SingleOutputStreamOperator 写入hdfs样例代码
时间: 2023-07-03 19:05:36 浏览: 103
以下是将数据写入 HDFS 的示例代码:
```
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class WriteToHdfsExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 构建数据源,例如从Kafka读取数据
SingleOutputStreamOperator<String> dataStream = env
.addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties));
// 将数据转换为需要写入 HDFS 的格式,并指定 HDFS 的路径
SingleOutputStreamOperator<String> hdfsDataStream = dataStream
.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
return value + "\n"; // 在每个记录后添加换行符
}
})
.setParallelism(1) // 将并行度设置为1,确保写入 HDFS 的数据是有序的
.writeAsText("hdfs://namenode:9000/path/to/hdfs/file", FileSystem.WriteMode.OVERWRITE);
env.execute("Write to HDFS Example");
}
}
```
在上述代码中,我们通过 `writeAsText` 方法将数据写入 HDFS。其中,第一个参数指定了 HDFS 的路径,第二个参数指定了写入 HDFS 的模式,`OVERWRITE` 表示如果文件已经存在,则覆盖原文件。
需要注意的是,由于 Flink 的并行计算模型,同一个分区的数据可能会被不同的 Task 处理,因此在写入 HDFS 时需要将并行度设置为1,以确保写入 HDFS 的数据是有序的。
阅读全文