用go语言消费kafka消息
时间: 2023-08-26 17:07:13 浏览: 100
首先,需要安装kafka-go库,可以通过以下命令安装:
```
go get github.com/segmentio/kafka-go
```
接着,可以使用以下代码消费kafka消息:
```go
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/segmentio/kafka-go"
)
func main() {
// 设置kafka地址
brokers := []string{"127.0.0.1:9092"}
// 创建kafka reader
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
GroupID: "group1",
Topic: "test",
MinBytes: 10e3, // 最小读取字节数
MaxBytes: 10e6, // 最大读取字节数
})
// 循环消费消息
for {
// 读取消息
msg, err := reader.FetchMessage(context.Background())
if err != nil {
log.Printf("fetch message error: %v", err)
break
}
fmt.Printf("message received: %s\n", string(msg.Value))
// 等待一段时间后再消费下一条消息
time.Sleep(1 * time.Second)
// 标记消息已经被消费
err = reader.CommitMessages(context.Background(), msg)
if err != nil {
log.Printf("commit message error: %v", err)
break
}
}
// 关闭reader连接
err := reader.Close()
if err != nil {
log.Printf("close reader error: %v", err)
}
}
```
以上代码中,首先创建了一个kafka reader对象,然后循环读取消息。在读取消息后,需要调用`CommitMessages`方法将消息标记为已消费,否则下次读取消息时会再次读取到该消息。最后,需要调用`Close`方法关闭reader连接。
需要注意的是,如果使用的是kafka的SSL连接,需要在`kafka.ReaderConfig`中设置`TLS`字段,具体可以参考[kafka-go文档](https://pkg.go.dev/github.com/segmentio/kafka-go?tab=doc#TLS)。
阅读全文