springboot 整合 canal
时间: 2023-10-22 19:08:57 浏览: 164
Canal 是阿里巴巴的一款开源的数据库同步工具,支持 MySQL、Oracle、PostgreSQL 等多种数据库,可以实时捕获数据库变更,包括 DDL 和 DML 操作,然后将这些变更信息通过消息队列的方式发送给消费者,以实现数据同步等功能。
Spring Boot 是一个快速开发框架,它提供了很多开箱即用的组件和工具,方便开发人员快速构建应用程序。在 Spring Boot 中整合 Canal,可以更加方便地进行数据库同步的开发。
以下是使用 Spring Boot 整合 Canal 的步骤:
1. 添加 Canal 相关依赖
在 pom.xml 文件中添加 Canal 相关依赖:
```
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
```
2. 配置 Canal 客户端
在 application.yml 文件中添加 Canal 客户端的配置:
```
canal:
client:
host: 127.0.0.1
port: 11111
destination: example
username: canal
password: canal
```
其中,host 和 port 分别指定 Canal 服务器的地址和端口号,destination 指定 Canal 实例的名称,username 和 password 分别指定 Canal 客户端的用户名和密码。
3. 创建 Canal 客户端
在 Spring Boot 应用程序中创建 Canal 客户端,通过监听 Canal 的 binlog 变更事件来实现数据同步。可以通过继承 AbstractCanalListener 类来实现自定义的监听器。
```
@Component
public class CanalClient {
@Autowired
private CanalConfig canalConfig;
private CanalConnector canalConnector;
private static final Logger LOGGER = LoggerFactory.getLogger(CanalClient.class);
private static final int BATCH_SIZE = 1000;
@PostConstruct
public void init() {
canalConnector = CanalConnectors.newSingleConnector(
new InetSocketAddress(canalConfig.getHost(), canalConfig.getPort()),
canalConfig.getDestination(),
canalConfig.getUsername(),
canalConfig.getPassword());
new Thread(() -> {
while (true) {
try {
canalConnector.connect();
canalConnector.subscribe(".*\\..*");
while (true) {
Message message = canalConnector.getWithoutAck(BATCH_SIZE, 1000L, TimeUnit.MILLISECONDS);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
Thread.sleep(1000L);
} else {
printEntries(message.getEntries());
}
canalConnector.ack(batchId);
}
} catch (Exception e) {
LOGGER.error("Canal client error: {}", e.getMessage(), e);
try {
Thread.sleep(1000L);
} catch (InterruptedException ex) {
LOGGER.error("Thread sleep error: {}", ex.getMessage(), ex);
}
} finally {
canalConnector.disconnect();
}
}
}).start();
}
private void printEntries(List<Entry> entries) {
for (Entry entry : entries) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChange;
try {
rowChange = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
LOGGER.error("Parsing row change error: {}", e.getMessage(), e);
return;
}
EventType eventType = rowChange.getEventType();
String tableName = entry.getHeader().getTableName();
for (RowData rowData : rowChange.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumns(rowData.getBeforeColumnsList(), tableName, eventType);
} else if (eventType == EventType.INSERT) {
printColumns(rowData.getAfterColumnsList(), tableName, eventType);
} else {
printColumns(rowData.getBeforeColumnsList(), tableName, eventType);
printColumns(rowData.getAfterColumnsList(), tableName, eventType);
}
}
}
}
private void printColumns(List<Column> columns, String tableName, EventType eventType) {
StringBuilder builder = new StringBuilder();
builder.append(tableName).append(": ");
builder.append(eventType == EventType.DELETE ? "delete" : eventType == EventType.INSERT ? "insert" : "update").append(" ");
for (Column column : columns) {
builder.append(column.getName()).append("=").append(column.getValue()).append(";");
}
LOGGER.info(builder.toString());
}
}
```
在 init() 方法中,创建 Canal 连接器,并订阅所有的数据库表。然后在一个死循环中,不断地从 Canal 连接器中获取变更事件,解析并处理这些事件。
4. 自定义 Canal 监听器
在应用程序中创建自定义的 Canal 监听器,继承 AbstractCanalListener 类,实现自己的业务逻辑。
```
@Component
@CanalEventListener
public class MyCanalListener extends AbstractCanalListener {
@Autowired
private UserService userService;
@Override
public void onInsert(RowChange rowChange) {
String tableName = rowChange.getTable();
if ("user".equals(tableName)) {
for (RowData rowData : rowChange.getRowDatasList()) {
User user = new User();
user.setId(Long.parseLong(rowData.getAfterColumns("id").getValue()));
user.setName(rowData.getAfterColumns("name").getValue());
user.setAge(Integer.parseInt(rowData.getAfterColumns("age").getValue()));
userService.addUser(user);
}
}
}
@Override
public void onUpdate(RowChange rowChange) {
String tableName = rowChange.getTable();
if ("user".equals(tableName)) {
for (RowData rowData : rowChange.getRowDatasList()) {
User user = new User();
user.setId(Long.parseLong(rowData.getAfterColumns("id").getValue()));
user.setName(rowData.getAfterColumns("name").getValue());
user.setAge(Integer.parseInt(rowData.getAfterColumns("age").getValue()));
userService.updateUser(user);
}
}
}
@Override
public void onDelete(RowChange rowChange) {
String tableName = rowChange.getTable();
if ("user".equals(tableName)) {
for (RowData rowData : rowChange.getRowDatasList()) {
Long id = Long.parseLong(rowData.getBeforeColumns("id").getValue());
userService.deleteUserById(id);
}
}
}
}
```
在这个监听器中,实现了对 user 表的 INSERT、UPDATE 和 DELETE 操作的监听,并将这些操作同步到数据库中。
总结
通过上述步骤,我们可以很容易地在 Spring Boot 中整合 Canal,实现数据库的实时同步。当然,这只是一个简单的示例,实际的应用场景可能更为复杂,需要根据实际情况进行调整和优化。
阅读全文