pyflink连接kafka示例代码
时间: 2023-03-08 09:15:11 浏览: 107
PyFlink连接Kafka的示例代码如下:from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.descriptors import Kafka# 创建流执行环境
env = StreamExecutionEnvironment.get_execution_environment()
# 创建表环境
t_env = StreamTableEnvironment.create(env)# 注册Kafka数据源
t_env.connect(Kafka()
.topic('news')
.start_from_latest()
.property('zookeeper.connect', 'localhost:2181')
.property('bootstrap.servers', 'localhost:9092')
.property('group.id', 'testGroup')) \
.with_format(type='json', schema=[('word', DataTypes.STRING),
('count', DataTypes.BIGINT)]) \
.with_schema(schema=[('word', DataTypes.STRING),
('count', DataTypes.BIGINT)]) \
.create_temporary_table('news')# 使用SQL语句处理
t_env.sql_query("SELECT word, SUM(count) as count FROM news GROUP BY word").execute().print()
阅读全文