使用scala写出flink消费kafka中的topic,将topic中的数据读取出来
时间: 2023-10-08 22:10:17 浏览: 184
好的,下面是使用 Scala 编写 Flink 消费 Kafka 中的 Topic 并将数据读取出来的代码:
```scala
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import java.util.Properties
object KafkaConsumer {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test-group")
val kafkaSource = new FlinkKafkaConsumer[String]("topic-name", new SimpleStringSchema(), properties)
val stream = env.addSource(kafkaSource)
stream.print()
env.execute("Kafka Consumer")
}
}
```
在这段代码中,我们首先创建了一个 Flink 的 StreamExecutionEnvironment 对象,然后创建了一个 Properties 对象来设置 Kafka 的相关属性,包括 bootstrap.servers 和 group.id。
接着,我们创建了一个 FlinkKafkaConsumer 对象,用来消费 Kafka 中的 Topic。这里需要传入 Topic 的名称、序列化和反序列化的 Schema 以及 Kafka 的属性配置。
最后,我们将消费到的数据通过 print() 方法输出到控制台上,并调用 env.execute() 方法来启动整个 Flink 程序。
阅读全文