#Why Isn't Kafka Consumer Updating code as Expected?

2 messages · Page 1 of 1 (latest)

manic wren
#

I am trying to update the campaign status from "Not Started" to "In Progress" when the current time is the same as the campaign's start time through Apache Kafka, but it is not updating the status of the campaign. I have debugged the code and checked the values of the dates; the value of the date is correct, but I am still unable to understand why it is not updating the status.

The producer code is:```
export class CampaignProducer {
private readonly producer: Producer;
private readonly campaignTopic = CAMPAIGN_TOPIC.campaignTopic;

constructor(private readonly kafka: Kafka) {
    this.producer = this.kafka.producer();
}

async sendCampaignMessage(campaignId: string, startDate: Date, endDate: Date): Promise<void> {
    await this.producer.connect();
    await this.producer.send({
        topic: this.campaignTopic,
        messages: [{ value: JSON.stringify({ campaignId, startDate, endDate }) }],
    });
}

}```

#

The consumer code: ```async consumeCampaignMessages(): Promise<void> {
await this.consumer.connect();
await this.consumer.subscribe({ topic: this.campaignTopic, fromBeginning: true });

    await this.consumer.run({
        eachMessage: async ({ topic, partition, message }: EachMessagePayload) => {
            const campaignMessage = JSON.parse(message.value.toString());

            const { campaignId, startDate, endDate } = campaignMessage;
            const campaignStartDate = new Date(startDate);
            const campaignEndDate = new Date(endDate);
            const currentDate = new Date();

            const campaign = await this.campaignService.getCampaign(campaignId);

            if (!campaign) {
                console.error(`Campaign not found: ${campaignId}`);
                return;
            }

            if (currentDate >= campaignStartDate && currentDate <= campaignEndDate) {
                await this.campaignService.updateStatusToInProgress(campaignId);
            } else if (currentDate > campaignEndDate) {
                await this.campaignService.updateStatusToClosed(campaignId);
            }
        },
});

}```