canal 客户端获取mysql5.6的数据demo
时间: 2023-08-14 14:05:44 浏览: 27
以下是一个使用 Canal 客户端获取 MySQL 5.6 数据的 Java Demo:
```java
import java.net.InetSocketAddress;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.google.protobuf.InvalidProtocolBufferException;
import java.util.List;
public class CanalDemo {
public static void main(String[] args) {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111), "example", "", "");
int batchSize = 1000;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
Thread.sleep(1000);
} else {
printEntries(message.getEntries());
}
connector.ack(batchId);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
connector.disconnect();
}
}
private static void printEntries(List<Entry> entries) {
for (Entry entry : entries) {
if (entry.getEntryType() == EntryType.ROWDATA) {
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
EventType eventType = rowChange.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumns(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumns(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumns(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumns(rowData.getAfterColumnsList());
}
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
}
}
private static void printColumns(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
```
这个 Demo 连接到本地的 Canal Server,并订阅了所有数据库表的数据变更事件。每次获取到数据变更事件后,它会打印出相应的信息。你可以根据自己的需要修改代码,以适应你的应用场景。
相关推荐















