#Write in docs how to refresh worker ( async handler )
1 messages · Page 1 of 1 (latest)
@near field btw check this
Yep but he wanted it for async not for normal handler
yes, but for async handler and thats for normal without the async keyword
import runpod
import asyncio
async def async_generator_handler(job):
results = []
for i in range(5):
# Generate an asynchronous output token
output = f"Generated async token output {i}"
results.append(output)
# Simulate an asynchronous task, such as processing time for a large language model
await asyncio.sleep(1)
# Return the results and indicate the worker should be refreshed
return {"refresh_worker": True, "job_results": results}
# Configure and start the RunPod serverless function
runpod.serverless.start(
{
"handler": async_generator_handler, # Required: Specify the async handler
"return_aggregate_stream": True, # Optional: Aggregate results are accessible via /run endpoint
}
)
Async Generator Handler:
The function async_generator_handler generates asynchronous outputs within a loop.
await asyncio.sleep(1) simulates asynchronous processing.
Collecting Results:
Results are collected in a list results instead of yielding directly.
This way, they can be returned as a single response.
Hmm okay cool so its allowed?
my ide says the otherwise, let me try it later
I tried it locally and got:
python hello_world.py --test_input '{"input": {"name": "World"}}'
--- Starting Serverless Worker | Version 1.6.2 ---
INFO | test_input set, using test_input as job input.
DEBUG | Retrieved local job: {'input': {'name': 'World'}, 'id': 'local_test'}
INFO | local_test | Started.
DEBUG | local_test | Handler output: {'refresh_worker': True, 'job_results': ['Generated async token output 0', 'Generated async token output 1', 'Generated async token output 2', 'Generated async token output 3', 'Generated async token output 4']}
DEBUG | local_test | run_job return: {'output': {'job_results': ['Generated async token output 0', 'Generated async token output 1', 'Generated async token output 2', 'Generated async token output 3', 'Generated async token output 4']}, 'stopPod': True}
INFO | Job local_test completed successfully.
INFO | Job result: {'output': {'job_results': ['Generated async token output 0', 'Generated async token output 1', 'Generated async token output 2', 'Generated async token output 3', 'Generated async token output 4']}, 'stopPod': True}
INFO | Local testing complete, exiting.
Oh okay thanks
oh my bad sorry I was asking for generator handler, just checked yes your solution works with async but not with generator because when you have yield it wont allow you to have return with value
How about generator handler then?
import runpod
import asyncio
async def async_generator_handler(job):
for i in range(5):
output = f"Generated async token output {i}"
yield {"job_results": output}
# Simulate an asynchronous task, such as processing time for a large language model
await asyncio.sleep(1)
# Indicate the worker should be refreshed after yielding all results
yield {"refresh_worker": True}
async def run_async_generator_handler(job):
results = []
async for result in async_generator_handler(job):
results.append(result)
return results
# Configure and start the RunPod serverless function
runpod.serverless.start(
{
"handler": run_async_generator_handler,
"return_aggregate_stream": True,
}
)
Like this?
hey again, finally
oh does it works?
Yeah something like that, but it doesnt refresh the worker does it?
maybe the functionality isn't there its fine
I am only running it locally - but that's what I would image. I don't have a clear use case to build out an example of this.
thats what you would image? wdym
use case huh, probably some "stream" task, with other utilities such as downloading, moving files that needs to refresh the worker
but refreshing workers or not doesnt really matters right
okay - hold on. Checking some of the code.
Okay, so:
-
Refresh After Every Job: The refresh_worker flag causes the worker to refresh after every job. This means that once a job is completed, the worker stops and will only start processing again when a new request is received.
-
Stops the Pod Until a New Request: After completing a job, the pod stops and remains idle until it receives a new request. This can be useful for resource management, ensuring that the worker is only active when needed.
-
No Per-Job Streaming at the Moment: Currently, RunPod does not support per-job streaming of results. This means that you won't receive a continuous stream of results for each job as it processes. Instead, results are aggregated and returned once the job is fully completed.
runpod.serverless.start(
{
"handler": async_generator_handler, # Required: Specify the async handler
"return_aggregate_stream": True, # Optional: Aggregate results are accessible via /run endpoint
"refresh_worker": True, # Worker refreshes after every job
}
)
Yeah alright this maybe works, but does refreshing the workers after every job will make flashboot like unused?
And the stream works great with generator handlers, why does that say no per job streaming atm?
flashboot and refresh worker won't work together, since cache gets ejected when refresh worker occurs