golang 消费/生产 kafka代码示例
时间: 2024-08-23 21:01:33 浏览: 90
在 Go 语言中,使用 Confluent 的 `github.com/confluentinc/kafka-go` 库可以轻松地进行 Kafka 生产者(Producer)和消费者(Consumer)的操作。以下是简化的代码示例:
**生产者示例**:
```go
package main
import (
"fmt"
"log"
"time"
"github.com/confluentinc/kafka-go/kafkacfg"
"github.com/confluentinc/kafka-go/producer"
)
func main() {
config := kafkacfg.NewConfig()
err := config.Use("localhost:9092").BootstrapBrokers().Build()
if err != nil {
log.Fatal(err)
}
p, err := producer.New(config)
if err != nil {
log.Fatal(err)
}
defer p.Close()
msg := []byte("Hello, Kafka!")
err = p.SendMessage(&producer.Message{
Value: msg,
Key: []byte("test-key"),
Topic: "my-topic",
})
if err != nil {
log.Fatal(err)
}
fmt.Println("Message sent to Kafka.")
time.Sleep(time.Second) // 阻塞一会儿以确认发送完成
}
```
**消费者示例**:
```go
package main
import (
"fmt"
"log"
"os"
"time"
"github.com/confluentinc/kafka-go/kafkacfg"
"github.com/confluentinc/kafka-go/consumer"
)
func main() {
config := kafkacfg.NewConfig()
err := config.Use("localhost:9092").GroupID("my-group-id").Topic("my-topic").Build()
if err != nil {
log.Fatal(err)
}
c, err := consumer.New(config)
if err != nil {
log.Fatal(err)
}
defer c.Close()
go func() {
for msg := range c.Messages() {
fmt.Printf("Received message: %s\n", string(msg.Value))
os.Stdout.Write(msg.Value)
}
}()
select {}
}
```
注意:这里的 `localhost:9092` 是 Kafka 服务的地址,`my-topic` 是你想要消费或生产的主题,`my-group-id` 是消费者组的ID,用于分区均衡。
阅读全文