shard.close(CloseFrame::NORMAL).await?;
``` how would I do this with a `ShardEventStream`?
My first thought was doing something like: ```rs
for shard in shards.iter_mut() {
shard.close(CloseFrame::NORMAL).await?;
}
``` but this has two problems; `shards` is already borrowed as mutable, and this would wait for each shard to close before starting on the next.
#How to close ShardEventStream?
1 messages · Page 1 of 1 (latest)
You create a ShardSender
Can I see an example? I can't find ShardSender on twilight's docs.
if you call the sender method on a shard, you'll get one
My problem right now is that I can't even get access to the shard instances though? I could really use an example here.
To clarify, I have the Vec<Shard>s provided by stream::create_recommended() but I've borrowed it already via ShardEventStream::new(shards.iter_mut()).
Before you do that, do something along the lines of ```rust
let shard_closers: Vec<MessageSender> = shards.iter().map(Shard::sender).collect();
And how would I use a MessageSender to close the shard exactly?
shard.close(CloseFrame::NORMAL).ok(); should work
and then eat up your remaining messages
One final question, how do I know when I'm finished eating up the remaining messages? stream.next().await is an endless loop right?
It will return None i believe
Though i am currently troubleshooting why it doesn't seem to be?
My understanding was that isn't how it works. If there are no remaining messages it'll just sleep until there are more?
Which leaves me confused on how to know once I've consumed the remaining messages after disconnecting shards.
wait, now i'm confused too. Give me a moment, I'll find my old convo
#1072244936549859518 message
you would have to cause a break when you get a normal close
i'm so confused lmfao
Send a close over the message sender and then call mem::forget on the ShardRef once it returns an error type of Io or Message::Close/Event::GatewayClose
Ser the example on Shard::close
how does this work with a ShardEventStream? Once all of the shards have been forgotten then would stream.next().await return None?
Yes
// Process already received events.
while let Some((shard, event)) = stream.next().await {
match event {
Ok(Event::GatewayClose(_)) => mem::forget(shard),
Ok(event) => tracing::debug!(?event, shard = ?shard.id(), "Received event."),
Err(source) if matches!(source.kind(), ReceiveMessageErrorType::Io) => mem::forget(shard),
Err(err) => tracing::error!(?err, "Error receiving event.")
};
}
``` so something along the lines of this would work?
Yes, but don't run that as your normal event loop
yea ofc
which reminds me, what does tokio::select! {} do to the normal event loop if a sigint/sigterm is received?
Select drops all other futures once a branch completes. Shard::next_message and Shard::next_event are not cancel safe so it might drop some events now that I think about it
would it be possible to make it cancel safe on my end, is that a library concern, or is tokio::select! {} simply not the best option in that case?
what do you mean?
the point is to cancel the normal event loop
they were talking about specifically my code
I mean that my code has the normal event loop in a tokio::select! {} along with a sigint/sigterm handler. I then have a second event loop meant for cleaning up any events that were received in the meantime. The code above is that second event loop.
ah, okay
Cancel safety is something Twilight must implement (though it's likely impossible with our current dependencies). You'd need to merge the event loops' match statements (forget on close, io error), probably with a helper static atmoic bool SHUTDOWN, and then spawn the event loop in a new task and put the JoinHandle (which is cancel safe) in the select statement. Alternatives to select don't really fix the issue, which is that Shard*Stream always has pending, non cancel safe, futures running.
would that prevent an erroneous gatewayclosed shard from reconnecting if you forgot it
The whole idea behind a second event loop is that it wants to eventually shut down the bot for good, i.e. not reconnecting upon receiving a close message. I guess that was not clear from the Shard::close example?
In the normal event loop, received close messages are ignored by the user and the shard auto reconnects. In the second one, received close messages are intercepted by the user and the shard's next_message method is not called again (as it's then reconnect)
i’m somewhat confused because i have been told a lot of different things, and this seems like a very complex solution to a very common usecase
a substantial chunk of my main file is just a clean shutdown
is there a definitive example of how to do this, complete with the standard rust boilderplate?
i found https://github.com/twilight-rs/twilight/blob/main/examples/gateway-parallel.rs, but it seems completely unrelated to what i was told last time i asked this question
Honestly, https://github.com/twilight-rs/twilight/blob/main/examples/gateway-parallel.rs might be simpler as no Shard*Stream => no cancel "unsafety"
There's multiple things going on. Before 0.15 it was impossible* to process remaining events after shutting down shards/clusters, now it is and the example on Shard::close / second event loop achieves this. Great! However, there's a small risk of some of these events being lost whilst transitioning from one event loop to the other when using the Shard*Stream types. Damn it! There is a way to solve this, but at that point it might just be simpler to totally avoid the Shard*Stream types (they're supposed to be convenience types) and implement an alternative yourself (perhaps based on the gateway-parallel example)
Note that the gateway-parallel example actually does not solve the problem if you just copy past it's code as it uses ShardEventStream (which is the source of the problem), but changing it so that each spawned task has just one shard it becomes very easy to use the Shard::close example as is
What downsides if any does the gateway-parallel example have compared to the other method discussed here?
I mean, i quite like it when things are convenient
Not really any, as the alternative is even more complex
I can code a quick example combining gateway-parallel and Shard::close that you guys can base your code upon.
I actually think gateway_parallel is a half-decent example
that would help me greatly personally
there's just too much... parallel
But normally it can be irritating to send state between shards (as you're crossing task bounds (Send + 'static), instead of having it all in the same task)
I think I understand at a surface level exactly what the gateway-parallel code is doing, but the more I look at the details the more questions I have...
This should work, let me know if there's a problem or something I could explain better
Actually, let's just use an Arc<AtomicBool> instead of a watch channel.
Do you need an arc around an atomic?
Think the watch channel example has some quirks
That or as a static variable, e.g. static SHUTDOWN: AtomicBool = AtomicBool::new(false)
I don't think you have to do either?
And you can imagine adding a break after _ = shard.close(), and then a second loop just as in the Shard::close example
oh, nvm you do
Yeah it's a common misconception that atomics should be sharable by default, but they're exactly the same as other types (like you need Arc<Client> or Arc<InMemoryCache>)
i am now stuck in a loop of reconnecting
Did you do the above?
FYI: I also figured out an somewhat easier way to make this all work with ShardEventStream https://github.com/vilgotf/voice-pruner/blob/9b1ca3cb3a7ef110180cce45701c2d7303eb4243/src/main.rs#L102-L144 (replace loop with while let Some, break with mem::forget, and sender with a vec of senders)
Created a tracking issue for documenting all this: https://github.com/twilight-rs/twilight/issues/2151
slight problem
my bot doesn’t shut down until it gets an event
Is the Event::GatewayClose event delayed?
Hmm. It might be?
my assumption is that's because of this part here?
It'll be sitting there waiting for shard.next_event() and the conditional shutdown is only after that?
yeah
this is also going to be a bigger problem for a bot like mine where the only event I even listen to is INTERACTION_CREATE... basically means it won't actually shut down until the next slash command used.
Pro tip: if you only listen for interactions, using HTTP interactions is a very good idea
Gateway is complex, http is simple
does twilight support that style of interaction usage?
Yes
You literally just
let interaction: twilight_model::application::interaction::Interaction = serde_json::from_slice(&body)?;
ofc you also have to verify the body
but that's also not hard
I'm also making this bot mainly as a practice run for another bot that will most definitely use more than just INTERACTION_CREATE. So with that in mind, I think I'll stick with the gateway implementation.
fair enough
Yeah I didn't think about that, send a closeframe on a sender after setting the shutdown boolean and then merge the two match statements into one where you check Event::GatewayClose if shutdown == true. Here's the commit where I fixed this in my bot if you want an example https://github.com/vilgotf/voice-pruner/commit/fd1888f4505dc558eac4cad80271b8d1fa834df3
BTW I'm not sure I like using mem::forget() at all here. It feels like a powerful tool intended for other purposes from what I've searched. Is there some other way to get the same result?
Or would it be possible to submit a PR that would create such a way, some way to un-register a shard from the ShardEventStream?
It is indeed very powerful and can cause problems if used in the wrong way. However, the pattern is explicitly documented on ShardRef https://docs.rs/twilight-gateway/latest/twilight_gateway/stream/struct.ShardRef.html and we'll be careful not to break that promise
Guard dereferencing to the shard that produced an event or message.
okay that makes me feel a lot better about using it, thank you!
do i have to build said closeframe manually?
no, you can just do sender.close(CloseFrame::NORMAL)
i've been looking at the gateway example and notice it does not use ShardStream(), i want to use shard stream and somehow gracefully shutdown without spawning a new handler per task. How would i do that?
You can use mem::forget on the exiting ShardRef
ok so we can't gracefully shutdown if we use ShardEventStream() since it takes ownership...
i feel like there should be some kind of way for shard event stream to shutdown all shards
since if we can't gracefully shutdown there is no point of shard event stream
i guess ill have to use this method
So i was looking at this codehttps://github.com/randomairborne/experienced/blob/prod/xpd-gateway/src/main.rs. And really liked how He/She/They split the senders. which would also allow them to bypass the ownership problems. Would i be able to do this using shardeventstream?
Yes, just create the senders before the shardeventstream
i got an error from clippy saying shards does not live long enough
let mut shards: Vec<_> = stream::create_recommended(&client, config, |_shard, builder| builder.build()).await?.collect();
let senders: Vec<_> = shards.iter().map(Shard::sender).collect();;
let mut stream = ShardEventStream::new(shards.iter_mut());
let handler = tokio::spawn(async move {
while let Some((shard, event)) = stream.next().await {
let event = match event {
Ok(event) => event,
Err(error) if error.is_fatal() => {
tracing::error!(?error, "fatal error while receiving event");
break;
}
Err(error) => {
tracing::warn!(?error, "error while receiving event");
continue;
}
};
tracing::info!(kind = ?event.kind(), shard = ?shard.id(), "received event");
}
});
tokio::signal::ctrl_c().await?;
tracing::info!("shutting down");
for sender in senders {
sender.close(CloseFrame::NORMAL).ok();
}
handler.await?;
this is what im trying out right now
wait is it due to me moving the value?
lemme extract it to a function
oh your example does not use ShardEventStream
hmmmm
im honestly just so fixated in ShardEventStream because i feel like it nicely brings everything into a single event loop
and because it felt like it exists for a reason :(
ok so this works now
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt::init();
let token = env::var("DISCORD_TOKEN").context("DISCORD_TOKEN env variable not set")?;
let client = Client::new(token.clone());
let config = Config::new(token.clone(), Intents::empty());
let application = client.current_user_application().await?.model().await?;
tracing::info!("logged as {} with ID {}", application.name, application.id);
let shards: Vec<_> = stream::create_recommended(&client, config, |_shard, builder| builder.build()).await?.collect();
let senders: Vec<_> = shards.iter().map(Shard::sender).collect();
for shard in shards {
tokio::spawn(handle_events(shard));
}
tokio::signal::ctrl_c().await?;
tracing::info!("shutting down");
for sender in senders {
sender.close(CloseFrame::NORMAL).ok();
}
Ok(())
}
async fn handle_events(mut shard: Shard) {
loop {
let event = match shard.next_event().await {
Ok(event) => event,
Err(error) if error.is_fatal() => {
tracing::error!(?error, "fatal error while receiving event");
break;
}
Err(error) => {
tracing::warn!(?error, "error while receiving event");
continue;
}
};
tracing::info!(kind = ?event.kind(), shard = ?shard.id(), "received event");
}
}
I still don’t dont like this method though since I’m spawning for every shard. Is there a way to poll them all at once like ShardEventStream?
what if the code for ShardEventStream was rewriten so it doesn't require a mutable value like what we're doin here?
ill leave it as it is for now since it works
Spawning each shard in a task allows lets tokio manage scheduling, i.e. better performance, at a (slight) complexity increase
But we're aware of this being a painpoint and are working on improvements
This can be fixed while retaining ShardEventStream, I can post an example if you'd like
Yes please
Well i mean it definitely wont have any problem with cache invalidation and in hindsight it does seem better.
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt::init();
let token = env::var("DISCORD_TOKEN").context("DISCORD_TOKEN env variable not set")?;
let client = Client::new(token.clone());
let config = Config::new(token.clone(), Intents::empty());
let application = client.current_user_application().await?.model().await?;
tracing::info!("logged as {} with ID {}", application.name, application.id);
let shards: Vec<_> = stream::create_recommended(&client, config, |_shard, builder| builder.build()).await?.collect();
let senders: Vec<_> = shards.iter().map(Shard::sender).collect();
let tracker = tokio_util::task::TaskTracker::new();
for shard in shards {
tracker.spawn(handle_events(shard));
}
tokio::signal::ctrl_c().await?;
tracing::info!("shutting down");
for sender in senders {
sender.close(CloseFrame::NORMAL).ok();
}
tracker.wait().await;
Ok(())
}
why did my shards respawn.....
how do i fix this???
Anyone know whats wrong with :(