Flink stop时,自定义SourceFunction中的cancel方法不起作用
时间: 2023-11-22 08:55:40 浏览: 104
自定义Flink SourceFunction定时读取数据库 包括实现思路说明和java代码
在 Flink 中,SourceFunction 的 cancel 方法是用于停止数据源的方法。当调用 Flink 的 stop 方法时,会自动调用数据源中的 cancel 方法来停止数据源。但是,如果自定义的 SourceFunction 中的 cancel 方法没有起作用,可能是因为数据源中的主线程被阻塞了,导致无法响应 cancel 方法的调用。
解决方法是在 SourceFunction 的 run 方法中添加一个循环,在每次迭代中检查是否有 cancel 的信号到达。这样可以确保即使在主线程被阻塞的情况下,也能够及时响应 cancel 信号并停止数据源。
以下是一个示例代码:
```java
public class CustomSource implements SourceFunction<String> {
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<String> ctx) throws Exception {
while (isRunning) {
// 从数据源读取数据并发送到 Flink 中
String data = readData();
ctx.collect(data);
// 检查是否有 cancel 信号到达
if (Thread.currentThread().isInterrupted()) {
isRunning = false;
}
}
}
@Override
public void cancel() {
isRunning = false;
}
private String readData() {
// 从数据源读取数据
// ...
return "data";
}
}
```
阅读全文