用go写一段消费kakfa的代码
时间: 2024-03-17 20:40:54 浏览: 128
好的,以下是使用 Go 语言消费 Kafka 的示例代码:
```go
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"github.com/segmentio/kafka-go"
)
func main() {
// 设置 Kafka 读取信息的参数
topic := "my-topic"
partition := 0
offset := kafka.LastOffset
// 创建 Kafka 连接
conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
if err != nil {
log.Fatal("Failed to dial leader:", err)
}
defer conn.Close()
// 设置 Kafka 消费者
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: topic,
Partition: partition,
MinBytes: 10e3,
MaxBytes: 10e6,
MaxWait: 1 * time.Second,
GroupID: "my-group",
StartOffset: offset,
})
defer r.Close()
// 启动消费者
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
for {
select {
case <-sigchan:
log.Println("Received termination signal, closing Kafka connection")
return
default:
// 读取消息
m, err := r.ReadMessage(context.Background())
if err != nil {
log.Println("Failed to read message:", err)
continue
}
// 处理消息
log.Println("Received message:", string(m.Value))
}
}
}
```
这段代码使用 `kafka-go` 库连接 Kafka,创建一个消费者,然后循环读取消息。当收到终止信号时,关闭 Kafka 连接并退出循环。注意这只是个简单的示例代码,实际使用时需要根据业务需求进行修改和完善。
阅读全文