#What's the best way to wake-up async tasks?
64 messages · Page 1 of 1 (latest)
#rust-help-1 message
it's also not really the behavior i'm looking for, i'm essentially trying to cleanup when the client is closed. let me provide u and idea of what i'm trying to do
async fn main() {
let mut close_now = false;
let raw_buff_recv = rt::spawn(async move {
loop {
if close_now { break; }
}
});
let conn_processor = rt::spawn(async move {
loop {
if close_now { break; }
} //
};
// some time in the distant future
*close_now = true;
}
obviously i'm aware this won't compile, but it's a very dumbed down example of what i'm trying to acheive
I'm currently doing this:
pub async fn close(&self) {
self.update_state(ConnectionState::Disconnecting).await;
self.closed
.store(true, std::sync::atomic::Ordering::Relaxed);
let mut tasks = self.tasks.lock().await;
for task in tasks.drain(..) {
#[cfg(feature = "async_std")]
task.cancel().await;
#[cfg(feature = "async_tokio")]
task.abort();
}
}
each task has an expression equivelant to this:
if closed.load(std::sync::atomic::Ordering::Relaxed) == true {
rakrs_debug!("[CLIENT] (recv_task) The internal network recieve channel has been killed.");
break;
}
However this doesn't work, because the task might be sleeping and is not notified of this change
That's basically what a CondVar does.
In practice it'll be easier to use a channel.
condvar requires me to use a mutex though doesn't it?
Yes. Hence why I recommended to use a channel.
A channel is essentially a Convar + Mutex already bundled for you
i could be mis-understanding it though
a channel?
like mspc?
if so, i'm already doing that,
Well, mspc would work but it's probably overkill. There are special channel implementations that are designed for this. mspc is more designed for constant data transfer.
im aware, you're probably talking about something like oneshot
Yes
but the issue with this, is it's not compat with async-std, and it doesn't really work the way i want it to
async-std itself has channels. THey don't work?
it doesn't have oneshot
afaik
which is what i would need
im going to try some magic with condvar again
You can create a bounded channel with capacity 1.
but i think im going to run into the same issue
condvar should be created within an arc right?
Both the mutex and the CondVar need to, yes
i mean im assuming i dont really need to use a mutex if im simply just notifying all tasks though?
Afaik CondVar API requires a mutex.
I'm not an expert on how to manually use this. There is an episode of Crust of Rust about this.
also i would need to use something like select right, if i have two blocking futures
Exactly
You can abuse channels for this:
use async_std::task;
use std::sync::Arc;
use async_std::channel;
pub fn main() {
task::block_on(start())
}
async fn start() {
let (send, recv) = channel::bounded::<()>(1);
println!("STARTING...");
let recv0 = recv.clone();
let raw_buff_recv = rt::spawn(async move {
println!("STARTED raw_buff_recv");
loop {
if recv0.recv() == TryRecvError::Closed {
break;
}
}
println!("DONE raw_buff_recv");
});
let recv1 = recv.clone();
let conn_processor = rt::spawn(async move {
println!("STARTED conn_processor");
loop {
if recv1.try_recv() !== TryRecvError::Closed {
continue;
}
break;
}
println!("DONE conn_processor");
});
println!("ENDING...");
drop(send);
println!("ENDED");
}
Abusing the channel closing is interesting.
But you also can just send something and use a successfull try_recv as the signal.
If you're listening in multiple tasks you'll have to drop it. Don't think async-std has a broadcast channel unfortunately.
I'm still not really sure why using like a tokio one-shot or broadcast channel is not possible.
In theory these should be executor-agnostic.
@rose plinth the issue is, cleanup may happen for an external reason
like the server closing the connection (which it does not have access to the socket directly)
here's my implementation: https://github.com/NetrexMC/RakNet/blob/v3/src/client/mod.rs#L237-L249
also im not sure if i really wanna abuse channels for this if there's a better way to implement it lol
although it has crossed my mind
also it turns out a convar is not compatible with futures::future::select 😔 due to pinning on the guard
Using channels for this is not abusing anything, it's the standard.
im ngl the only reason I didn't want to go that route is just it looks messy lmao
You can wrap the channel in an API that's cleaner and conveys what you're trying to do better.
pub struct SignalSender(Option<Sender>);
pub struct SignalReceiver(Receiver);
And just have SignalSender drop the Sender if you want
Use a tokio broadcast channel and call try_recv in the task loops. It should just work.
yeah i was probably going to go that route
i guess while we're on this topic, would this be better as a thread? I dont really want the tick to be paused
https://github.com/NetrexMC/RakNet/blob/v3/src/client/mod.rs#L607
tokio (at least when i use console-subscriber) seems to be pausing this for long periods of time (over 50ms)
not really a big issue but
You have that sleep at the top of the loop
If you sleep 50 milliseconds, then it will indeed be paused for 50 milliseconds
yes but the main thing im annoyed about is when the network recieve thread takes longer than 50ms, the operation is suspended longer, altough this might be fixed as it's been awhile since i've even looked at that, and code definitely has changed
You're going to want to use select to wait for the close signal and for messages from your queues at the same time
yeah im aware of that
that's fine, i'll probably use futures::future::select for that
If you just want to prevent 100% CPU you should use yield_now instead of sleep.
i mean the only issue with it is client & server tick desync
but it's not really that big of an issue
It's basically impossible to keep two ends of a connection in perfect sync using sleep, sleep is not that accurate.