goka.NewProcessor NewProcessor
时间: 2024-03-01 22:30:10 浏览: 223
在 Goka 中,你可以使用 `goka.NewProcessor` 函数或 `goka.NewProcessorBuilder` 函数创建一个新的 Processor。这两个函数的区别在于,`goka.NewProcessor` 函数直接返回一个 `goka.Processor` 对象,而 `goka.NewProcessorBuilder` 函数返回一个 `goka.ProcessorBuilder` 对象,该对象可以用于配置 Processor 的各种参数。
以 `goka.NewProcessor` 函数为例,下面是一个简单的使用示例:
```go
import "github.com/lovoo/goka"
func processCallback(ctx goka.Context, msg interface{}) {
// 处理消息的逻辑
}
processor := goka.NewProcessor([]string{"my-input-topic", "my-output-topic"}, // 指定 Input Topic 和 Output Topic
new(codec.String), // 指定消息的编解码器
processCallback, // 指定处理消息的回调函数
goka.WithConsumerGroup("my-consumer-group"), // 指定 Consumer Group 的名称
goka.WithOutput(goka.NewOutput("my-output-topic", new(codec.String))), // 指定 Output Topic
goka.WithOutputMaxAttempts(10), // 指定 Output Topic 的最大重试次数
goka.WithOutputRetryTimeout(time.Second), // 指定 Output Topic 的重试间隔时间
goka.WithCommitInterval(time.Second), // 指定 Commit 间隔时间
goka.WithLeaseDuration(time.Minute), // 指定 Lease 的持续时间
)
```
在这个例子中,我们使用 `goka.NewProcessor` 函数创建了一个新的 Processor,该 Processor 的 Input Topic 和 Output Topic 分别为 "my-input-topic" 和 "my-output-topic",消息编解码器为 `codec.String`,处理消息的回调函数为 `processCallback`。我们还使用了一些可选参数来配置 Processor 的各种参数,例如指定 Consumer Group 的名称、指定 Output Topic、指定 Output Topic 的最大重试次数和重试间隔时间、指定 Commit 间隔时间以及指定 Lease 的持续时间等。
需要注意的是,`goka.NewProcessor` 函数创建的 Processor 是一个独立的进程,需要通过调用 `processor.Run()` 方法启动 Processor。同时,Processor 会自动创建 Input Topic 和 Output Topic(如果它们不存在),无需手动创建。
阅读全文