spring-boot-kafka-consumer-example
时间: 2023-05-08 09:58:40 浏览: 346
spring-boot-kafka-consumer-example是一个使用Spring Boot框架和Apache Kafka消息队列的消费者示例。该示例演示了如何使用Spring Kafka进行消费者端开发,以便从Kafka中接收消息。
在该示例中,首先需要添加Spring Kafka和Apache Kafka依赖项。然后可以开始编写消费者代码。通过定义一个KafkaListener注释方法,可以指定需要消费消息的主题和分组。
当收到消息时,该方法将被自动调用,其中消息将作为参数传递。通过处理消息,可以实现所需的业务逻辑。
在Spring Boot应用程序中,可以使用@SpringBootApplication注释指定启动类,并使用@EnableKafka注释启动应用程序中的Kafka支持。通过将一些配置属性添加到application.properties文件中,可以配置用于连接Kafka的属性,如Kafka服务地址和端口。
使用spring-boot-kafka-consumer-example可以快速开始Kafka消费者开发,并且由于Spring Boot的优点,开发过程变得更加高效,并且应用程序可靠性也得到了提高。
相关问题
springboot整合easy-es-boot-starter和canbal、kafka实现mysql数据同步的接口,可实现手动刷新索引
实现步骤如下:
1. 引入依赖
在pom.xml中引入easy-es-boot-starter、canal和kafka的依赖:
```xml
<dependency>
<groupId>com.github.xiaolyuh</groupId>
<artifactId>easy-es-boot-starter</artifactId>
<version>${easy-es-boot-starter.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>${canal.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
</dependency>
```
2. 配置Canal
在application.yml中配置Canal:
```yaml
canal:
host: 127.0.0.1
port: 11111
destination: example
username: canal
password: canal
```
3. 配置Kafka
在application.yml中配置Kafka:
```yaml
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: group1
auto-offset-reset: earliest
enable-auto-commit: true
```
4. 创建数据同步监听器
创建一个数据同步监听器,用于监听Canal的数据变化事件,并将变化的数据发送到Kafka中:
```java
@Component
public class DataSyncListener implements CanalEventListener {
private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper objectMapper;
public DataSyncListener(KafkaTemplate<String, String> kafkaTemplate, ObjectMapper objectMapper) {
this.kafkaTemplate = kafkaTemplate;
this.objectMapper = objectMapper;
}
@Override
public void onEvent(CanalEntry.Entry entry) {
if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) {
return;
}
CanalEntry.RowChange rowChange;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
}
EventType eventType = rowChange.getEventType();
if (eventType == EventType.QUERY || eventType == EventType.HEARTBEAT) {
return;
}
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
String tableName = entry.getHeader().getTableName();
Map<String, Object> data = new HashMap<>();
for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
data.put(column.getName(), column.getValue());
}
String json = null;
try {
json = objectMapper.writeValueAsString(data);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
kafkaTemplate.send(tableName, json);
}
}
}
```
5. 创建索引刷新接口
创建一个接口,用于手动刷新索引:
```java
@RestController
@RequestMapping("/index")
public class IndexController {
private final ElasticSearchService elasticSearchService;
public IndexController(ElasticSearchService elasticSearchService) {
this.elasticSearchService = elasticSearchService;
}
@GetMapping("/refresh")
public void refresh() {
elasticSearchService.refreshIndex();
}
}
```
6. 实现索引刷新功能
实现一个ElasticSearchService,用于刷新索引:
```java
@Service
public class ElasticSearchService {
private final RestHighLevelClient restHighLevelClient;
private final ElasticsearchProperties elasticsearchProperties;
public ElasticSearchService(RestHighLevelClient restHighLevelClient, ElasticsearchProperties elasticsearchProperties) {
this.restHighLevelClient = restHighLevelClient;
this.elasticsearchProperties = elasticsearchProperties;
}
public void refreshIndex() {
RefreshRequest request = new RefreshRequest(elasticsearchProperties.getIndex());
try {
restHighLevelClient.indices().refresh(request, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
```
这样就实现了一个手动刷新索引的接口,同时根据Canal监听MySQL的数据变化,将变化的数据发送到Kafka中,再由Elasticsearch自动同步到索引中。
springboot整合easy-es、canbal、kafka实现mysql数据同步到es,支持全量和增量刷新数据的代码
下面是一个简单的示例代码,演示了如何使用Spring Boot整合Easy-ES、Canal和Kafka实现MySQL数据同步到Elasticsearch,支持全量和增量刷新数据。
1. 添加依赖
在pom.xml中添加以下依赖:
```xml
<!-- Easy-ES -->
<dependency>
<groupId>com.github.a2619388896</groupId>
<artifactId>easy-es-spring-boot-starter</artifactId>
<version>1.0.2</version>
</dependency>
<!-- Canal -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.5</version>
</dependency>
<!-- Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.6.7</version>
</dependency>
```
2. 配置文件
在application.yml中配置Easy-ES、Canal和Kafka:
```yaml
spring:
data:
elasticsearch:
cluster-name: elasticsearch
cluster-nodes: localhost:9300
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: my-group
auto-offset-reset: earliest
properties:
max.poll.interval.ms: 600000
canal:
host: 127.0.0.1
port: 11111
destination: example
username: canal
password: canal
filter:
include:
- .*\\..*
exclude:
- example\\..*
```
3. Canal客户端
使用Canal客户端连接到Canal Server,监听MySQL的变更事件,并将变更事件发送到Kafka:
```java
@Component
public class CanalClient {
private static final Logger logger = LoggerFactory.getLogger(CanalClient.class);
@Autowired
private CanalConnector canalConnector;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@PostConstruct
public void start() {
new Thread(() -> {
int batchSize = 1000;
try {
canalConnector.connect();
canalConnector.subscribe(".*\\..*");
canalConnector.rollback();
while (true) {
Message message = canalConnector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
Thread.sleep(1000);
} else {
List<CanalEntry.Entry> entries = message.getEntries();
for (CanalEntry.Entry entry : entries) {
if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
String database = entry.getHeader().getSchemaName();
String table = entry.getHeader().getTableName();
EventType eventType = rowChange.getEventType();
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
Map<String, Object> data = new HashMap<>();
for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
data.put(column.getName(), column.getValue());
}
String json = new ObjectMapper().writeValueAsString(data);
kafkaTemplate.send(database + "." + table, json);
}
}
}
canalConnector.ack(batchId);
}
}
} catch (Exception e) {
logger.error("Canal client error", e);
} finally {
canalConnector.disconnect();
}
}).start();
}
}
```
4. Kafka消费者
使用Kafka消费者从Kafka中读取变更事件,并将变更事件同步到Elasticsearch:
```java
@Component
public class KafkaConsumer {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
@Autowired
private ElasticsearchOperations elasticsearchOperations;
@KafkaListener(topics = "${canal.filter.include[0]}")
public void consume(ConsumerRecord<String, String> record) {
try {
String[] topicParts = record.topic().split("\\.");
String indexName = topicParts[1];
String json = record.value();
Map<String, Object> data = new ObjectMapper().readValue(json, new TypeReference<>() {});
String id = (String) data.get("id");
data.remove("id");
IndexRequest indexRequest = new IndexRequest(indexName);
indexRequest.id(id);
indexRequest.source(data, XContentType.JSON);
UpdateRequest updateRequest = new UpdateRequest(indexName, id);
updateRequest.doc(data, XContentType.JSON);
updateRequest.upsert(indexRequest);
elasticsearchOperations.update(updateRequest);
} catch (Exception e) {
logger.error("Kafka consumer error", e);
}
}
}
```
5. 完成
现在,只要启动应用程序,就可以将MySQL中的数据同步到Elasticsearch了。如果需要进行全量刷新,只需简单地从MySQL中复制数据到Elasticsearch即可。
阅读全文