不用kafka 将db的数据同步到es
时间: 2023-09-18 19:01:41 浏览: 104
不使用Kafka将数据库的数据同步到Elasticsearch(ES)可以采用以下几种方法:
1. 使用数据库触发器(Triggers)实现同步:在数据库中创建触发器,当特定的表发生变化时,触发器将捕获变化并通过HTTP请求将变化的数据发送到ES进行索引更新。
2. 使用定时任务(Scheduled Tasks)实现同步:编写一个定时任务,定期查询数据库,将变化的数据通过HTTP请求发送到ES进行索引更新。可以使用诸如Cron等工具来执行定时任务。
3. 使用数据库的Change Data Capture(CDC)功能:一些数据库提供了CDC功能,可以捕获数据库的变化并将其发送到消息队列(如ActiveMQ、RabbitMQ等)中。然后,使用消息队列的消费者将数据发送到ES进行索引更新。
4. 使用专门的数据同步工具:有一些第三方工具可以帮助将数据库的数据同步到ES,例如Logstash、Debezium等。这些工具可以监控数据库的变化并将变化的数据发送到ES。
需要注意的是,虽然不使用Kafka可实现数据库到ES的数据同步,但Kafka作为一个高性能、分布式消息队列,具有很好的数据缓冲和并发处理能力,通常被广泛用于数据的异步传输和流式处理。因此,在某些情况下,Kafka仍然是一个优秀的选择来实现数据库到ES的数据同步。
相关问题
springboot整合easy-es和canbal、kafka实现mysql数据同步,支持数据全量和增量同步
Spring Boot 整合 Easy-ES、Canal 和 Kafka 可以实现 MySQL 数据的全量和增量同步。下面简单介绍一下具体步骤:
1. 集成 Easy-ES
(1)在 pom.xml 中添加 Easy-ES 依赖:
```
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>easy-es-spring-boot-starter</artifactId>
<version>2.3.0</version>
</dependency>
```
(2)在 application.yml 中配置 Easy-ES:
```
spring:
elasticsearch:
rest:
uris: http://127.0.0.1:9200
easy-es:
enabled: true
index-prefix: my_index
refresh-interval: 5s
```
2. 集成 Canal
(1)在 pom.xml 中添加 Canal 依赖:
```
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
```
(2)在 application.yml 中配置 Canal:
```
canal:
client:
# canal server的ip地址和端口号
servers: 127.0.0.1:11111
# 监听的实例名称,多个实例用逗号分隔
instance: my_instance
# 连接 Canal server 的用户名和密码
username:
password:
destination:
# 数据源名称
schema: my_db
# 数据库连接信息
url: jdbc:mysql://127.0.0.1:3306/my_db?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf-8
username: root
password: root
filter:
# 监听表的正则表达式,多个表用逗号分隔
include: .*\\..*
```
3. 集成 Kafka
(1)在 pom.xml 中添加 Kafka 依赖:
```
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.5.4.RELEASE</version>
</dependency>
```
(2)在 application.yml 中配置 Kafka:
```
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
producer:
retries: 0
consumer:
group-id: my_group_id
auto-offset-reset: earliest
properties:
max.poll.interval.ms: 600000
```
4. 实现数据同步
(1)全量同步
全量同步可以通过 Easy-ES 的 `com.alibaba.easysearch.indexbuilder.IndexBuilderFactory` 类来实现。在应用启动时,通过监听 `ApplicationReadyEvent` 事件,获取 MySQL 数据并调用 `com.alibaba.easysearch.indexbuilder.IndexBuilderFactory.buildFullIndex()` 方法来创建索引,具体代码如下:
```
@Component
public class FullIndexBuilder implements ApplicationListener<ApplicationReadyEvent> {
@Autowired
private IndexBuilderFactory indexBuilderFactory;
@Override
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
// 获取 MySQL 数据并创建索引
indexBuilderFactory.buildFullIndex();
}
}
```
(2)增量同步
增量同步可以通过 Canal 和 Kafka 实现。Canal 监听 MySQL 数据库变化,将变化信息发送到 Kafka 中,然后在消费者中获取变化信息并更新索引。
首先创建一个 Canal 客户端:
```
@Component
public class CanalClient {
private static final Logger logger = LoggerFactory.getLogger(CanalClient.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${canal.client.servers}")
private String servers;
@Value("${canal.client.instance}")
private String instance;
@PostConstruct
public void init() {
CanalConnector connector = CanalConnectors.newClusterConnector(servers, instance, "", "");
int batchSize = 1000;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
if (batchId == -1 || message.getEntries().isEmpty()) {
continue;
}
List<String> messages = new ArrayList<>();
for (CanalEntry.Entry entry : message.getEntries()) {
if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
RowChange rowChange;
try {
rowChange = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
String tableName = entry.getHeader().getTableName();
EventType eventType = rowChange.getEventType();
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
String messageJson = JSON.toJSONString(rowData.getAfterColumnsList()
.stream()
.collect(Collectors.toMap(CanalEntry.Column::getName, CanalEntry.Column::getValue)));
messages.add(tableName + "|" + eventType + "|" + messageJson);
}
}
}
if (!messages.isEmpty()) {
kafkaTemplate.send("my_topic", StringUtils.join(messages, "\n"));
}
connector.ack(batchId);
}
} catch (Exception e) {
logger.error("CanalClient error", e);
} finally {
connector.disconnect();
}
}
}
```
然后创建一个 Kafka 消费者,获取变化信息并更新索引:
```
@Component
public class IncrementIndexBuilder {
private static final Logger logger = LoggerFactory.getLogger(IncrementIndexBuilder.class);
@Autowired
private IndexBuilderFactory indexBuilderFactory;
@KafkaListener(topics = "my_topic")
public void listen(ConsumerRecord<String, String> record) {
String[] fields = StringUtils.split(record.value(), "|");
String tableName = fields[0];
String eventType = fields[1];
String messageJson = fields[2];
try {
Map<String, Object> message = JSON.parseObject(messageJson, new TypeReference<Map<String, Object>>() {});
if ("INSERT".equals(eventType)) {
indexBuilderFactory.buildIndex(tableName, message);
} else if ("UPDATE".equals(eventType)) {
indexBuilderFactory.updateIndex(tableName, message);
} else if ("DELETE".equals(eventType)) {
indexBuilderFactory.deleteIndex(tableName, message);
}
} catch (Exception e) {
logger.error("IncrementIndexBuilder error", e);
}
}
}
```
到此为止,我们就实现了 Spring Boot 整合 Easy-ES、Canal 和 Kafka 实现 MySQL 数据的全量和增量同步。
阅读全文