#Multiple Kafka consumers

14 messages · Page 1 of 1 (latest)

ebon gull
#

How can i create Kafka consumer with different consumer group ids in one nestjs application
and also how can i consume messages as a batch

zenith harbor
#

hi, have you had a change to find out the asnwer?

full gulch
#

Neither one can be done with the native implementation. Last time we needed that on a project, we had to create a custom adapter for it. Sadly it's proprietary code so I can't share

merry zealot
#

It's currently 1.5 years later. Does anyone know if it is still not possible to have multiple Kafka consumers with different groupIds in one application?

Edit:
I'd like this because my app is small enough to be a modular monolith, however, I have some events which are slow to process, which delay the rest of the application

shrewd basin
full gulch
#

There has been this change recently - https://github.com/nestjs/nest/pull/14606
You should now be able to register the kafka transport multiple times with different transport ids (that you have to use in the respective decorators), which should allow you to have multiple consumer groups in a single application

GitHub

PR Checklist
Please check if your PR fulfills the following requirements:

The commit message follows our guidelines: https://github.com/nestjs/nest/blob/master/CONTRIBUTING.md
Tests for the chan...

merry zealot
#

That really sounds like what I need. Is there a way to find out if its already released? I don't see any of it in the online documentation yet.

full gulch
merry zealot
#

Thanks for the help. It does not resolve the issue though. Upon inspection, it allows me to set:

@EventPattern('identical-topic-name', Transport.KAFKA) and
@EventPattern('identical-topic-name', Transport.NATS)

But not
@EventPattern('identical-topic-name', KAFKA_1) and
@EventPattern('identical-topic-name', KAFKA_2)

However, in my situation, i have a modular monolith, where it should make sense to have separated consumers for the separated modules.

full gulch
#

The former was always supported, The latter should be enabled by that feature, if I understand it correctly. I have not tested it myself though.

#

The latter can also be supported by extending ServerKarka and forcefully overriding the transport id (that's what I've done in the past and it worked as per the second example).

merry zealot
#
class KafkaConsumer extends ServerKafka {
  constructor(protected readonly options: Required<KafkaOptions>['options']) {
    super(options);

    this.transportId = Transport.KAFKA;
  }
}

export const consumer1 = new KafkaConsumer({
  client: {
    brokers: ['localhost:9092'],
  },
  consumer: {
    groupId: 'consumer1',
  },
});

export const consumer2 = new KafkaConsumer({
  client: {
    brokers: ['localhost:9092'],
  },
  consumer: {
    groupId: 'consumer2',
  },
});


@Controller()
export class NotificationsController {
  private readonly logger = new Logger(NotificationsController.name);

  constructor(private readonly socketGateway: SocketGateway) {}

  @EventPattern('topic', consumer2)
  handleNotification(
    @Payload() data: { userId: string; ... },
  ) {
    this.socketGateway.emitMessageToUser(data.userId, 'topic');
  }

I am not sure if im understanding. The above is what I did understand from what you explained

full gulch
#

You have to write this.transportId = "KAFKA_1" and also static transportId = "KAFKA_1" IIRC

#

If it complains with type errors, cast it to any, it will work