#async-and-concurrency
12 messages · Page 77 of 1
I'd recommend just writing your own and use some async websocket library directly
I never knew about this error when I initially started using it
Wait can I just copy paste it like this into my code and then just do asyncio.run(main()) instead of doing connector.start() will that give me a handle for later on from inside the event loop being able to just end it?
You have that stop event right?
Which stop event? How do I know, if I have the stop event or not
You just return from that function and asyncio.run will stop the loop for you
@connector.close was what I was looking for
Yes but connector.close is not closing the event loop I already tried calling it
import asyncio
from lcu_driver import Connector
connector = Connector()
_close = asyncio.Event()
@connector.close
async def disconnect(connection):
_close.set()
async def amain():
# Some of the below code is copied/adjusted from
# Connector.start() without the code which runs the loop
process = next(_return_ux_process(), None)
while not process:
await asyncio.sleep(0.5)
process = next(_return_ux_process(), None)
connection = Connection(self, process)
connector.register_connection(connection)
task = asyncio.create_task(connector.init())
await _close.wait()
task.cancel()
try:
await asyncio.wait_for(task, 0)
except asyncio.TimeoutError:
return
if __name__ == '__main__':
asyncio.run(amain())
Better to do await asyncio.wait_for(task, 0) and catch the TimeoutError generally
do i forcefully have to add connection as a argument for the disconnect function?
do I even have to call disconnect itsself or can I just call _close.set()
I was just following their example: https://github.com/sousa-andre/lcu-driver/blob/35da052e47807e6ea7a246093778c58a43f2210e/examples/create_custom_lobby.py
The library calls disconnect when the connector closes
If you want to spontaneously close everything, you can call _close.set()
I already tried doing from lcu_driver.utils import _return_ux_process but that did not work
lcu_driver/utils.py line 15
def _return_ux_process() -> Generator[Process, None, None]:```
Ah I didn't want to change too much, but you should be able to replicate it with: ```python
from psutil import process_iter
...
Inside of amain()
found = list(filter(lambda p: p.name() in {'LeagueClientUx.exe', 'LeagueClientUx'}, process_iter()))
while not found:
await asyncio.sleep(0.5)
found = list(filter(lambda p: p.name() in {'LeagueClientUx.exe', 'LeagueClientUx'}, process_iter()))
process = found[0]
Essentially that copies the implementation into amain(); I took the liberty to make it a bit shorter and skip the generators
Oh I just realized, filter() returns an iterator right
now it has an issue with python connection = Connection(self, process) because first off Connection( was not defined so I imported it (from: https://github.com/sousa-andre/lcu-driver/blob/35da052e47807e6ea7a246093778c58a43f2210e/lcu_driver/connection.py#L17) and then it still kept saying self there and self is not defined. So then I tried removing self and I ended up with what you see on this screenshot
lcu_driver/connection.py line 17
class Connection:```
AAhh I thought I changed that, self should be connection. Hmm I need to start doing this in my editor so that Pylance can scream at me 😅
Wait but until that line connection is not even defined, I can't use connection instead of self, I end up with a unboundlocalerror
and I can't use connector either, because AttributeError: Connector Object has no attribute register_connection
Sorry it is getting late, I'll step away. I meant the global, connector right? There's way too much differences in such small variance of variables haha
lcu_driver/connector.py lines 40 to 41
def register_connection(self, connection):
self.connection = connection```
Hm?
¯_(ツ)_/¯
oh lol it was the next line that errored
but why
I am not sure what to say, I am looking at the source and it is right there
Can you Ctrl + Click on Connector on like 2299 to go to the source? Compare it to what is on GitHub
i think it is because connection and connector classes are defined in lcu_driver/connector.py but also in lcu_driver/connection.py
i am mixing up to import them correctly
actually i am confused myself
We're both confused, I am still not sure why you don't just reimplement the library and give the access you need with a bit of better design
It's a super small library
I already tried doing this but I don't know what to reimplement
you mean like taking a local copy of it and then editing it there right
all my edits to it, caused me to get more brain damage from errors
it's not like it's easy to locally modify the source without really knowing what you're doing with async
Everything? Remove it as a dependency. Create a folder in your project and redesign it, keeping the names and attributes that you use the same so that you don't need to change much other code.
Yes man, I already tried doing that but as i just mentioned I have no clue how to edit it so that it just closes that event loop with connector.stop() or connector.close() or whatever (since that is the function that is broken -> https://github.com/sousa-andre/lcu-driver/blob/35da052e47807e6ea7a246093778c58a43f2210e/lcu_driver/connector.py#L74)
ah yes
@severe lynx the pypi package's connector class does not even have the same source as the github code
i could cry
I'll just download the one from github and put it into my local folder and then import it from there
it would be funny if the github version would work out of box, while the pypi package is just a outdated broken version
funny and upsetting
😭
the way you usually start the connector is by doing connector.start() but even that didn't work for a replacement of conn.init(). becaue then it says Connection.init was never awaited even when I add await infront of it
The fact that the GitHub version is so different from the PyPI makes it very difficult to do much
I am literally using the github version now
Hm? You're connecting two errors which are separate
I downloaded it from github and importing it from the local folder
and I still can't do conn.init()
the error on the screenshot above is from the github version
If you're still getting the has no attribute 'init' then I am not sure you have done so correctly? I do really need to go to bed now, sorry.
I figured out it has to be connection.init but you wrote connector.init and connector does not have init
@severe lynx but just the line after that i got the next error
what I am guessing is because I am importing the class from the library, but in the library it is creating another loop (here: https://github.com/sousa-andre/lcu-driver/blob/35da052e47807e6ea7a246093778c58a43f2210e/lcu_driver/connector.py#L64) this error is happening
You will probably need a dirty work-around that delays the instantiation of the Event until you call asyncio.run
can i just remove the function start or something
or in start / in that wrapper say get running loop?
This line complains because Event I guess captures the loop which is different before it is actually running, compared to when asyncio.run unconditionally creates a new one
I usually don't get this issue 🤔 ```pycon
import asyncio
e = asyncio.Event()async def task():
... e.set()
...
async def main():
... t = asyncio.create_task(task())
... await e.wait()
... await t
...
asyncio.run(main())
what do I do
i just tried editing start to instead do self.loop.get_running_loop(connection.init()) but that also did not work
actually wait i am nto even sure if start got triggered
ah yes you were talkign about a event
yeah no clue
how?
can we not just do a dirty workaround but instead just edit the github source so that it stops creating the event in first place
friend pls help 🙏🏻
_close = None
async def amain():
global _close
_close = asyncio.Event()
...
This would only create the Event when the real event loop is running really
it's executing until the print('waiting') and then getting stuck there forever
even though the event already long started (the leagueclientux.exe process)
do you think it's because of the generator stuff you were talking about before
That's a good thing, when the next line executes the process will exit really
The event is set to signal to exit
but man it is not executing the right code
the connection.init() is not doign what it is supposed to
like it's not initialising as if i was calling .start
the functionality is entirely missing I just end up doing nothing, lcu driver is not starting and I'm waiting infinitely at that wait
the issue was the generator stuff
I replaced the lambda stuff with the original code and now it started
@severe lynx did starting it as a task somehow break the requests library?
ahh actually it's not requests
connection.request
that's actually apart of lcu driver
how come I can initialise it now but I can't use status
Did you maybe update/downgrade AioHTTP? I think this night be because it now expects you to do async with? Seems odd, not sure what's going on other than what I suggested.
just uninstalled aiohttp again and used the setup.py to reinstall everything and I still get the same error
What did you even suggest
do you think this is because on the github source, he wrote the connection.request stuff differently?
then what he had written on the pypi one, so now I can't use that part anymore without merging it back from the pypi stuff to the github version I am using now
apart from that _close.set() does not even work, I can't trigger the event to close. seems like for some reason everything is getting ported through aiohttp's requestcontextmanager to the point I can't even return out of my exit function without an error
Once i turn off the devices, gather will get stuck with the notify task and never leave it. As the script is stuck i am not able to clean up and cancel the awaited futures.
Wouldn't the same happen to using anyio?
Update: Its also resolving in an endless loop.
I am starting to think that there is no solution. The code just gets stuck.
Might have to change the approach
anyio implements level cancellation
So once you cancel your task group it will keep cancelling tasks until they finish
Thats great but i run into an infinite loop. So anyio will not do a thing, if i am not mistaken.
There is no expection or something happening just the code running forever
Can someone explain me how discord py handle callbacks?
How do I callback a function with the name that I give as an input
example:
@bot.on_message
def something(ctx):
pass
@bot.on_message
def something2(ctx):
pass
and have a way to only respond to one depending on the input
(sorry I dont have any async yet in my code. but I guess that is what I want. that is why I'm asking it here)
def on_message(self, func:callable):
while True:
req = str(input("User: "))
if func.__name__.startswith(req):
func(req)
something like this
When do you want your code to stop? You just call tg.cancel_scope.cancel() and anyio will stop everything in the task group
I have no indication of when the event will happen. So the code will run suddenly into an permanent loop leaving me no options to handle that after the event.
Unfortunately, i do not see any way to deal with that
Meaning i cannot call tg.cancel_scope.cancel() as the code is stuck in the permament loop before it can be executed
Any framework for asynchronous live graph you can suggest?
Cant make matplotlib update the graph
how can I hardset that asyncio can only run 1 task of the same kind at the same time?
I have this one task, that manages to launch at two points in my code at the same time even though I tried so hard to not let it do that by checking if the task is running already and stuff but it still launches
Semaphore?
Is this the ble thing?
no clue how you manage to constantly mix me and the other guy up
Honestly it's tough lol
Ah yes, that's it I'm used to hash coloured nicks like in element.io and irccloud
if I want to freeze the code execution at the creation of a task, where do I place my await? infront of asyncio.create_task or infront of the actual function that is supposed to be started as task? so like: asyncio.create_task(await myfunction())
await asyncio.create_task(myfunction())or 2.asyncio.create_task(await myfunction())?
How long do you wish to freeze/wait for?
until the code inside the task finishes
Why even start a task then? Just do await myfunction() like usual
because I wrote my async code to be able to start the same function at two places in my code and turning it into a task helped me to stop it from starting twice
by just looking at asyncio.all_tasks() and then checking if it is running already, so that there's no way for the rest of my async code to start the now task twice
ah nevermind it is still starting twice

Why not use a lock? You're using asyncio.all_tasks() for all the wrong reasons
I am pretty sure asyncio.all_tasks() can only reasonably be used for debugging
how can I set a lock then, so that every task that is running can only be ran once simultaneously no matter how many times it is trying to get started
it's not like i ever knew about async locks
Do you want to abort the task, or enqueue it?
If you're trying to start the task, but one is already running, what behaviour are you looking for?
abort the task
just nothing should happen; it shouldn't start the task a second time and just continue with executing the rest of my code
# Inside of globals, or somewhere which all points reach
lock = asyncio.Lock()
# Where you start the task
if not lock.locked():
t = asyncio.create_task(...)
# Inside of the task
async with lock:
...
do I ever need to remove the lock again or does it auto remove when the task is done?
also can I still cancel the task manually to also get rid of the lock?
The async context manager (async with) ensures that the lock is correctly released
You don't need a lock here though, you only need to keep track of the task you created and not create another one
hey, why should i use job queues like rq, celery etc instead of running the jobs somewhere on a python thread?
If you're using gunicorn it can kill your subprocess mid execution
ahh okay ty, so youd recommend me to use something like celery
Not necessarily
Starlette/FastAPI has a nice background task system
A lot of people reach for celery/rq unnecessarily
Never had good experiences with Celery
Celery is pretty great when you need it
Unless you need like, one task running at most across several processes which aren't aware of each other, then probably not required
But terrible if you don't need it
It's cool, but it was using 40GB of ram on startup 
That's probably your own fault
Mmm definitely shouldn't have been, was only a cron job scheduled at midnight and once per hour
I it was my fault
Then not sure how I feel about it being super easy to accidentally get it trying to consume until it OOM
@agile pike basically it depends on precisely what, how and where you're running tasks
Well if you didn't report a bug with a minimal reproducer it's your fault xD
😅 Should have issued a bug report but alas, other priorities got in the way
im gonna use it with fastapi to run some stuff that can take a little to complete like doing api requests and for scheduling certain tasks
Are you running JS on your client?
You can use a background fetch on your client and run the task entirely in a request
Then the client knows if the process finished and your server can cancel it if the client disconnects
You can even stream progress to the client
ahh yeah i can do that with some of the requests, but i still cant do it with other requests like uploading an image to imgur, otherwise id need to send my api key to the frontend right
No
You'd stream the image to FastAPI from JS with fetch
And stream the image from your FastAPI to imgur in a normal foreground async function for the lifetime of the request
You need to make sure your proxy doesn't buffer requests
Although I'm surprised imgur doesn't have a pre-signed upload API - where you create a URL to upload to on your server, send it to the client, then the client uploads to imgur
Upload Amazon S3 objects using presigned URLs when someone has given you permissions to access the object identified in the URL.
ahh nice thanks! im going to check that out
nope im on google cloud, but im probably gonna switch
Google cloud probably has a similar thingy
yeah im gonna check it out it looks interesting to use
Get the client to upload to a bucket of some sort then tell imgur to download from it
yeah seems like google cloud has signed urls also
yeah i was also checking the docs but couldnt find it, ill take a better look at it tomorrow
and how would you recommend me to do scheduled tasks?
Depends
What, how and where your tasks run
And how often
Pretty sure Google cloud functions has a facility for chron jobs
every hour its making a few api requests for all users tho it can take like 5 min to complete, and the user can also schedule certain functions btw thatll run just once
would that be worth it compared to something like rq?
Yes
okay thanks im gonna check that out
But also you might not even need it
A last updated timestamp in the database with on access refresh might be all you need
does anyone know how to async listen for/wait until a process has started (without constantly polling)?
await asyncio.wait_for(...)
Or using asyncio.Event
can you be more specific please; about how exactly I would be using these async functions in relation to awaiting a process to start, without polling for it (to know that it started or not)?
What is the criteria for the process to be considered ready?
if it is ready or not does not even matter, I just wanna know if it has launched or not the second it launches
Is it a process you start?
no
I'm not sure then. I would think it's OS-specific if there even is anything that would notify when a process starts.
For example, on Linux https://stackoverflow.com/questions/6075013/how-to-detect-the-launching-of-programs-on-linux/8255487
I'm on windows, with python 3.8.10
I don't know. If you want to avoid polling you can research how to be notified of process creation/start on Windows.
That's basically what I just did for Linux above and found something in a minute
I've researched already and afterwards asked my question in this discord since I've not found satisfying results to my search
This suggests WMI. Have you already researched that? https://stackoverflow.com/questions/3556048/how-to-detect-win32-process-creation-termination-in-c
no applicable with python / not accessible with ctypes
That's not the point, there's multiple libraries for access to WMI, but they're all limited and do not provide access to the entire environment of WMI
also all the libraries are based on using ctypes mostly, which is the reason why I wrote not accessible with ctypes
Like if it was possible with python, I would want to just interrupt CreateProcess through injecting my own dll into kernel32.dll that hooks it and then I would basically know whenever anything launches, but how do you do that ever with python. like this is a example for that https://github.com/vmcall/ayyxam/blob/master/ayyxam/handler.cpp#L53
If that's the case perhaps you could use WMI through a Python extension?
nah that's over complicating my use case way too much, I was searching for easily applicable thing through python
Why isn't this applicable for your use case? http://timgolden.me.uk/python/wmi/tutorial.html#monitoring
Which part of WMI do you need access to that is missing from Python libs?
uhh I guess I missed that one, my bad 😕 thanks!
Using this approach is effectively polling too though so I'm not sure if it's worth the trouble in your case.
There is an async version of the WMI event API but I don't think that Python library wraps it
Can't you use a multiprocessing event, then setup either a callback, or a waiting thread, which finally sets an asyncio Event after the multiprocessing Event is sey
Oh I see it looks like you hsve absolutely 0 control over this process
This is a child process of the current process that you're looking for?
no, a entirely different program is creating this
guys is this
await asyncio.gather(*[aiosmtplib.send(msg, **config) for msg in messages])
Equal to this?
async def send_msg(msg):
await aiosmtplib.send(msg, **config)
await asyncio.gather(*[send_msg(msg) for msg in messages])
because the await in the second line makes me think that it will stop all other coroutines
Yes they're equivalent. Gather runs them concurrently.
The await on that line will not make them run serially
Depends if send returns a Future or a Coroutine
Oh also you forgot the return await
So any return value will be garbage collected at a different time
is how concurrency works the same in python as golang ?
no
Which type of concurrency? Goroutines?
yep
If you want to learn more about those, you can search around about green threads
Stackless and gevent are both, different types of green threads
Async is not green threads
ok thanks
https://github.com/python/cpython/issues/89419 python 3.11 is most of the way to stackless
And subinterpreters https://peps.python.org/pep-0554/#concurrency
Python Enhancement Proposals (PEPs)
Yeah but I don't think CPython will add native green threads
There's also https://github.com/dabeaz/thredo which is fun
I know it was you, thredo
?
Michael kisses his brother in Cuba: a scene from The Godfather Part II.
is there a simple way to finish a coroutine upon condition instead of using an event?
Looking to use if(cond): await foo
But thats not working because foo will run forever as it contains a notify function
I could gather and add a event handler function i guess but that seems overkill imo
can you wrap it in a task and cancel it conditionally ?
async def foo:
await notify(adr)
async def main:
await foo()
asyncio.run(main)
How can i cancel foo? (Foo is permanently running cause it will always await futures through notify)
You didn't await notifiy(adr) and you didn't call foo()
Is notify doing blocking IO? You need to run it in a thread or refactor it to wait for IO using the event loop
i just forgot to write it in here. Its awaited in my actual script. Its non blocking
When do you want to cancel it?
Got a question. I am trying to run asyncio.create_subprocess_exec with the ability to do things with the stdout/stderr as the subprocess is running (not after). Something like "call this function every time you get buffered output". How do I go about doing this?
naturally, the moment I ask a question, I find something promising in a blog post
I keep finding myself facing this problem where I want to run an external process in Python. There are a few ways to do it in the standard library: subprocess os.system os.spawn os.popen popen2 commands All of...
You don't want readline that waits for a whole line
yeah I think you'd want to wait for any output at all and buffer it yourself
But also stream sockets don't provide framing
"don't provide framing"?
i recognise the terminology but I'm unsure how it applies to this situation exactly
like....yeah, streams don't provide you a delimiter, but that's...fine?
If the other end calls stream.write(chunk)
You might only get half the chunk at the other side
Or nothing
where does the chunk fall through?
And you might need to wait for the other end to call stream.write(chunk); stream.write(chunk2);
or flush?
even then
oh i see what this code is doing, yeah this code is incomplete
still, the core conceit of awaiting on stream.read seems like it could work?
oh of course this could've been avoided if I hadn't misread the python docs in the first place
Yeah I think it's supposed to be a read1
trio processes have a recieve_some method
You also have to be very careful with process streams as if you don't have a task per stream it can dreadlock
after a condition is met: cond_var is True
Hello! There's no "Schedulers" chat, so I'll ask here. What's the best scheduler to run along your main app in Flask or other frameworks? We're running APScheduler in production, it keeps dying randomly or losing jobs on re-deployment. Using persistent mongo storage
Of course, I tried fixing it, but so far only found discussion on its github and other places of other people having same issues.
Need to have a shared objects between programs on Windows... any ideas?
What object are you trying to share?
1st program has access to interface to 2nd programs class interface. The second program currently is merely a monitor which periodically reports the state values hopefully changed by the 1st program.
Why not do the reporting from the same program?
anyone experienced with httpx? my code is breaking when i try to run 12 + clients as coroutines
What's the error?
which client do you prefer https://github.com/aaugustin/websockets or aiohttp.ClientWebSocketResponse
if your project already has aiohttp as dependency and you just need a websocket client, aiohttp.ClientWebSocketResponse is fine
alright thanks
can you use the shelve module asynchronously
because im caching some data with shelve but i want it to run asynchronously
You have to use it with a thread currently
does that work well with asyncio?
Threads works well with coroutines, but some data structures like asyncio.Queue aren't thread-safe
if your code has a producer consumer workflow, how do you name the python file that deals with creating threads and launching workers (contains worker classes and routines for synchronization, etc)?
import asyncio
async def counter(name: str):
for i in range(3):
print(f"{name}: {i!s}")
await asyncio.sleep(0)
async def main():
tasks = [asyncio.create_task(counter(f"task{number}")) for number in range(4)]
asyncio.run(main())
Output:
task0: 0
task1: 0
task2: 0
How does themaincoro yields control when there's no await?
It returns
Can you elaborate?
https://github.com/python/cpython/blob/3.10/Lib/asyncio/runners.py#L44 when your asyncio.run task completes you end up here
Lib/asyncio/runners.py line 44
return loop.run_until_complete(main)```
And that works by using asyncio.create_task(main).add_done_callback
And a task effectively does an await asyncio.sleep(0) before calling callbacks
Why this doesn't output anything and yield control as above?
import asyncio
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
async def main():
tasks = [asyncio.create_task(say_after(number, "hi")) for number in range(4)]
asyncio.run(main())
It does - it's just that asyncio.sleep raises asyncio.CancelledError
Interesting, moving print to the top generated an output like this:
hi
hi
hi
hi
Fuck, this whole asyncio shit is too paradoxical. :P
?
I'm struggling. :P It mostly creates unexpected outputs and behaviors for me.
I'd recommend using a task group instead of using asyncio.create_task
I'm just experimenting, the tutorial I'm following uses this kind of examples. So far, so bad. :P
Thanks. Will keep being overwhelmed with asyncio's weirdness. :P
So I'm getting a RuntimeError: Event loop is closed with this code:
Entry: https://paste.pythondiscord.com/heniyegici
Database: https://paste.pythondiscord.com/qidilanuba
Stacktrace: https://paste.pythondiscord.com/obosugifit
I tried making a new loop event and running the function through it, but to no avail.
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
Is there something that I'm missing? I'm using Linux, python 3.10.
You're using asyncio.run too many times
You should only call it once
Like this:
async def amain():
await bot.start(token=await db.read_database('config', 'token'))
def main():
asyncio.run(amain())
Alright, I'll look into this tommorow. Thanks!
I see quite a few of:
DEBUG:asyncio:<asyncio.sslproto.SSLProtocol object at 0x7f0af3783e20>: SSL handshake took 12911.3 ms when I have logging turned up to debug. Is this unexpected?
12 seconds is a long time to wait for an ssl handshake
Try running in -X Dev mode?
is it possible to run async functions after certain thing happens?
sync function()
after and only after sync function runs
async func1()
async func2()
async func3()
``` ?
What do you mean specifically?
so for example async func1 will use values coming from the sync function, so if async func1 runs before sync function finishes running, it will return an error
cause some data will be missing
meaning I need async func1 (actually all my async funcs)to wait till sync function finishes running
cause in my code, certain funcs (that are now sync but should be async) could be running when certain sync funcs finish running
if it makes sense
So you just call
function()
await func1()
so basically await waits for something to finish and then it executes whats next
ok yeah I'll go watch async tutorials 🤣
tysm
Hello, folks! New here! May i ask a question here? Noone came to help me on the help channel. Im doing something asyncio related, but i'm having trouble even before asyncio part starts 🥶
win32com / pythoncom stuff
Yes
I need to create multiple instances of finite element pre/post processor called Simcenter Femap (Siemens Femap) and pass a function into each of the instances for some asyncio fun.
The issue is that when I try to create the instances the actual app windows never open. I set the number of windows of create and the processes ‘femap.exe’ are actually being run – I can see the correct number of them in task manager. But after like 3 or 4 seconds they get terminated and I have no idea why. Tried using ‘win32com.client.Dispatch’ (which is not the correct one to use here), ‘win32com.client.DispatchEx’ (which is actually used for multiinstances as far as I know from google search), ‘pythoncom. CoCreateInstance’. The last 2 work as I described earlier.
Any ideas? I guess it would be helpful to catch some error code / event that’s happening?
Here’s the function to create instances If you need it.
win32 = win32com.client and there should be DispatchEx on the screenshot, sorry...
another thought i have is that this function is actually have to be async too. i'm issuing for example 4 commands to run 4 instances, but as far as i know python is non-concurrent by default so it has to wait until the command passes but there's some error here because the app takes several seconds to start?... just a thought
I put it all in 1 async function but I still keep getting Event loop closed
Stacktrace: https://paste.pythondiscord.com/erikaperok
Main: https://paste.pythondiscord.com/solitefete
Database: Unchanged
@dull ocean How did you improve yourself in asyncio? From which resources? Any prior async experience?
I used the tutorials in the pins
And I read the curio source code
Any prior async experience?
import pygame
from pygame.locals import *
import asyncio
from sys import exit
from random import randint
class Game:
def __init__(self):
pygame.init()
self.screen = pygame.display.set_mode((640, 480))
pygame.display.set_caption('My Game')
self.clock = pygame.time.Clock()
@staticmethod
async def loop(event_queue):
while True:
await event_queue.get()
event_queue.put(pygame.event.wait())
event_queue.task_done()
@staticmethod
async def update():
pygame.display.update()
await asyncio.sleep(0.01)
async def events(self, event_queue):
while True:
self.clock.tick(60)
event = await event_queue.get()
if event.type == QUIT:
pygame.exit()
exit()
self.screen.fill((0, 0, 0))
await self.update()
async def main():
game = Game()
event_queue = asyncio.Queue()
task = asyncio.create_task(game.loop(event_queue))
await event_queue.join()
task.cancel()
await asyncio.gather(task, return_exceptions=True)
await game.events(event_queue)
if __name__ == "__main__":
asyncio.run(main())
can someone help me with connecting the events from pygame to the event_queue
You need await asyncio.to_thread(pygame.event.wait)
@staticmethod
async def loop(event_queue):
while True:
await event_queue.get()
event = await asyncio.to_thread(pygame.event.wait)
event_queue.put(event)
event_queue.task_done()
is that correct or?
ah
# so this?
@staticmethod
async def loop(event_queue):
while True:
await event_queue.get()
event = await asyncio.to_thread(pygame.event.wait)
event_queue.put_nowait(event)
event_queue.task_done()
The consumer should call task_done, not the producer.
ic
But you probably don't need task_done or join
Update: I forgot await in read_database. Spent 2 days trying to fix this thing ._.
Hey everyone! I would like to know how should I design something like this, and whether asyncio.Events would be suitable for this.
I have a socket connection over TCP, and let's just say each packet has its own ID. Every now and then in the code I would want to wait until another packet of id A arrives. Of course, in the function which reads and parses packets, if it detects a packet with id A, it could trigger an event. But, i don't need to do this for every packet A, only sometimes. I thought of something like this: ```py
self.packet_A_event = asyncio.Event()``````py
data = await self.read(1024)
packet = parse_packet(data)
if packet.id == 'A':
self.packet_A_event.set()
self.packet_A_event.clear()
# somewhere else
do_something()
await self.packet_A_event.wait()
do_something_else```However, this is not very practical for me because I also need the packet data at the point where I wait for it. How can I implement this behaviour?
it seems like you're multiplexing multiple channels over socket
you might be best off with a queue for each packet type
wdym exactly by that?
I have specific packet handlers for each packet type, and they get called as soon as possible (more precisely, they get scheduled as tasks, internally I don't care what they do, they don't have to be awaited)
Oh just schedule them from your socket read loop
yeah, that works, but I want to give signals whenever specific type of a packet is received
You can give those signals from your packet handlers
exactly, that is what I'm trying to do
would asyncio.Event s be appropriate for that task?
(similar to the code that i sent above)
Well the way you have it setup is that it's easy to skip events
So you'll have to tell me a bit more about your usecase
But a Queue is usually better - but it sounds like you want a broadcast channel?
I have a class that basically handles the connection
and responds with keep alive packets - you know, as usual
it acts like an API. In the main file, i subclass it, register some listeners and then run forever
i basically need a function which blocks until a packet of specific type is received
I don't understand how a queue would be used; you mean if something is currently waiting for a packet, a listener/handler would put it to the queue? If nothing is waiting for a packet then it has to be skipped
im quite new to asycronous stuff and im having issues getting some code running using mcstatus's async_lookup when i run py server = JavaServer.async_lookup(address=ip, timeout=5) print(await server) i get <mcstatus.server.JavaServer object at 0x000002127DE42920> which im assuming means it has actually done the server lookup i just have no idea how to access the data that should be shown
if using JavaServer.lookup instead of the async i can just call server.status() however running with the async gives me the error 'coroutine' object has no attribute 'status'
Show the full traceback?
there is no traceback
Show a screenshot?
there is no traceback im simply asking how i would retrive the data from the object that is returned
or is that object literally just the definition of server
there is literally no documentation for this section of the module/library/whatever so im going in blind
What do you have if you have no traceback?
Show the full code you wrote and what happens when you run it
I've already shown the code and what happens when I run it
@minor plinth there is not any problem with asyncio. You did a server lookup, you awaited it because you actually wanted the data returned. What you got is a JavaServer object, you just have to access some of it's properties. See:https://github.com/py-mine/mcstatus/blob/c3cc75e78588220260c7b84f6d1953ee1fc6a407/mcstatus/server.py#L87
mcstatus/server.py line 87
async def async_lookup(cls, address: str, timeout: float = 3) -> Self:```
where does asyncio.create_task run in another thread?
It doesn't
where does it exactly 😅
It schedules it on the current event loop
but how does it not block any other coroutine? e.g you pass a blocking loop inside of the coroutine that has been scheduled?
I wrote a simple demo program that just schedules coroutines with no IO or time here #async-and-concurrency message maybe messing with that will help explain it?
But yeah if you pass a blocking loop inside of the coroutine it will block every other coroutine - except the current one
like the current one being ran? because e.g i have a while True loop that blocks everything but in a task it doesnt block it.
so im kinda confused ngl😅
Can you reproduce your issue with my fake event loop?
well its not really an issue im just curious
I can't understand your question, you didn't provide enough information. Can you show some code that shows the issue you have?
import asyncio
async def test():
while True:
await asyncio.sleep(0)
asyncio.create_task(test())
e.g here it doesnt block anything over just awaiting the coroutine
That's not a blocking while loop
well what does it block? because on execution if you called it normally it would not execute anything else?
That's actually even more insidious issue - it's warm rescheduling
Try repeating it in the event loop code I wrote?
You need to call asyncio_sleep(0) instead of asyncio.sleep(0) and loop.create_task instead of asyncio.create_task - but it will all work the same
Just replace the code in async def amain(loop) :
sorry im kinda bad at explaining but if i make a talk with a loop that e.g isnt async and can block further code how does making a task not block the code block under the function call? does create task just schedule the coroutine and then the further code gets executed?
ah ic guess i was just having a brain fart😅 😭
tornado's gen.coroutine does call coro.send before scheduling the coro so it's a bit odd in that way
Hi, guys. Why below code will get error?
I want to create a process to do some auxiliary works. It will still work some time even main script exit, so I could not use join on it.
import multiprocessing as mp
def foo(q):
print("hhh")
if __name__ == '__main__':
mp.set_start_method('spawn')
q = mp.Queue()
p = mp.Process(target=foo, args=(q,))
p.start()
error:
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/Users/sienna/opt/anaconda3/envs/eflow/lib/python3.9/multiprocessing/spawn.py", line 116, in spawn_main
exitcode = _main(fd, parent_sentinel)
File "/Users/sienna/opt/anaconda3/envs/eflow/lib/python3.9/multiprocessing/spawn.py", line 126, in _main
self = reduction.pickle.load(from_parent)
File "/Users/sienna/opt/anaconda3/envs/eflow/lib/python3.9/multiprocessing/synchronize.py", line 110, in __setstate__
self._semlock = _multiprocessing.SemLock._rebuild(*state)
FileNotFoundError: [Errno 2] No such file or directory
You have to use join on it
And you'll want to use the def main(): https://docs.python.org/3/library/__main__.html#idiomatic-usage
Also you don't need to use set_start_method, use context = mp.get_context("spawn")
It's not clear why you couldn't use join
Emmm, if I use join my main process have to wait this process finish to exit
I'll take a look, thanks
yeah, this helpful.
Yes your main process has to wait that's correct
You do all the stuff you want to do then call join
Emmm. You are right, I should have messed up something. I'm reading a large library, it start a process do some work. I just remember it called join in another place. Thank you for help!😬
It's also marginally better to use a process Pool
Is it unrecommended to use async and threading at the same time?
things like this ```py
async def _run(self) -> None:
pass
def run(self):
self.loop.create_task(self._run())
in my situation its not necessarily plausable
glad I'm not you, then
thanks
honestly I can't think of a reason why you'd ever need both. iiuc, they solve roughly the same problems
if I were you I'd use whichever one you're forced to use (probably async)
try to get 'er done without the other one
im mixing selenium and websockets
but i need multiple selenium instances running in multiple different threads
that is way, WAY above my pay grade
why am I having to do it when my pay grade is lower 😢
I'm going to rephrase my question
This stack overflow https://stackoverflow.com/questions/29269370/how-to-properly-create-and-run-concurrent-tasks-using-pythons-asyncio-module is what I am trying to solve but both solutions don't use asynchronous code, is there a way to solve his solution with using asynchronous functions?
I want to be able to run asynchronous functions similiar to threads
some data structures like asyncio.Queue are not thread-safe, but I think it is okay, there is even asyncio.to_thread function that runs IO-bound non-coroutine functions in threads to get asynchronous execution
selenium is sync, you need threads instead of asyncio
that stackoverflow solution both use async code, they just dont have the async keyword
ah no they used process pool
im implementing my own event loop and i dont understand this error:
if not iscoroutine(event[1]):
event[1]()
RuntimeWarning: coroutine 'main' was never awaited
event[1]()
surely if its not a coroutine then it doesnt need to be awaited?
note: im using the inspect modules iscoroutine
anyway you need asyncio only if you have async IO code
you probably need inspect.iscoroutinefunction() IIRC
coroutine is created when a coroutine function is called
that is the function im using
from inspect import iscoroutine
is that the only way to determine if a function is asynchronous or not then?
async def main():
its definitely asynchronous, so why is the if statement triggering 🤔
iscoroutine is not iscoroutinefunction
iscoroutine checks is given object coroutine
iscoroutinefunction checks is function returns coroutine
ah, i didnt realise they were seperate methods
async def main(): is coroutine function, it is a function that returns a coroutine
Selenium and async don’t mix well
If your website isn’t very security heavy then maybe look at playwright
thanks 👌
if not iscoroutinefunction(event[1]):
event[1]()
if event[2]: event[2]()
continue
try:
command = event[1].send(None)
...
new error 😄 'function' object has no attribute 'send'
how come a normal function escapes the notice of the if statement and continue. it was my understanding that you can send to a coroutine
'function' object has no attribute 'send
oh does it has to be the actual instance?
yes, the coroutine function is not a coroutine
only coroutines has send()
its really confusing
it used to be that they had the same name
yes, in theory coroutine means cooperative routines, and routines means functions
it is like using of 'generator' and 'generator function' for function that yields
it was a mistake in the documentation - technically an async def is a coroutine that doesn't support generators and the object it returns is a non-iterable generator
another quirk of history - a generator function is just a coroutine that doesn't support with
I think coroutine in python is a kind of generator
yeah, I'm talking about the meaning of the term "coroutine"
in 'common parlance' coroutine is the kind of routine/function. In python it refers to the object used to implement async def delegation
it's not supported to call create_task in that way - you need to keep a reference to the task you create
Can python do multithreading, or not? Because I keep finding tutorials about multithreading, yet I also saw people talking about the GIL and how that prevents multithreading.
or perhaps i'm mistaking it with multiprocessing
It can but not very efficiently
Python doesn’t necessarily have true multi threading but it attempts to emulate it while being held back by the Gil
Multiprocessing is a bit of an bypass because it spawns a whole new process with a whole new Gil and thus bypassing the Gil
But it’s very expensive for something like doing mass http requests or light work
what do you mean by 'emulating' it?
ah nvm, i think i found an explanation online
thanks
Depends on Python implementation, there is python-nogil project where you can use threads for parallelism
Threads aren't emulated they are real posix threads. Which means they show up to the operating system scheduler as threads
Some languages do emulate threads, sometimes known as green threads
I thought I had this but I'm confused again. Can someone clarify something for me, I can send to an object implementing the await dunder however I can't send to a function defined with async so how to I initially call the asynchronous function to run it until the point where the await statements get to one of those awaitable classes
Clarify is the wrong word, explain*
You have to call the function before you can "prime" it. If you tell me more about your usecase I can help you here. curio trio and anyio all implement this behavior and asyncio, twisted and tornado do not. In addition Tornado call and prime the generator before scheduling it
is their any difference of making a function a coroutine, like a simple function that doesnt do any async tasks, because await passes the coroutine to the event loop to manage it and if a function isnt it doesnt get managed by it, is it bad when that happens or does it not matter since the code under it would still get executed normally and if theirs other coroutines it would still get scheduled without a problem?
ah thanks for the offer but dont worry, i think (more like hope) that ive finally implemented it now, ofc its not comparable to asyncio or whatever but it should (might) work :)
one question though, how does asyncio and twisted work if not priming the coroutine?
from Asyncy import EventLoop, Sleep
from Socket import Socket
from time import time
async def main():
client = Socket()
await client.connect("127.0.0.1", 8080)
await Sleep(time() + 2)
await client.send(b"hello")
data = await client.recv(1024)
print(f"Received: {data}")
EventLoop.schedule(main(), time())
EventLoop.start()
this is the interface, the headache was getting the async sockets to work
If you have function without awaits inside, it won't pass control to the event loop even if it is a coroutine function
Making it async would just be misleading
It schedules the send(None) with call_soon
I don't get how that's different?
It's different from calling .send(None) then scheduling the coro later
So it schedules the send(None) and the execution of the coro at the same time but just at a later date?
No
Lib/asyncio/tasks.py line 120
self._loop.call_soon(self.__step, context=self._context)```
Calling send(None) on a coroutine object is running it
So it doesn't call send(None) but runs the Coro soon? How does the call soon method work if it's not calling send?
self.__step calls self._coro.send(None)
so it doesnt prime the coroutine but sends anyway?
sorry, this async stuff is really hard for me to wrap my head around
The first send(None)to a generator is special and called priming
but you said asyncio does not implement this behaviour? i think i misunderstood
Why does the multiprocessing module keep crashing my computer? Is it because I’m running more processes than there are cores on my computer?
no, that shouldn't affect it if you don't go into unreasonably high numbers (and the os doesn't limit you)
asyncio doesn't prime the generator before scheduling the task
it also doesn't wait for the generator to be primed before cancelling it
if you throw a CancelledError into an unprimed generator then it cannot suppress it
ahh got you, i think i do it the way asyncio does it, check it out:
https://github.com/Evorage0/Asyncy
@dull ocean you seem to know what your doing lol so any feedback from you would be greatly appreciated
Looks great tbh! I'd highly recommend you read the curio source code to see how it's done there
You have a bunch of fun bugs to fix ;)
😫 and after i thought i got it working as well
thanks for the response! ill look at the code now
oh no there's years of fun bugs here
I mean that when writing an event loop it's impossible to get it right first time - and if you're building one for fun, part of that fun comes from fixing all the bugs
can you show me one of the bugs, it makes me anxious that you can see some that i cant. of course, i know there are some things wrong with the code, for instance, it assumes you MUST pass in an already instantiated coro and i know some of the terminology vis-a-vis variable naming is iffy lol but apart from that, i dont see much else wrong with it. im not liking the match statement to see which event is being processed but i dont see how else you can implement it without a long if-else chain
Your time goes backwards sometimes
you need monotonic time
As Graingert mentioned, the function you're using can go backwards if the system clock is updated between calling it again. https://docs.python.org/3/library/time.html#time.time
Use time.monotonic() or time.perf_counter()
Hello, everyone
I'm doing some trivial tests with a ProcessPoolExecutor, but the results are driving me nuts. Consider this script that does some single-threaded compute, then does it again multiple times, but in parallel:
from concurrent.futures import ProcessPoolExecutor
import time
def do_work(x: int):
for i in range(100 * 1000 * 1000):
i ** 20
if __name__ == "__main__":
t_0 = time.time()
do_work(1)
single_run_delta = time.time() - t_0
print(f"A single run takes {single_run_delta}s")
num_parallel_tasks = 2
args = [1] * num_parallel_tasks
with ProcessPoolExecutor(max_workers=num_parallel_tasks) as executor:
t_0_parallel = time.time()
_ = list(executor.map(do_work, args))
parallel_delta = time.time() - t_0_parallel
print(f"{num_parallel_tasks} parallel tasks took: {parallel_delta}s")
I expected the single run and the parallel run to take roughly the same time, but I get something like:
A single run takes 39.55275535583496s
2 parallel tasks took: 47.86863374710083s
I understand that ProcessPoolExecutor has some scheduling/serialization overhead, but those several seconds worth of difference are definitely too much for that. Am I just hitting a CPU power limit or something? Does anyone have any ideas what this could mean? Thanks o/
time.monotonic: "The reference point of the returned value is undefined", it means i cant schedule things to happen at a specific time, i can only do things by offset to the current time
is there a way around this?
Use one of the other clocks which does have a reference time.
The reason is that it's possible for a number of reasons (including the user doing it manually) for the system time to be moved backwards, so it's no longer monotonic.
each of your parallel tasks is doing the same work
so when you run the two jobs in pararallel, you're doing twice as much work as when you run the single task, but it only takes ten seconds more. That's pretty good.
also my numbers are a little different ```
python3 async-and-concurrency.py
A single run takes 35.84136199951172s
2 parallel tasks took: 36.92131781578064s
I'm doing twice as much work, true, but I'm also using twice as much resources, so it just feels I wasted 10 seconds. The parallel tasks are running for 20% longer than the serial one, which is not nothing.
Your numbers look far more sensible, though
maybe the parallel measurement also includes subprocess launch time? try repeating the tests and taking average.
I imagine that because t0 is taken inside the context manager, that the subprocesses would have already spawned
I am not much sure about that
I guess its lazy spawned as demand increases until hit max
hello i wanna ask question which form of threading is best for concurrent tasks
It really depends on what your tasks are doing. If they are doing IO (especially network requests), then asyncio and Twisted are great choices, followed by standard threading. If they are doing CPU-intensive work, multiprocessing would be the approach I recommend
You can also do CPU-intensive work with threads, but unless you're running C code that releases the global interpreter lock (GIL) it won't bring you any performance gains.
for some reason py async with aiofiles.open(filenameis, "r", errors="backslashreplace") as f: async for email in f: await worker(email) is much much slower than py with open(filenameis, "r", errors="backslashreplace") as f: for email in f: await worker(email)
any ideas as to why?
using the second one also fires off the output from py asyncio.run(..., debug=True) whereas the 1st one doesnt
It's slower because you have more trips around the ThreadPoolExecutor and event loop
What's async def worker() doing?
https://trio.readthedocs.io/en/stable/reference-io.html#asynchronous-filesystem-i-o is the best explanation for what's going on here
from asyncio import sleep as aiosleep, run
from time import sleep as block, perf_counter as pfc
async def foo():
block(1) # Blocking work
await aiosleep(1) # Suspended
async def measure(coro):
u = pfc()
ret = await coro
print(f"Time spent blocking: {pfc() - u}, Suspended: ...")
return ret
async def main():
c = foo()
await measure(c)
run(main())
# Time spent blocking: 2.0188845000229776, Suspended: ...
# Which is incorrect because 1 second is spent blocking and 1 second is spent suspended.
# How to get how much time is spent suspended for a given coroutine?
Ie, how do I measure how much time is spent waiting for coroutines?
Hey is there anyway to collect concurrent post requests and process them one by one ?
You can make a collections.abc.Coroutine that measures send time
is there good material for python asyncio? (apart from python docs)
see pinned message
I'm building a project that requires concurrent access to a synchronous method and I'd like each async call to this synchronous method to 'queue'.
Is there an easy way to do this?
More than anything, I just need to make sure nothing starts running the method twice at the same time.
I realize now what I'm describing is a lock
Go on, why is concurrently running this method to be avoided?
You don't want out of order calls? That means there's temporal coupling at play.
So you want to try solving that if possible instead of slapping a bandage (a lock) on it.
I think they just need a common lock
If this method is synchronous, it cannot be concurrently accessed.
gotcha
finally had some time to reply, im just taking in a string and then doing lower and strip for now
Ok so don't make it an async function
im eventually going to use async funcs which is why it's there atm
or do you mean just py def worker()?
Yes, if you don't await anything then don't make it async def
so i am trying to exit my asyncio loop eventually this code executes inside my exit function ```python
started_from = asyncio.current_task().get_coro().name
print(f'\nHello from exit (started from: {started_from})')
tloop = asyncio.get_running_loop()
for task in asyncio.all_tasks():
tname = task.get_coro().name
if tname in [started_from]:continue
print(f'Cancelling Task: {tname}')
with contextlib.suppress(asyncio.CancelledError):
task.cancel()
task.cancel() won't raise asyncio.CancelledError
I think you need to use whatever network API directly rather than relying on a utility library that messes with cancellation
It's a SPI interface on a microcontroller.
Sorry for not responding, I kinda closed my (personal) laptop. Don't have Discord on my work laptop.
Essentially, it needs to write bytes to a port on a single threaded microcontroller. If they tried to access simultaneously, I don't know what would happen. So I'd like to avoid it.
However, the places at which writing will occur come from two different sources - a mainloop on a timer and a MQTT subscription handler.
I'm honestly not really good/practiced at async and preferably I need a solution that won't require me to learn anything or pray for things to magically work out - so I'm tempted to drop any kind of asynchronous crap in favor of whatever works for now (on a deadline)