#How to avoid Arc<Mutex<FnMut() -> BoxFuture>>

78 messages · Page 1 of 1 (latest)

median rivet
#
use std::sync::Arc;

use futures::future::BoxFuture;

pub struct Callback {
    pub callback: Box<dyn Fn() -> BoxFuture<'static, ()>>,
}

impl Callback {
    pub async fn call(&self) {
        (self.callback)().await
    }
}
pub trait BlockingIfSync<Marker> {
    fn to_callback(self) -> Callback;
}

impl<F: Fn() + Send + Sync + 'static> BlockingIfSync<()> for F {
    fn to_callback(self) -> Callback {
        let cb = Arc::new(self);
        Callback {
            callback: Box::new(move || {
                let cb = Arc::clone(&cb);
                Box::pin(async move {
                    _ = tokio::task::spawn_blocking(move || (cb)()).await;
                })
            }),
        }
    }
}

struct AsyncMarker;
impl<F, Fut> BlockingIfSync<AsyncMarker> for F
where
    F: Fn() -> Fut + Send + Sync + 'static,
    Fut: Future<Output = ()> + Send + Sync + 'static,
    <Fut as Future>::Output: Send,
{
    fn to_callback(self) -> Callback {
        let cb = Arc::new(self);
        Callback {
            callback: Box::new(move || {
                let cb = Arc::clone(&cb);
                Box::pin(async move { (cb)().await })
            }),
        }
    }
}

pub fn schedule<Marker, MaybeAsync: BlockingIfSync<Marker>>(cb: MaybeAsync) -> Callback {
    cb.to_callback()
}

#[cfg(test)]
mod tests {
    use tokio::sync::mpsc::channel;

    use super::*;

    async fn test_async() {
        println!("testing");
    }

    async fn test_sync() {
        println!("testing");
    }

    #[tokio::test]
    async fn test_async_callback() {
        let (tx, rx) = channel::<()>(1);

        {
            let tx = tx.clone();
            let cb = schedule(move || {
                let tx = tx.clone();
                async move {
                    _ = tx.send(());
                }
            });
        }

        {
            let tx = tx.clone();
            let cb = schedule(move || {
                _ = tx.send(());
            });
        }

        let cb = schedule(test_sync);
        let cb = schedule(test_async);
    }
}

I would like to remove the usage of arc. I would also like to support FnMut instead, but I don't see how I can make this work without Arc<Mutex> due to the nested closures and the nature of FnMut. These callbacks are intended to be ran from inside one task in a single loop, so there won't be multiple threads calling a given cb at a given time.

dim crow
worthy walrus
#

should it ever be possible for the same callback to be run again before its previous run has finished?

worthy walrus
#

i had a play around with this and arrived at this solution: https://play.rust-lang.org/?version=stable&mode=debug&edition=2024&gist=8af85ba96da69c13b89368f49a26ca6c

the key change here is that when a callback is run, it consumes self, giving the task full ownership of the closure. when the task is done, it then returns the callback back to the main thread so it can be rescheduled

so the way this might work is that you might have a queue of callbacks, and whenever you need to run one, you remove it from the queue and call it to get a future. at this point, the callback has been moved and can no longer be used from the scheduler. but when the future resolves, it will resolve with the original callback, so you can add it back to the queue

#

by moving ownership of the callback to the task thread itself, we get around the need for Arc

median rivet
median rivet
worthy walrus
#

but if you can avoid it it'll make synchronisation and lifetimes a lot easier to deal with

median rivet
#

Sweet. I do wish I didn’t have to resort to returning the callback to avoid the mutex, but it is quite the clever way to get around the issue

worthy walrus
#

it's probably a bit more common to use a similar setup but with oneshot channels instead to move the owned data between threads, but in this case it's simple enough to just use move closures and pass it back as the return value

median rivet
#

and if it was Sync only, I think you could probably do that? Seems like the inner async move makes things dicey

#

yeah, for example this works fine:

pub struct Testing {
    pub callback: Box<dyn FnMut()>,
}

pub trait TestingCallback {
    fn to_callback(self) -> Testing;
}

impl<F: FnMut() + 'static> TestingCallback for F {
    fn to_callback(mut self) -> Testing {
        Testing {
            callback: Box::new(move || {
                (self)();
            }),
        }
    }
}
#

maybe the new async closures could help?

worthy walrus
#

problem with FnMut is that it requires a mutable reference to the captured data, so you're not allowed to have any other references to the same data while the closure is running (unless they're protected by a mutex). so eg your callback queue can't keep a reference to it

#

and it's particularly difficult with eg spawn_blocking since it requires that the closure you pass to it is 'static. that means the closure can't hold any references to temporary data, which includes the callback function if you don't move it into the closure you pass to spawn_blocking

median rivet
#

true, the triple moves for the sync version would make that more difficult

worthy walrus
#

if you don't mind running all tasks, including the sync ones, on the main thread, things would be a bit simpler

median rivet
#

I don't think that will be possible, as this is supposed to be an event scheduler for a service framework that will trigger user registered callbacks whenever an event pops, so the scheduler can't run on the main thread as that is where user code will be running

worthy walrus
#

yup that's what i figured

median rivet
#

