pyflink读取kafka示例代码
时间: 2023-03-14 13:37:30 浏览: 151
PyFlink提供了一个简单的API来从Kafka中读取数据,可以使用以下示例代码实现:from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
from pyflink.table.descriptors import Kafka, OldCsv, Schemaenv = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)t_env.connect(Kafka()
.topic("topic")
.start_from_earliest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
.property("group.id", "test")) \
.with_format(OldCsv()
.field("word", "STRING")
.field("frequency", "INT")) \
.with_schema(Schema()
.field("word", "STRING")
.field("frequency", "INT")) \
.in_append_mode() \
.register_table_source("source")t_env.scan("source") \
.insert_into("sink")env.execute("kafka source")
阅读全文