flink消费kafka的数据存到kafka java代码
时间: 2023-11-18 16:06:12 浏览: 227
以下是将Flink消费Kafka的数据存储到Kafka的Java代码示例:
```java
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class FlinkKafkaExample {
public static void main(String[] args) throws Exception {
// 设置执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置Kafka Consumer的参数
Properties consumerProps = new Properties();
consumerProps.setProperty("bootstrap.servers", "localhost:9092");
consumerProps.setProperty("group.id", "test-group");
// 创建Kafka Consumer
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("test-topic", new SimpleStringSchema(), consumerProps);
// 添加Kafka Consumer数据源
DataStream<String> stream = env.addSource(kafkaConsumer);
// 对数据进行处理
DataStream<String> processedStream = stream.map(str -> str.toUpperCase());
// 设置Kafka Producer的参数
Properties producerProps = new Properties();
producerProps.setProperty("bootstrap.servers", "localhost:9092");
// 创建Kafka Producer
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>("output-topic", new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), producerProps, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
// 将处理后的数据写入Kafka
processedStream.addSink(kafkaProducer);
// 执行任务
env.execute("Flink Kafka Example");
}
}
```
阅读全文