scalaflink1.14消费kafka
时间: 2023-05-28 15:05:58 浏览: 115
消费 Kafka 可以使用 Flink 的 Kafka Consumer API。下面是一个简单的示例代码:
```scala
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
object KafkaConsumerExample {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-group")
val kafkaConsumer = new FlinkKafkaConsumer[String]("test-topic", new SimpleStringSchema(), properties)
val stream = env.addSource(kafkaConsumer)
stream.print()
env.execute("Kafka Consumer Example")
}
}
```
在这个例子中,我们首先创建了一个 `StreamExecutionEnvironment` 对象。然后,我们创建了一个 `Properties` 对象,用于配置 Kafka consumer。我们设置了 Kafka broker 的地址,反序列化器的类名,以及使用的 consumer group ID。
接下来,我们创建了一个 `FlinkKafkaConsumer` 对象,它使用 `SimpleStringSchema` 将 Kafka 中的消息转换为字符串。我们还指定了要消费的主题名称。
然后,我们使用 `addSource` 方法将 Kafka consumer 添加到 Flink 程序中,并将返回的数据流打印出来。
最后,我们调用 `execute` 方法启动 Flink 程序。
阅读全文