springboot整合easy-es、canbal、kafka实现mysql数据同步到es,支持全量和增量刷新数据
时间: 2024-01-09 09:03:39 浏览: 140
实现MySQL数据同步到Elasticsearch,可以使用Easy-Es和Canal组合的方式。其中,Easy-Es是一个开源的Elasticsearch操作框架,可以方便地进行数据的增删改查等操作。Canal是阿里巴巴开源的基于数据库增量日志解析工具,可以实时地捕获MySQL数据库的增量日志,从而实现MySQL数据的实时同步。
下面是具体的步骤:
1. 首先,在Spring Boot中引入Easy-Es和Canal的依赖包:
```xml
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>com.github.yingzhuo</groupId>
<artifactId>easy-es-spring-boot-starter</artifactId>
<version>1.3.0</version>
</dependency>
```
2. 在application.properties中配置Canal和Elasticsearch的相关信息:
```properties
# Canal配置
canal.server.ip=127.0.0.1
canal.server.port=11111
canal.server.destination=test
canal.server.username=
canal.server.password=
# Elasticsearch配置
spring.elasticsearch.rest.uris=http://localhost:9200/
spring.elasticsearch.rest.username=
spring.elasticsearch.rest.password=
```
3. 创建CanalClient用于监听MySQL的增量日志,并将变更的数据同步到Elasticsearch中:
```java
@Component
public class CanalClient {
@Autowired
private ElasticsearchTemplate elasticsearchTemplate;
@PostConstruct
public void init() {
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress(environment.getProperty("canal.server.ip"),
Integer.parseInt(environment.getProperty("canal.server.port"))),
environment.getProperty("canal.server.destination"),
environment.getProperty("canal.server.username"),
environment.getProperty("canal.server.password"));
new Thread(() -> {
try {
connector.connect();
connector.subscribe(".*\\..*");
while (true) {
Message message = connector.getWithoutAck(1000);
long batchId = message.getId();
if (batchId == -1 || message.getEntries().isEmpty()) {
continue;
}
List<CanalEntry.Entry> entries = message.getEntries();
for (CanalEntry.Entry entry : entries) {
if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
String tableName = entry.getHeader().getTableName();
String databaseName = entry.getHeader().getSchemaName();
switch (rowChange.getEventType()) {
case INSERT:
case UPDATE:
List<Map<String, Object>> dataList = new ArrayList<>();
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
Map<String, Object> data = new HashMap<>();
for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
data.put(column.getName(), column.getValue());
}
dataList.add(data);
}
elasticsearchTemplate.bulkIndex(dataList, IndexCoordinates.of(databaseName, tableName));
break;
case DELETE:
List<String> idList = new ArrayList<>();
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
Map<String, Object> data = new HashMap<>();
for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
if (column.getIsKey()) {
idList.add(column.getValue());
}
}
}
elasticsearchTemplate.bulkDelete(idList, IndexCoordinates.of(databaseName, tableName));
break;
default:
break;
}
}
}
connector.ack(batchId);
}
} catch (Exception e) {
connector.rollback();
} finally {
connector.disconnect();
}
}).start();
}
}
```
4. 如果需要支持全量数据同步,可以使用Easy-Es提供的bulkIndex接口,实现将MySQL中的所有数据同步到Elasticsearch中:
```java
@Autowired
private ElasticsearchTemplate elasticsearchTemplate;
@Autowired
private JdbcTemplate jdbcTemplate;
@PostConstruct
public void init() {
elasticsearchTemplate.deleteIndex(User.class);
elasticsearchTemplate.createIndex(User.class);
elasticsearchTemplate.putMapping(User.class);
List<User> userList = jdbcTemplate.query("SELECT * FROM user", BeanPropertyRowMapper.newInstance(User.class));
elasticsearchTemplate.bulkIndex(userList);
}
```
5. 如果需要支持增量数据同步,可以在CanalClient中增加一个定时任务,定时从MySQL中查询出最新的数据,并将数据同步到Elasticsearch中:
```java
@Scheduled(fixedRate = 5000)
public void syncData() {
List<User> userList = jdbcTemplate.query("SELECT * FROM user WHERE update_time > ?", new Object[]{lastUpdateTime}, BeanPropertyRowMapper.newInstance(User.class));
if (!userList.isEmpty()) {
elasticsearchTemplate.bulkIndex(userList);
lastUpdateTime = userList.get(userList.size() - 1).getUpdateTime();
}
}
```
通过以上步骤,就可以实现MySQL数据同步到Elasticsearch,并且支持全量和增量刷新数据了。另外,如果需要支持Kafka,可以在CanalClient中增加一个KafkaProducer,将变更的数据发送到Kafka中。
阅读全文