package pipline // source部分 type ChanSource struct { out chan interface{} } // 定义一个新的channel源,包含状态收集,是否停止,和停止chan func NewSource(out chan interface{}) *ChanSource { return &ChanSource{ out: out, } } // Via streams data through the given flow func (cs *ChanSource) Via(flow Flow) Flow { go cs.DoStream(cs, flow) return flow } // DoStream 发送数据 outlet to inlet. func (cs *ChanSource) DoStream(outlet Outlet, inlet Inlet) { defer close(inlet.In()) for element := range outlet.Out() { inlet.In() <- element // fmt.Println("Source: ", element) } } // Out returns an output channel for sending data func (cs *ChanSource) Out() <-chan interface{} { return cs.out } 增加一个cancel功能
时间: 2024-03-22 21:42:57 浏览: 153
可以在 ChanSource 结构体中增加一个 cancel chan bool 类型的字段,用于控制是否停止数据流。同时,在 NewSource() 函数中初始化 cancel chan,然后在 DoStream() 函数中检查 cancel 变量,如果为 true 则退出循环。最后,可以增加一个 Cancel() 函数,用于设置 cancel 变量为 true,从而停止数据流。修改后的代码如下:
```
package pipline
// source部分
type ChanSource struct {
out chan interface{}
cancel chan bool
}
// 定义一个新的channel源,包含状态收集,是否停止,和停止chan
func NewSource(out chan interface{}) *ChanSource {
return &ChanSource{
out: out,
cancel: make(chan bool),
}
}
// Via streams data through the given flow
func (cs *ChanSource) Via(flow Flow) Flow {
go cs.DoStream(cs, flow)
return flow
}
// DoStream 发送数据 outlet to inlet.
func (cs *ChanSource) DoStream(outlet Outlet, inlet Inlet) {
defer close(inlet.In())
for element := range outlet.Out() {
select {
case <-cs.cancel:
return
default:
inlet.In() <- element
// fmt.Println("Source: ", element)
}
}
}
// Out returns an output channel for sending data
func (cs *ChanSource) Out() <-chan interface{} {
return cs.out
}
// Cancel sets cancel variable to true, stopping the data flow
func (cs *ChanSource) Cancel() {
cs.cancel <- true
}
```
阅读全文