flink读取kafka数据,将读到的数据收集满100条往下游sink一次代码示例
时间: 2023-05-12 08:07:15 浏览: 238
FlinkTest_demo:来自kafka的fink处理数据的演示代码
以下是一个简单的 Flink 代码示例,用于从 Kafka 中读取数据并将其发送到下游 Sink:
```
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
object KafkaToFlinkSink {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")
val stream = env.addSource(new FlinkKafkaConsumer[String]("my-topic", new SimpleStringSchema(), properties))
val sink = new MySinkFunction()
stream.addSink(sink)
env.execute("Kafka to Flink Sink")
}
}
class MySinkFunction extends RichSinkFunction[String] {
override def invoke(value: String, context: SinkFunction.Context[_]): Unit = {
// 将数据发送到下游 Sink
}
}
```
这个示例使用 Flink 的 Kafka 连接器从 Kafka 中读取数据,并将其发送到自定义的 Sink 函数中。在这个示例中,我们使用了一个简单的字符串模式,但你可以根据你的需求使用不同的模式。
阅读全文