#async-and-concurrency

12 messages · Page 77 of 1

low tapir
#

yes infact it is already integrated in so many lines of code, that I really need to fix this issue

dull ocean
#

I'd recommend just writing your own and use some async websocket library directly

low tapir
#

I never knew about this error when I initially started using it

#

Wait can I just copy paste it like this into my code and then just do asyncio.run(main()) instead of doing connector.start() will that give me a handle for later on from inside the event loop being able to just end it?

severe lynx
#

You have that stop event right?

low tapir
#

Which stop event? How do I know, if I have the stop event or not

dull ocean
#

You just return from that function and asyncio.run will stop the loop for you

severe lynx
low tapir
#

Yes but connector.close is not closing the event loop I already tried calling it

severe lynx
#
import asyncio
from lcu_driver import Connector


connector = Connector()
_close = asyncio.Event()

@connector.close
async def disconnect(connection):
    _close.set()


async def amain():
    # Some of the below code is copied/adjusted from
    # Connector.start() without the code which runs the loop
    process = next(_return_ux_process(), None)
    while not process:
        await asyncio.sleep(0.5)
        process = next(_return_ux_process(), None)

    connection = Connection(self, process)
    connector.register_connection(connection)

    task = asyncio.create_task(connector.init())

    await _close.wait()
    task.cancel()

    try:
        await asyncio.wait_for(task, 0)
    except asyncio.TimeoutError:
        return

if __name__ == '__main__':
    asyncio.run(amain())
dull ocean
#

Better to do await asyncio.wait_for(task, 0) and catch the TimeoutError generally

low tapir
#

do I even have to call disconnect itsself or can I just call _close.set()

severe lynx
#

The library calls disconnect when the connector closes

#

If you want to spontaneously close everything, you can call _close.set()

tidal phoenixBOT
#

lcu_driver/utils.py line 15

def _return_ux_process() -> Generator[Process, None, None]:```
severe lynx
#

Ah I didn't want to change too much, but you should be able to replicate it with: ```python
from psutil import process_iter

...

Inside of amain()

found = list(filter(lambda p: p.name() in {'LeagueClientUx.exe', 'LeagueClientUx'}, process_iter()))
while not found:
await asyncio.sleep(0.5)
found = list(filter(lambda p: p.name() in {'LeagueClientUx.exe', 'LeagueClientUx'}, process_iter()))

process = found[0]

#

Essentially that copies the implementation into amain(); I took the liberty to make it a bit shorter and skip the generators

#

Oh I just realized, filter() returns an iterator right

low tapir
# severe lynx ```python import asyncio from lcu_driver import Connector connector = Connecto...

