#Future cannot be sent between threads safely

1 messages · Page 1 of 1 (latest)

wheat ravine
#

Im trying to learn a bit more about tokio by making a scheduling engine, tho i run into an issue for the following code:

async fn refresh(self: &Arc<Self>) {
        if let Some(process) = self.task_process.swap(None) {
            process.abort()
        }
        if self.reference.lock().await.is_empty() {
            return;
        }
        let (id, task, time) = self.get_next().await.unwrap();
        let this = Arc::clone(self);
        self.task_process.swap(Some(Arc::new(
            tokio::spawn(async move {
                let now_chrono = Local::now();
                let now_tokio = Instant::now();

                let delta = time.signed_duration_since(now_chrono);
                let target_time = if delta.num_milliseconds() <= 0 {
                    now_tokio
                } else {
                    now_tokio + Duration::from_millis(delta.num_milliseconds() as u64)
                };
                sleep_until(target_time).await;
                {
                    let mut lock = task.lock().await;
                    lock.execute().await.unwrap();
                    let runs = lock.total_runs().await;
                    lock.set_total_runs(runs + 1).await;
                }

                // vv Panicks vv
                tokio::task::spawn_local(async move {
                    this.clone().refresh().await;
                });
            })
        )));
    }

the panic in question is:

spawn_local` called from outside of a `task::LocalSet` or LocalRuntime

If i try to make it a spawn, then i get a compile-time error:

error: future cannot be sent between threads safely
   --> ...
    |
185 | /                 tokio::spawn(async move {
186 | |                     this.clone().refresh().await;
187 | |                 });
    | |__________________^ future created by async block is not `Send`
#

I have to refresh, its not like i can avoid it, i also cannot make this function sync as reference is a tokio::Mutex and get_next() is async

#

which is

#
async fn get_next(&self) -> Option<(usize, Arc<Mutex<dyn Task>>, DateTime<Local>)> {
        let mut heap = self.earliest.lock().await;
        let tasks = self.reference.lock().await;

        while let Some(Reverse((when, id))) = heap.peek().cloned() {
            if let Some(task) = tasks.get(&id) {
                return Some((id, task.clone(), when));
            } else {
                heap.pop();
            }
        }
        None
    }
#

(it also has tokio::Mutex)

terse socket
wheat ravine
#

mhm

#

the full error (copied from rustrover)

future cannot be sent between threads safely
future created by async block is not `Send`
Note: cannot satisfy `impl futures::Future<Output = ()>: std::marker::Send`
Note: future is not `Send` as it awaits another future which is not `Send`
Note: required by a bound in `tokio::spawn`
terse socket
#

-errors

civic troutBOT
#

Run cargo check in a terminal

Note: If using rust analyzer you can click the "click for full compiler diagnostic" link in your editor.

Please post the full output of the above command, including the error title and any help or notes. An example of how this looks is:

error[E0308]: mismatched types
 --> src/main.rs:3:17
  |
3 | let foo: &i32 = bar;
  |          ----   ^^^ expected `&i32`, found integer
  |          |
  |          expected due to this
  |
help: consider borrowing here
  |
3 | let foo: &i32 = &bar;
  |                 +

When posting the error put it in a code block so it has nice formatting:

```rust
// error from cargo check here
```

Please do not post a screenshot. If the output is too long then use a paste tool like https://paste.rs/web

wheat ravine
#
 future cannot be sent between threads safely
   --> core/src/scheduler.rs:185:17
    |
185 | /                 tokio::spawn(async move {
186 | |                     this.clone().refresh().await;
187 | |                 });
    | |__________________^ future created by async block is not `Send`
    |
    = note: cannot satisfy `impl futures::Future<Output = ()>: std::marker::Send`
note: future is not `Send` as it awaits another future which is not `Send`
   --> core/src/scheduler.rs:186:21
    |
186 |                     this.clone().refresh().await;
    |                     ^^^^^^^^^^^^^^^^^^^^^^ await occurs here on type `impl futures::Future<Output = ()>`, which is not `Send`
note: required by a bound in `tokio::spawn`
   --> /Users/.../.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.47.1/src/task/spawn.rs:168:21
    |
166 |     pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
    |            ----- required by a bound in this function
167 |     where
168 |         F: Future + Send + 'static,
    |                     ^^^^ required by this bound in `spawn`
terse socket
#

how is refresh defined?

wheat ravine
#

i just switch spawn_local with spawn to get the compile-time error

terse socket
#

ohh sorry missed that it's calling itself recursively

#

are you sure this wouldn't be better expressed as a loop instead of a recursive call?

wheat ravine
#

hmm

#

i've thought about it

#

that i will hit recurssion errors

#

tho how would i go about making it a loop?

#

i call it in 3 places

#

(when removing, starting and pushing)

terse socket
#

is that because you want to both refresh at regular intervals, and when a refresh is explicitly requested?

#

in any case what i would do is have a dedicated task that loops forever, refreshing whenever you need to, something like:

async fn refresh_loop(mut tasks: tokio::sync::mpsc::Receiver<Task>) {
    while let Some(task) = tasks.recv().await {
        // do refresh logic here
    }
}
#

idk exactly what the logic of your refresh code is, when you need to refresh, how tasks are added etc, but if you use an mpsc channel anyone can schedule another task to refresh by sending it via the channel

#

and you'd start this by just doing something like

let (sender, receiver) = tokio::sync::mpsc::channel(buffer_size);
tokio::task::spawn(refresh_loop(receiver));
wheat ravine
#

once this chunk is done:

sleep_until(target_time).await;
{
  let mut lock = task.lock().await;
  lock.execute().await.unwrap();
  let runs = lock.total_runs().await;
  lock.set_total_runs(runs + 1).await;
}
#

and when explicitly requested

terse socket
#

well either way your refresh function always waits for get_next to complete first, so you can just let the channel play that same role. so assuming you have some kind of queue of tasks, then just send the task to the channel whenever you queue up the task, and the loop will receive it and process it. and if you need to request an explicit refresh, again just push the task that needs refreshing to the channel

wheat ravine
#

kinda don't get it

wheat ravine
terse socket
#

so, correct me if i'm wrong but the core of your logic is:

  • new tasks get scheduled (somehow)
  • whenever a new task is scheduled, your get_next function will return the next task to refresh
  • you perform whatever logic needed to refresh
  • once finished, you start waiting for get_next again

with my refresh_loop suggestion, instead of calling get_next, you would use a channel. so:

  • new tasks get scheduled
  • the scheduling function sends the task that needs to be refreshed via the channel
  • refresh_loop receives the task to refresh
  • refresh_loop performs the logic needed to refresh
  • once finished, refresh_loop starts waiting for the next channel message again
#

in other words tasks.recv().await is equivalent to your self.get_next().await, and the logic of your refreshing would go inside the while loop

wheat ravine
#

not by insertion order

terse socket
#

you think?

wheat ravine
#

im trying to understand

terse socket
#

did you not write the code?

wheat ravine
#

i wrote it

terse socket
#

you don't understand the code you wrote?

wheat ravine
#

no no

#

your take

#

i mean

#

ok so

#

the reason i refresh a task when i insert/start/delete, is there is a chance that say Task A is inserted and executes in 15 minutes

#

at some arbirtary point

#

Task B might be inserted

#

which executes in 2 minutes

#

so the earliest task was A but now B is

terse socket
#

if you're dealing with some kind of priority queue, the way i'd write it is something like

async fn run_tasks(mut new_tasks: Receiver<Task>) {
    let mut current_tasks = BinaryHeap::new(); // or whatever you use to sort tasks
    loop {
        let Some(next_task) = match current_tasks.pop() {
            None => new_tasks.recv().await,
            v => v,
        } else {
            // queue was empty and there are no senders left
            return;
        };

        select! {
            _ => tokio::time::sleep(next_task.time_until()) => {
                // run task here
            }
            Some(new_task) = new_tasks.recv() => {
                current_tasks.push(next_task); // didn't execute task, put it back in the queue
                current_tasks.push(new_task);
            }
        }
    }
}
wheat ravine
#

wait this can be simplified

#

okie

terse socket
#

(forgot to readd next_task back onto the queue in the event that it's not executed, fixed it now)

#

in any case a loop + channel + select! should do what you want

wheat ravine
#

im ditching the priority queue

wheat ravine
#

actually i found the solution