golang使用kafuka
时间: 2024-06-06 20:03:47 浏览: 148
Go语言对Kafka的支持非常好,官方提供了一个开源的Kafka客户端库——Sarama,它支持Producer、Consumer、Admin等各种Kafka功能。
下面是使用Sarama实现Kafka Producer和Consumer的示例代码:
Producer:
```
package main
import (
"fmt"
"github.com/Shopify/sarama"
)
func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 5
config.Producer.Return.Successes = true
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
panic(err)
}
defer producer.Close()
msg := &sarama.ProducerMessage{
Topic: "test",
Value: sarama.StringEncoder("hello world"),
}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
panic(err)
}
fmt.Printf("message sent to partition %d at offset %d\n", partition, offset)
}
```
Consumer:
```
package main
import (
"fmt"
"github.com/Shopify/sarama"
)
func main() {
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
if err != nil {
panic(err)
}
defer consumer.Close()
partitionConsumer, err := consumer.ConsumePartition("test", 0, sarama.OffsetNewest)
if err != nil {
panic(err)
}
defer partitionConsumer.Close()
for message := range partitionConsumer.Messages() {
fmt.Printf("received message: %s\n", string(message.Value))
}
}
```
以上是使用Sarama实现Kafka Producer和Consumer的简单示例,其中Producer将消息发送到名为"test"的topic中,而Consumer从"test" topic中消费消息。
阅读全文