flink读取kafka数据,数据达到100条往下游sink一次代码示例
时间: 2023-05-12 13:07:16 浏览: 188
my-flink-demo:flink-1.7.0的各种示例代码
以下是一个简单的 Flink 代码示例,用于从 Kafka 中读取数据并将其发送到下游 sink:
```
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
val env = StreamExecutionEnvironment.getExecutionEnvironment
val kafkaProps = new Properties()
kafkaProps.setProperty("bootstrap.servers", "localhost:9092")
kafkaProps.setProperty("group.id", "my-group")
val kafkaConsumer = new FlinkKafkaConsumer[String]("my-topic", new SimpleStringSchema(), kafkaProps)
val stream = env.addSource(kafkaConsumer)
stream
.map(record => record.toUpperCase())
.addSink(new MySinkFunction())
env.execute("Kafka to Sink Example")
```
在这个示例中,我们使用 FlinkKafkaConsumer 从 Kafka 中读取数据,并将其转换为流。然后,我们将流中的每个记录转换为大写字母,并将其发送到自定义的下游 sink 函数 MySinkFunction 中。最后,我们使用 env.execute() 启动 Flink 作业。
阅读全文