#Custom opentelemetry context propagation RxJS

6 messages · Page 1 of 1 (latest)

high mortar
#

Hello, I am implementing opentelemetry to get traces of my distributed system.
One of the core feature in context propagation. I won't enter into details here.
What I am having trouble with is that I need to extract the context in my server service to popagate the context in this service.
I am using a custom transporter to communicate between 2 nestjs services.
To try to propagate the context, I am using a global interceptor.

intercept(context: ExecutionContext, next: CallHandler) {
    try {
      const type = context.getType();

      const protectedKeys =
        this.reflector?.get<string[]>('keys', context.getHandler()) ?? [];

      let span: Span | null = null;
      if (type === 'rpc') {
        // fetch data
        const infosData = this.getTcpInfos(context, protectedKeys);
        const data = context.switchToRpc().getData();

        const activeContext = propagation.extract(
          OTContext.active(),
          data.headers,
        );
        // HERE: try to propagate opentelemetry context through RxJS Observable
        return OTContext.with(activeContext, () => {
          return next.handle();
        });
...

Using this code, the context does not propagate through the RxJS Observable.
What I do not understand is that if I add the exact same code within a controller, it works well.

Does anyone have an idea why the context does not propagate through the Observable ? Do I miss something ?
Maybe my logic should be moved somewhere else ?

paper siren
#

Have you tried to look if there's an existing RxJS intrumentation that you can use?

#

But I've had a similar problem with observables and AsyncLocalStorage (which OTEL uses, too) in my library nestjs-cls

high mortar
#

I didn’t find any instrumentation for rxjs

high mortar
#

amazing, wrapping the next.handle into another observer works great!