now it has an issue with python connection = Connection(self, process) because first off Connection( was not defined so I imported it (from: https://github.com/sousa-andre/lcu-driver/blob/35da052e47807e6ea7a246093778c58a43f2210e/lcu_driver/connection.py#L17) and then it still kept saying self there and self is not defined. So then I tried removing self and I ended up with what you see on this screenshot

tidal phoenixBOT
#

lcu_driver/connection.py line 17

class Connection:```
severe lynx
#

AAhh I thought I changed that, self should be connection. Hmm I need to start doing this in my editor so that Pylance can scream at me 😅

low tapir
#

and I can't use connector either, because AttributeError: Connector Object has no attribute register_connection

severe lynx
#

Sorry it is getting late, I'll step away. I meant the global, connector right? There's way too much differences in such small variance of variables haha

tidal phoenixBOT
#

lcu_driver/connector.py lines 40 to 41

def register_connection(self, connection):
    self.connection = connection```
severe lynx
#

Hm?

low tapir
#

oh lol it was the next line that errored

#

but why

severe lynx
#

I am not sure what to say, I am looking at the source and it is right there

#

Can you Ctrl + Click on Connector on like 2299 to go to the source? Compare it to what is on GitHub

low tapir
#

i think it is because connection and connector classes are defined in lcu_driver/connector.py but also in lcu_driver/connection.py

#

i am mixing up to import them correctly

#

actually i am confused myself

severe lynx
#

We're both confused, I am still not sure why you don't just reimplement the library and give the access you need with a bit of better design

#

It's a super small library

low tapir
#

you mean like taking a local copy of it and then editing it there right

#

all my edits to it, caused me to get more brain damage from errors

#

it's not like it's easy to locally modify the source without really knowing what you're doing with async

severe lynx
#

Everything? Remove it as a dependency. Create a folder in your project and redesign it, keeping the names and attributes that you use the same so that you don't need to change much other code.

low tapir
#

Yes man, I already tried doing that but as i just mentioned I have no clue how to edit it so that it just closes that event loop with connector.stop() or connector.close() or whatever (since that is the function that is broken -> https://github.com/sousa-andre/lcu-driver/blob/35da052e47807e6ea7a246093778c58a43f2210e/lcu_driver/connector.py#L74)

GitHub

🐍 Python3 interface for League of Legends Client API - lcu-driver/connector.py at 35da052e47807e6ea7a246093778c58a43f2210e · sousa-andre/lcu-driver

#

ah yes

#

@severe lynx the pypi package's connector class does not even have the same source as the github code

severe lynx
#

Ah-

#

Odd, so it hasn't gotten a release in a while then

low tapir
#

i could cry

#

I'll just download the one from github and put it into my local folder and then import it from there

#

it would be funny if the github version would work out of box, while the pypi package is just a outdated broken version

#

funny and upsetting

low tapir
#

the way you usually start the connector is by doing connector.start() but even that didn't work for a replacement of conn.init(). becaue then it says Connection.init was never awaited even when I add await infront of it

severe lynx
#

The fact that the GitHub version is so different from the PyPI makes it very difficult to do much

low tapir
severe lynx
low tapir
#

I downloaded it from github and importing it from the local folder

#

and I still can't do conn.init()

#

the error on the screenshot above is from the github version

severe lynx
#

If you're still getting the has no attribute 'init' then I am not sure you have done so correctly? I do really need to go to bed now, sorry.

low tapir
#

@severe lynx but just the line after that i got the next error

severe lynx
#

You will probably need a dirty work-around that delays the instantiation of the Event until you call asyncio.run

low tapir
#

can i just remove the function start or something

#

or in start / in that wrapper say get running loop?

severe lynx
#

I usually don't get this issue 🤔 ```pycon

import asyncio
e = asyncio.Event()

async def task():
... e.set()
...
async def main():
... t = asyncio.create_task(task())
... await e.wait()
... await t
...
asyncio.run(main())

low tapir
#

what do I do

#

i just tried editing start to instead do self.loop.get_running_loop(connection.init()) but that also did not work

#

actually wait i am nto even sure if start got triggered

#

ah yes you were talkign about a event

#

yeah no clue

low tapir
#

can we not just do a dirty workaround but instead just edit the github source so that it stops creating the event in first place

#

friend pls help 🙏🏻

severe lynx
# low tapir how?
_close = None

async def amain():
    global _close
    _close = asyncio.Event()

    ...

This would only create the Event when the real event loop is running really

low tapir
#

even though the event already long started (the leagueclientux.exe process)

#

do you think it's because of the generator stuff you were talking about before

severe lynx
#

The event is set to signal to exit

low tapir
#

but man it is not executing the right code

#

the connection.init() is not doign what it is supposed to

#

like it's not initialising as if i was calling .start

#

the functionality is entirely missing I just end up doing nothing, lcu driver is not starting and I'm waiting infinitely at that wait

#

the issue was the generator stuff

#

I replaced the lambda stuff with the original code and now it started

#

@severe lynx did starting it as a task somehow break the requests library?

#

ahh actually it's not requests

#

connection.request

#

that's actually apart of lcu driver

#

how come I can initialise it now but I can't use status

severe lynx
#

Did you maybe update/downgrade AioHTTP? I think this night be because it now expects you to do async with? Seems odd, not sure what's going on other than what I suggested.

low tapir
#

What did you even suggest

#

do you think this is because on the github source, he wrote the connection.request stuff differently?

#

then what he had written on the pypi one, so now I can't use that part anymore without merging it back from the pypi stuff to the github version I am using now

low tapir
#

apart from that _close.set() does not even work, I can't trigger the event to close. seems like for some reason everything is getting ported through aiohttp's requestcontextmanager to the point I can't even return out of my exit function without an error

rocky spade
#

Once i turn off the devices, gather will get stuck with the notify task and never leave it. As the script is stuck i am not able to clean up and cancel the awaited futures.

#

Wouldn't the same happen to using anyio?

Update: Its also resolving in an endless loop.
I am starting to think that there is no solution. The code just gets stuck.

Might have to change the approach

dull ocean
#

anyio implements level cancellation

#

So once you cancel your task group it will keep cancelling tasks until they finish

rocky spade
lusty hedge
#

Can someone explain me how discord py handle callbacks?
How do I callback a function with the name that I give as an input

example:

@bot.on_message
def something(ctx):
  pass
@bot.on_message
def something2(ctx):
  pass

and have a way to only respond to one depending on the input
(sorry I dont have any async yet in my code. but I guess that is what I want. that is why I'm asking it here)

#
def on_message(self, func:callable):
  while True:
    req = str(input("User: "))
    if func.__name__.startswith(req):
      func(req)
#

something like this

dull ocean
rocky spade
#

Meaning i cannot call tg.cancel_scope.cancel() as the code is stuck in the permament loop before it can be executed

rocky spade
#

Any framework for asynchronous live graph you can suggest?

Cant make matplotlib update the graph

low tapir
#

how can I hardset that asyncio can only run 1 task of the same kind at the same time?

#

I have this one task, that manages to launch at two points in my code at the same time even though I tried so hard to not let it do that by checking if the task is running already and stuff but it still launches

low tapir
dull ocean
#

Honestly it's tough lol

hollow badge
#

people on discord all tend to look the same

#

just little rows of letters

dull ocean
#

Ah yes, that's it I'm used to hash coloured nicks like in element.io and irccloud

low tapir
#

if I want to freeze the code execution at the creation of a task, where do I place my await? infront of asyncio.create_task or infront of the actual function that is supposed to be started as task? so like: asyncio.create_task(await myfunction())

#
  1. await asyncio.create_task(myfunction()) or 2. asyncio.create_task(await myfunction()) ?
severe lynx
low tapir
severe lynx
#

Why even start a task then? Just do await myfunction() like usual

low tapir
#

by just looking at asyncio.all_tasks() and then checking if it is running already, so that there's no way for the rest of my async code to start the now task twice

#

ah nevermind it is still starting twice

severe lynx
#

Why not use a lock? You're using asyncio.all_tasks() for all the wrong reasons

#

I am pretty sure asyncio.all_tasks() can only reasonably be used for debugging

low tapir
#

how can I set a lock then, so that every task that is running can only be ran once simultaneously no matter how many times it is trying to get started

#

it's not like i ever knew about async locks

severe lynx
#

If you're trying to start the task, but one is already running, what behaviour are you looking for?

low tapir
low tapir
severe lynx
#
# Inside of globals, or somewhere which all points reach
lock = asyncio.Lock()

# Where you start the task
if not lock.locked():
    t = asyncio.create_task(...)

# Inside of the task
async with lock:
    ...
low tapir
#

do I ever need to remove the lock again or does it auto remove when the task is done?

#

also can I still cancel the task manually to also get rid of the lock?

severe lynx
#

The async context manager (async with) ensures that the lock is correctly released

dull ocean
#

You don't need a lock here though, you only need to keep track of the task you created and not create another one

agile pike
#

hey, why should i use job queues like rq, celery etc instead of running the jobs somewhere on a python thread?

dull ocean
agile pike
dull ocean
#

Not necessarily

#

Starlette/FastAPI has a nice background task system

#

A lot of people reach for celery/rq unnecessarily

marsh orchid
#

002_sad Never had good experiences with Celery

dull ocean
#

Celery is pretty great when you need it

marsh orchid
#

Unless you need like, one task running at most across several processes which aren't aware of each other, then probably not required

dull ocean
#

But terrible if you don't need it

marsh orchid
dull ocean
#

That's probably your own fault

marsh orchid
#

Mmm definitely shouldn't have been, was only a cron job scheduled at midnight and once per hour

#

I it was my fault bloblul Then not sure how I feel about it being super easy to accidentally get it trying to consume until it OOM

dull ocean
#

@agile pike basically it depends on precisely what, how and where you're running tasks

#

Well if you didn't report a bug with a minimal reproducer it's your fault xD

marsh orchid
#

😅 Should have issued a bug report but alas, other priorities got in the way

agile pike
dull ocean
#

You can use a background fetch on your client and run the task entirely in a request

#

Then the client knows if the process finished and your server can cancel it if the client disconnects

#

You can even stream progress to the client

agile pike
dull ocean
#

No

#

You'd stream the image to FastAPI from JS with fetch

#

And stream the image from your FastAPI to imgur in a normal foreground async function for the lifetime of the request

#

You need to make sure your proxy doesn't buffer requests

#

Although I'm surprised imgur doesn't have a pre-signed upload API - where you create a URL to upload to on your server, send it to the client, then the client uploads to imgur

agile pike
#

ahh nice thanks! im going to check that out

dull ocean
#

Can the imgur api download from an s3 url?

#

(are you on aws?)

agile pike
dull ocean
#

Google cloud probably has a similar thingy

agile pike
#

yeah im gonna check it out it looks interesting to use

dull ocean
#

Get the client to upload to a bucket of some sort then tell imgur to download from it

agile pike
#

yeah seems like google cloud has signed urls also

dull ocean
#

imgur should have a facility to do this too

#

But I can't see it in the docs

agile pike
#

yeah i was also checking the docs but couldnt find it, ill take a better look at it tomorrow

#

and how would you recommend me to do scheduled tasks?

dull ocean
#

Depends

#

What, how and where your tasks run

#

And how often

#

Pretty sure Google cloud functions has a facility for chron jobs

agile pike
#

every hour its making a few api requests for all users tho it can take like 5 min to complete, and the user can also schedule certain functions btw thatll run just once

agile pike
dull ocean
#

Yes

agile pike
#

okay thanks im gonna check that out

dull ocean
#

But also you might not even need it

#

A last updated timestamp in the database with on access refresh might be all you need

low tapir
#

does anyone know how to async listen for/wait until a process has started (without constantly polling)?

gilded plover
#
await asyncio.wait_for(...)

Or using asyncio.Event

low tapir
devout harness
low tapir
devout harness
#

Is it a process you start?

low tapir
devout harness
#

I'm not sure then. I would think it's OS-specific if there even is anything that would notify when a process starts.

low tapir
#

I'm on windows, with python 3.8.10

devout harness
#

I don't know. If you want to avoid polling you can research how to be notified of process creation/start on Windows.

#

That's basically what I just did for Linux above and found something in a minute

low tapir
#

I've researched already and afterwards asked my question in this discord since I've not found satisfying results to my search

devout harness
low tapir
devout harness
#

Pretty sure I've seen a Python library for WMI before

#

Maybe it was just pywin32

low tapir
#

That's not the point, there's multiple libraries for access to WMI, but they're all limited and do not provide access to the entire environment of WMI

#

also all the libraries are based on using ctypes mostly, which is the reason why I wrote not accessible with ctypes

#

Like if it was possible with python, I would want to just interrupt CreateProcess through injecting my own dll into kernel32.dll that hooks it and then I would basically know whenever anything launches, but how do you do that ever with python. like this is a example for that https://github.com/vmcall/ayyxam/blob/master/ayyxam/handler.cpp#L53

GitHub

Bypass for The Digital Exam Monitor developed by the Danish Ministry of Education (Den Digitale Prøvevagt) and ExamCookie - ayyxam/handler.cpp at master · vmcall/ayyxam

devout harness
#

If that's the case perhaps you could use WMI through a Python extension?

low tapir
#

nah that's over complicating my use case way too much, I was searching for easily applicable thing through python

devout harness
#

Which part of WMI do you need access to that is missing from Python libs?

low tapir
devout harness
#

Using this approach is effectively polling too though so I'm not sure if it's worth the trouble in your case.

#

There is an async version of the WMI event API but I don't think that Python library wraps it

severe lynx
#

Can't you use a multiprocessing event, then setup either a callback, or a waiting thread, which finally sets an asyncio Event after the multiprocessing Event is sey

#

Oh I see it looks like you hsve absolutely 0 control over this process

dull ocean
low tapir
nova plover
#

guys is this

await asyncio.gather(*[aiosmtplib.send(msg, **config) for msg in messages])
#

Equal to this?

async def send_msg(msg):
    await aiosmtplib.send(msg, **config)


await asyncio.gather(*[send_msg(msg) for msg in messages])
nova plover
devout harness
#

Yes they're equivalent. Gather runs them concurrently.

#

The await on that line will not make them run serially

dull ocean
#

Depends if send returns a Future or a Coroutine

dull ocean
#

Oh also you forgot the return await

#

So any return value will be garbage collected at a different time

vernal spire
#

is how concurrency works the same in python as golang ?

hollow badge
#

no

severe lynx
vernal spire
severe lynx
#

If you want to learn more about those, you can search around about green threads

#

Stackless and gevent are both, different types of green threads

#

Async is not green threads

vernal spire
#

ok thanks

dull ocean
severe lynx
#

Yeah but I don't think CPython will add native green threads

dull ocean
hollow badge
#

I know it was you, thredo

dull ocean
hollow badge
rocky spade
#

is there a simple way to finish a coroutine upon condition instead of using an event?
Looking to use if(cond): await foo
But thats not working because foo will run forever as it contains a notify function

I could gather and add a event handler function i guess but that seems overkill imo

rain atlas
#

can you wrap it in a task and cancel it conditionally ?

rocky spade
dull ocean
#

Is notify doing blocking IO? You need to run it in a thread or refactor it to wait for IO using the event loop

rocky spade
dull ocean
#

When do you want to cancel it?

stark forge
#

Got a question. I am trying to run asyncio.create_subprocess_exec with the ability to do things with the stdout/stderr as the subprocess is running (not after). Something like "call this function every time you get buffered output". How do I go about doing this?

#

naturally, the moment I ask a question, I find something promising in a blog post

dull ocean
#

You don't want readline that waits for a whole line

stark forge
#

yeah I think you'd want to wait for any output at all and buffer it yourself

dull ocean
#

But also stream sockets don't provide framing

stark forge
#

"don't provide framing"?

#

i recognise the terminology but I'm unsure how it applies to this situation exactly

#

like....yeah, streams don't provide you a delimiter, but that's...fine?

dull ocean
#

If the other end calls stream.write(chunk)

#

You might only get half the chunk at the other side

#

Or nothing

stark forge
#

where does the chunk fall through?

dull ocean
#

And you might need to wait for the other end to call stream.write(chunk); stream.write(chunk2);

stark forge
#

or flush?

dull ocean
#

even then

stark forge
#

oh i see what this code is doing, yeah this code is incomplete

#

still, the core conceit of awaiting on stream.read seems like it could work?

#

oh of course this could've been avoided if I hadn't misread the python docs in the first place

dull ocean
#

Yeah I think it's supposed to be a read1

dull ocean
#

You also have to be very careful with process streams as if you don't have a task per stream it can dreadlock

rocky spade
placid current
#

Hello! There's no "Schedulers" chat, so I'll ask here. What's the best scheduler to run along your main app in Flask or other frameworks? We're running APScheduler in production, it keeps dying randomly or losing jobs on re-deployment. Using persistent mongo storage

#

Of course, I tried fixing it, but so far only found discussion on its github and other places of other people having same issues.

tough blade
#

Need to have a shared objects between programs on Windows... any ideas?

dull ocean
tough blade
#

1st program has access to interface to 2nd programs class interface. The second program currently is merely a monitor which periodically reports the state values hopefully changed by the 1st program.

dull ocean
#

Why not do the reporting from the same program?

oblique yew
#

anyone experienced with httpx? my code is breaking when i try to run 12 + clients as coroutines

abstract zinc
molten comet
#

if your project already has aiohttp as dependency and you just need a websocket client, aiohttp.ClientWebSocketResponse is fine

abstract zinc
#

alright thanks

atomic lava
#

can you use the shelve module asynchronously

#

because im caching some data with shelve but i want it to run asynchronously

dull ocean
atomic lava
dull ocean
#

You can use threads with all good event loops

#

Asyncio has asyncio.to_thread

gilded plover
astral verge
#

if your code has a producer consumer workflow, how do you name the python file that deals with creating threads and launching workers (contains worker classes and routines for synchronization, etc)?

rancid ember
#
import asyncio


async def counter(name: str):
    for i in range(3):
        print(f"{name}: {i!s}")
        await asyncio.sleep(0)


async def main():
    tasks = [asyncio.create_task(counter(f"task{number}")) for number in range(4)]


asyncio.run(main())

Output:

task0: 0
task1: 0
task2: 0
How does the main coro yields control when there's no await?

rancid ember
dull ocean
tidal phoenixBOT
#

Lib/asyncio/runners.py line 44

return loop.run_until_complete(main)```
dull ocean
#

And that works by using asyncio.create_task(main).add_done_callback

#

And a task effectively does an await asyncio.sleep(0) before calling callbacks

rancid ember
#

Why this doesn't output anything and yield control as above?

import asyncio


async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)


async def main():
    tasks = [asyncio.create_task(say_after(number, "hi")) for number in range(4)]


asyncio.run(main())
dull ocean
#

It does - it's just that asyncio.sleep raises asyncio.CancelledError

rancid ember
#

Interesting, moving print to the top generated an output like this:

hi
hi
hi
hi

#

Fuck, this whole asyncio shit is too paradoxical. :P

dull ocean
#

?

rancid ember
#

I'm struggling. :P It mostly creates unexpected outputs and behaviors for me.

dull ocean
#

I'd recommend using a task group instead of using asyncio.create_task

rancid ember
dull ocean
#

Eg async with anyio.create_task_group() as tg:

#

Then you use tg.start_soon(async_fn)

rancid ember
#

Thanks. Will keep being overwhelmed with asyncio's weirdness. :P

dull relic
dull ocean
#

You should only call it once

dull ocean
dull relic
#

Alright, I'll look into this tommorow. Thanks!

clear mountain
#

I see quite a few of:
DEBUG:asyncio:<asyncio.sslproto.SSLProtocol object at 0x7f0af3783e20>: SSL handshake took 12911.3 ms when I have logging turned up to debug. Is this unexpected?

dull ocean
#

Try running in -X Dev mode?

proper crow
#

is it possible to run async functions after certain thing happens?

sync function()

after and only after sync function runs

async func1()
async func2()
async func3()
``` ?
dull ocean
proper crow
#

so for example async func1 will use values coming from the sync function, so if async func1 runs before sync function finishes running, it will return an error

#

cause some data will be missing

#

meaning I need async func1 (actually all my async funcs)to wait till sync function finishes running

#

cause in my code, certain funcs (that are now sync but should be async) could be running when certain sync funcs finish running

#

if it makes sense

dull ocean
#

So you just call

function()
await func1()
proper crow
#

so basically await waits for something to finish and then it executes whats next

#

ok yeah I'll go watch async tutorials 🤣

#

tysm

wind drum
#

Hello, folks! New here! May i ask a question here? Noone came to help me on the help channel. Im doing something asyncio related, but i'm having trouble even before asyncio part starts 🥶

#

win32com / pythoncom stuff

dull ocean
#

Yes

wind drum
#

I need to create multiple instances of finite element pre/post processor called Simcenter Femap (Siemens Femap) and pass a function into each of the instances for some asyncio fun.
The issue is that when I try to create the instances the actual app windows never open. I set the number of windows of create and the processes ‘femap.exe’ are actually being run – I can see the correct number of them in task manager. But after like 3 or 4 seconds they get terminated and I have no idea why. Tried using ‘win32com.client.Dispatch’ (which is not the correct one to use here), ‘win32com.client.DispatchEx’ (which is actually used for multiinstances as far as I know from google search), ‘pythoncom. CoCreateInstance’. The last 2 work as I described earlier.
Any ideas? I guess it would be helpful to catch some error code / event that’s happening?
Here’s the function to create instances If you need it.

#

win32 = win32com.client and there should be DispatchEx on the screenshot, sorry...

#

another thought i have is that this function is actually have to be async too. i'm issuing for example 4 commands to run 4 instances, but as far as i know python is non-concurrent by default so it has to wait until the command passes but there's some error here because the app takes several seconds to start?... just a thought

dull relic
rancid ember
#

@dull ocean How did you improve yourself in asyncio? From which resources? Any prior async experience?

dull ocean
#

And I read the curio source code

rancid ember
#

Any prior async experience?

dull ocean
#

Oh mostly from JavaScript

#

Eg looking into regenerator runtime

atomic lava
#
import pygame
from pygame.locals import *
import asyncio
from sys import exit
from random import randint


class Game:
    def __init__(self):
        pygame.init()
        self.screen = pygame.display.set_mode((640, 480))
        pygame.display.set_caption('My Game')
        self.clock = pygame.time.Clock()

    @staticmethod
    async def loop(event_queue):
        while True:
            await event_queue.get()
            event_queue.put(pygame.event.wait())
            event_queue.task_done()


    @staticmethod
    async def update():
        pygame.display.update()
        await asyncio.sleep(0.01)

    async def events(self, event_queue):
        while True:
            self.clock.tick(60)

            event = await event_queue.get()

            if event.type == QUIT:
                pygame.exit()
                exit()

            self.screen.fill((0, 0, 0))
            await self.update()
            

async def main():
    game = Game()
    event_queue = asyncio.Queue()
    task = asyncio.create_task(game.loop(event_queue))
    await event_queue.join()
    task.cancel()
    await asyncio.gather(task, return_exceptions=True)
    await game.events(event_queue)


if __name__ == "__main__":
    asyncio.run(main())
#

can someone help me with connecting the events from pygame to the event_queue

dull ocean
#

You need await asyncio.to_thread(pygame.event.wait)

atomic lava
#

is that correct or?

dull ocean
#

You need to await event_queue.put(...

#

Or use put_nowait

atomic lava
#

ah

atomic lava
# dull ocean Or use put_nowait
# so this?
@staticmethod
    async def loop(event_queue):
        while True:
            await event_queue.get()
            event = await asyncio.to_thread(pygame.event.wait)
            event_queue.put_nowait(event)
            event_queue.task_done()
plucky glen
atomic lava
#

ic

dull ocean
#

But you probably don't need task_done or join

dull relic
still lark
#

Hey everyone! I would like to know how should I design something like this, and whether asyncio.Events would be suitable for this.
I have a socket connection over TCP, and let's just say each packet has its own ID. Every now and then in the code I would want to wait until another packet of id A arrives. Of course, in the function which reads and parses packets, if it detects a packet with id A, it could trigger an event. But, i don't need to do this for every packet A, only sometimes. I thought of something like this: ```py
self.packet_A_event = asyncio.Event()``````py
data = await self.read(1024)
packet = parse_packet(data)
if packet.id == 'A':
self.packet_A_event.set()
self.packet_A_event.clear()

# somewhere else
do_something()
await self.packet_A_event.wait()
do_something_else```However, this is not very practical for me because I also need the packet data at the point where I wait for it. How can I implement this behaviour?
dull ocean
#

you might be best off with a queue for each packet type

still lark
#

I have specific packet handlers for each packet type, and they get called as soon as possible (more precisely, they get scheduled as tasks, internally I don't care what they do, they don't have to be awaited)

dull ocean
#

Oh just schedule them from your socket read loop

still lark
#

yeah, that works, but I want to give signals whenever specific type of a packet is received

dull ocean
#

You can give those signals from your packet handlers

still lark
#

exactly, that is what I'm trying to do
would asyncio.Event s be appropriate for that task?
(similar to the code that i sent above)

dull ocean
#

Well the way you have it setup is that it's easy to skip events

#

So you'll have to tell me a bit more about your usecase

#

But a Queue is usually better - but it sounds like you want a broadcast channel?

still lark
#

i basically need a function which blocks until a packet of specific type is received

#

I don't understand how a queue would be used; you mean if something is currently waiting for a packet, a listener/handler would put it to the queue? If nothing is waiting for a packet then it has to be skipped

minor plinth
#

im quite new to asycronous stuff and im having issues getting some code running using mcstatus's async_lookup when i run py server = JavaServer.async_lookup(address=ip, timeout=5) print(await server) i get <mcstatus.server.JavaServer object at 0x000002127DE42920> which im assuming means it has actually done the server lookup i just have no idea how to access the data that should be shown

#

if using JavaServer.lookup instead of the async i can just call server.status() however running with the async gives me the error 'coroutine' object has no attribute 'status'

minor plinth
#

there is no traceback

dull ocean
#

Show a screenshot?

minor plinth
#

there is no traceback im simply asking how i would retrive the data from the object that is returned

#

or is that object literally just the definition of server

#

there is literally no documentation for this section of the module/library/whatever so im going in blind

dull ocean
#

What do you have if you have no traceback?

#

Show the full code you wrote and what happens when you run it

minor plinth
#

I've already shown the code and what happens when I run it

dull ocean
#

Your code doesn't have status in

#

So can't raise that error message

still lark
tidal phoenixBOT
#

mcstatus/server.py line 87

async def async_lookup(cls, address: str, timeout: float = 3) -> Self:```
minor plinth
#

where does asyncio.create_task run in another thread?

minor plinth
dull ocean
#

It schedules it on the current event loop

minor plinth
dull ocean
#

I wrote a simple demo program that just schedules coroutines with no IO or time here #async-and-concurrency message maybe messing with that will help explain it?

#

But yeah if you pass a blocking loop inside of the coroutine it will block every other coroutine - except the current one

minor plinth
#

so im kinda confused ngl😅

dull ocean
#

What is it?

#

Show your code?

dull ocean
minor plinth
dull ocean
#

I can't understand your question, you didn't provide enough information. Can you show some code that shows the issue you have?

minor plinth
dull ocean
#

That's not a blocking while loop

minor plinth
#

well what does it block? because on execution if you called it normally it would not execute anything else?

dull ocean
#

That's actually even more insidious issue - it's warm rescheduling

#

Try repeating it in the event loop code I wrote?

#

You need to call asyncio_sleep(0) instead of asyncio.sleep(0) and loop.create_task instead of asyncio.create_task - but it will all work the same

#

Just replace the code in async def amain(loop) :

minor plinth
dull ocean
#

Yes

#

asyncio.create_task doesn't call coro.send

minor plinth
dull ocean
#

tornado's gen.coroutine does call coro.send before scheduling the coro so it's a bit odd in that way

knotty bramble
#

Hi, guys. Why below code will get error?
I want to create a process to do some auxiliary works. It will still work some time even main script exit, so I could not use join on it.

import multiprocessing as mp

def foo(q):
    print("hhh")

if __name__ == '__main__':
    mp.set_start_method('spawn')
    q = mp.Queue()
    p = mp.Process(target=foo, args=(q,))
    p.start()

error:

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/Users/sienna/opt/anaconda3/envs/eflow/lib/python3.9/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "/Users/sienna/opt/anaconda3/envs/eflow/lib/python3.9/multiprocessing/spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
  File "/Users/sienna/opt/anaconda3/envs/eflow/lib/python3.9/multiprocessing/synchronize.py", line 110, in __setstate__
    self._semlock = _multiprocessing.SemLock._rebuild(*state)
FileNotFoundError: [Errno 2] No such file or directory
dull ocean
#

Also you don't need to use set_start_method, use context = mp.get_context("spawn")

dull ocean
knotty bramble
dull ocean
#

You do all the stuff you want to do then call join

knotty bramble
#

Emmm. You are right, I should have messed up something. I'm reading a large library, it start a process do some work. I just remember it called join in another place. Thank you for help!😬

dull ocean
silk fern
#

Is it unrecommended to use async and threading at the same time?

hollow badge
#

heh

#

I personally avoid both whenever possible

silk fern
#

things like this ```py
async def _run(self) -> None:
pass

def run(self):
    self.loop.create_task(self._run())
silk fern
hollow badge
#

glad I'm not you, then

silk fern
#

thanks

hollow badge
#

honestly I can't think of a reason why you'd ever need both. iiuc, they solve roughly the same problems

#

if I were you I'd use whichever one you're forced to use (probably async)

#

try to get 'er done without the other one

silk fern
#

but i need multiple selenium instances running in multiple different threads

hollow badge
#

that is way, WAY above my pay grade

silk fern
#

why am I having to do it when my pay grade is lower 😢

silk fern
#

I'm going to rephrase my question

#

I want to be able to run asynchronous functions similiar to threads

gilded plover
molten comet
#

that stackoverflow solution both use async code, they just dont have the async keyword

#

ah no they used process pool

foggy reef
#

im implementing my own event loop and i dont understand this error:

            if not iscoroutine(event[1]):
                event[1]()
RuntimeWarning: coroutine 'main' was never awaited
  event[1]()

surely if its not a coroutine then it doesnt need to be awaited?
note: im using the inspect modules iscoroutine

molten comet
#

anyway you need asyncio only if you have async IO code

molten comet
#

coroutine is created when a coroutine function is called

foggy reef
#

that is the function im using
from inspect import iscoroutine

#

is that the only way to determine if a function is asynchronous or not then?

#
async def main():

its definitely asynchronous, so why is the if statement triggering 🤔

gilded plover
#

iscoroutine is not iscoroutinefunction

#

iscoroutine checks is given object coroutine
iscoroutinefunction checks is function returns coroutine

foggy reef
#

ah, i didnt realise they were seperate methods

gilded plover
#

async def main(): is coroutine function, it is a function that returns a coroutine

vast jasper
#

If your website isn’t very security heavy then maybe look at playwright

foggy reef
#
            if not iscoroutinefunction(event[1]):
                event[1]()
                if event[2]: event[2]()
                continue

            try:
                command = event[1].send(None)
            ...

new error 😄 'function' object has no attribute 'send'
how come a normal function escapes the notice of the if statement and continue. it was my understanding that you can send to a coroutine

molten comet
#

wekk, call event1 to get the coro, then coro.send(None)

foggy reef
#

'function' object has no attribute 'send

foggy reef
molten comet
#

yes, the coroutine function is not a coroutine

#

only coroutines has send()

#

its really confusing

dull ocean
#

it used to be that they had the same name

molten comet
#

yes, in theory coroutine means cooperative routines, and routines means functions

gilded plover
#

it is like using of 'generator' and 'generator function' for function that yields

dull ocean
#

it was a mistake in the documentation - technically an async def is a coroutine that doesn't support generators and the object it returns is a non-iterable generator

dull ocean
molten comet
#

I think coroutine in python is a kind of generator

dull ocean
#

yeah, I'm talking about the meaning of the term "coroutine"

#

in 'common parlance' coroutine is the kind of routine/function. In python it refers to the object used to implement async def delegation

dull ocean
deft ruin
#

Can python do multithreading, or not? Because I keep finding tutorials about multithreading, yet I also saw people talking about the GIL and how that prevents multithreading.

#

or perhaps i'm mistaking it with multiprocessing

vast jasper
#

It can but not very efficiently

vast jasper
vast jasper
#

But it’s very expensive for something like doing mass http requests or light work

deft ruin
#

ah nvm, i think i found an explanation online

#

thanks

dull ocean
#

it's real posix threads - not emulated

#

you just can't run python bytecode

deft ruin
#

i'm a beginner

gilded plover
dull ocean
#

Some languages do emulate threads, sometimes known as green threads

foggy reef
#

I thought I had this but I'm confused again. Can someone clarify something for me, I can send to an object implementing the await dunder however I can't send to a function defined with async so how to I initially call the asynchronous function to run it until the point where the await statements get to one of those awaitable classes

#

Clarify is the wrong word, explain*

dull ocean
minor plinth
#

is their any difference of making a function a coroutine, like a simple function that doesnt do any async tasks, because await passes the coroutine to the event loop to manage it and if a function isnt it doesnt get managed by it, is it bad when that happens or does it not matter since the code under it would still get executed normally and if theirs other coroutines it would still get scheduled without a problem?

foggy reef
#

one question though, how does asyncio and twisted work if not priming the coroutine?

#
from Asyncy import EventLoop, Sleep
from Socket import Socket
from time import time

async def main():
    client = Socket()
    await client.connect("127.0.0.1", 8080)
    await Sleep(time() + 2)
    await client.send(b"hello")
    data = await client.recv(1024)
    print(f"Received: {data}")

EventLoop.schedule(main(), time())
EventLoop.start()

this is the interface, the headache was getting the async sockets to work

minor lava
#

Making it async would just be misleading

dull ocean
foggy reef
dull ocean
foggy reef
#

So it schedules the send(None) and the execution of the coro at the same time but just at a later date?

tidal phoenixBOT
#

Lib/asyncio/tasks.py line 120

self._loop.call_soon(self.__step, context=self._context)```
dull ocean
#

Calling send(None) on a coroutine object is running it

foggy reef
#

So it doesn't call send(None) but runs the Coro soon? How does the call soon method work if it's not calling send?

dull ocean
foggy reef
#

so it doesnt prime the coroutine but sends anyway?

#

sorry, this async stuff is really hard for me to wrap my head around

dull ocean
foggy reef
#

but you said asyncio does not implement this behaviour? i think i misunderstood

shadow umbra
#

Why does the multiprocessing module keep crashing my computer? Is it because I’m running more processes than there are cores on my computer?

minor lava
#

no, that shouldn't affect it if you don't go into unreasonably high numbers (and the os doesn't limit you)

shadow umbra
#

Why does my computer keep crashing then?

#

Like it works with like

#

2 processes

dull ocean
#

it also doesn't wait for the generator to be primed before cancelling it

#

if you throw a CancelledError into an unprimed generator then it cannot suppress it

foggy reef
#

@dull ocean you seem to know what your doing lol so any feedback from you would be greatly appreciated

dull ocean
#

You have a bunch of fun bugs to fix ;)

foggy reef
#

😫 and after i thought i got it working as well
thanks for the response! ill look at the code now

dull ocean
#

I mean that when writing an event loop it's impossible to get it right first time - and if you're building one for fun, part of that fun comes from fixing all the bugs

foggy reef
#

can you show me one of the bugs, it makes me anxious that you can see some that i cant. of course, i know there are some things wrong with the code, for instance, it assumes you MUST pass in an already instantiated coro and i know some of the terminology vis-a-vis variable naming is iffy lol but apart from that, i dont see much else wrong with it. im not liking the match statement to see which event is being processed but i dont see how else you can implement it without a long if-else chain

dull ocean
#

Your time goes backwards sometimes

foggy reef
#

what

#

😂

dull ocean
#

you need monotonic time

severe lynx
#

Use time.monotonic() or time.perf_counter()

real fox
#

Hello, everyone

I'm doing some trivial tests with a ProcessPoolExecutor, but the results are driving me nuts. Consider this script that does some single-threaded compute, then does it again multiple times, but in parallel:

from concurrent.futures import ProcessPoolExecutor
import time


def do_work(x: int):
    for i in range(100 * 1000 * 1000):
        i ** 20

if __name__ == "__main__":
    t_0 = time.time()
    do_work(1)
    single_run_delta = time.time() - t_0

    print(f"A single run takes {single_run_delta}s")

    num_parallel_tasks = 2
    args = [1] * num_parallel_tasks

    with ProcessPoolExecutor(max_workers=num_parallel_tasks) as executor:
        t_0_parallel = time.time()
        _ = list(executor.map(do_work, args))
        parallel_delta = time.time() - t_0_parallel

        print(f"{num_parallel_tasks} parallel tasks took: {parallel_delta}s")

I expected the single run and the parallel run to take roughly the same time, but I get something like:

A single run takes 39.55275535583496s
2 parallel tasks took: 47.86863374710083s

I understand that ProcessPoolExecutor has some scheduling/serialization overhead, but those several seconds worth of difference are definitely too much for that. Am I just hitting a CPU power limit or something? Does anyone have any ideas what this could mean? Thanks o/

foggy reef
#

time.monotonic: "The reference point of the returned value is undefined", it means i cant schedule things to happen at a specific time, i can only do things by offset to the current time

#

is there a way around this?

teal knot
#

Use one of the other clocks which does have a reference time.

#

The reason is that it's possible for a number of reasons (including the user doing it manually) for the system time to be moved backwards, so it's no longer monotonic.

hollow badge
#

so when you run the two jobs in pararallel, you're doing twice as much work as when you run the single task, but it only takes ten seconds more. That's pretty good.

#

also my numbers are a little different ```
python3 async-and-concurrency.py
A single run takes 35.84136199951172s
2 parallel tasks took: 36.92131781578064s

real fox
molten comet
real fox
molten comet
#

I am not much sure about that

#

I guess its lazy spawned as demand increases until hit max

minor plinth
#

hello i wanna ask question which form of threading is best for concurrent tasks

iron nest
#

It really depends on what your tasks are doing. If they are doing IO (especially network requests), then asyncio and Twisted are great choices, followed by standard threading. If they are doing CPU-intensive work, multiprocessing would be the approach I recommend

#

You can also do CPU-intensive work with threads, but unless you're running C code that releases the global interpreter lock (GIL) it won't bring you any performance gains.

indigo dagger
#

for some reason py async with aiofiles.open(filenameis, "r", errors="backslashreplace") as f: async for email in f: await worker(email) is much much slower than py with open(filenameis, "r", errors="backslashreplace") as f: for email in f: await worker(email)

#

any ideas as to why?

#

using the second one also fires off the output from py asyncio.run(..., debug=True) whereas the 1st one doesnt

dull ocean
#

What's async def worker() doing?

buoyant trout
#
from asyncio import sleep as aiosleep, run
from time import sleep as block, perf_counter as pfc


async def foo():
    block(1)  # Blocking work
    await aiosleep(1)  # Suspended


async def measure(coro):
    u = pfc()
    ret = await coro
    print(f"Time spent blocking: {pfc() - u}, Suspended: ...")
    return ret


async def main():
    c = foo()
    await measure(c)


run(main())
# Time spent blocking: 2.0188845000229776, Suspended: ...
# Which is incorrect because 1 second is spent blocking and 1 second is spent suspended.
# How to get how much time is spent suspended for a given coroutine?
#

Ie, how do I measure how much time is spent waiting for coroutines?

amber geyser
#

Hey is there anyway to collect concurrent post requests and process them one by one ?

dull ocean
sour valve
#

is there good material for python asyncio? (apart from python docs)

molten comet
uneven steeple
#

I'm building a project that requires concurrent access to a synchronous method and I'd like each async call to this synchronous method to 'queue'.

#

Is there an easy way to do this?

#

More than anything, I just need to make sure nothing starts running the method twice at the same time.

#

I realize now what I'm describing is a lock

plucky glen
molten comet
#

I think they just need a common lock

severe lynx
sour valve
indigo dagger
dull ocean
indigo dagger
#

or do you mean just py def worker()?

severe lynx
low tapir
#

so i am trying to exit my asyncio loop eventually this code executes inside my exit function ```python
started_from = asyncio.current_task().get_coro().name
print(f'\nHello from exit (started from: {started_from})')
tloop = asyncio.get_running_loop()
for task in asyncio.all_tasks():
tname = task.get_coro().name
if tname in [started_from]:continue
print(f'Cancelling Task: {tname}')
with contextlib.suppress(asyncio.CancelledError):
task.cancel()

dull ocean
dull ocean
uneven steeple
#

Sorry for not responding, I kinda closed my (personal) laptop. Don't have Discord on my work laptop.

#

Essentially, it needs to write bytes to a port on a single threaded microcontroller. If they tried to access simultaneously, I don't know what would happen. So I'd like to avoid it.

#

However, the places at which writing will occur come from two different sources - a mainloop on a timer and a MQTT subscription handler.

#

I'm honestly not really good/practiced at async and preferably I need a solution that won't require me to learn anything or pray for things to magically work out - so I'm tempted to drop any kind of asynchronous crap in favor of whatever works for now (on a deadline)