#RxJS: how to merge two streams and preserve asynchronous behavior.

3 messages · Page 1 of 1 (latest)

unkempt acorn
#

I am using a library called observable-worker to conceptually turn a webworker into an RxJS observable. The worker will perform an intensive calculation that could take anywhere from a 300ms to 10 minutes depending on user input. The observable would emit members of an object called BuildRecipe I defined. This had been going fine until I wanted to expand the functionality a bit.

I wanted to display a number that increased with each step in the calculation. But with the observable-worker package my only access to the object containing the algorithm is through the observable it returns. I decided I would create two streams: one of type BuildRecipe and one of type number, merge them together and return that stream through the observable-worker functions. The stream of type number is called counter$. The stream of type BuildRecipe is called build$. getBuildStream returns an observable of type Observable<BuildRecipe>. workUnit is a function of an interface I get from observable-worker and it is the function that will be run on the created web-worker.

workUnit(input: InputChainData): Observable<BuildRecipe | number> {
    let counter$ = of(this.counter).pipe(delay(1000), repeat(100))
        let build$ = this.getBuildStream(userInput)
    return merge(counter$, build$)
}

this.counter is a number I increment at different points during the runtime of this.getBuilderStream(). The class implementing the observable-worker interface is located in my builder.worker file, while I display the information from the worker in my builder.component.ts file.

#

I encountered two problems:

  1. merge() is deprecated, but the documentation on the recommendations to replace merge don't seem to do what I want.
  2. When I subscribe to the merged function I only get output from build$ stream until it is finished.

I want the counter$ stream to begin emitting this.counter immediately and emit the latest value of this.counter every second until build$ as finished, and I want this to occur simultaneously as the build$ performs it's calculations. Finally, I would really like to accomplish this without completely scrapping observable-worker, but I'm beginning to think that may be impossible.

#

Here is the function that calls workUnit from within builder.component.ts and my subscription to the obvservable it returns.

let userInput: UserInputData = this.getUserInput()
fromWorkerPool<UserInputData, BuildRecipe | number>(
    () =>
        new Worker(
            new URL('./builder.worker', import.meta.url),
            {
                type: 'module',
            }
        ),
    userInput
).subscribe({
    next: (data) => {
        if (typeof data == 'number') {
            this.counter = data
        } else {
            this.buildsSource.data.push(data)
            //forces table to rerender
            this.buildsSource.data = this.buildsSource.data
        }
    },
    error: (error) => {
        this.stopWebWorker()  
        throw error
    },
    complete: () => {
        this.stopWebWorker()
    }
)