flink写入 mysql_flink sql实时计算当天pv写入mysql

时间: 2023-10-14 17:19:09 浏览: 44
要将Flink实时计算的结果写入MySQL数据库中,可以使用Flink的JDBC连接器。以下是一个将当天PV写入MySQL的示例代码: ```java // 定义MySQL连接信息 final String jdbcUrl = "jdbc:mysql://localhost:3306/test"; final String username = "root"; final String password = "root"; // 定义Flink数据流环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 读取数据流 DataStream<String> input = env.socketTextStream("localhost", 9999); // 对数据流进行处理,计算当天PV DataStream<Tuple2<String, Integer>> result = input .map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { // 解析日志数据,获取访问时间和页面URL String[] fields = value.split(","); String time = fields[0]; String url = fields[1]; // 计算当天日期 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); String today = sdf.format(new Date()); // 判断日志时间是否在当天,如果是则返回页面URL和1,否则返回空 if (time.contains(today)) { return new Tuple2<>(url, 1); } else { return null; } } }) .filter(Objects::nonNull) .keyBy(0) .sum(1); // 将结果写入MySQL result.addSink(JdbcSink.sink( "insert into pv(url, pv, date) values (?, ?, ?)", new JdbcStatementBuilder<Tuple2<String, Integer>>() { @Override public void accept(PreparedStatement preparedStatement, Tuple2<String, Integer> t) throws SQLException { // 设置SQL参数 preparedStatement.setString(1, t.f0); preparedStatement.setInt(2, t.f1); preparedStatement.setDate(3, new Date(System.currentTimeMillis())); } }, new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl(jdbcUrl) .withUsername(username) .withPassword(password) .build())); // 启动数据流处理 env.execute(); ``` 该代码首先定义了MySQL的连接信息,然后创建了一个数据流环境,并从Socket读取数据流进行处理。处理过程中,首先解析日志数据,判断是否在当天,并计算PV。最后,使用JdbcSink将结果写入MySQL数据库中。 需要注意的是,JdbcSink需要引入以下依赖: ```xml <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_2.11</artifactId> <version>1.12.2</version> </dependency> ```

相关推荐

最新推荐

recommend-type

Flink实用教程_预览版_v1.pdf

最新Flink教程,基于Flink 1.13.2。书中所有示例和案例代码均为双语。这是预览版。 目录 第1 章Flink 架构与集群安装..............................................................................................
recommend-type

大数据之flink教程-TableAPI和SQL.pdf

1.1 什么是 Table API 和 Flink SQL 2 1.2 需要引入的依赖 2 1.3 两种 planner(old & blink)的区别 4 第二章 API 调用 5 2.1 基本程序结构 5 2.2 创建表环境 5 2.3 在 Catalog 中注册表 7 2.3.1 表(Table)的概念...
recommend-type

基于Flink构建实时数据仓库.docx

基于Flink SQL的扩展工作,构建实时数仓的应用案例,未来工作的思考和展望4个方面介绍了OPPO基于Flink构建实时数仓的经验和未来的规划。
recommend-type

Flink一线公司经验实战

该资料收集了国内外一线公司使用flink的一些实战经验,包括了为什么使用flink,以及在使用flink后遇到的一些技术难点是怎么去解决的。具有非常高的参考价值。
recommend-type

zigbee-cluster-library-specification

最新的zigbee-cluster-library-specification说明文档。
recommend-type

管理建模和仿真的文件

管理Boualem Benatallah引用此版本:布阿利姆·贝纳塔拉。管理建模和仿真。约瑟夫-傅立叶大学-格勒诺布尔第一大学,1996年。法语。NNT:电话:00345357HAL ID:电话:00345357https://theses.hal.science/tel-003453572008年12月9日提交HAL是一个多学科的开放存取档案馆,用于存放和传播科学研究论文,无论它们是否被公开。论文可以来自法国或国外的教学和研究机构,也可以来自公共或私人研究中心。L’archive ouverte pluridisciplinaire
recommend-type

实现实时数据湖架构:Kafka与Hive集成

![实现实时数据湖架构:Kafka与Hive集成](https://img-blog.csdnimg.cn/img_convert/10eb2e6972b3b6086286fc64c0b3ee41.jpeg) # 1. 实时数据湖架构概述** 实时数据湖是一种现代数据管理架构,它允许企业以低延迟的方式收集、存储和处理大量数据。与传统数据仓库不同,实时数据湖不依赖于预先定义的模式,而是采用灵活的架构,可以处理各种数据类型和格式。这种架构为企业提供了以下优势: - **实时洞察:**实时数据湖允许企业访问最新的数据,从而做出更明智的决策。 - **数据民主化:**实时数据湖使各种利益相关者都可
recommend-type

list根据id查询pid 然后依次获取到所有的子节点数据

可以使用递归的方式来实现根据id查询pid并获取所有子节点数据。具体实现可以参考以下代码: ``` def get_children_nodes(nodes, parent_id): children = [] for node in nodes: if node['pid'] == parent_id: node['children'] = get_children_nodes(nodes, node['id']) children.append(node) return children # 测试数
recommend-type

JSBSim Reference Manual

JSBSim参考手册,其中包含JSBSim简介,JSBSim配置文件xml的编写语法,编程手册以及一些应用实例等。其中有部分内容还没有写完,估计有生之年很难看到完整版了,但是内容还是很有参考价值的。
recommend-type

"互动学习:行动中的多样性与论文攻读经历"

多样性她- 事实上SCI NCES你的时间表ECOLEDO C Tora SC和NCESPOUR l’Ingén学习互动,互动学习以行动为中心的强化学习学会互动,互动学习,以行动为中心的强化学习计算机科学博士论文于2021年9月28日在Villeneuve d'Asq公开支持马修·瑟林评审团主席法布里斯·勒菲弗尔阿维尼翁大学教授论文指导奥利维尔·皮耶昆谷歌研究教授:智囊团论文联合主任菲利普·普雷教授,大学。里尔/CRISTAL/因里亚报告员奥利维耶·西格德索邦大学报告员卢多维奇·德诺耶教授,Facebook /索邦大学审查员越南圣迈IMT Atlantic高级讲师邀请弗洛里安·斯特鲁布博士,Deepmind对于那些及时看到自己错误的人...3谢谢你首先,我要感谢我的两位博士生导师Olivier和Philippe。奥利维尔,"站在巨人的肩膀上"这句话对你来说完全有意义了。从科学上讲,你知道在这篇论文的(许多)错误中,你是我可以依