#Pass async callback to looping function

63 messages · Page 1 of 1 (latest)

slender cobalt
#
pub struct Watch<V: LiveValue> {
    pub rx: tokio::sync::broadcast::Receiver<Update<V>>,
}

impl<V: LiveValue> Watch<V> {
    /// Watch for changes to the subscribed value.
    /// The provided closure will run whenever the value is updated.
    ///
    /// To keep watching for changes, use `Control::Break` to close the 
    /// receiver.
    pub fn on_update<F, Fut>(mut self, f: F) 
    where
        F: Fn(Update<V>) -> Fut + Send + Sync + 'static,
        Fut: Future<Output=Control> + Send + Sync + 'static
    {
        tokio::spawn(async move {
            loop {
                match self.rx.recv().await {
                    Ok(update) => {
                        if let Control::Break = (f)(update).await {
                            return;
                        }
                    },
                    Err(RecvError::Closed) => return,
                    _ => {}
                }
            }
        });
    }
}
error: async closure does not implement `Fn` because it captures state from its environment
   --> src/lib.rs:829:23
    |
829 |         sub.on_update(async move |update| {
    |             --------- ^^^^^^^^^^^^^^^^^^^
    |             |
    |             required by a bound introduced by this call
    |
note: required by a bound in `Watch::<V>::on_update`
   --> src/lib.rs:757:12
    |
755 |     pub fn on_update<F, Fut>(mut self, f: F) 
    |            --------- required by a bound in this associated function
756 |     where
757 |         F: Fn(Update<V>) -> Fut + Send + Sync + 'static,
    |            ^^^^^^^^^^^^^^^^^^^^ required by this bound in `Watch::<V>::on_update`

Hello! I am having trouble passing an async closure into a function that .awaits it in a loop. I understand it says it captures state in its environment, but I am cloning any captured values before moving them into the closure, so I'm not sure why I'm seeing that.

glossy perch
#

you should consider removing : LiveValue on the struct

slender cobalt
#

Yes, I made the LiveValue trait. It enforced Clone + Send + Sync + 'static

glossy perch
#

i see

#

but you should just put it on relevant method unless there is another specific issue this solves

#

either not the issue

#

have you considered FnMut instead of Fn ?

#

is there a reason you don't want to use it ?

slender cobalt
#

I've tried alot of things, including AsyncFn and AsyncFnMut. I'll try removing the bound like you said.

#

I can't remove the bound, its' required for Update<V>

#

I get the same error with FnMut as above

#
    #[test]
    fn subscribe() {
        let store = LiveStore::<Foo>::new(StoreOptions::default());
        let key = store.init(Foo { bar: 0, baz: 1 });
        let sub = store.watch(&key).unwrap();

        let bar_lock = Arc::new(Mutex::new(0));
        let baz_lock = Arc::new(Mutex::new(0));

        let bar_lock2 = bar_lock.clone();
        let baz_lock2 = baz_lock.clone();

        sub.on_update(async move |update| {
            if let Update::Mutate { mutn, .. } = update {
                match mutn {
                    FooMutation::SetBar(bar) => *bar_lock2.lock().unwrap() = bar,
                    FooMutation::SetBaz(baz) => *baz_lock2.lock().unwrap() = baz,
                }
            }

            Control::Continue
        });
    }

This is the test that's failing on async move

glossy perch
#

not the methods or impls or functions

#

except clone or stuff like that

glossy perch
#

can you copy to check ?

slender cobalt
#

no, replace Fn with FnMut

#
error: async closure does not implement `FnMut` because it captures state from its environment
   --> src/lib.rs:849:23
    |
849 |         sub.on_update(async move |update| {
    |             --------- ^^^^^^^^^^^^^^^^^^^
    |             |
    |             required by a bound introduced by this call
    |
note: required by a bound in `Watch::<V>::on_update`
   --> src/lib.rs:777:12
    |
775 |     pub fn on_update<F, Fut>(mut self, mut f: F) 
    |            --------- required by a bound in this associated function
776 |     where
777 |         F: FnMut(Update<V>) -> Fut + Send + Sync + 'static,
    |            ^^^^^^^^^^^^^^^^^^^^^^^ required by this bound in `Watch::<V>::on_update`
glossy perch
#

oh yeah am dum

slender cobalt
#

I can't remove V: LiveValue from Update<V> because LiveValue has associated types that Update uses

#

this is the repo

glossy perch
#

oh i missed Update

#

i see

#

then yeah that is when you need to put it on the struct

#

the only good reason i know

#

anyway

#

i think i got the solution

#

wait

#

why is it async

#

if it has 0 await

slender cobalt
#

Idk, it's intended to be used with async so I included it

#
error[E0277]: `Control` is not a future
   --> src/lib.rs:852:13
    |
852 |         sub.on_update(move |update| {
    |             ^^^^^^^^^ `Control` is not a future
    |
    = help: the trait `std::future::Future` is not implemented for `Control`
note: required by a bound in `Watch::<V>::on_update`
   --> src/lib.rs:778:14
    |
775 |     pub fn on_update<F, Fut>(mut self, mut f: F) 
    |            --------- required by a bound in this associated function
...
778 |         Fut: Future<Output=Control> + Send + Sync + 'static
    |              ^^^^^^^^^^^^^^^^^^^^^^ required by this bound in `Watch::<V>::on_update`

This is returned if I remove it

glossy perch
#

you can change it's definition

#

right ?

slender cobalt
#

But I'll need it to be async in the future to write to websockets

glossy perch
#

ok

#
    #[test]
    fn subscribe() {
        let store = LiveStore::<Foo>::new(StoreOptions::default());
        let key = store.init(Foo { bar: 0, baz: 1 });
        let sub = store.watch(&key).unwrap();

        let bar_lock = Arc::new(Mutex::new(0));
        let baz_lock = Arc::new(Mutex::new(0));


        sub.on_update(move |update| {
            
            let bar_lock2 = bar_lock.clone();
            let baz_lock2 = baz_lock.clone();
            async move {
              if let Update::Mutate { mutn, .. } = update {
                  match mutn {
                      FooMutation::SetBar(bar) => *bar_lock2.lock().unwrap() = bar,
                      FooMutation::SetBaz(baz) => *baz_lock2.lock().unwrap() = baz,
                  }
              }
  
              Control::Continue
            }
        });
    }

this should prob work

glossy perch
#

this ?

    #[test]
    fn subscribe() {
        let store = LiveStore::<Foo>::new(StoreOptions::default());
        let key = store.init(Foo { bar: 0, baz: 1 });
        let sub = store.watch(&key).unwrap();

        let bar_lock = Arc::new(Mutex::new(0));
        let baz_lock = Arc::new(Mutex::new(0));


        sub.on_update(async move |update| {
            let bar_lock2 = bar_lock.clone();
            let baz_lock2 = baz_lock.clone();

            if let Update::Mutate { mutn, .. } = update {
                match mutn {
                    FooMutation::SetBar(bar) => *bar_lock2.lock().unwrap() = bar,
                    FooMutation::SetBaz(baz) => *baz_lock2.lock().unwrap() = baz,
                }
            }

            Control::Continue
        });
    }
slender cobalt
#

yes

glossy perch
#

yes what

slender cobalt
#

it gives the error

glossy perch
#

is sub used anywhere ?

#

ah yeah

#

mb

#

am blind

#

ok

glossy perch
slender cobalt
#

I wonder if the captured variables are dropped after the first call to the closure?

glossy perch
#

no they shouldn't that's not how closures work usually

#

can you try to make a minimal reproduceable example on the playground ?

#

that way i can toy with it directly

slender cobalt
slender cobalt
#

For anybody in the future, you need something like this:

   read_until_empty(rx, move |v| {
        let val = val.clone();
        async move {
            println!("{}", v + *val);
            true
        }
    });
opal kayak
#

did you figure it out or do you still want me to look at it

slender cobalt
#

I was able to figure it out

#

Your effort was much appreciated though 🙂