#What's the best way to wake-up async tasks?

64 messages · Page 1 of 1 (latest)

viscid zenith
#

I know tokio has tokio::sync::Notify, but are there any std alternatives to this? I'm looking to wake a task up when i destroy the underlying structure that the task was created in, I'm currently using async-std and I ahven't found a decent way to do this yet. maybe someone can give me some pointers where to start?

viscid zenith
#

#rust-help-1 message
it's also not really the behavior i'm looking for, i'm essentially trying to cleanup when the client is closed. let me provide u and idea of what i'm trying to do

#
async fn main() {
  let mut close_now = false; 
  let raw_buff_recv = rt::spawn(async move {
    loop {
      if close_now { break; }
    }
  });
  let conn_processor = rt::spawn(async move {
    loop {
      if close_now { break; }
    } // 
  };

  // some time in the distant future
  *close_now = true;
}
#

obviously i'm aware this won't compile, but it's a very dumbed down example of what i'm trying to acheive

#

I'm currently doing this:

  pub async fn close(&self) {
      self.update_state(ConnectionState::Disconnecting).await;
      self.closed
          .store(true, std::sync::atomic::Ordering::Relaxed);
      let mut tasks = self.tasks.lock().await;
      for task in tasks.drain(..) {
          #[cfg(feature = "async_std")]
          task.cancel().await;
          #[cfg(feature = "async_tokio")]
          task.abort();
      }
  }

each task has an expression equivelant to this:

if closed.load(std::sync::atomic::Ordering::Relaxed) == true {
    rakrs_debug!("[CLIENT] (recv_task) The internal network recieve channel has been killed.");
    break;
}

However this doesn't work, because the task might be sleeping and is not notified of this change

vivid wolf
#

That's basically what a CondVar does.
In practice it'll be easier to use a channel.

viscid zenith
#

condvar requires me to use a mutex though doesn't it?

vivid wolf
#

Yes. Hence why I recommended to use a channel.
A channel is essentially a Convar + Mutex already bundled for you

viscid zenith
#

i could be mis-understanding it though

#

a channel?

#

like mspc?

#

if so, i'm already doing that,

vivid wolf
#

Well, mspc would work but it's probably overkill. There are special channel implementations that are designed for this. mspc is more designed for constant data transfer.

viscid zenith
#

im aware, you're probably talking about something like oneshot

vivid wolf
#

Yes

viscid zenith
#

but the issue with this, is it's not compat with async-std, and it doesn't really work the way i want it to

vivid wolf
#

async-std itself has channels. THey don't work?

viscid zenith
#

it doesn't have oneshot

#

afaik

#

which is what i would need

#

im going to try some magic with condvar again

vivid wolf
#

You can create a bounded channel with capacity 1.

viscid zenith
#

but i think im going to run into the same issue

#

condvar should be created within an arc right?

vivid wolf
#

Both the mutex and the CondVar need to, yes

viscid zenith
#

i mean im assuming i dont really need to use a mutex if im simply just notifying all tasks though?

vivid wolf
#

Afaik CondVar API requires a mutex.

#

I'm not an expert on how to manually use this. There is an episode of Crust of Rust about this.

viscid zenith
#

also i would need to use something like select right, if i have two blocking futures

vivid wolf
#

Exactly

rose plinth
#

You can abuse channels for this:

use async_std::task;
use std::sync::Arc;
use async_std::channel;

pub fn main() {
    task::block_on(start())
}

async fn start() {
    let (send, recv) = channel::bounded::<()>(1);
    
    println!("STARTING...");
    let recv0 = recv.clone();
    let raw_buff_recv = rt::spawn(async move {
        println!("STARTED raw_buff_recv");
        loop {
            if recv0.recv() == TryRecvError::Closed {
                break;
            }
        }
        println!("DONE raw_buff_recv");
    });

    let recv1 = recv.clone();
    let conn_processor = rt::spawn(async move {
        println!("STARTED conn_processor");
        loop {
            if recv1.try_recv() !== TryRecvError::Closed {
                continue;
            }
            break;
        }
        println!("DONE conn_processor");
    });

    println!("ENDING...");
    drop(send);
    println!("ENDED");
}
vivid wolf
#

Abusing the channel closing is interesting.
But you also can just send something and use a successfull try_recv as the signal.

rose plinth
#

If you're listening in multiple tasks you'll have to drop it. Don't think async-std has a broadcast channel unfortunately.

vivid wolf
#

I'm still not really sure why using like a tokio one-shot or broadcast channel is not possible.
In theory these should be executor-agnostic.

viscid zenith
#

@rose plinth the issue is, cleanup may happen for an external reason

#

like the server closing the connection (which it does not have access to the socket directly)

#

also im not sure if i really wanna abuse channels for this if there's a better way to implement it lol

#

although it has crossed my mind

viscid zenith
# vivid wolf Exactly

also it turns out a convar is not compatible with futures::future::select 😔 due to pinning on the guard

vivid wolf
#

Using channels for this is not abusing anything, it's the standard.

viscid zenith
#

im ngl the only reason I didn't want to go that route is just it looks messy lmao

rose plinth
#

You can wrap the channel in an API that's cleaner and conveys what you're trying to do better.

pub struct SignalSender(Option<Sender>);
pub struct SignalReceiver(Receiver);

And just have SignalSender drop the Sender if you want

vivid wolf
#

Use a tokio broadcast channel and call try_recv in the task loops. It should just work.

viscid zenith
#

tokio (at least when i use console-subscriber) seems to be pausing this for long periods of time (over 50ms)

#

not really a big issue but

rose plinth
#

You have that sleep at the top of the loop

vivid wolf
#

If you sleep 50 milliseconds, then it will indeed be paused for 50 milliseconds

viscid zenith
#

yes but the main thing im annoyed about is when the network recieve thread takes longer than 50ms, the operation is suspended longer, altough this might be fixed as it's been awhile since i've even looked at that, and code definitely has changed

rose plinth
#

You're going to want to use select to wait for the close signal and for messages from your queues at the same time

viscid zenith
#

yeah im aware of that

#

that's fine, i'll probably use futures::future::select for that

vivid wolf
#

If you just want to prevent 100% CPU you should use yield_now instead of sleep.

viscid zenith
#

i mean the only issue with it is client & server tick desync

#

but it's not really that big of an issue

vivid wolf
#

It's basically impossible to keep two ends of a connection in perfect sync using sleep, sleep is not that accurate.

viscid zenith
#

well yeah

#

its more of an issue with the internal packet processing takes longer than 50 ms

#

due to a large payload

#

the tick will be suspended

#

but again, its not really that big of an issue, more of a curiosity i had

#

i appreciate your time and help marc and artentus :)