#RxJS StartWith + debounce

16 messages · Page 1 of 1 (latest)

ebon kestrel
#

I got streams of data represented by data1$, data2$ & data3$. When a stream emits I need to execute a costly function.
I'm struggling to create the right RxJs stream with theses contraints :

  • Because the data streams can emit multiple times I need to add a debounceTime to prevent the execution of the costyle function
  • The final stream should emit a state (represented by the DataState type). The state should be 'Loading' when a data stream emits then it should be 'Ok' when the costly function has been done
nimble crag
#

Please post your code as text, so that we can copy and paste part of your code.

somber masonBOT
#

Hi @ebon kestrel, you can use the following snippet to place your code in a syntax highlighted codeblock. Replace ts with the language you need (i.e. html, js, css, etc.)

```ts
// your code goes here
```

This example:
**```ts
const greet = (name: string) => console.log(`Hi ${name}, welcome to the Angular Community Discord!`);

greet("Tom"); // "Hi Tom, welcome to the Angular Community Discord!"
```**

will appear in a post like this:

const greet = (name: string) => console.log(`Hi ${name}, welcome to the Angular Community Discord!`);

greet("Tom"); // "Hi Tom, welcome to the Angular Community Discord!"
ebon kestrel
#

Oh ok sorry for that

#
import {
  BehaviorSubject,
  combineLatest,
  tap,
  debounceTime,
  startWith,
  map,
} from 'rxjs';

let data1$ = new BehaviorSubject<number>(0);
let data2$ = new BehaviorSubject<number>(0);
let data3$ = new BehaviorSubject<number>(0);

function costlyProcess(data1: number, data2: number, data3: number) {}

type DataState<T> =
  | { state: 'Loading' }
  | { state: 'Error'; error: Error }
  | { state: 'Ok'; data: T };

let updateState$ = combineLatest({
  data1: data1$,
  data2: data2$,
  data3: data3$,
}).pipe(
  debounceTime(1000),
  tap((val) => {
    costlyProcess(val.data1, val.data2, val.data3);
  }),
  map(() => {
    return {
      state: 'Ok',
      data: undefined,
    } as DataState<undefined>;
  }),
  startWith({ state: 'Loading' } as DataState<undefined>)
);

updateState$.subscribe();
nimble crag
#

If your costlyProcess is really something that is costly in terms of CPU and needs to be run in the background in order not to freeze the UI, then RxJS is not what will help you. You need to either do it on the server, or to use a web worker.
If it's costly because it makes one or several requests to a backend, then it should return an Observable, and you can then use something like this:

import {
  BehaviorSubject,
  combineLatest,
  tap,
  debounceTime,
  startWith,
  map, switchMap, of, delay,
} from 'rxjs';

let data1$ = new BehaviorSubject<number>(0);
let data2$ = new BehaviorSubject<number>(0);
let data3$ = new BehaviorSubject<number>(0);

function costlyProcess(data1: number, data2: number, data3: number) {
  return of('result').pipe(delay(3000));
}

type DataState<T> =
  | { state: 'Loading' }
  | { state: 'Error'; error: Error }
  | { state: 'Ok'; data: T };

let updateState$ = combineLatest({
  data1: data1$,
  data2: data2$,
  data3: data3$,
}).pipe(
  debounceTime(1000),
  switchMap(({data1, data2, data3}) =>
    costlyProcess(data1, data2, data3).pipe(
      map(() => {
        return {
          state: 'Ok',
          data: undefined,
        } as DataState<undefined>;
      }),
      startWith({ state: 'Loading' } as DataState<undefined>)
    )
  )
);

updateState$.subscribe();
ebon kestrel
#

costlyProcess is basicaly drawing some shapes on a map using openlayer library. And I don't want this process to be executed multiple times if the data changes very fast

nimble crag
#

OK. But if it's synchronous, then there's no way your loading state can ever have any effect. So you can forget about the loading state completely.

ebon kestrel
#

It is synchronous but I can convert it to an async function because it can take some time and I don't want to freeze

#

I just tried your solution but I don't understand something. When some new data is emitted the same keeps the value "Ok" instead of becoming "Loading" during the execution of costlyProcess then becoming "Ok"

#

Maybe I should create an observable that emit a loading state, then execute costlyProcess then emits state "ok" and finally ends

#

🤔

#

and use this observable in the switchMap after debounceTime

nimble crag
#

If your costlyFunction is still synchronous, then as I said earlier, there's no way your loading state can ever have any effect.

#

It is synchronous but I can convert it to an async function because it can take some time and I don't want to freeze
Making it an async function won't preven your UI to freeze. If you execute something that takes 2 seconds, then nothing else can happen during those two seconds.

ebon kestrel
#

Oh you're right my bad