#How to close ShardEventStream?

1 messages · Page 1 of 1 (latest)

narrow tundra
#
shard.close(CloseFrame::NORMAL).await?;
``` how would I do this with a `ShardEventStream`?

My first thought was doing something like: ```rs
for shard in shards.iter_mut() {
  shard.close(CloseFrame::NORMAL).await?;
}
``` but this has two problems; `shards` is already borrowed as mutable, and this would wait for each shard to close before starting on the next.
ripe radish
#

You create a ShardSender

narrow tundra
#

Can I see an example? I can't find ShardSender on twilight's docs.

ripe radish
narrow tundra
#

To clarify, I have the Vec<Shard>s provided by stream::create_recommended() but I've borrowed it already via ShardEventStream::new(shards.iter_mut()).

ripe radish
#

Before you do that, do something along the lines of ```rust
let shard_closers: Vec<MessageSender> = shards.iter().map(Shard::sender).collect();

narrow tundra
#

And how would I use a MessageSender to close the shard exactly?

ripe radish
#

shard.close(CloseFrame::NORMAL).ok(); should work

#

and then eat up your remaining messages

narrow tundra
#

One final question, how do I know when I'm finished eating up the remaining messages? stream.next().await is an endless loop right?

ripe radish
#

It will return None i believe

#

Though i am currently troubleshooting why it doesn't seem to be?

narrow tundra
#

My understanding was that isn't how it works. If there are no remaining messages it'll just sleep until there are more?

#

Which leaves me confused on how to know once I've consumed the remaining messages after disconnecting shards.

ripe radish
#

#1072244936549859518 message

ripe radish
#

i'm so confused lmfao

uncut ridge
#

Send a close over the message sender and then call mem::forget on the ShardRef once it returns an error type of Io or Message::Close/Event::GatewayClose

#

Ser the example on Shard::close

narrow tundra
uncut ridge
#

Yes

