docker 部署canal 并且在java客户端消费
时间: 2024-03-24 08:37:04 浏览: 156
好的,我来回答你的问题。
首先,我们需要了解一下什么是 Canal。Canal 是阿里巴巴开源的一款基于 MySQL 数据库增量日志解析与同步的组件,可以将 MySQL 数据库的增量日志解析出来,再进行消费和传输。它可以支持多种消费者,其中包括 Java 客户端。
接下来,我们来讲一下如何在 Docker 中部署 Canal。
1. 首先,我们需要拉取 Canal 的 Docker 镜像。可以使用以下命令:
```
docker pull canal/canal-server
```
2. 接着,我们需要创建一个配置文件,用于指定 Canal 的配置信息。例如,我们可以创建一个名为 `canal.properties` 的文件,内容如下:
```
canal.instance.master.address=mysql服务器ip:3306
canal.instance.dbUsername=数据库用户名
canal.instance.dbPassword=数据库密码
canal.instance.defaultDatabaseName=需要同步的数据库名称
canal.instance.connectionCharset=UTF-8
canal.instance.tsdb.enable=true
canal.instance.gtidon=false
canal.instance.enableDruid=false
```
需要注意的是,`canal.instance.master.address` 需要填写你的 MySQL 服务器的 IP 地址和端口号,`canal.instance.dbUsername` 和 `canal.instance.dbPassword` 需要填写你的 MySQL 数据库的用户名和密码,`canal.instance.defaultDatabaseName` 需要填写你需要同步的数据库名称。
3. 然后,我们需要创建一个名为 `instance.yml` 的文件,内容如下:
```
canal.instance:
# 主节点信息
master:
address: mysql服务器ip:3306
jmx: false
# mq配置,不需要可以不配置
mq:
enabled: false
# 配置同步的数据库实例信息
dbUsername: 数据库用户名
dbPassword: 数据库密码
connectionCharset: UTF-8
# 配置需要同步的数据库信息
positionInfo:
name: example
position:
entries:
filter:
# 不需要同步的表
- example.ignore
```
需要注意的是,`canal.instance.master.address`、`canal.instance.dbUsername` 和 `canal.instance.dbPassword` 的值需要和上面的 `canal.properties` 文件中的值保持一致。
4. 最后,我们可以使用以下命令启动 Canal 服务:
```
docker run --name canal-server -v /path/to/instance.yml:/home/admin/canal-server/conf/instance.yml -v /path/to/canal.properties:/home/admin/canal-server/conf/canal.properties -d -p 11111:11111 canal/canal-server
```
需要注意的是,`/path/to/instance.yml` 和 `/path/to/canal.properties` 需要替换成你自己的文件路径。
至此,我们已经成功在 Docker 中部署了 Canal。
接下来,我们来讲一下如何在 Java 客户端消费 Canal。
1. 首先,我们需要在 pom.xml 文件中添加 Canal 客户端的依赖:
```
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
```
2. 然后,我们可以使用以下代码来消费 Canal:
```
// 创建 Canal 连接器
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost", 11111), "example", "", "");
// 连接 Canal
connector.connect();
// 订阅需要同步的表
connector.subscribe("example.*");
// 循环消费数据
while (true) {
// 获取数据
Message message = connector.getWithoutAck(100);
// 获取批次 ID
long batchId = message.getId();
// 获取当前批次的数据条数
int size = message.getEntries().size();
// 处理数据
for (CanalEntry.Entry entry : message.getEntries()) {
// 判断数据类型
if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
// 获取表名
String tableName = entry.getHeader().getTableName();
// 获取操作类型
CanalEntry.EventType eventType = entry.getHeader().getEventType();
// 获取行数据
RowChange rowChange = null;
try {
rowChange = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
// 处理行数据
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
// 处理插入操作
if (eventType == CanalEntry.EventType.INSERT) {
// 获取列数据
List<CanalEntry.Column> columns = rowData.getAfterColumnsList();
// 处理列数据
for (CanalEntry.Column column : columns) {
String name = column.getName();
String value = column.getValue();
System.out.println("INSERT: " + tableName + "." + name + " = " + value);
}
}
// 处理更新操作
if (eventType == CanalEntry.EventType.UPDATE) {
// 获取列数据
List<CanalEntry.Column> beforeColumns = rowData.getBeforeColumnsList();
List<CanalEntry.Column> afterColumns = rowData.getAfterColumnsList();
// 处理列数据
for (int i = 0; i < beforeColumns.size(); i++) {
String name = beforeColumns.get(i).getName();
String beforeValue = beforeColumns.get(i).getValue();
String afterValue = afterColumns.get(i).getValue();
System.out.println("UPDATE: " + tableName + "." + name + " = " + beforeValue + " -> " + afterValue);
}
}
// 处理删除操作
if (eventType == CanalEntry.EventType.DELETE) {
// 获取列数据
List<CanalEntry.Column> columns = rowData.getBeforeColumnsList();
// 处理列数据
for (CanalEntry.Column column : columns) {
String name = column.getName();
String value = column.getValue();
System.out.println("DELETE: " + tableName + "." + name + " = " + value);
}
}
}
}
}
// 提交批次
connector.ack(batchId);
}
```
至此,我们已经成功在 Java 客户端消费了 Canal。
阅读全文