没有合适的资源?快使用搜索试试~ 我知道了~
首页flink 流式表自定义StreamTableSource、RetractStreamSink,并使用flink kafkaDataStream.pdf
flink 流式表自定义StreamTableSource、RetractStreamSink,并使用flink kafkaDa...
需积分: 50 16 下载量 78 浏览量
更新于2023-03-03
评论 2
收藏 348KB PDF 举报
本文件是自己根据flink1.8官网文档https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/time_attributes.html#using-a-tablesource里实现的自定义StreamTableSource,并且输入流也用的是kafkaStream(更贴近实际应用),由于官网的文档过于简单,并且有漏洞,你如果按照官网文档编写是会运行不成功的。附件文档的代码是亲测完全可以运行的,里面也自己实现了RetractStreamSink,可以供大家参考。
资源详情
资源评论
资源推荐
官方文档https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/time_attributes.html#using-a-
tablesource给的例子如果你照抄的话,是不能运行的。需要先按照下述方法进行改造
使用flink自定义事件流
/**
* procTime
处
理
时
间
流
*/
class
ProcTimeUserActionAttribute
implements
StreamTableSource<Row>, DefinedProctimeAttribute
{
private
String[]
names
=
new
String[]{
"name"
,
"data"
,
"UserActionTime"
};
private
TypeInformation[]
types
=
new
TypeInformation[]{Types.
STRING
, Types.
INT
,
Types.
SQL_TIMESTAMP
};
@Nullable
@Override
public
String getProctimeAttribute() {
return "UserActionTime"
;
}
@Override
public
DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
Properties properties =
new
Properties();
properties.setProperty(ConsumerConfig.
BOOTSTRAP_SERVERS_CONFIG
,
"localhost:9092"
);
properties.setProperty(ConsumerConfig.
GROUP_ID_CONFIG
,
"flink-test-group"
);
FlinkKafkaConsumer<String> flinkKafkaConsumer =
new
FlinkKafkaConsumer<>(
"test-
flink"
,
new
SimpleStringSchema(), properties);
DataStreamSource<String> flinkDataStream = execEnv.addSource(flinkKafkaConsumer);
SingleOutputStreamOperator<Row> outputStream = flinkDataStream.map(
new
MapFunction<String, Row>() {
@Override
public
Row map(String value)
throws
Exception {
String[] splitArray = value.split(SymbolConstants.
SYMBOL_DH
);
String name = splitArray[0];
Integer data = Integer.
parseInt
(splitArray[1]);
Long timeMills = System.
currentTimeMillis
();
//这里需要将处理时间定义,虽然是系统的处理时间可以自动生成,但是在flink1.8中,
这个时间还是需要显著给出
Timestamp timestamp =
new
Timestamp(timeMills);
return
Row.
of
(name, data, timestamp);
}
}).returns(Types.
ROW_NAMED
(
names
,
types
));
bestchenwu123
- 粉丝: 5
- 资源: 18
上传资源 快速赚钱
- 我的内容管理 收起
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
会员权益专享
最新资源
- RTL8188FU-Linux-v5.7.4.2-36687.20200602.tar(20765).gz
- c++校园超市商品信息管理系统课程设计说明书(含源代码) (2).pdf
- 建筑供配电系统相关课件.pptx
- 企业管理规章制度及管理模式.doc
- vb打开摄像头.doc
- 云计算-可信计算中认证协议改进方案.pdf
- [详细完整版]单片机编程4.ppt
- c语言常用算法.pdf
- c++经典程序代码大全.pdf
- 单片机数字时钟资料.doc
- 11项目管理前沿1.0.pptx
- 基于ssm的“魅力”繁峙宣传网站的设计与实现论文.doc
- 智慧交通综合解决方案.pptx
- 建筑防潮设计-PowerPointPresentati.pptx
- SPC统计过程控制程序.pptx
- SPC统计方法基础知识.pptx
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功
评论0