使用Flink1.14时用scala从Kafka中读取key为string,value为Array[Byte]的数据
时间: 2024-01-21 09:17:49 浏览: 135
在 Flink 中从 Kafka 中读取数据,可以使用 Flink Kafka Consumer。对于 key 为 string,value 为 Array[Byte] 的场景,可以按照如下方式进行配置:
```
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper
import java.nio.charset.StandardCharsets
// Kafka 主题和消费者组
val topic = "your-topic"
val groupId = "your-group-id"
// Kafka 连接信息
val props = new Properties()
props.setProperty("bootstrap.servers", "your-bootstrap-servers")
props.setProperty("group.id", groupId)
// 创建 Flink 流处理环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 创建 Kafka 消费者
val kafkaConsumer = new FlinkKafkaConsumer[(String, Array[Byte])](
topic,
new KafkaDeserializationSchema[(String, Array[Byte])] {
override def isEndOfStream(nextElement: (String, Array[Byte])): Boolean = false
override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): (String, Array[Byte]) = {
val key = new String(record.key(), StandardCharsets.UTF_8)
val value = record.value()
(key, value)
}
override def getProducedType: TypeInformation[(String, Array[Byte])] = {
TypeInformation.of(new TypeHint[(String, Array[Byte])] {})
}
},
props
)
// 将 Kafka 消费者添加到 Flink 流处理环境中
val stream = env.addSource(kafkaConsumer)
// 打印数据
stream.print()
// 启动 Flink 流处理程序
env.execute("Kafka Consumer")
```
在上述代码中,我们通过 `KafkaDeserializationSchema` 来指定从 Kafka 中读取的数据格式。在 `deserialize` 方法中,我们将 key 转换为字符串类型,将 value 保持不变。在 `getProducedType` 方法中,我们指定了输出数据的类型为 `(String, Array[Byte])`。
阅读全文