3、在node1主机上的10000端口不断有数据流入(单个数字),请完成如下功能: (1)从node1主机上的10000端口读入数据到Flink中。 (2)使用outPutTag和process 对流中的数据按照奇数和偶数拆分。 flink代码
时间: 2024-02-17 08:04:08 浏览: 48
好的,根据您的要求,我可以为您提供如下 Flink 代码:
```java
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;
import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.SocketTextStreamFunctionFactory;
import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.SocketTextStreamFunctionProvider;
import org.apache.flink.streaming.api.scala.OutputTag;
import org.apache.flink.util.Collector;
public class SplitOddEvenStream {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从node1主机上的10000端口读入数据到Flink中
DataStream<String> input = env.addSource(new SocketTextStreamFunction("node1", 10000, "\n", 3));
// 使用outPutTag和process 对流中的数据按照奇数和偶数拆分
OutputTag<Integer> oddTag = new OutputTag<Integer>("odd"){};
OutputTag<Integer> evenTag = new OutputTag<Integer>("even"){};
DataStream<Integer> numbers = input
.map(new MapFunction<String, Integer>() {
@Override
public Integer map(String value) throws Exception {
return Integer.parseInt(value);
}
});
DataStream<Integer> oddNumbers = numbers
.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value % 2 != 0;
}
})
.process(new ProcessFunction<Integer, Integer>() {
@Override
public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
out.collect(value);
ctx.output(oddTag, value);
}
});
DataStream<Integer> evenNumbers = numbers
.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value % 2 == 0;
}
})
.process(new ProcessFunction<Integer, Integer>() {
@Override
public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
out.collect(value);
ctx.output(evenTag, value);
}
});
oddNumbers.print();
evenNumbers.print();
env.execute("Splitting odd and even numbers from input stream");
}
}
```
这段代码会从 node1 主机上的 10000 端口读取数据流,然后将数据流中的数据按照奇数和偶数拆分,并使用 `outPutTag` 和 `process` 进行处理。最后,输出拆分后的奇数和偶数数据流。
相关推荐
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![pdf](https://img-home.csdnimg.cn/images/20210720083512.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)