narrow tundra
#
// Process already received events.
while let Some((shard, event)) = stream.next().await {
  match event {
    Ok(Event::GatewayClose(_)) => mem::forget(shard),
    Ok(event) => tracing::debug!(?event, shard = ?shard.id(), "Received event."),
    Err(source) if matches!(source.kind(), ReceiveMessageErrorType::Io) => mem::forget(shard),
    Err(err) => tracing::error!(?err, "Error receiving event.")
  };
}
``` so something along the lines of this would work?
uncut ridge
#

Yes, but don't run that as your normal event loop

narrow tundra
#

yea ofc

#

which reminds me, what does tokio::select! {} do to the normal event loop if a sigint/sigterm is received?

uncut ridge
#

Select drops all other futures once a branch completes. Shard::next_message and Shard::next_event are not cancel safe so it might drop some events now that I think about it

narrow tundra
ripe radish
#

the point is to cancel the normal event loop

narrow tundra
#

they were talking about specifically my code

ripe radish
#

isn’t that just the example from Shard

#

with a minor edit for forget

narrow tundra
# ripe radish isn’t that just the example from Shard

I mean that my code has the normal event loop in a tokio::select! {} along with a sigint/sigterm handler. I then have a second event loop meant for cleaning up any events that were received in the meantime. The code above is that second event loop.

ripe radish
#

ah, okay

uncut ridge
# narrow tundra would it be possible to make it cancel safe on my end, is that a library concern...

Cancel safety is something Twilight must implement (though it's likely impossible with our current dependencies). You'd need to merge the event loops' match statements (forget on close, io error), probably with a helper static atmoic bool SHUTDOWN, and then spawn the event loop in a new task and put the JoinHandle (which is cancel safe) in the select statement. Alternatives to select don't really fix the issue, which is that Shard*Stream always has pending, non cancel safe, futures running.

ripe radish
#

would that prevent an erroneous gatewayclosed shard from reconnecting if you forgot it

uncut ridge
#

In the normal event loop, received close messages are ignored by the user and the shard auto reconnects. In the second one, received close messages are intercepted by the user and the shard's next_message method is not called again (as it's then reconnect)

ripe radish
#

i’m somewhat confused because i have been told a lot of different things, and this seems like a very complex solution to a very common usecase

#

a substantial chunk of my main file is just a clean shutdown

#

is there a definitive example of how to do this, complete with the standard rust boilderplate?

uncut ridge
uncut ridge
# ripe radish i’m somewhat confused because i have been told a lot of different things, and th...

There's multiple things going on. Before 0.15 it was impossible* to process remaining events after shutting down shards/clusters, now it is and the example on Shard::close / second event loop achieves this. Great! However, there's a small risk of some of these events being lost whilst transitioning from one event loop to the other when using the Shard*Stream types. Damn it! There is a way to solve this, but at that point it might just be simpler to totally avoid the Shard*Stream types (they're supposed to be convenience types) and implement an alternative yourself (perhaps based on the gateway-parallel example)

#

Note that the gateway-parallel example actually does not solve the problem if you just copy past it's code as it uses ShardEventStream (which is the source of the problem), but changing it so that each spawned task has just one shard it becomes very easy to use the Shard::close example as is

narrow tundra
#

What downsides if any does the gateway-parallel example have compared to the other method discussed here?

ripe radish
#

I mean, i quite like it when things are convenient

uncut ridge
#

I can code a quick example combining gateway-parallel and Shard::close that you guys can base your code upon.

ripe radish
#

I actually think gateway_parallel is a half-decent example

narrow tundra
#

that would help me greatly personally

ripe radish
#

there's just too much... parallel

uncut ridge
narrow tundra
#

I think I understand at a surface level exactly what the gateway-parallel code is doing, but the more I look at the details the more questions I have...

uncut ridge
ripe radish
#

Do you need an arc around an atomic?

uncut ridge
#

Think the watch channel example has some quirks

uncut ridge
ripe radish
uncut ridge
#

And you can imagine adding a break after _ = shard.close(), and then a second loop just as in the Shard::close example

ripe radish
#

oh, nvm you do

uncut ridge
#

Yeah it's a common misconception that atomics should be sharable by default, but they're exactly the same as other types (like you need Arc<Client> or Arc<InMemoryCache>)

ripe radish
ripe radish
#

nope. Sorry.

#

Amazing, it works now

#

thank you so much

uncut ridge
ripe radish
uncut ridge
#

Is the Event::GatewayClose event delayed?

ripe radish
#

Hmm. It might be?

narrow tundra
#

It'll be sitting there waiting for shard.next_event() and the conditional shutdown is only after that?

ripe radish
#

yeah

narrow tundra
#

this is also going to be a bigger problem for a bot like mine where the only event I even listen to is INTERACTION_CREATE... basically means it won't actually shut down until the next slash command used.

ripe radish
#

Yeah

#

I only listen for interaction and message create

ripe radish
#

Gateway is complex, http is simple

narrow tundra
#

does twilight support that style of interaction usage?

ripe radish
#

You literally just

#

let interaction: twilight_model::application::interaction::Interaction = serde_json::from_slice(&body)?;

#

ofc you also have to verify the body

#

but that's also not hard

narrow tundra
#

I'm also making this bot mainly as a practice run for another bot that will most definitely use more than just INTERACTION_CREATE. So with that in mind, I think I'll stick with the gateway implementation.

ripe radish
#

fair enough

uncut ridge
narrow tundra
#

BTW I'm not sure I like using mem::forget() at all here. It feels like a powerful tool intended for other purposes from what I've searched. Is there some other way to get the same result?

#

Or would it be possible to submit a PR that would create such a way, some way to un-register a shard from the ShardEventStream?

uncut ridge
narrow tundra
#

okay that makes me feel a lot better about using it, thank you!

ripe radish
narrow tundra
ripe radish
#

Yep

#

I figured that out right after i asked

#

Sorry

cold wing
#

i've been looking at the gateway example and notice it does not use ShardStream(), i want to use shard stream and somehow gracefully shutdown without spawning a new handler per task. How would i do that?

uncut ridge
#

You can use mem::forget on the exiting ShardRef

cold wing
#

i feel like there should be some kind of way for shard event stream to shutdown all shards

#

since if we can't gracefully shutdown there is no point of shard event stream

cold wing
cold wing
uncut ridge
#

Yes, just create the senders before the shardeventstream

cold wing
#
let mut shards: Vec<_> = stream::create_recommended(&client, config, |_shard, builder| builder.build()).await?.collect();

let senders: Vec<_> = shards.iter().map(Shard::sender).collect();;

let mut stream = ShardEventStream::new(shards.iter_mut());

let handler = tokio::spawn(async move {
    while let Some((shard, event)) = stream.next().await {
        let event = match event {
            Ok(event) => event,
            Err(error) if error.is_fatal() => {
                tracing::error!(?error, "fatal error while receiving event");
                break;
            }
            Err(error) => {
                tracing::warn!(?error, "error while receiving event");
                continue;
            }
        };
        tracing::info!(kind = ?event.kind(), shard = ?shard.id(), "received event");
    }
});

tokio::signal::ctrl_c().await?;

tracing::info!("shutting down");
for sender in senders {
    sender.close(CloseFrame::NORMAL).ok();
}

handler.await?;
#

this is what im trying out right now

#

wait is it due to me moving the value?

#

lemme extract it to a function

#

oh your example does not use ShardEventStream

#

hmmmm

#

im honestly just so fixated in ShardEventStream because i feel like it nicely brings everything into a single event loop

#

and because it felt like it exists for a reason :(

cold wing
#

ok so this works now

#[tokio::main]
async fn main() -> Result<()> {
    tracing_subscriber::fmt::init();

    let token = env::var("DISCORD_TOKEN").context("DISCORD_TOKEN env variable not set")?;

    let client = Client::new(token.clone());
    let config = Config::new(token.clone(), Intents::empty());

    let application = client.current_user_application().await?.model().await?;

    tracing::info!("logged as {} with ID {}", application.name, application.id);

    let shards: Vec<_> = stream::create_recommended(&client, config, |_shard, builder| builder.build()).await?.collect();
    let senders: Vec<_> = shards.iter().map(Shard::sender).collect();

    for shard in shards {
        tokio::spawn(handle_events(shard));
    }

    tokio::signal::ctrl_c().await?;

    tracing::info!("shutting down");
    for sender in senders {
        sender.close(CloseFrame::NORMAL).ok();
    }

    Ok(())
}

async fn handle_events(mut shard: Shard) {
    loop {
        let event = match shard.next_event().await {
            Ok(event) => event,
            Err(error) if error.is_fatal() => {
                tracing::error!(?error, "fatal error while receiving event");
                break;
            }
            Err(error) => {
                tracing::warn!(?error, "error while receiving event");
                continue;
            }
        };
        tracing::info!(kind = ?event.kind(), shard = ?shard.id(), "received event");
    }
}
cold wing
#

I still don’t dont like this method though since I’m spawning for every shard. Is there a way to poll them all at once like ShardEventStream?

#

what if the code for ShardEventStream was rewriten so it doesn't require a mutable value like what we're doin here?

#

ill leave it as it is for now since it works

uncut ridge
#

But we're aware of this being a painpoint and are working on improvements

uncut ridge
# cold wing

This can be fixed while retaining ShardEventStream, I can post an example if you'd like

cold wing
cold wing
#
#[tokio::main]
async fn main() -> Result<()> {
    tracing_subscriber::fmt::init();

    let token = env::var("DISCORD_TOKEN").context("DISCORD_TOKEN env variable not set")?;

    let client = Client::new(token.clone());
    let config = Config::new(token.clone(), Intents::empty());

    let application = client.current_user_application().await?.model().await?;

    tracing::info!("logged as {} with ID {}", application.name, application.id);

    let shards: Vec<_> = stream::create_recommended(&client, config, |_shard, builder| builder.build()).await?.collect();
    let senders: Vec<_> = shards.iter().map(Shard::sender).collect();

    let tracker = tokio_util::task::TaskTracker::new();
    for shard in shards {
        tracker.spawn(handle_events(shard));
    }

    tokio::signal::ctrl_c().await?;

    tracing::info!("shutting down");
    for sender in senders {
        sender.close(CloseFrame::NORMAL).ok();
    }
    tracker.wait().await;

    Ok(())
}
#

why did my shards respawn.....

true dune
#

theyre cats

#

they have 9 lives

cold wing
#

how do i fix this???

cold wing
#

Anyone know whats wrong with :(