#Chain iterator of streams and stop early if `None`

26 messages · Page 1 of 1 (latest)

rugged dagger
#

I have a function load_games_page which loads a single page of a website. And this returns each "game" on that page in a boxed stream of futures.

I have another function load_games which I want to load all games (up to 5 pages) as a single boxed stream of futures.
Though I'm stuck at how I could do this exactly. I've posted my current code, but not sure what's the right approach for this. Any ideas?

/// Returns a stream of futures of `Game`s.
/// If no games are available on this page, `Ok(None)` is returned.
async fn load_games_page(
    &self,
    sport: Sport,
    page: u16,
) -> Result<Option<BoxStream<'_, BoxFuture<Result<Game>>>>> {
    // ...
}

async fn load_games(&self, sport: Sport) -> Result<BoxStream<'_, BoxFuture<Result<Game>>>> {
    let workers: FuturesOrdered<_> = (0..=5) // load 5 pages
        .into_iter()
        .map(|page| self.load_games_page(sport, page))
        .collect();
    let pages: Vec<_> = workers
        .try_take_while(|res| future::ready(Ok(res.is_some()))) // if ones page is empty, stop the iterator early
        .try_collect()
        .await?;

    Ok(pages...) // chain streams into one continuous stream
}
lofty gyro
#

The structure here is a little confusing. My first thought is that load_games should be called by load_games_page rather than vice versa.

#

Is the page number a fundamental part of the data here, or are you just taking the first n games and calling it a page?

rugged dagger
rugged dagger
lofty gyro
#

Ahh, scraping? That makes a bit more sense why it seems backwards.

rugged dagger
#

Yeah exactly. The code is acting as a http client rather than server

lofty gyro
#

What's your primary potential failure condition then, are you just catching network faults or is there the potential for logic errors and/or valid empty responses?

rugged dagger
#

I'm just using ? operator on reqwest http calls. That's pretty much it.
And I have a line to return None if there's no games:

// fn load_games_page
if root.attachments.markets.is_empty() {
    return Ok(None);
}
#

fn load_games_page function is done, my goal is just to load all pages and chain them as a single stream. But if one of the pages has no games, then stop the iterator early (as to not load later pages since we know we've reached the last page).

#

The function body of fn load_games is just what I've got so far, but can be completely rewritten, I just wanted to provide some code to get the idea acorss

lofty gyro
#

If I understand correctly, your use of try_collect() should handle the early termination as long as you return Err from load_games_page.

#

But I'm assuming you want to keep the results in the first requests if a later one failed?

rugged dagger
#

If any fail, I am fine with returning an error early so no results come through

#

If the page doesn't exist, it shouldn't return error from load_games_page, instead it'd return Ok(None), which is why I call .try_take_while

lofty gyro
#

Yep, okay I get what you're saying. I'm not that familiar with the futures crate overall, but it sounds like you actually already have your solution, what's the issue?

#

It's the chaining part you're stuck on?

rugged dagger
#

So let pages: Vec<_> = is actually a Vec<Option<_>>. I tried to .try_collect::<Option<Vec<_>>>(), but apparently that's not supported

#

And yes also, I'm not sure how to chain this vec into a flattened stream

rugged dagger
#
let workers = ...; // some stream of Result<Option<T>>

let items = workers
  .try_collect::<Vec<_>>()
  .await?
  .into_iter()
  .collect::<Option<Vec<_>>>();

Is there any way to avoid the into_iter here? I want to try collect into a Option<Vec<_>>, but it seems like it's not supported.

lofty gyro
#

Instead of collecting and then chaining, you can use Stream::flatten() to flatten your stream of streams, but from a quick look around I think you're actually dealing with a TryStream, so you'll need to use into_stream before you can flatten them.

#

As I said before, I'm not that familiar with futures so this is a guess, but I think the pattern you're looking for is something like

Ok(
  workers
    .try_take_while(...)
    .await?
    .into_stream()
    .flatten()
)
rugged dagger
#

Okay I'll try it now

#
async fn load_games(&self, sport: Sport) -> Result<BoxStream<'_, BoxFuture<Result<Game>>>> {
    let mut workers: FuturesOrdered<_> = (0..=5)
        .into_iter()
        .map(|page| self.load_games_page(sport, page))
        .collect();

    let mut pages = Vec::new();
    while let Some(Some(page)) = workers.next().await.transpose()? {
        pages.push(page);
    }

    Ok(futures::stream::iter(pages).flatten().boxed())
}