使用scala写出flink消费kafka中的topic
时间: 2023-07-23 20:42:52 浏览: 132
scala编写,以spark stream方式监听消费kafka的topic,同步写入elasticsearch
可以使用 Flink 的 Kafka Consumer API 来消费 Kafka 中的数据。下面是使用 Scala 代码实现 Flink 消费 Kafka 的示例:
```scala
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import java.util.Properties
object FlinkKafkaConsumerExample {
def main(args: Array[String]) {
// 设置 Flink 执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Kafka 的配置
val props = new Properties()
props.setProperty("bootstrap.servers", "localhost:9092")
props.setProperty("group.id", "test-group")
// 创建 Kafka Consumer
val consumer = new FlinkKafkaConsumer[String]("test-topic", new SimpleStringSchema(), props)
// 将 Consumer 添加到执行环境中
val stream = env.addSource(consumer)
// 打印 Kafka 中的数据
stream.print()
// 执行 Flink 任务
env.execute("Flink Kafka Consumer Example")
}
}
```
在上面的示例中,我们首先创建了 Flink 的 StreamExecutionEnvironment 对象,然后设置了 Kafka 的属性,包括 Kafka 的地址和 Consumer 的 Group ID。接着,我们使用 FlinkKafkaConsumer 创建了一个 Kafka Consumer,并将其添加到执行环境中。最后,我们使用 stream.print() 打印了从 Kafka 中消费到的数据,并调用 env.execute() 执行 Flink 任务。
需要注意的是,上面的示例中使用的是简单字符串序列化器 SimpleStringSchema。如果需要使用其他的序列化器,可以将其替换为对应的序列化器。同时,需要将 Flink 和 Kafka 的版本号保持一致,否则可能会出现版本不兼容的问题。
阅读全文