springboot整合easy-es和canbal、kafka实现mysql数据同步,支持数据全量和增量同步

时间: 2023-07-15 19:13:13 浏览: 77
Spring Boot 整合 Easy-ES、Canal 和 Kafka 可以实现 MySQL 数据的全量和增量同步。下面简单介绍一下具体步骤: 1. 集成 Easy-ES (1)在 pom.xml 中添加 Easy-ES 依赖: ``` <dependency> <groupId>com.alibaba</groupId> <artifactId>easy-es-spring-boot-starter</artifactId> <version>2.3.0</version> </dependency> ``` (2)在 application.yml 中配置 Easy-ES: ``` spring: elasticsearch: rest: uris: http://127.0.0.1:9200 easy-es: enabled: true index-prefix: my_index refresh-interval: 5s ``` 2. 集成 Canal (1)在 pom.xml 中添加 Canal 依赖: ``` <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.4</version> </dependency> ``` (2)在 application.yml 中配置 Canal: ``` canal: client: # canal server的ip地址和端口号 servers: 127.0.0.1:11111 # 监听的实例名称,多个实例用逗号分隔 instance: my_instance # 连接 Canal server 的用户名和密码 username: password: destination: # 数据源名称 schema: my_db # 数据库连接信息 url: jdbc:mysql://127.0.0.1:3306/my_db?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf-8 username: root password: root filter: # 监听表的正则表达式,多个表用逗号分隔 include: .*\\..* ``` 3. 集成 Kafka (1)在 pom.xml 中添加 Kafka 依赖: ``` <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.5.4.RELEASE</version> </dependency> ``` (2)在 application.yml 中配置 Kafka: ``` spring: kafka: bootstrap-servers: 127.0.0.1:9092 producer: retries: 0 consumer: group-id: my_group_id auto-offset-reset: earliest properties: max.poll.interval.ms: 600000 ``` 4. 实现数据同步 (1)全量同步 全量同步可以通过 Easy-ES 的 `com.alibaba.easysearch.indexbuilder.IndexBuilderFactory` 类来实现。在应用启动时,通过监听 `ApplicationReadyEvent` 事件,获取 MySQL 数据并调用 `com.alibaba.easysearch.indexbuilder.IndexBuilderFactory.buildFullIndex()` 方法来创建索引,具体代码如下: ``` @Component public class FullIndexBuilder implements ApplicationListener<ApplicationReadyEvent> { @Autowired private IndexBuilderFactory indexBuilderFactory; @Override public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) { // 获取 MySQL 数据并创建索引 indexBuilderFactory.buildFullIndex(); } } ``` (2)增量同步 增量同步可以通过 Canal 和 Kafka 实现。Canal 监听 MySQL 数据库变化,将变化信息发送到 Kafka 中,然后在消费者中获取变化信息并更新索引。 首先创建一个 Canal 客户端: ``` @Component public class CanalClient { private static final Logger logger = LoggerFactory.getLogger(CanalClient.class); @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Value("${canal.client.servers}") private String servers; @Value("${canal.client.instance}") private String instance; @PostConstruct public void init() { CanalConnector connector = CanalConnectors.newClusterConnector(servers, instance, "", ""); int batchSize = 1000; try { connector.connect(); connector.subscribe(".*\\..*"); connector.rollback(); while (true) { Message message = connector.getWithoutAck(batchSize); long batchId = message.getId(); if (batchId == -1 || message.getEntries().isEmpty()) { continue; } List<String> messages = new ArrayList<>(); for (CanalEntry.Entry entry : message.getEntries()) { if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) { RowChange rowChange; try { rowChange = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } String tableName = entry.getHeader().getTableName(); EventType eventType = rowChange.getEventType(); for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { String messageJson = JSON.toJSONString(rowData.getAfterColumnsList() .stream() .collect(Collectors.toMap(CanalEntry.Column::getName, CanalEntry.Column::getValue))); messages.add(tableName + "|" + eventType + "|" + messageJson); } } } if (!messages.isEmpty()) { kafkaTemplate.send("my_topic", StringUtils.join(messages, "\n")); } connector.ack(batchId); } } catch (Exception e) { logger.error("CanalClient error", e); } finally { connector.disconnect(); } } } ``` 然后创建一个 Kafka 消费者,获取变化信息并更新索引: ``` @Component public class IncrementIndexBuilder { private static final Logger logger = LoggerFactory.getLogger(IncrementIndexBuilder.class); @Autowired private IndexBuilderFactory indexBuilderFactory; @KafkaListener(topics = "my_topic") public void listen(ConsumerRecord<String, String> record) { String[] fields = StringUtils.split(record.value(), "|"); String tableName = fields[0]; String eventType = fields[1]; String messageJson = fields[2]; try { Map<String, Object> message = JSON.parseObject(messageJson, new TypeReference<Map<String, Object>>() {}); if ("INSERT".equals(eventType)) { indexBuilderFactory.buildIndex(tableName, message); } else if ("UPDATE".equals(eventType)) { indexBuilderFactory.updateIndex(tableName, message); } else if ("DELETE".equals(eventType)) { indexBuilderFactory.deleteIndex(tableName, message); } } catch (Exception e) { logger.error("IncrementIndexBuilder error", e); } } } ``` 到此为止,我们就实现了 Spring Boot 整合 Easy-ES、Canal 和 Kafka 实现 MySQL 数据的全量和增量同步。

