flink消费kafka的例子
时间: 2023-03-08 22:43:13 浏览: 30
Apache Flink是一个分布式流处理框架,可以用来消费Apache Kafka中的数据。下面是一个Flink消费Kafka数据的示例代码:
```java
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) throws Exception {
// create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test-group");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties);
env.addSource(consumer)
.print();
env.execute();
}
}
```
在上面的代码中,我们首先创建了一个`StreamExecutionEnvironment`,然后配置了Kafka的连接属性(如bootstrap.servers和group.id),最后创建了一个FlinkKafkaConsumer来消费名为“topic”的Kafka主题中的数据。
阅读全文