DefaultRollingPolicy<String, String> defaultRollingPolicy = DefaultRollingPolicy.builder() .withInactivityInterval(TimeUnit.SECONDS.toMillis(10)) //10s空闲,就滚动写入新的文件 .withRolloverInterval(TimeUnit.SECONDS.toMillis(30)) //不论是否空闲,超过30秒就写入新文件,默认60s。这里设置为30S .withMaxPartSize(1024 * 1024 * 1024) // 设置每个文件的最大大小 ,默认是128M。这里设置为1G .build(); final StreamingFileSink<String> sink = StreamingFileSink .forRowFormat(new Path("hdfs://192.168.88.151:8020/user/hive/warehouse/basic.db/monitor_car_data/"), new SimpleStringEncoder<String>("UTF-8"))//设置文件路径,以及文件中的编码格式 .withBucketAssigner(new DateTimeBucketAssigner<>("yyyyMMdd", ZoneId.of("Asia/Shanghai")))//设置自定义分桶 .withRollingPolicy(defaultRollingPolicy)//设置文件滚动条件 .withBucketCheckInterval(40)//设置检查点 .build();
时间: 2024-04-22 19:21:41 浏览: 120
完整版度量转换.rar
这段代码是Flink中使用`StreamingFileSink`将数据写入HDFS的代码片段。从代码上看,没有明显的语法错误,但是需要根据实际情况来检查是否有问题。
其中,`DefaultRollingPolicy`是用于控制Flink写入HDFS文件的滚动策略,可以根据需要进行调整。`StreamingFileSink`是Flink提供的用于将数据写入HDFS的Sink,可以设置文件路径、编码格式、分桶策略、滚动条件等。
需要注意的是,文件路径需要设置正确,并且对应的HDFS目录需要有写权限。另外,分桶策略和滚动条件也需要根据实际需求进行设置。
阅读全文