#Tokio task weird error

46 messages · Page 1 of 1 (latest)

hasty sparrow
#
fn spawn_synchronisation_task(
    daemon: PompaDaemon,
    location: crate::models::Location,
    expected_dates: Vec<Range>,
) {
    tokio::task::spawn(daemon.sync_data_for(location, expected_dates));
}

for some reason this produces a cryptic error:

error: implementation of `FnOnce` is not general enough
   --> prometeusz/src/pompa_states.rs:426:5
    |
426 |     tokio::task::spawn(daemon.sync_data_for(location, expected_dates));
    |     ^^^^^^^^^^^^^^^^^^ implementation of `FnOnce` is not general enough
    |
    = note: closure with signature `fn(&'0 models::Product) -> impl futures::Future<Output = std::result::Result<HashSet<(NaiveDateTime, NaiveDateTime)>, ErrReport>>` must implement `FnOnce<(&models::Product,)>`, for any lifetime `'0`...
    = note: ...but it actually implements `FnOnce<(&models::Product,)>`
orchid axle
#

Show us the implementation of PompaDaemon::sync_data_for please

hasty sparrow
#

ok

#
    async fn sync_data_for(self, location: Location, expected: Vec<Range>) -> Result<()> {
        use futures::{
            StreamExt,
            TryStreamExt,
        };
        let missing = missing_sales_entries_slots_for_location(
            self.db.clone(),
            location.clone(),
            expected.clone(),
        )
        .await?;
        let _res: Vec<_> = futures::stream::iter(missing)
            .map(|range| self.clone().load_data_for(location.clone(), range))
            .buffer_unordered(1)
            .try_collect()
            .await
            .wrap_err_with(|| format!("synchronising sales from pompa for {location:?}"))?;
        if !expected.is_empty() {
            tracing::info!(
                "succesfully synchronised {} days of sales entries from pompa's {location:?}",
                expected.len()
            );
        }
        Ok(())
    }

#

if i delete spawn_synchronisation_task() function it compiles just fine

orchid axle
#

Wow that's indeed a cryptic error ferrisThonk

hasty sparrow
#

yup

#

I'm actually clueless for the first time in a long time writing rust

#

xD

orchid axle
#

Try:

tokio::task::spawn(async move { daemon.sync_data_for(location, expected_dates) });
#

I'm shooting in the dark here

hasty sparrow
#

it was like this before but let's try

#

but I'd need to .await right?

orchid axle
#

yes

hasty sparrow
#

same ;/

orchid axle
#

Very strange, do you have any idea what closure the compiler is talking about?

hasty sparrow
#

well

#

there is one

#

in the entire module

#
pub async fn with_connection<R: Send + 'static>(
    db: Db,
    f: impl FnOnce(&mut diesel::PgConnection) -> eyre::Result<R> + Send + 'static,
) -> eyre::Result<R> {
    tokio::task::spawn_blocking(move || -> eyre::Result<_> {
        let mut conn = db.get().wrap_err("connection failed")?;
        f(&mut conn)
    })
    .await
    .wrap_err("thread crashed")?
    .wrap_err("query failed")
}

orchid axle
#

uhh but what is up with the closure of type fn(&'0 models::Product) -> impl futures::Future<Output = std::result::Result<HashSet<(NaiveDateTime, NaiveDateTime)>, ErrReport>>

hasty sparrow
#

hmm

#

let me look for it

#

oooh...

#
async fn missing_sales_entries_slots_for_location(
    db: Db,
    location: Location,
    ranges: Vec<Range>,
) -> Result<HashSet<Range>> {
    use futures::{
        StreamExt,
        TryStreamExt,
    };
    let products = products(db.clone(), location.clone()).await?;
    let missing: Vec<HashSet<Range>> = futures::stream::iter(&products)
        .map(|product| {                                    // \----------------- this

            missing_sales_entries_slots_for_location_product(
                db.clone(),
                location.clone(),
                product.clone(),
                ranges.clone(),
            )
        })
        .buffer_unordered(2)
        .try_collect()
        .await
        .wrap_err("combining missing entries foe all prodcuts")?;
    let missing: HashSet<_> =
        tokio::task::spawn_blocking(move || missing.into_iter().flatten().collect()).await?;
    Ok(missing)
}

#

changing products to products fixes everything

#

thank you

#

but still I thought rust functions are evaluated individually

#

why would function itself be ok but some deeply-nested function cuase a parent function not to be usable in tokio context...

orchid axle
#

I have literally no idea...

#

But glad you got it resolved :D

orchid axle
#

But it's not immediately obvious from the signature of your functions that that's not the case

coral mirage
#

that looks like a bug in rustc

#

on nightly the error message is different: ```rs
error: higher-ranked lifetime error
--> src/lib.rs:5:5
|
5 | tokio::task::spawn(bar());
| ^^^^^^^^^^^^^^^^^^^^^^^^^
|
= note: could not prove impl futures::Future<Output = Result<(), anyhow::Error>>: std::marker::Send

#

I don't know if it applies in the same way to your code

hasty sparrow
#

I'm on nightly

coral mirage
#

an earlier one than the playground

#

?play ```rs
use anyhow::Result;
use futures::prelude::*;

fn foo() {
tokio::task::spawn(baz());
}

async fn baz() -> Result<Vec<i32>> {
let ints = [1, 2, 3];
futures::stream::iter(&ints)
.map(|i| quux(i.clone()))
.buffer_unordered(2)
.try_collect()
.await
}

async fn quux(s: i32) -> Result<i32> {
Ok(s)
}

lucid kestrelBOT
#
error: higher-ranked lifetime error
 --> src/main.rs:6:5
  |
6 |     tokio::task::spawn(baz());
  |     ^^^^^^^^^^^^^^^^^^^^^^^^^
  |
  = note: could not prove `impl futures::Future<Output = Result<Vec<i32>, anyhow::Error>>: std::marker::Send`
coral mirage
#

?play channel=stable ```rs
use anyhow::Result;
use futures::prelude::*;

fn foo() {
tokio::task::spawn(baz());
}

async fn baz() -> Result<Vec<i32>> {
let ints = [1, 2, 3];
futures::stream::iter(&ints)
.map(|i| quux(i.clone()))
.buffer_unordered(2)
.try_collect()
.await
}

async fn quux(s: i32) -> Result<i32> {
Ok(s)
}

lucid kestrelBOT
#
error: implementation of `FnOnce` is not general enough
 --> src/main.rs:6:5
  |
6 |     tokio::task::spawn(baz());
  |     ^^^^^^^^^^^^^^^^^^ implementation of `FnOnce` is not general enough
  |
  = note: closure with signature `fn(&'0 i32) -> impl futures::Future<Output = Result<i32, anyhow::Error>>` must implement `FnOnce<(&i32,)>`, for any lifetime `'0`...
  = note: ...but it actually implements `FnOnce<(&i32,)>`
coral mirage
#

if it was actually Send that was sus, then it makes sense that it doesn't trigger without calling it in spawn

#

yep, it triggers with ```rs
fn foo() {
fn assert_send<T: Send>(_: T) {}
assert_send(baz());
}

#

but with that error message you'd never find the culprit ferrisSweat

#

at least fn(&'0 i32) -> impl futures::Future<Output = Result<i32, anyhow::Error>> says something