#Reactive programming with NestJs

29 messages · Page 1 of 1 (latest)

tawdry blade
tranquil ibex
#

For those of us who haven't used it, would you like to explain how WebFlux from SpringBoot works?

rough temple
#

@tawdry blade - you can do reactive programming in Nest. It most widely common in a Websocket Gateway. https://docs.nestjs.com/websockets/gateways. When you use RxJS observables in your handlers, that is basically Reactive programming. Not sure if that is what WebFlux does though.

#

Also, because of Node/ JavaScript, Nest is inherently asynchronous, which I believe is a key reason for WebFlux ( after reading about it some).

tawdry blade
tranquil ibex
#

You can return an observable and Nest will get the last value emitted from it. That already exists. Am I missing something?

tawdry blade
tranquil ibex
#

Again, can you explain what WebFlux is for those of us (like myself) who have not used it?

tawdry blade
tranquil ibex
#

And what do these reactive controllers do? Why is it useful?

#

And if you say "It allows for reactive programming" then I'm afraid you don't understand the question I'm asking

tawdry blade
# tranquil ibex And what do these reactive controllers do? Why is it useful?

In usual HTTP response you have a response and that's it the end (blocking)!
But when you use Reactive endpoints (non-blocking) you can receive a stream (example : 1 then 2 then 3 then 4 then complete and this is the end)
This is useful because you can use Reactive endpoints (use HTTP protocol) instead of using WebSocket (other protocol ws & wss)

tranquil ibex
#

Okay, so for a non-webclient HTTP call to a WebFlux endpoint, just a standard curl or xh request from the command line, what do the responses look like? Is it several responses or a single response to them?

#

If you really need data streaming (which seems to be along the idea of what you're looking for) then there is the chapter on Streaming Files, which, while originally written for the sake of streaming file data, it can be used for generalized streams as well, and intermixed with RxJS

tawdry blade
#

This an example (Springboot with WebFlux), the controller :

#

The controller with the endPoint : "host/example/all" will emit a number every 1 second

#

In the browser what i see (every 1 second the end point emit an item):

#

Header response content :

tranquil ibex
#

Okay, give me a moment because I think Nest can handle this with streaming. I just need to set up a test first

tawdry blade
#

okay

tranquil ibex
#

Okay, I was having issues doing it with a streamable file, unfortunately, as that interface expects the file to be fully read and/or readable and not written to in the process. I was able to write an implementation that mimicked that of WebFlux, at least in terms of streaming out the responses, and it might be possible to make this something that Nest can handle, but I'd need to play around with it more. But the simple implementation would look something like this:

import { Controller, Get, Res } from '@nestjs/common';
import { AppService } from './app.service';
import { interval, take } from 'rxjs';
import type { Response } from 'express';

@Controller()
export class AppController {
  constructor(private readonly appService: AppService) {}

  @Get()
  getHello(): string {
    return this.appService.getHello();
  }

  @Get('example/all')
  getAll(@Res() res: Response) {
    res.writeHead(200, {
      'content-type': 'application/json',
      'transfer-encoding': 'chunked',
    });
    res.write('[');
    const sub = interval(1000)
      .pipe(take(10))
      .subscribe({
        next: (val) => {
          console.log(`Writing ${val} to buffer`);
          res.write(val.toString());
          if (val !== 9) {
            res.write(',');
          }
        },
        error: () => sub.unsubscribe(),
        complete: () => {
          console.log('Ending the stream');
          res.end(']');
        },
      });
    sub.add(() => sub.unsubscribe());
  }
}
tawdry blade
tranquil ibex
#

And again, it might be possible to make Nest act this way, but I'd need to come up with a solution for that, as currently it doesn't exist. It's an interesting concept, and one I'm not super familiar with, nor do I see often in the JS world as most interactions server-side (in JS) are non-blocking, due to Node's asynchronous nature

#

I can look into creating an implementation for it

rough temple
# tawdry blade The controller with the endPoint : "host/example/all" will emit a number every 1...

If you ask me, this is a problem that doesn't need solving with NestJS. I've been researching the reasoning behind the creation of WebFlux and it is to offer asynchronicity via an event loop (go figure) over the usual MVC thread-per-request klunkiness of how Springboot generally works. This isn't a problem needing solving with Node/ JavaScript because the stack is asynchronous at its core.

In the example code given, from what I'm understanding in my research, the updating of the chunked encoding is working, but is a terrible situation in general for the web server, because it is a blocked HTTP connection, as if you were downloading a file, the connection won't close until the file transfer is finished i.e. the last bracket in the json output is sent. This is terribly bad for a web backend for sure.

WebFlux should know to upgrade the connection to at least SSE, to avoid this bad chunked transfer situation. The example code given should look something more like this, to get the SSE upgrade:

@GetMapping(value = "/all", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Long> streamNumbers() {
    return Flux.interval(Duration.ofMillis(1000));
}

This gives a response back every 1 second over SSE (which is the idea behind all this, i.e. update the client with data "semi-reactively") is also already possible with Nest with SSE too (along with Websockets). Btw, WebFlus uses the Netty webserver, which strongly supports websocket connection. The "streaming" bit is just how WebFlux handles the data in the example. Take that out of the equation and you can reach the same solution with Nest's own SSE solution, which I forgot to mention is also a possibility for reactive programming in Nest.

This is why your issue got closed on Github and you were asked to discuss it here. So, welcome to the wonderful world of Nest. 🙂

#

This is how the same effect can be achieved with Nest and SSE, assuming we can agree that a streaming set of chunks ovver HTTP is a bad scenario for any webserver:

import { Controller, Sse, MessageEvent } from '@nestjs/common';
import { interval, Observable } from 'rxjs';
import { map } from 'rxjs/operators';

@Controller('example')
export class ExampleController {

 // 1. The @Sse decorator automatically sets:
 //    Content-Type: text/event-stream
 //    Transfer-Encoding: chunked
 //    Connection: keep-alive
 @Sse('all')
 getAllNumbers(): Observable<MessageEvent> {
   
   // 2. interval(1000) is the RxJS equivalent of Flux.interval(Duration.ofMillis(1000))
   return interval(1000).pipe(
     
     // 3. We map the raw number (0, 1, 2) into the SSE format object
     map((num) => ({
       data: { number: num } 
     })),
   );
 }
}

If anyone caught it, I also wrote "semi-reactively" above. That is because I also learned something too. That a pure bi-directional transfer with SSE or websockets isn't completely reactive. A reactive connection to the client can also "push back" when the client is overwhelmed. I haven't yet looked into how that can be handled with Nest and am not sure how it can be handled with WebFlux either.

tranquil ibex
#

Pretty much the same conclusions I came to as well. Wanted to show that it could be done with Observables and the res, but I do agree that generally it shouldn't be necessary in the Node.js world, which already sends everything as transfer-encoded: chunked and has a very strong event loop to work off of

tawdry blade
#

Looks like NestJs handle the "reactive controllers" ..
The problem is solved then ..
I will search for the performance of using SSE with HTTP protocol ('Content-Type: text/event-stream' and 'transfer-encoding:chunked') and webSocket with WS protocol
But i think it's confusing to use a decorater '@Sse' instead of '@Get' and what about '@Post', ... ? I think a configution in 'main.ts' much better, this configuration will indicate if you want the default behaviour (discrabed in nestJs documentation) or "reactive return" when the return is an Observable stream.