goka NumPartitions example
时间: 2024-02-18 18:00:38 浏览: 252
以下是一个使用 `NumPartitions` 属性设置输入 Topic partition 数量的示例代码:
```
package main
import (
"context"
"fmt"
"log"
"github.com/lovoo/goka"
)
func main() {
// 创建一个新的 Group
g := goka.DefineGroup("my-group",
goka.Input("my-input-topic", new(myInputCodec), func(ctx goka.Context, msg interface{}) {
// 处理输入消息
fmt.Printf("Received message: %v\n", msg)
}),
)
// 创建一个新的 Processor,并指定输入 Topic 的 partition 数量为 4
p, err := goka.NewProcessor([]string{"localhost:9092"}, g,
goka.WithTopicConfig("my-input-topic", goka.TopicConfig{
NumPartitions: 4,
}),
)
if err != nil {
log.Fatalf("Error creating processor: %v", err)
}
// 启动 Processor
if err := p.Run(context.Background()); err != nil {
log.Fatalf("Error running processor: %v", err)
}
}
```
上述代码将创建一个名为 `my-input-topic` 的输入 Topic,并将其 partition 数量设置为 4。在 `goka.DefineGroup` 函数中定义了一个名为 `my-group` 的 Group,并将 `my-input-topic` 作为其输入 Topic。在 `goka.NewProcessor` 函数中,通过传递 `WithTopicConfig` 参数来指定 `my-input-topic` 的 partition 数量为 4。在 Processor 启动后,每个 partition 都将被一个独立的 Consumer 处理。
阅读全文