sparkstraming整合kafka java
时间: 2023-12-29 19:49:59 浏览: 86
使用Spark Streaming整合Kafka可以实现实时流式数据处理。下面是一个简单的Java代码示例:
```java
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
public class KafkaSparkStreamingExample {
public static void main(String[] args) throws InterruptedException {
String brokers = "localhost:9092";
String groupId = "group1";
String topics = "topic1";
// Create context with a 2 seconds batch interval
SparkConf sparkConf = new SparkConf().setAppName("KafkaSparkStreamingExample");
JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, new Duration(2000));
// Create Kafka parameters map
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", brokers);
kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("group.id", groupId);
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topicsSet = Arrays.asList(topics.split(","));
// Create direct kafka stream
JavaInputDStream<String> messages = KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topicsSet, kafkaParams)
).map(record -> record.value());
// Process each message in stream
messages.foreachRDD(rdd -> {
rdd.foreach(message -> System.out.println(message));
});
// Start the computation
streamingContext.start();
streamingContext.awaitTermination();
}
}
```
在这个示例中,我们首先定义了Kafka的broker地址、消费者组ID和要消费的主题。然后,我们创建了一个Spark Streaming的JavaStreamingContext对象,并定义了每个批次的时间间隔为2秒。
接下来,我们定义了Kafka参数Map和要消费的主题集合,并使用KafkaUtils.createDirectStream()方法创建了一个JavaInputDStream对象。
最后,我们对每个批次中的每个消息进行处理,将其打印到控制台上。最后,我们启动了Spark Streaming上下文并等待它终止。
这只是一个简单的示例,你可以根据自己的需求进行修改和扩展。
阅读全文