"group.id": "group1",
"auto.offset.reset": "earliest",
})
if err != nil {
log.Fatalf("Failed to create consumer: %s", err)
}
err = c.SubscribeTopics(topics, nil)
if err != nil {
log.Fatalf("Failed to subscribe to topics: %s", err)
}
fmt.Println("Consuming...")
for {
msg, err := c.ReadMessage(-1)
if err == nil {
fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
key := string(msg.Key)
if handler, ok := routeHandlers[key]; ok {
handler(msg)
} else {
handleUnknown(msg)
}
} else {
// The client will automatically try to recover from all errors.
fmt.Printf("Consumer error: %v (%v)\n", err, msg)
}
}
}
### Step 4: Main Function
Finally, call `consumeAndRoute` in your main function to start consuming and routing Kafka messages:
```go
func main() {
consumeAndRoute()
}
Wrap Up
The above program creates a Kafka consumer that listens for messages, inspects each message's key, and then dispatches the message to a specific function based on the routing map. The trivial routing logic here is based on the string value of the message key, directly mapping to predefined functions. You can expand this pattern to include more complex routing scenarios or message handling logic depending on your application's needs. Remember to replace <your-broker-address> and <your-topic-name> with your actual Kafka broker address and topic name, respectively.
Try Voicify! It's the best way to make AI vocal covers of your favorite songs.
Check out this demo! Squidward singing Gorilla Warfare by $uicideboy$
GPT-4 is in expert mode. Current expert: Computer Expert