相关推荐

最新推荐

OGG实现ORACLE数据到大数据平台KFAKF的实时同步到KUDU数据库

该文档是根据真实项目,搭建的一套OGG实时同步oracle数据到kafka集群,文档主要介绍OGG的安装和进程配置。文档最后附带整个数据处理的流程图。

python3实现从kafka获取数据,并解析为json格式,写入到mysql中

今天小编就为大家分享一篇python3实现从kafka获取数据,并解析为json格式,写入到mysql中,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧

kafka-python批量发送数据的实例

今天小编就为大家分享一篇kafka-python批量发送数据的实例,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧

Java实现批量向mysql写入数据的方法

主要介绍了Java实现批量向mysql写入数据的方法,涉及java基于JDBC连接mysql数据库及写入数据的相关操作技巧,非常简单实用,需要的朋友可以参考下

Spring Boot集群管理工具KafkaAdminClient使用方法解析

主要介绍了Spring Boot集群管理工具KafkaAdminClient使用方法解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下

ChatGPT的工作原理-2023最新版

ChatGPT 是一种能够生成文本的AI模型,它可以自动生成看起来非常像人类写的文字。尽管这让人感到惊讶,但它的工作原理其实并不复杂。在本文中,我们将深入探讨 ChatGPT 的内部结构和运行原理,解释为什么它如此成功地生成有意义的文本。 首先,我们需要了解概率是怎么产生的。概率在AI系统中起着至关重要的作用,通过统计数据和模式识别来预测下一个可能的事件。在 ChatGPT 中,概率被用来生成各种不同的文本形式。 接下来,我们将探讨模型的概念。在AI领域,模型是指一种数学和统计工具,用于解决复杂的问题。ChatGPT 就是一个基于神经网络的模型,它可以学习和理解大量的文本数据,并生成类似的内容。 神经网络是 ChatGPT 的核心组成部分,它模拟了人类大脑的工作方式,并通过多层次的神经元相互连接来处理信息。通过机器学习和神经网络的训练,ChatGPT 可以不断改进其生成文本的质量和准确性。 在 ChatGPT 的训练过程中,嵌入是一个重要的概念。嵌入是将单词或短语转换为向量形式的技术,它有助于模型更好地理解和处理文本数据。 随着 ChatGPT 不断进行基本训练,其能力也在不断提升。但是真正让 ChatGPT 发挥作用的是意义空间和语义运动法则。这些概念帮助模型更好地理解文本的含义和语境,从而生成更加准确和有意义的文本。 此外,语义语法和计算语言的力量也在 ChatGPT 的工作原理中扮演着重要角色。这些工具和技术帮助 ChatGPT 更好地理解文本结构和语法规则,生成更加流畅和自然的文本。 最后,我们将探讨 ChatGPT 对于普通人的影响和机会。作为一种能够生成文本的工具,ChatGPT 可以帮助人们更高效地处理信息和进行沟通,为个人和企业带来更多的机会和发展空间。 综上所述,ChatGPT 是一种非常先进的AI模型,其工作原理基于概率、模型、神经网络和机器学习等技术。通过不断的训练和优化,ChatGPT 能够生成高质量、有意义的文本,为人们的工作和生活带来便利和价值。ChatGPT 的成功离不开对概率、神经网络和语义理解等方面的深入研究,它的影响和机会也将继续扩大,为未来的人工智能发展开辟新的可能性。

