springboot整合easy-es、canbal、kafka实现mysql数据同步到es,支持全量和增量刷新数据的代码
时间: 2024-01-24 11:16:53 浏览: 143
下面是一个简单的示例代码,演示了如何使用Spring Boot整合Easy-ES、Canal和Kafka实现MySQL数据同步到Elasticsearch,支持全量和增量刷新数据。
1. 添加依赖
在pom.xml中添加以下依赖:
```xml
<!-- Easy-ES -->
<dependency>
<groupId>com.github.a2619388896</groupId>
<artifactId>easy-es-spring-boot-starter</artifactId>
<version>1.0.2</version>
</dependency>
<!-- Canal -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.5</version>
</dependency>
<!-- Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.6.7</version>
</dependency>
```
2. 配置文件
在application.yml中配置Easy-ES、Canal和Kafka:
```yaml
spring:
data:
elasticsearch:
cluster-name: elasticsearch
cluster-nodes: localhost:9300
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: my-group
auto-offset-reset: earliest
properties:
max.poll.interval.ms: 600000
canal:
host: 127.0.0.1
port: 11111
destination: example
username: canal
password: canal
filter:
include:
- .*\\..*
exclude:
- example\\..*
```
3. Canal客户端
使用Canal客户端连接到Canal Server,监听MySQL的变更事件,并将变更事件发送到Kafka:
```java
@Component
public class CanalClient {
private static final Logger logger = LoggerFactory.getLogger(CanalClient.class);
@Autowired
private CanalConnector canalConnector;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@PostConstruct
public void start() {
new Thread(() -> {
int batchSize = 1000;
try {
canalConnector.connect();
canalConnector.subscribe(".*\\..*");
canalConnector.rollback();
while (true) {
Message message = canalConnector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
Thread.sleep(1000);
} else {
List<CanalEntry.Entry> entries = message.getEntries();
for (CanalEntry.Entry entry : entries) {
if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
String database = entry.getHeader().getSchemaName();
String table = entry.getHeader().getTableName();
EventType eventType = rowChange.getEventType();
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
Map<String, Object> data = new HashMap<>();
for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
data.put(column.getName(), column.getValue());
}
String json = new ObjectMapper().writeValueAsString(data);
kafkaTemplate.send(database + "." + table, json);
}
}
}
canalConnector.ack(batchId);
}
}
} catch (Exception e) {
logger.error("Canal client error", e);
} finally {
canalConnector.disconnect();
}
}).start();
}
}
```
4. Kafka消费者
使用Kafka消费者从Kafka中读取变更事件,并将变更事件同步到Elasticsearch:
```java
@Component
public class KafkaConsumer {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
@Autowired
private ElasticsearchOperations elasticsearchOperations;
@KafkaListener(topics = "${canal.filter.include[0]}")
public void consume(ConsumerRecord<String, String> record) {
try {
String[] topicParts = record.topic().split("\\.");
String indexName = topicParts[1];
String json = record.value();
Map<String, Object> data = new ObjectMapper().readValue(json, new TypeReference<>() {});
String id = (String) data.get("id");
data.remove("id");
IndexRequest indexRequest = new IndexRequest(indexName);
indexRequest.id(id);
indexRequest.source(data, XContentType.JSON);
UpdateRequest updateRequest = new UpdateRequest(indexName, id);
updateRequest.doc(data, XContentType.JSON);
updateRequest.upsert(indexRequest);
elasticsearchOperations.update(updateRequest);
} catch (Exception e) {
logger.error("Kafka consumer error", e);
}
}
}
```
5. 完成
现在,只要启动应用程序,就可以将MySQL中的数据同步到Elasticsearch了。如果需要进行全量刷新,只需简单地从MySQL中复制数据到Elasticsearch即可。
阅读全文