#How to close event stream

16 messages · Page 1 of 1 (latest)

manic frost
#

Hi,

I’m working on a chessboard project based on an ESP32 and having an “issue” to manually close the Lichess event stream.

If you’d like to take a look (see LichessWorkerEvent, but you dont have to, I’ll explain the idea and the issue below):
https://github.com/onlytrialanderror/chessboard/blob/main/ha_lichess_adapter/lichess_components.py

The current implementation is based on theberserk library. The logic opens an event stream (in a separate thread) for the user currently selected on the chessboard. This works fine. However, when I select a different user, I want to close the event stream of the previous user and immediately open a new stream for the new user.

The stream only closes when the next event is received. That’s the problem: I have to wait until the next event arrives.

According to the documentation, an “empty” event is sent every 7 seconds, which means I should only have to wait up to 6–7 seconds.

Here is a minimal example:

import berserk
API_TOKEN = 'xxxx'
def main():
    session = berserk.TokenSession(API_TOKEN)
    _client_lichess_event = berserk.Client(session=session)
    count = 0
    for event in _client_lichess_event.board.stream_incoming_events():
        if event:                    
            print(f"Event: {event}")
        print(f'Count: {count}-> {event}')
        count += 1

if __name__ == "__main__":
    main()

I would expect to see the “Count” message printed every 7 seconds, but I don’t see that happening.

So my questions are:

What is the best way to close the event stream?

Is there a way to avoid waiting for the next event (i.e. skip the 7-second delay)?

Why I dont see the "count"-message every 7 seconds ? (what is my missunderstanding here)

I’ve been thinking about triggering a kind of “dummy” event to force the stream to return, but I can’t find a suitable way to do this. Sending a dummy message to the user might be possible, but it doesn’t feel like a good solution.

Do you have any suggestions?

Thanks!

#

Here is the event-stream I would like to manually escape:

def _handle_incoming_events(self, token_init):

        """
        Stream incoming board events from Lichess and optionally publish reduced payloads.

        This method runs inside the event worker thread.
        """
        self._lichess_stream_event_init_value = token_init

        self._log(f"Starting the stream (event): {token_init}")

        # open the stream for whole chess game
        for event in self._client_lichess.board.stream_incoming_events():
            self._log(f"Event loop: {event}")
            if event:
                reduced_data = lh.reduce_response_event(event)
                if reduced_data is not None:
                    reduced_data_json = json.dumps(reduced_data)
                    self._log(f"Event: {reduced_data_json}")
                    if self._mqtt_publish_function is not None:
                        self._mqtt_publish_function(reduced_data_json)
                else:
                    self._log(f"Event: manually skipped event {event}")
            if self._stop_event_stream.is_set():
                self._log(f"Stream stop event set, terminating the stream (event): {self._lichess_stream_event_init_value}")
                break
            with  self._lock:
                if self._lichess_stream_event_init_value != self._current_token:
                    self._log(f"Terminating the event stream (token changed): {self._lichess_stream_event_init_value}->{self._current_token}")
                    # close the stream
                    break

        self._log(f"Terminated the stream (event): {self._lichess_stream_event_init_value}")
        self._lichess_stream_event_init_value = self._idle_token
faint kayak
#

how to close a connection depends on the client, and here the client is the berserk library, so it sounds like you should be creating an issue there

manic frost
#

I would be happy to use any python - library, if it will work, let say httpx

with httpx.stream("GET", URL, headers=headers, timeout=None) as response:
    for line in response.iter_lines():
        if line:
            data = json.loads(line)
            print(data)  # Process the JSON object

Also here I have to address my question to httpx - developers ?

How it is done "usualy" ? I would be the first person with this issue? Do you know any project that explicitelly closes the stream ?

faint kayak
#

idk and I rarely use python. I suppose the stream handling done by a distinct thread, so can you maybe terminate that?

manic frost
#

As I said, it is not the best work around, but the only possibility I see at the moment, is to send a "dummy" request, and I currently see only one: To send a (correspondence)-challange to to user and imideatelly cancle it 🤪

#

We need a some kind of "heartbeat" - reqest 🤣

  • hey, are you alive ?
  • yes, I'm fine
    😆
#

httpx seems to send empty line every 7s ... at least a "possibility" to cancel, but a "slow" one

manic frost
#

jfyi: I've replaced the berserk by the httpx at the end (for the stream event only). So, in worst case the delay is 7s. Haven't found a better solution:

 self._lichess_stream_event_init_value = token_init

        self._log(f"Starting the stream (event): {token_init}")

        URL = "https://lichess.org/api/stream/event"
        headers={
        "Authorization": f"Bearer {token_init}"
        }

        # we use httpx here, because berserk - stream doesn't sents heartbeat every 7s
        with httpx.stream("GET", URL, headers=headers, timeout=None) as response:
            for line in response.iter_lines():
                if line:
                    event = json.loads(line)
                    reduced_data = lh.reduce_response_event(event)
                    if reduced_data is not None:
                        reduced_data_json = json.dumps(reduced_data)
                        self._log(f"Event: {reduced_data_json}")
                        if self._mqtt_publish_function is not None:
                            self._mqtt_publish_function(reduced_data_json)
                    else:
                        self._log(f"Event: manually skipped event {event}")
                if self._stop_event_stream.is_set():
                    self._log(f"Stream stop event set, terminating the stream (event): {self._lichess_stream_event_init_value}")
                    break
                with  self._lock:
                    if self._lichess_stream_event_init_value != self._current_token:
                        self._log(f"Terminating the event stream (token changed): {self._lichess_stream_event_init_value}->{self._current_token}")
                        # close the stream
                        break

        self._log(f"Terminated the stream (event): {self._lichess_stream_event_init_value}")
        self._lichess_stream_event_init_value = self._idle_token
faint kayak
#

I see, you wait for response.iter_lines() to yield a line so you can break

#

that's not ideal

#

I don't know much python, but

#

I see that iter_lines is a Generator[bytes, None, None]. Could you make a second generator of your own, that yields an event on demand. Then, you combine the 2 generators into one. And you iterate that.

#

so your for will run either when the first generator yields a line, or when the second yields a stop message

gleaming phoenix
#

I want to close the event stream of the previous user and immediately open a new stream for the new user.

Maybe keep a list of all streams for the previous user, then on user change, close all of them, wait for that to finish before opening new ones? (edited for clarity)