this is my example code
// main.ts
async function bootstrap() {
const app = await NestFactory.create(AppModule);
const configService = app.get<ConfigService>(ConfigService);
app.connectMicroservice<MicroserviceOptions>(KAFKA_CONSUMER1_OPTION);
app.connectMicroservice<MicroserviceOptions>(KAFKA_CONSUMER2_OPTION);
setUpSwagger(app);
const port = configService.get<string>('PORT') ?? 3000;
await app.startAllMicroservices();
await app.listen(port);
}
bootstrap();
// kafka.options.ts
export const KAFKA_CONSUMER1_OPTION: KafkaOptions = {
transport: Transport.KAFKA,
options: {
client: {
ssl: true,
sasl: {
mechanism: 'scram-sha-512',
username: process.env.KAFKA_SASL_USERNAME!,
password: process.env.KAFKA_SASL_PASSWORD!,
},
brokers: 'broker1-address',
},
consumer: {
groupId: 'consumer1',
allowAutoTopicCreation: false,
},
postfixId: '',
},
};
export const KAFKA_CONSUMER2_OPTION: KafkaOptions = {
transport: Transport.KAFKA,
options: {
client: {
ssl: true,
sasl: {
mechanism: 'scram-sha-512',
username: process.env.KAFKA_SASL_USERNAME!,
password: process.env.KAFKA_SASL_PASSWORD!,
},
brokers: 'broker1-address',
},
consumer: {
groupId: 'consumer2',
allowAutoTopicCreation: false,
},
postfixId: '',
},
};
// kafka.controller.ts
...
@MessagePattern('topic')
async consume(@Payload() message: unknown) {
console.log(message);
return 1;
}
and then open the server and publish some message to "topic".
The console.log(message) occurs twice.