管理建模和仿真的文件

管理Boualem Benatallah引用此版本:布阿利姆·贝纳塔拉。管理建模和仿真。约瑟夫-傅立叶大学-格勒诺布尔第一大学,1996年。法语。NNT:电话:00345357HAL ID:电话:00345357https://theses.hal.science/tel-003453572008年12月9日提交HAL是一个多学科的开放存取档案馆,用于存放和传播科学研究论文,无论它们是否被公开。论文可以来自法国或国外的教学和研究机构,也可以来自公共或私人研究中心。L’archive ouverte pluridisciplinaire

嵌入式系统设计:单片机与外设模块的接口设计与优化

# 1. 嵌入式系统设计基础 嵌入式系统是一种专用计算机系统,通常用于控制、监视或执行特定功能。其特点包括紧凑、低功耗、实时性要求高等。与通用计算机系统相比,嵌入式系统更专注于特定应用领域,硬件资源有限、软件定制化程度高。 在嵌入式系统架构中,单片机架构常用于资源受限的场景,外设模块扩展了系统功能。处理器的选择需兼顾性能与功耗,并优化功耗管理策略。 设计嵌入式系统时,需要考虑单片机的选择与接口设计,保证系统稳定可靠。外设模块的选择与接口设计也至关重要,要确保数据传输高效可靠。最后,设计优化技巧如电路布局、供电系统设计、软硬件协同优化能提升系统性能与稳定性。 # 2. 单片机的选择与应用

halcon控件中点击区域选中已存在区域

如果你想在Halcon控件中点击已存在的区域以选中它,你可以使用`set_check`函数来实现。以下是一个示例代码: ```c++ HWindow hWnd; // Halcon窗口句柄 HObject image; // Halcon图像对象 HObject region; // 已存在的区域对象 // 读取图像到image对象中 ReadImage(&image, "image.jpg"); // 生成一个示例的区域对象 GenRectangle1(&region, 100, 100, 300, 300); // 显示图像和已存在的区域到Halcon窗口 DispObj(imag

毕业论文jsp714学生管理系统 带论坛ssh.doc

本文是关于一个JSP714学生管理系统带论坛的毕业论文。论文包括了摘要、背景意义、论文结构安排、开发技术介绍、需求分析、可行性分析、功能分析、业务流程分析、数据库设计、ER图、数据字典、数据流图、详细设计、系统截图、测试、总结、致谢和参考文献。 在毕业论文中,作者首先对学生管理系统的背景和意义进行了阐述,指出了学生管理系统的重要性和实用价值。接着作者详细介绍了论文的结构安排,包括各章节的内容和组织方式。在开发技术介绍中,作者说明了使用的技术和工具,为后续开发工作做好准备。 需求分析部分详细描述了学生管理系统的功能需求和性能需求,为系统设计和开发提供了指导。可行性分析则对系统的可行性进行了评估,包括技术可行性、经济可行性和实施可行性等方面。功能分析部分对系统的主要功能进行了梳理,明确了系统需要实现的功能模块和功能点。 在业务流程分析中,作者对学生管理系统的业务流程进行了建模和分析,确保系统设计符合实际业务需求。数据库设计部分包括ER图和数据字典,详细说明了系统数据库的结构和字段定义。数据流图则展示了系统的数据流向和处理过程,为系统设计提供了参考。 详细设计部分包括系统架构设计、模块设计和界面设计等内容,为系统开发提供了具体步骤和方法。系统截图展示了系统的界面和功能,使读者可以直观地了解系统的运行情况。测试部分对系统进行了全面的测试,确保系统的稳定性和可靠性。 总结部分对整个毕业论文进行了总结和归纳,指出了系统的优点和不足之处,并提出了改进和发展的方向。致谢部分感谢了所有支持和帮助过作者的人,表达了作者的诚挚感激之情。参考文献列出了作者在研究过程中所参考的文献和资料。 整个毕业论文详细而全面地介绍了一个JSP714学生管理系统带论坛的设计与开发过程,具有很高的实用价值和参考意义。通过本文的阅读,读者可以了解到学生管理系统的设计方法和开发过程,对相关领域的研究和实践起到了有效的指导作用。