damn yeah, the static requirement doens't make things easier for sure.

#
            callback: Box::new(move || {
                let cb = Arc::clone(&cb);
                Box::pin(async move { (cb)().await })
            }),

This is cloning the cb arc every time the callback is called, isn't it?

worthy walrus
#

yeah it is, though arc clones are pretty cheap

median rivet
#

and if I did want to put a mutex around it, I suppose MOST of the time the contention would be zero

#

as I think most people would schedule periodic events a few seconds apart

#

If I decide they do need to be able to call the callback again before the first execution finishes that is

#

I was debating spawning the user registered functions in a new task, so that way it didn't hold up the scheduler thread in case they put some long running code in there. Ideally nobody would do that, or the scheduler instead uses these callbacks to send events out and users let that call their own functions.

worthy walrus
#

one approach that's similar to what i suggested but which doesn't require you to remove the callback from the main queue, is to store the closure within the callback in an Arc<Mutex<Option<F>>>. then the task could arc.lock().unwrap().take() the closure to move it into the task, leaving a temporary None value for the closure while it's running, and then arc.lock().unwrap().insert(cb) when it's done to put it back

#

but that requires handling the case when a task accidentally tries take()ing the closure while it's None

median rivet
#

hmm yeah that could also be worth considering. Not sure, but since I wasn't told the full extent / features, I'll probably present several of the ideas Monday and see what's best

#

Technically I just needed to figure out accepting sync and async callbacks, now I'm trying to deep dive on this

worthy walrus
#

welcome to the rabbit hole!

west crystal
#

tokio::fs::File uses this strategy to pass a buffer into a blocking task and back, but it seems to assume std::fs::File::{read, write, seek} will never panic
see File::poll_read for example

worthy walrus
#

very true. maybe instead of moving the callback directly into the closure, wrap it in a DerefMut panic guard which sends it back via a oneshot channel in Drop when done

#

although a oneshot channel requires consuming the channel which doesn't work very well with Drop sadly (since it takes &mut self). might just need to use an mpsc channel for that

west crystal
#

although if the scheduler needs to tolerate callbacks panicking, someone will need to catch_unwind eventually

worthy walrus
#

well the catch_unwind version does have the benefit that you don't need to keep track of both messages passed over the channel and the future returned by call()

#

not that a simple join!() couldn't deal with that

median rivet
worthy walrus
median rivet
#

But wouldn’t using the scoped spawn not move the callback into the task, meaning I don’t need to send it back over a channel?

worthy walrus
#

ah i see what you mean

median rivet
#

Yeah, regular task wouldn’t work because of the static requirement, but a scoped one should be able to use a mutable reference when calling the task without taking ownership

worthy walrus
#

in that case maybe even consider just using std::thread::scope for your blocking tasks instead of tokio's spawn_blocking

#

iirc scoped async tasks have... issues...

#

(in the soundness department)

#

although std::thread::scope will make it harder to actually wait for the task to complete... so that would probably just be moving the problem elsewhere so nevermind

median rivet
#

Yeah that’s what I was about to say, since I’m joining everything and not letting them run concurrently

#

I’ll try and research the issues with scoped async tasks. However, seems like that could be the solution to the unwind problem

worthy walrus
#

i think any sound scoped async task implementations also just block until they finish to avoid the soundness issues

median rivet
#

That sounds kinda pointless

#

To use them I mean

#

If they block, kinda anti async at that point

worthy walrus
#

such is life sadly. what they enable you to do is run asynchronous tasks in parallel to each other while still giving them access to temporary data, but you can't use them to run tasks in parallel to the main thread

#

it's basically the same issue as with the original scoped threads api that people realised was unsound right before the release of rust 1.0

median rivet
#

Guess I’ll be manually catching the unwind then

worthy walrus
#

don't remember the exact details but it's something along the line of, if you don't block it's possible to leak/forget the tasks in a way that allows them to continue running while the parent function returns and deallocates the data

#

maybe if we had async drop

median rivet
#

Well, block in what way though. Like block in a way that stalls the executor, or block as in my scheduler thread wouldn’t progress cause I was calling await on the scope

worthy walrus
#

former

#

or uh

#

depends on what you mean by blocking the executor

median rivet
#

It literally says it in the first paragraph on the Tokio scoped docs lol I’m dumb

#

Be aware, that when a Scope exits it will block until every future spawned by the Scope completes. Therefore, one should take caution when created scopes within an asynchronous context, such as from within another spawned future.

worthy walrus
#

yup

#

it'll block the thread it's running on so it won't be usable until all tasks finish

#

which may also stall tokio's runtime in some cases, esp if you're using a single-threaded runtime

median rivet
#

So when would you even use this tbh

worthy walrus
#

whenever you basically want to do std::thread::scope + tokio::runtime::Runtime::block_on but without having to start a new tokio executor in every single thread

#

(i assume at least)

median rivet
#

Interesting

median rivet
#

I’ve heard there are also performance issues with running async code in a main method without wrapping it in another task. Is something like that true as well?

worthy walrus
#

hmm that's not something i'm familiar with personally at least, but i'm also not super clued in on the intricacies of async performance in rust

median rivet
#

Cool, thanks for your help on this

west crystal