#Problem with async thread and bevy returning data

105 messages · Page 1 of 1 (latest)

wind solstice
#

I have this system which needs to run in Bevy and update game state variables that are stored on the Solana blockchain. Because it needs to wait for a response from the blockchain, I need to use async make the calls. The problem for me is how to get the data out of the async thread so I can update the resource. I don't seem to be able to alter the lifetime for Commands. Here is the code:

fn get_game_state_variables (mut commands : Commands, time: Res<Time>, mut timer: ResMut<QueryTimer>) {

if timer.0.tick(time.delta()).just_finished(){

    let thread_pool = AsyncComputeTaskPool::get();
    let task = thread_pool.spawn(async move {
        
    let client : WasmClient = WasmClient::new_with_commitment("http://127.0.0.1:8899", CommitmentConfig::confirmed());  
        let version = client.get_version().await.unwrap();   
        let storage = Pubkey::from_str("PUBKEY-HERE").unwrap(); 
        let game_state_account = client.get_account(&storage).await.unwrap();
        let account_data = game_state_account.data;
        let decoded = ResultSchema::try_from_slice(&account_data[8..18]).unwrap();

        console::log_1(&decoded.wheel_speed.into());
        console::log_1(&version.solana_core.into());
        console::log_1(&"Test".into());
        let tests = decoded.wheel_speed; 
        
    });

        commands.insert_resource(ResultSchema { time_of_last_update : 0, wheel_speed : tests, current_state : 0});
}
#

I need to get the value of "tests" outside the async block to call the commands.insert_resource. If I call it from with the async thread I get lifetime issues

tidal island
#

What if you use a channel to send the value of tests from the async block, and have a system if there's a value on the receiver (via a resource) and calls insert_resource if it's available?

#

Basically, instead of trying to use one system, have two systems - one that starts the async work, and one that can tell when the async work is complete, and make the async work use a sharable data type (eg. channel, atomic value, Arc<Mutex<Whatever>>, etc).

wind solstice
#

I can try that. I have to think it through. Async is breaking my brain. Seems like the arc/mutex path keeps coming up.

#

How would the second one be able to tell when the async work is complete?

tidal island
#

You would need some data value that can change and be queried, like an Option or Vec.

woven crane
#

You just need to store the task in a resource, and then in another system you poll it to see if it's done, like jdm mentioned.
I did this with the IoTaskPool in my bevy_slippy_tiles plugin, which downloads map tiles from the web asynchronously: https://github.com/edouardpoitras/bevy_slippy_tiles/blob/main/src/systems.rs

GitHub

A helper bevy plugin to handle downloading OpenStreetMap-compliant slippy tiles - bevy_slippy_tiles/systems.rs at main · edouardpoitras/bevy_slippy_tiles

#

The polling itself I did this way:

pub fn download_slippy_tiles_completed(
    ...,
    mut slippy_tile_download_tasks: ResMut<SlippyTileDownloadTasks>,
    ...)
{
    for task in slippy_tile_download_tasks.0.iter_mut() {
        if let Some(SlippyTileDownloadTaskResult { path }) =
            future::block_on(future::poll_once(task))
        {
            // Do stuff with `path` (the result of the async task).
#

task here is the async task I scheduled and threw into the SlippyTileDownloadTasks resource.
task is of type SlippyTileDownloadTaskResult { path: PathBuf } (what is returned from my async fn).

#

This is using the futures_lite library for the polling

woven crane
wind solstice
# woven crane Actually, the bevy examples are way cleaner: https://github.com/bevyengine/bevy/...

Thank you for the response. I am trying to implement this example. BTW: I might have failed to mention I am trying to get this working in WASM, which is adding layers of doubt everywhere.
I am encountering a struct returned by asynccomputetaskpool that is called FakeTask. There is no information anywhere on what this is. It’s causing mismatched types at commands.spawn(ComputeTransform(task)); Do you know what that is?

woven crane
#

It's entirely possible WASM doesn't yet support multiple threads (the task pools)

#

The proposal is to create something like a wasm_task_pool::{Scope, TaskPool, TaskPoolBuilder} but there doesn't seem to be any movement on it recently.

#

From the code: ```/// Spawns a static future onto the JS event loop. For now it is returning FakeTask
/// instance with no-op detach method. Returning real Task is possible here, but tricky:
/// future is running on JS event loop, Task is running on async_executor::LocalExecutor
/// so some proxy future is needed. Moreover currently we don't have long-living
/// LocalExecutor here (above spawn implementation creates temporary one)
/// But for typical use cases it seems that current implementation should be sufficient:
/// caller can spawn long-running future writing results to some channel / event queue
/// and simply call detach on returned Task (like AssetServer does) - spawned future
/// can write results to some channel / event queue.

wind solstice
woven crane
#

Yup, see comments above. Essentially what jdm mentioned regarding channels

#

Sorry for the wild goose chase

#

From what I gather: 1) Spawn task and detach, 2) Have task write to a channel stored in a resource, 3) Have another system listen for events on that channel

wind solstice
# woven crane Sorry for the wild goose chase

It all good. Async is really adding a weird element to all of this, plus I don’t fully grasp bevy yet, so I’m somewhat in the weeds. I just keep banging away and eventually this thing will compile and run lol. I don’t really understand the channels yet. I need a good example. So for channels do I just completely walk away from the Async task pool?

wind solstice
#

Oh so don’t walk away from Async task pool. Just detach

woven crane
#

I'm not sure if there's a way to do it without the task pool. Yeah, you need async, and the task pool is currently the easiest way to do that.

wind solstice
#

Easier said than done, but at least it’s somewhere to dig. I’ll keep working and if I succeed will report back.

woven crane
#

Here's what I think: You want to create your channel transmitter/receiver once, then store them in a resource. One of your systems creates a task, that task uses the transmitter to send an event when completed with the results. Another system uses the receiver and polls for events.

#

There's some subtleties there with regards to ownership - I forget if you can just clone the transmitter before moveing it to the async task... hopefully that's the case

wind solstice
wind solstice
woven crane
#

Yeah, I think you can just clone the transmitter: ```
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();

And that should keep it happy.
wind solstice
woven crane
#

No problem, good luck 👍

wind solstice
#

I am having trouble passing the tx into the rust system. I’m sure it’s just syntax.

woven crane
#

Share a minimal example if you get a chance, maybe someone will spot something

wind solstice
#

I started another thread specifcaly on that issue and I keep getting the same advice, which is to insert the tx and rx as system resources and pass them into the system functions. But I am having trouble doing that. It all seems so straight forward if it wasnt for the async. The documentation is so thin, which is a bit of a downside to bevy, but on the other hand I really enjoy the hard earned learning that comes from fighting these kinds of issues. So anyway, I can create the tx/rx any number of ways... It's just getting it into the system that is killing me.

tidal island
#

What problems do you have when you try to follow the advice?

wind solstice
#

Hey thanks for responding... I guess now is as good a time as ever.... So I'll send over some excerpts from my code.

#

I've got these structs:

#

#[derive(Resource, Clone)]
pub struct GameStateData {

pub wheel_speed : Arc<Mutex<i8>>,

}

impl Default for GameStateData {

fn default() -> Self {
    
    let wheel_speed = Arc::new(Mutex::new(1)); 
    Self { wheel_speed, }       
}

}

#

and then I call these in the app

#

.init_resource::<GameStateData>()

#

.add_startup_system(|| { let (tx, rx) = crossbeam_channel::unbounded::<GameStateData>();})

#

and this is essentially where I am hung up....... Because I don't know how to get tx and rx be callable in bevy system....

#

I want to pass that wheel_speed i8 from aync block where it is acquired to the sync system...

#

I'd like to be able to pass a clone of crossbeam::sender into the async block and then use it.

#

The receive I havent even started to consider, but I hope it will be easier once I get the tx in place.

#

I've been trying things left and right so It's been good to help be understand the system more.

wind solstice
tidal island
#

I'm seeing a mixture of different solutions here. Ideally, your resource contains the tx and rx values, and you use commands.insert_resource(game_state_data) inside your startup system. That allows other systems to use Res<GameStateData> arguments and access the tx and rx values that are stored within it.

#

If you're using channels then I would get rid of the arc/mutex.

wind solstice
#

Are tx and rx stored in GameStateData?

#

Ok on getting rid of arc and mutex... I just wanted to be sure

tidal island
#

And your channel should send the data that needs to be communicated, not the GameStateData resource

wind solstice
#

it the being able to call tx inside system function that I cant quite get

#

I can simplify to only channels and the basic data....

tidal island
#

fn system(data: Res<GameStateData>) { data.tx.send(5); }

wind solstice
#

It's the calling tx that seems to be the rub for me... I can pass the sender clone into the async block, because I can't call it in system function call

tidal island
#

You should be able to clone tx and move the clone into the async block.

wind solstice
#

ok wait. If it's that simple

#

lol

#

So the tx get associated with GameStateData when I create it?

#

Is the closure ok for creating it?

tidal island
#

Yep!

#

You may want to separate your game state data from the channels, using two different resources

wind solstice
#

wow.. ok I am praying this works.... and thank you. I will be happy to put this async piece behind me and move into actually bevy stuff

tidal island
#

GameStateData and AsyncDataChannel or something

wind solstice
#

How would I associate those?

tidal island
#

Systems that want to read the current data use res<GameStateData>, systems that want to use the channels use res<AsyncDataChannel>, and the receiver system uses resmut<GameStateData> to update the stored values

wind solstice
#

ah I see...

#

Ok.. well I going to try that now.

#

error[E0609]: no field tx on type bevy::prelude::Res<'_, GameStateData>

tidal island
#

What's your GameStateData struct?

#

If it doesn't have tx and rx fields, it's not going to work 🙂

wind solstice
#

#[derive(Resource, Clone)]
pub struct GameStateData {

pub wheel_speed : Arc<Mutex<i8>>,

}

impl Default for GameStateData {

fn default() -> Self {

    let wheel_speed = Arc::new(Mutex::new(1)); 
    Self { wheel_speed, }
}

}

#

it's this

#

now sans the arc and mutex

#

back to simple i8

tidal island
#

struct AsyncChannelData {
tx: Sender<i32>,
rx: Receiver<i32>,
}

wind solstice
#

ok.... but not sender<i8> in this case?

tidal island
#

Sure, i8 is fine

wind solstice
#

and then when this

#

.insert_resource(GameStateData { wheel_speed : 1 })

#

how to I define the tx and rx in that?

tidal island
#

.insert_resource(AsyncChannelData { tx, rx })

#

After you've done let (tx, rx) = channel::<i8>()

wind solstice
#

Ok . I think this is it....

tidal island
#

Your startup system needs a mut commands: Commands argument so you can call insert_resource though.

wind solstice
wind solstice
tidal island
#

AsyncChannelData needs to derive Resource

#

But I'm just guessing at the problem without actual errors

wind solstice
#

What does that mean?

#

oh hold on

#

error[E0277]: the trait bound AsyncDataChannel: bevy::prelude::Resource is not satisfied
--> src/main.rs:112:30
|
112 | commands.insert_resource(AsyncDataChannel { tx , rx });
| --------------- ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ the trait bevy::prelude::Resource is not implemented for AsyncDataChannel
| |
| required by a bound introduced by this call
|
= help: the following other types implement trait bevy::prelude::Resource:
AmbientLight
AppTypeRegistry
Assets<T>
Audio<Source>
Axis<T>
ComponentUniforms<C>
DefaultImageSampler
Diagnostics
and 105 others
note: required by a bound in bevy::prelude::Commands::<'w, 's>::insert_resource

#

What does AsynChannelData needs to drive the resource mean?

#

oh derive

#

yeah

#

thats silly if I didnt do that let me see

#

ok.. It seems to be sending... I have no receiver so not 100% but this has been enlightening to the max

wind solstice
#

so the bevy and channel problem has definitely been solved by this help.. Now I still have a wasm issue, but things seem clear so maybe I can work through that.

wind solstice