package pipline // 输入通道 type Inlet interface { In() chan<- interface{} } // 输出通道 type Outlet interface { Out() <-chan interface{} } // Source步骤 type Source interface { Outlet // 设置回调接口 Via(Flow) Flow } // Flow步骤 type Flow interface { Inlet Outlet Via(Flow) Flow To(Target) } // Target步骤 type Target interface { Inlet // 确认操作完毕 } type Pipeline struct { source Source flow Flow target Target } func Connect(source Source, flow Flow, target Target) *Pipeline { source.Via(flow) flow.To(target) return &Pipeline{ source: source, flow: flow, target: target, } } 增加target处理一条数据完毕就与source确认某条数据操作完毕
时间: 2024-03-05 08:49:10 浏览: 212
您可以在`Target`步骤中添加一个`Done()`方法,用于确认某条数据的操作已经完成。修改后的代码如下:
```go
package pipeline
// 输入通道
type Inlet interface {
In() chan<- interface{}
}
// 输出通道
type Outlet interface {
Out() <-chan interface{}
}
// Source步骤
type Source interface {
Outlet
// 设置回调接口
Via(Flow) Flow
}
// Flow步骤
type Flow interface {
Inlet
Outlet
Via(Flow) Flow
To(Target)
}
// Target步骤
type Target interface {
Inlet
// 确认操作完毕
Done()
}
type Pipeline struct {
source Source
flow Flow
target Target
}
func Connect(source Source, flow Flow, target Target) *Pipeline {
source.Via(flow)
flow.To(target)
return &Pipeline{
source: source,
flow: flow,
target: target,
}
}
// 在Target步骤中添加Done()方法
func (p *Pipeline) Done() {
p.source.Out()
}
```
在`Target`步骤中,当一条数据的操作完成后,可以调用`Done()`方法,将结果返回给上一个步骤,表示该条数据已经处理完成。这样,上一个步骤就可以继续处理下一条数据。