springboot sparkStreaming消费kafka代码示例
时间: 2023-07-23 21:26:10 浏览: 104
Spark踩坑记——SparkStreaming+Kafka
好的,这里是一个简单的 Spring Boot Spark Streaming 消费 Kafka 的代码示例,供参考:
1. 添加 Maven 依赖
首先,你需要添加以下 Maven 依赖来使用 Kafka 和 Spark Streaming:
```xml
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${spring-boot.version}</version>
</dependency>
```
注意,这里使用了 Spark 2.11 版本和 Kafka 0.10 版本的依赖。
2. 创建 Kafka 消费者配置类
在 Spring Boot 中,你可以通过配置类来创建 Kafka 消费者的配置。以下是一个简单的 Kafka 消费者配置类示例:
```java
@Configuration
public class KafkaConsumerConfig {
@Value("${kafka.bootstrap.servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
```
3. 创建 Spark Streaming 应用程序
接下来,你需要创建一个 Spark Streaming 应用程序来消费 Kafka 中的数据。以下是一个简单的 Spark Streaming 应用程序示例:
```java
@SpringBootApplication
public class SparkStreamingKafkaApplication {
public static void main(String[] args) {
SpringApplication.run(SparkStreamingKafkaApplication.class, args);
}
@Bean
public JavaStreamingContext javaStreamingContext() {
SparkConf sparkConf = new SparkConf().setAppName("KafkaConsumer").setMaster("local[*]");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(5));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
Collection<String> topics = Arrays.asList("test-topic");
JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
stream.mapToPair(record -> new Tuple2<>(record.key(), record.value()))
.print();
return jssc;
}
}
```
这个应用程序会每 5 秒钟从 Kafka 中消费一次数据,并将消费到的数据输出到控制台上。
在上面的示例中,我们使用了 Kafka 的 `createDirectStream` 方法来创建一个 DStream,该方法会直接从 Kafka 中消费数据。然后,我们使用 `mapToPair` 方法将消费到的数据转换成键值对的形式,并使用 `print` 方法输出到控制台上。
4. 运行应用程序
最后,你可以运行这个 Spring Boot Spark Streaming 应用程序,它会从 Kafka 中消费数据并输出到控制台上。
阅读全文