#Write in docs how to refresh worker ( async handler )

1 messages · Page 1 of 1 (latest)

scenic hamlet
#

Title says it all, just the async handler worker wont work with return that has any values. and docs doesnt explain how to do it in async handler

if you guys got any solution pls help by sending it here and pinging me

scenic hamlet
#

@near field btw check this

near field
#

Is this what you're looking for?

strong bear
#

Yep but he wanted it for async not for normal handler

scenic hamlet
near field
#
import runpod
import asyncio

async def async_generator_handler(job):
    results = []
    for i in range(5):
        # Generate an asynchronous output token
        output = f"Generated async token output {i}"
        results.append(output)

        # Simulate an asynchronous task, such as processing time for a large language model
        await asyncio.sleep(1)
    
    # Return the results and indicate the worker should be refreshed
    return {"refresh_worker": True, "job_results": results}

# Configure and start the RunPod serverless function
runpod.serverless.start(
    {
        "handler": async_generator_handler,  # Required: Specify the async handler
        "return_aggregate_stream": True,  # Optional: Aggregate results are accessible via /run endpoint
    }
)
#

Async Generator Handler:

The function async_generator_handler generates asynchronous outputs within a loop.

await asyncio.sleep(1) simulates asynchronous processing.
Collecting Results:

Results are collected in a list results instead of yielding directly.
This way, they can be returned as a single response.

scenic hamlet
#

my ide says the otherwise, let me try it later

near field
#

I tried it locally and got:

python hello_world.py --test_input '{"input": {"name": "World"}}'
--- Starting Serverless Worker |  Version 1.6.2 ---
INFO   | test_input set, using test_input as job input.
DEBUG  | Retrieved local job: {'input': {'name': 'World'}, 'id': 'local_test'}
INFO   | local_test | Started.
DEBUG  | local_test | Handler output: {'refresh_worker': True, 'job_results': ['Generated async token output 0', 'Generated async token output 1', 'Generated async token output 2', 'Generated async token output 3', 'Generated async token output 4']}
DEBUG  | local_test | run_job return: {'output': {'job_results': ['Generated async token output 0', 'Generated async token output 1', 'Generated async token output 2', 'Generated async token output 3', 'Generated async token output 4']}, 'stopPod': True}
INFO   | Job local_test completed successfully.
INFO   | Job result: {'output': {'job_results': ['Generated async token output 0', 'Generated async token output 1', 'Generated async token output 2', 'Generated async token output 3', 'Generated async token output 4']}, 'stopPod': True}
INFO   | Local testing complete, exiting.
scenic hamlet
#

How about generator handler then?

near field
#
import runpod
import asyncio

async def async_generator_handler(job):
    for i in range(5):
        output = f"Generated async token output {i}"
        yield {"job_results": output}

        # Simulate an asynchronous task, such as processing time for a large language model
        await asyncio.sleep(1)
    
    # Indicate the worker should be refreshed after yielding all results
    yield {"refresh_worker": True}

async def run_async_generator_handler(job):
    results = []
    async for result in async_generator_handler(job):
        results.append(result)
    return results

# Configure and start the RunPod serverless function
runpod.serverless.start(
    {
        "handler": run_async_generator_handler, 
        "return_aggregate_stream": True,
    }
)

Like this?

scenic hamlet
#

hey again, finally

scenic hamlet
#

Yeah something like that, but it doesnt refresh the worker does it?

#

maybe the functionality isn't there its fine

near field
#

I am only running it locally - but that's what I would image. I don't have a clear use case to build out an example of this.

scenic hamlet
#

thats what you would image? wdym

#

use case huh, probably some "stream" task, with other utilities such as downloading, moving files that needs to refresh the worker

#

but refreshing workers or not doesnt really matters right

near field
#

okay - hold on. Checking some of the code.

#

Okay, so:

  • Refresh After Every Job: The refresh_worker flag causes the worker to refresh after every job. This means that once a job is completed, the worker stops and will only start processing again when a new request is received.

  • Stops the Pod Until a New Request: After completing a job, the pod stops and remains idle until it receives a new request. This can be useful for resource management, ensuring that the worker is only active when needed.

  • No Per-Job Streaming at the Moment: Currently, RunPod does not support per-job streaming of results. This means that you won't receive a continuous stream of results for each job as it processes. Instead, results are aggregated and returned once the job is fully completed.

runpod.serverless.start(
    {
        "handler": async_generator_handler,  # Required: Specify the async handler
        "return_aggregate_stream": True,  # Optional: Aggregate results are accessible via /run endpoint
        "refresh_worker": True,  # Worker refreshes after every job
    }
)
scenic hamlet
#

And the stream works great with generator handlers, why does that say no per job streaming atm?

hollow umbra