1、docker compose部署 canal mysql8.0.30同步到rabbitmq 多队列; 2、Java代码实现将rabbitmq中的消息消费并写入clickhouse
时间: 2024-03-27 08:36:43 浏览: 84
很高兴回答你的问题。让我们逐个解答。
1. docker compose部署canal mysql8.0.30同步到rabbitmq多队列:
首先,我们需要在docker-compose.yml中定义canal、mysql、rabbitmq容器的参数,如下所示:
```
version: '3'
services:
canal:
image: canal/canal-server:v1.1.4
ports:
- 11111:11111
environment:
- canal.instance.master.address=mysql:3306
- canal.instance.dbUsername=canal
- canal.instance.dbPassword=canal
- canal.instance.filter.regex=.*\\..*
- canal.instance.filter.black.regex=mysql\\.slave_.*
- canal.instance.filter.black.regex=mysql\\.binlog_.*
- canal.instance.filter.black.regex=mysql\\.innodb_.*
mysql:
image: mysql:8.0.30
ports:
- 3306:3306
environment:
- MYSQL_ROOT_PASSWORD=root
- MYSQL_DATABASE=test
- MYSQL_USER=canal
- MYSQL_PASSWORD=canal
volumes:
- ./mysql/data:/var/lib/mysql
- ./mysql/conf:/etc/mysql/conf.d
rabbitmq:
image: rabbitmq:3-management
ports:
- 5672:5672
- 15672:15672
```
上述配置中,我们定义了canal、mysql、rabbitmq三个服务。其中,canal服务监听mysql的binlog,并将数据发送到rabbitmq。mysql服务提供数据源,rabbitmq服务作为数据接收方。
接下来,我们需要在canal中配置同步规则,以便将数据发送到rabbitmq。可以通过canal的管理页面进行配置。具体步骤如下:
1. 访问canal的管理页面:http://localhost:11111/manager/index
2. 点击“添加实例”,填写mysql的相关参数,保存。
3. 点击“添加通道”,填写通道名称、选择数据源、选择消息队列类型(这里选择rabbitmq),保存。
4. 点击“添加同步规则”,填写规则名称、选择通道、选择数据表、选择操作类型(insert/update/delete),保存。
至此,canal的同步规则已经配置完成。启动docker-compose服务,canal会开始监听mysql的binlog,并将数据发送到rabbitmq。
2. Java代码实现将rabbitmq中的消息消费并写入clickhouse:
我们可以使用Spring AMQP来消费rabbitmq中的消息。具体步骤如下:
1. 引入Spring AMQP依赖:
```
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.3.11.RELEASE</version>
</dependency>
```
2. 配置连接信息:
```
@Configuration
public class RabbitMQConfig {
@Value("${rabbitmq.host}")
private String host;
@Value("${rabbitmq.port}")
private int port;
@Value("${rabbitmq.username}")
private String username;
@Value("${rabbitmq.password}")
private String password;
@Value("${rabbitmq.queue-name}")
private String queueName;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
return connectionFactory;
}
@Bean
public Queue queue() {
return new Queue(queueName);
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setQueue(queueName);
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueueNames(queueName);
container.setMessageListener(new MessageListenerAdapter(new RabbitMQListener()));
return container;
}
}
```
上述代码中,我们定义了RabbitMQ的连接信息,包括主机、端口、用户名、密码以及队列名称。同时,我们也定义了消息监听器,用于接收并处理消息。
3. 编写消息监听器:
```
public class RabbitMQListener {
@Autowired
private ClickHouseService clickHouseService;
public void handleMessage(String message) {
// 将消息写入clickhouse
clickHouseService.insert(message);
}
}
```
上述代码中,我们定义了一个消息监听器,用于接收从rabbitmq中获取的消息,并将消息写入clickhouse。
4. 编写clickhouse服务:
```
@Service
public class ClickHouseService {
private static final String INSERT_SQL = "INSERT INTO test_table (id, name) VALUES (?, ?)";
@Autowired
private JdbcTemplate jdbcTemplate;
public void insert(String message) {
// 解析消息
JSONObject json = JSON.parseObject(message);
int id = json.getInteger("id");
String name = json.getString("name");
// 插入数据
jdbcTemplate.update(INSERT_SQL, id, name);
}
}
```
上述代码中,我们定义了一个clickhouse服务,用于将消息插入clickhouse中。在这里,我们使用JdbcTemplate来操作clickhouse。
至此,我们已经完成了将rabbitmq中的消息消费并写入clickhouse的Java代码实现。
阅读全文