#Are there any pitfalls when using .parallel from Stream API when dealing with blocking requests?

1 messages · Page 1 of 1 (latest)

stone lagoon
#

Hi guys, some context to the question. I have a Client class that is doing a blocking request to some external API. I'm using RestClient in my project. Due to constraints, I'd like to stay with RestClient for this task, I know that WebClient would be a good fit for this situation but I'd like to get to know if there is a good solution to that problem with RestClient.

This is the code snippet in which I'd like to insert .parallel()

public List<GithubRepoResponseDto> getUserReposWithoutForks(String user) {

       List<GithubRepo> allRepos = githubClient.getUserRepos(user);

       List<GithubRepoResponseDto> result = allRepos.stream()
                .filter(repo -> !repo.fork())
                .parallel()
                .map(this::mapToRepoResponse)
                .toList();

        return result;

method "mapToRepoResponse" is making another external request to pull missing data that is accessible only after the first request.

edgy harborBOT
# stone lagoon Hi guys, some context to the question. I have a Client class that is doing a blo...

Detected code, here are some useful tools:

[WARNING] The code couldn't end properly...

Problematic source code:

public List<GithubRepoResponseDto> getUserReposWithoutForks(String user) {

       List<GithubRepo> allRepos = githubClient.getUserRepos(user);

       List<GithubRepoResponseDto> result = allRepos.stream()
                .filter(repo -> !repo.fork())
                .parallel()
                .map(this::mapToRepoResponse)
                .toList();

        return result;```
Cause:
The code doesn't compile, there are syntax errors in this code.

## System out
[Nothing]
#

<@&987246883653156906> please have a look, thanks.

stone lagoon
#

In case it matters, here is the method:

 private GithubRepoResponseDto mapToRepoResponse(GithubRepo repo) {
        log.trace("Fetching details for repo: {}", repo.name());

        List<GithubBranchResponseDto> branches = githubClient.getRepoBranches(repo).stream()
                .map(branch -> new GithubBranchResponseDto(branch.name(), branch.lastCommitSha().sha()))
                .toList();

        return new GithubRepoResponseDto(
                repo.name(),
                repo.owner().login(),
                branches
        );

I did think about using virtual threads for this one but it seems that it is accomplishing the same thing (unless there are any pitfalls) that .parallel() does while having a less clear code and a few more lines. My take with virtual threads here:

    private static final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();

    List<CompletableFuture<GithubRepoResponseDto>> futures = allRepos.stream()
                .filter(repo -> !repo.fork())
                .map(repo -> CompletableFuture.supplyAsync(() -> mapToRepoResponse(repo), executor))
                .toList();

    return futures.stream()
                .map(CompletableFuture::join)
                .toList();

My question is: is using .parallel() here a good practice? My understanding is that it could starve the thread pool as it uses the conventional OS thread pool rather than JVM one that virtual threads use. So if it happens that there is more requests than available OS threads to use, everything is halted while the app is waiting for the response from the API, but I also don't like how to code with virtual threads reads, seems that there could be some things in it which I'm missing and might create an issue down the line

edgy harborBOT
# stone lagoon In case it matters, here is the method: ```java private GithubRepoResponseDto ...

Detected code, here are some useful tools:

Formatted code
private GithubRepoResponseDto mapToRepoResponse(GithubRepo repo) {
  log.trace("Fetching details for repo: {}", repo.name());
  List<GithubBranchResponseDto> branches = githubClient.getRepoBranches(repo).stream().map(branch -> new GithubBranchResponseDto(branch.name(), branch.lastCommitSha().sha())).toList();
  return new GithubRepoResponseDto(repo.name(), repo.owner().login(), branches);
mystic valve
#

My experience is if you can't test it, and can't benchmark it and you can't see real-time significant improvement then don't use #parallel. It's almost always a bad choice.

stone lagoon
#

As to in general or .parallel() of Stream API specifically?

mystic valve
#

just the .parallel method of the Stream API

#

It's usually best to forget it exists.

stone lagoon
#

That's interesting

mystic valve
#

Maybe that will change in the future but since it came out, it's been shit.

stone lagoon
#

So given the constraints here of using RestClient, is there a better way to make the requests in parallel?

#

Doing it with virtual threads how I did it here is I guess one way, but as I didn't know about .parallel() it might also have its own quirks that I'm not aware of

mystic valve
#

Since you're pummeling github I assume there is probably a rate limit anyway. But virtual threads is the answer to this kind of IO

stone lagoon
#

Yeah, github is serving as dummy data I guess. I'm mostly interested about the idea overall. In other scenario I might hit my own endpoint for which I do not have rate limiting by design, so thinking how to handle those situations

#

I'm not really that familiar with using virtual threads, would this implementation be somewhat acceptable?

    private static final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();

private ....() {
    List<CompletableFuture<GithubRepoResponseDto>> futures = allRepos.stream()
                .filter(repo -> !repo.fork())
                .map(repo -> CompletableFuture.supplyAsync(() -> mapToRepoResponse(repo), executor))
                .toList();

    return futures.stream()
                .map(CompletableFuture::join)
                .toList();
}

I did some reading about it and it seems that there is also a quirk with exceptions where they are not passed as is but rather packed over CompletionException? Assuming that the request to API within the getRepoBranches throws a custom exception for 404 let's say

mystic valve
#

That I don't know. Don't be throwing exceptions while using streams.

mystic valve
#

I'm not really that familiar with using virtual threads, would this implementation be somewhat acceptable?

sorry, didn't see this. Yes, this seems fine. As far as using streams and network stuff, there's always a chance for exception so either handle it in the supplier or return a Result type, which can accept the thing you want or an error/exception message.

stone lagoon
#

Thanks a lot! Will take a look at it and explore it more