package pipline // source部分 type ChanSource struct { out chan interface{} } // 定义一个新的channel源,包含状态收集,是否停止,和停止chan func NewSource(out chan interface{}) *ChanSource { return &ChanSource{ out: out, } } // Via streams data through the given flow func (cs *ChanSource) Via(flow Flow) Flow { go cs.CheckOsInterpreter() go cs.DoStream(cs, flow) return flow } func (cs *ChanSource) CheckOsInterpreter(){ } // DoStream 发送数据 outlet to inlet. func (cs *ChanSource) DoStream(outlet Outlet, inlet Inlet) { defer close(inlet.In()) for element := range outlet.Out() { inlet.In() <- element // fmt.Println("Source: ", element) } } // Out returns an output channel for sending data func (cs *ChanSource) Out() <-chan interface{} { return cs.out } 当检查到退出信号,关闭输入通道
时间: 2024-03-23 07:36:15 浏览: 111
Spark Streaming SQL Pipline 框架提供Kafka2HBase+源代码+文档说明
可以在 CheckOsInterpreter 方法中检查操作系统的退出信号,例如 Linux 下的 SIGINT 或 SIGTERM 信号。如果检测到退出信号,则调用 ChanSource 的 Stop 方法来关闭输入通道。修改后的代码如下:
```
package pipeline
import (
"os"
"os/signal"
"syscall"
)
type ChanSource struct {
out chan interface{}
stop chan struct{}
signal chan os.Signal
}
func NewSource(out chan interface{}) *ChanSource {
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
return &ChanSource{
out: out,
stop: make(chan struct{}),
signal: signalChan,
}
}
func (cs *ChanSource) Via(flow Flow) Flow {
go cs.CheckOsInterpreter()
go cs.DoStream(cs, flow)
return flow
}
func (cs *ChanSource) CheckOsInterpreter() {
<-cs.signal
cs.Stop()
}
func (cs *ChanSource) DoStream(outlet Outlet, inlet Inlet) {
defer close(inlet.In())
for {
select {
case element, ok := <-outlet.Out():
if !ok {
return
}
inlet.In() <- element
case <-cs.stop:
return
}
}
}
func (cs *ChanSource) Out() <-chan interface{} {
return cs.out
}
func (cs *ChanSource) Stop() {
close(cs.stop)
}
```
这样,在程序接收到 SIGINT 或 SIGTERM 信号时,会调用 ChanSource 的 Stop 方法来关闭输入通道。
阅读全文