#Scoped async with named tasks

18 messages · Page 1 of 1 (latest)

austere silo
#

Hi all, I'm looking to use scoped async such that I can take a mutable borrow into a spawn. I'm currently using async-scoped and that works, but I also want to give the spawned tasks names. I see that I can use tokio::task::Builder for this, but the type signature for Builder.spawn is incompatible for async-scoped.

Using Builder.spawn directly doesn't work because of lifetime issues with shards, and I can't wrap shards in an Arc<RwLock<Vec<...>>> because of lifetime issues with Builder.spawn.

I've also looked at using FuturesUnordered, but that doesn't appear to support creating named tasks either.

For reference, I want to give my tasks names so they show up in tokio-console for debugging purposes.

My code looks something like this:

async fn main() -> anyhow::Result<()> {
    ...
    run_events(&mut shards);
    shard_resume::dump_resumes(&shards, cluster).await?;
}

fn run_events(
    shards: &mut Vec<Shard<Queue>>,
    cluster: Arc<Cluster>,
) {
    let shutdown_token = CancellationToken::new();

    async_scoped::TokioScope::scope_and_block(|scope| {
        for shard in shards {
            scope.spawn(shard_runner(shard, shutdown_token));
        }
        scope.spawn(kill_shards_on_exit(cluster, shutdown_token));
    });
}
timber ermine
#

I would not recommend using async-scoped at all, it requires blocking the thread in order to be safe and you should never block the tokio thread.

#

cross-task data communication should be done with channels or a shared mutex

#

the latter 'lifetime issue' should not exist, please post the code and the error

austere silo
#

Why not block the outer main thread? I want to run the last async function after the function above completes, and the first wants mutable access to the vector, but I can't spawn using the contents of the vector due to a static lifetime issue

timber ermine
#

yes you can. you can use a shared mutex. in this case Shard is the thing that is shared, so it is the thing that needs to be wrapped in an Arc. presumably that is what you are missing, that you instead need Vec<Arc<...>>

#

the main thread is not unique among the tokio threads you should not block. blocking the tokio thread can lead to resource starvation or deadlocks

lucid urchin
#

that's not quite true. there is no technical reason not to block inside of a tokio block_on() (and #[tokio::main] is a block_on() under the hood) unless you are using the current-thread executor. however it may be confusing.

timber ermine
lucid urchin
#

what do you mean by "a new tokio context"?

#

certainly they share the same runtime, as in Runtime::enter(), but block_on() does not cause spawned tasks to run on the current thread (unless you are using the current-thread runtime)

austere silo
#

I feel using block_on is simpler, and I still get issues when using Arc<RwLock>

timber ermine
austere silo
# timber ermine please post the code and the error
async fn run_events(
    shards: Vec<Arc<RwLock<Shard<NQNQueue>>>>,
    ...
) {
    let shutdown_token = CancellationToken::new();
    for shard in shards {
        let mut shard = shard.write().await;
        tokio::spawn(
            shard_runner(
                &mut shard,
                shutdown_token.clone(),
                ...
            )
        );
    }
}

Error:

error[E0597]: `shard` does not live long enough
   --> src\main.rs:321:25
    |
320 |       for shard in shards {
    |           ----- binding `shard` declared here
321 |           let mut shard = shard.write().await;
    |                           ^^^^^ borrowed value does not live long enough
322 | /         tokio::spawn(
323 | |             shard_runner(
324 | |                 &mut shard,
325 | |                 shutdown_token.clone(),
...   |
333 | |         );
    | |_________- argument requires that `shard` is borrowed for `'static`
334 |       }
    |       - `shard` dropped here while still borrowed
timber ermine
#

you still have to call clone

#
let shard = shard.clone();
let shutdown_token = shutdown_token.clone(); // also required to put outside
tokio::spawn(async move {
    let mut shard = shard.write().await;
    shard_runner(&mut shard, shutdown_token);
});
#

you perform the handle clones outside, and move the new reference-free values into the task