#Multiple Kafka consumers
14 messages · Page 1 of 1 (latest)
hi, have you had a change to find out the asnwer?
Neither one can be done with the native implementation. Last time we needed that on a project, we had to create a custom adapter for it. Sadly it's proprietary code so I can't share
It's currently 1.5 years later. Does anyone know if it is still not possible to have multiple Kafka consumers with different groupIds in one application?
Edit:
I'd like this because my app is small enough to be a modular monolith, however, I have some events which are slow to process, which delay the rest of the application
perhaps similar to this issue?
https://github.com/nestjs/nest/issues/13421
There has been this change recently - https://github.com/nestjs/nest/pull/14606
You should now be able to register the kafka transport multiple times with different transport ids (that you have to use in the respective decorators), which should allow you to have multiple consumer groups in a single application
PR Checklist
Please check if your PR fulfills the following requirements:
The commit message follows our guidelines: https://github.com/nestjs/nest/blob/master/CONTRIBUTING.md
Tests for the chan...
That really sounds like what I need. Is there a way to find out if its already released? I don't see any of it in the online documentation yet.
Thanks for the help. It does not resolve the issue though. Upon inspection, it allows me to set:
@EventPattern('identical-topic-name', Transport.KAFKA) and
@EventPattern('identical-topic-name', Transport.NATS)
But not
@EventPattern('identical-topic-name', KAFKA_1) and
@EventPattern('identical-topic-name', KAFKA_2)
However, in my situation, i have a modular monolith, where it should make sense to have separated consumers for the separated modules.
The former was always supported, The latter should be enabled by that feature, if I understand it correctly. I have not tested it myself though.
The latter can also be supported by extending ServerKarka and forcefully overriding the transport id (that's what I've done in the past and it worked as per the second example).
class KafkaConsumer extends ServerKafka {
constructor(protected readonly options: Required<KafkaOptions>['options']) {
super(options);
this.transportId = Transport.KAFKA;
}
}
export const consumer1 = new KafkaConsumer({
client: {
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'consumer1',
},
});
export const consumer2 = new KafkaConsumer({
client: {
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'consumer2',
},
});
@Controller()
export class NotificationsController {
private readonly logger = new Logger(NotificationsController.name);
constructor(private readonly socketGateway: SocketGateway) {}
@EventPattern('topic', consumer2)
handleNotification(
@Payload() data: { userId: string; ... },
) {
this.socketGateway.emitMessageToUser(data.userId, 'topic');
}
I am not sure if im understanding. The above is what I did understand from what you explained