#Storing a stream as a member of a struct

12 messages · Page 1 of 1 (latest)

marble estuary
#

Howdy! ferrisYeehaw

I'm trying to grok async rust, and I have come across yet another question: Why can't I keep a stream as member of a struct?
Consider my first try:

use color_eyre::Result;
use futures::stream::StreamExt;
use tokio;

pub struct Book {}

impl Book {
    pub async fn fetch(id: usize) -> usize {
        id
    }
}

pub struct BookList {
    queue: Vec<usize>,
}

impl BookList {
    pub fn new(queue: Vec<usize>) -> Self {
        Self { queue }
    }
    pub fn fetch(&self) -> impl futures::Stream<Item = usize> {
        let mut work = vec![];
        for id in &self.queue {
            work.push(Book::fetch(*id));
        }

        futures::stream::iter(work).buffered(5)
    }
}

#[tokio::main]
async fn main() -> Result<()> {
    let booklist = BookList::new(vec![1, 2, 3, 4, 5, 6, 7]);

    let a = booklist.fetch().next().await;
    dbg!(a);
    let b = booklist.fetch().next().await;
    dbg!(b);

    Ok(())
}

This will output

[examples\async.rs:13:5] a = Some(
    1,
)
[examples\async.rs:15:5] b = Some(
    1,
)

because my stream gets created from scratch every time I call fetch. That's fair enough.

Moving on, I tried this:

pub struct Book {}

impl Book {
    pub async fn fetch(id: usize) -> usize {
        id
    }
}

pub struct BookList {
pub queue: Box<dyn futures::Stream<Item = usize>>
}

impl BookList {
    pub fn new(items: Vec<usize>) -> Self {
        let mut work = vec![];
        for id in items {
            work.push(Book::fetch(id));
        }

        let queue = Box::new(futures::stream::iter(work).buffered(5));

        Self { queue }
    }
}

use color_eyre::Result;
use futures::stream::StreamExt;
use tokio;

#[tokio::main]
async fn main() -> Result<()> {
    let booklist = BookList::new(vec![1, 2, 3, 4, 5, 6, 7]);

    let a = booklist.queue.next();
    dbg!(a);
    let b = booklist.queue.next();
    dbg!(b);

    Ok(())
}
#

So, I tried storing the Stream as part of my struct. This gives me pin-related errors that I don't understand. Like:

error[E0277]: `(dyn Stream<Item = usize> + 'static)` cannot be unpinned
   --> examples\async.rs:16:28
    |
16  |     let a = booklist.queue.next();
    |                            ^^^^ the trait `Unpin` is not implemented for `(dyn Stream<Item = usize> + 'static)`
    |
    = note: consider using the `pin!` macro
            consider using `Box::pin` if you need to access the pinned value outside of the current scope

In conclusion, it seems that, no matter what I do, I can't store a Stream in a struct.
I know that I can store a Stream as a variable inside the scope of a function. This could be used to fix my first example. But why can't I store it as a member variable of a struct?

rigid lantern
#

you need to use Pin<Box<dyn Stream<Item = usize>>> which you create using Box::pin(futures::stream::iter(work).buffered(5))

#

alternatively you can just call futures::stream::iter(work).buffered(5).boxed() which is a helper method on StreamExt to do just that

marble estuary
#

Whoa, that did the trick. What kind of magic is that?

So, taking a look at Pin, it says:

Pin is a wrapper around some kind of pointer Ptr which makes that pointer "pin" its pointee value in place, thus preventing the value referenced by that pointer from being moved or otherwise invalidated at that place in memory unless it implements Unpin.

Does this mean that I would not be able to move my booklist into a function call now?

rigid lantern
#

you can see the relevant impls for Pin<Ptr> and Box<S> here: https://docs.rs/futures/latest/futures/prelude/trait.Stream.html#impl-Stream-for-Pin<P>

what you'll notice is that Box<S> only implements Stream if S: Stream + Unpin, that is the stream type S needs to itself be Unpin, but Pin<Ptr> implements Stream as long as Ptr is Unpin. so for Pin<Box<S>> for example, that means that Box<S> needs to be Unpin, but S itself only needs to implement Stream, not Unpin. and wouldn't you know it, Box<S> always implements Unpin regardless of what S is

#

Unpin essentially means "it's safe to move this type". when you move a value, like let var2 = var1;, or by passing it to a function, the value may get a new address in memory. for most types that's not a problem, however for certain types it can be. in particular, some types are self-referential, so that you might have one field in a struct that points to another field within the same struct, and if the value changed address, that pointer would get invalidated. so if a type is !Unpin, it means that it's not safe to move it (once you've initialised it)

#

Box<S> is always Unpin because a box is just a pointer, and you're just moving the pointer not the data it's pointing to. it's always safe to move a pointer

marble estuary
#

Hey, that's cool!

I just tested the following:

async fn move_test(mut booklist: BookList) {
    let c = booklist.queue.next().await;
    dbg!(c);
}

#[tokio::main]
async fn main() -> Result<()> {
    let mut booklist = BookList::new(vec![1, 2, 3, 4, 5, 6, 7]);

    let a = booklist.queue.next().await;
    dbg!(a);
    let b = booklist.queue.next().await;
    dbg!(b);

    move_test(booklist).await;

    Ok(())
}

From the quote I shared, I didn't expect it to work. But it does, and that makes sense given your explanation!

Thank you so much, I've spent the last couple of days running into async wall after async wall.

rigid lantern
#

yeah, it's important to remember than when you have a variable my_var which is Pin<Ptr> (for some pointer type, so for example Pin<Box<T>> or Pin<&mut T>), it's not preventing the variable my_var itself from being moved. what Pin does is it essentially serves as a promise - "the data that i am pointing to will not be moved for as long as it's alive". and that promise is something that unsafe code can rely on to know that you can't invalidate a value by moving it in safe rust

#

pinning is really a lot less magical than it might seem at first glance. it's similar to eg NonZero<T>. just like NonZero<T> is a promise that the value it contains can't be zero, and it has an unsafe NonZero::new_unchecked method that allows unsafe code to say "i promise that this value is always non-zero", Pin<Ptr> is a promise that the value pointed to by Ptr will never be moved, and it has an unsafe Pin::new_unchecked that allows unsafe code to say "i promise that the value this pointer points to will never be moved"

marble estuary
#

That all makes sense the way you write it. I think I'll need to work quite a bit with these things to know all of this intuitively, though.