Canal与消息队列集成实战
Canal与消息队列集成实战
前置知识
在开始本教程之前,建议您具备以下基础知识:
- Canal的基本原理和使用方法
- 消息队列(Kafka/RocketMQ)的基本概念
- Java开发环境
- Spring Boot基础
为什么需要消息队列
在前面的文章中,我们介绍了Canal的基本原理和使用方法,通过Canal客户端直接消费binlog数据。但在实际生产环境中,这种方式存在一些局限性:
- 耦合性高:Canal客户端与业务逻辑耦合在一起,不利于系统解耦
- 扩展性差:当需要多个应用消费同一份数据时,需要每个应用都实现一个Canal客户端
- 可靠性问题:如果Canal客户端处理数据时出现异常,可能导致数据丢失
- 性能瓶颈:当数据量大时,单个Canal客户端可能成为性能瓶颈
引入消息队列可以解决上述问题:
- 解耦:Canal服务端将数据发送到消息队列,业务应用从消息队列消费数据,实现系统解耦
- 扩展性:多个应用可以同时消费同一个主题的消息
- 可靠性:消息队列提供消息持久化和重试机制,保证数据不丢失
- 负载均衡:消息队列可以实现负载均衡,提高系统吞吐量
Canal与消息队列集成架构
整体架构
Canal与消息队列的集成架构如下图所示:
MySQL Binlog --> Canal Server --> 消息队列(Kafka/RocketMQ) --> 消费者应用
在这个架构中:
- MySQL产生binlog日志
- Canal Server模拟MySQL slave,解析binlog
- Canal Server将解析后的数据发送到消息队列
- 消费者应用从消息队列订阅并消费数据
Canal支持的消息队列
Canal目前支持以下几种消息队列:
- Kafka:高吞吐量的分布式发布订阅消息系统
- RocketMQ:阿里巴巴开源的分布式消息中间件
- RabbitMQ:实现了高级消息队列协议(AMQP)的开源消息代理软件
- Pulsar:雅虎开源的分布式发布订阅消息系统
本文将重点介绍Canal与Kafka、RocketMQ的集成方案。
Canal与Kafka集成
1. 环境准备
安装Kafka
首先,我们需要安装Kafka。可以从Kafka官网下载最新版本。
# 下载Kafka
wget https://downloads.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz
# 解压
tar -xzf kafka_2.13-3.3.1.tgz
cd kafka_2.13-3.3.1
启动Zookeeper和Kafka
# 启动Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties &
# 启动Kafka
bin/kafka-server-start.sh config/server.properties &
创建Topic
# 创建用于接收Canal数据的Topic
bin/kafka-topics.sh --create --topic canal-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
2. 配置Canal Server
修改Canal Server的配置,使其将数据发送到Kafka。
修改canal.properties
# 修改canal.properties文件,设置serverMode为kafka
canal.serverMode = kafka
# 设置kafka相关配置
canal.mq.servers = 127.0.0.1:9092
# 设置flatMessage,扁平化消息
canal.mq.flatMessage = true
# 设置批量获取数据的大小
canal.mq.batchSize = 50
# 设置获取数据的超时时间
canal.mq.fetchTimeout = 100
修改instance.properties
# 修改instance.properties文件,设置消息队列相关配置
# 设置消息主题,可以为每个instance设置不同的主题
canal.mq.topic = canal-topic
# 设置分区,-1表示随机分区,也可以指定具体分区
canal.mq.partition = -1
# 设置消息队列的过滤规则,与canal的过滤规则一致
canal.instance.filter.regex = .*\..*
3. 启动Canal Server
# 启动Canal Server
sh bin/startup.sh
4. 验证Kafka消息
启动Canal Server后,我们可以使用Kafka控制台消费者来验证消息是否正确发送到Kafka:
# 消费Kafka消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic canal-topic --from-beginning
如果一切正常,当MySQL数据库发生变更时,我们应该能看到相应的消息输出。
Canal消息格式解析
Canal发送到消息队列的消息有两种格式:
- 非扁平化消息:默认格式,与Canal客户端接收的消息格式一致
- 扁平化消息:通过设置
canal.mq.flatMessage = true
启用,更易于处理
非扁平化消息格式
非扁平化消息是一个JSON对象,包含以下字段:
{
"data": [ // 数据变更内容
{"id": "1", "name": "张三", "age": "28"}
],
"database": "test", // 数据库名
"es": 1589373515000, // 事件时间戳(毫秒)
"id": 5, // 消息ID
"isDdl": false, // 是否是DDL语句
"mysqlType": { // 字段的MySQL类型
"id": "int(11)",
"name": "varchar(50)",
"age": "int(11)"
},
"old": null, // 变更前的数据(仅在UPDATE操作时有值)
"pkNames": ["id"], // 主键名称
"sql": "", // 原始SQL语句(仅在isDdl=true时有值)
"sqlType": { // 字段的Java类型
"id": 4,
"name": 12,
"age": 4
},
"table": "user", // 表名
"ts": 1589373515477, // 消息产生时间戳(毫秒)
"type": "INSERT" // 操作类型: INSERT/UPDATE/DELETE/CREATE/ALTER/ERASE...
}
扁平化消息格式
扁平化消息将每一行变更转换为一个独立的消息,更适合直接处理:
{
"data": [{"id": "1", "name": "张三", "age": "28"}],
"database": "test",
"es": 1589373515000,
"id": 5,
"isDdl": false,
"mysqlType": {"id": "int(11)", "name": "varchar(50)", "age": "int(11)"},
"old": null,
"pkNames": ["id"],
"sql": "",
"sqlType": {"id": 4, "name": 12, "age": 4},
"table": "user",
"ts": 1589373515477,
"type": "INSERT"
}
开发Kafka消费者
接下来,我们将开发一个Spring Boot应用,用于消费Canal发送到Kafka的消息。
1. 创建Spring Boot项目
首先,创建一个Spring Boot项目,并添加以下依赖:
<dependencies>
<!-- Spring Boot Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- Spring Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- JSON处理 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
2. 配置Kafka消费者
在application.yml
中添加Kafka配置:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: canal-consumer
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3. 创建消息实体类
创建Canal消息的实体类:
package com.example.canal.model;
import lombok.Data;
import java.util.List;
import java.util.Map;
@Data
public class CanalMessage {
private List<Map<String, String>> data; // 数据变更内容
private String database; // 数据库名
private Long es; // 事件时间戳(毫秒)
private Integer id; // 消息ID
private Boolean isDdl; // 是否是DDL语句
private Map<String, String> mysqlType; // 字段的MySQL类型
private List<Map<String, String>> old; // 变更前的数据(仅在UPDATE操作时有值)
private List<String> pkNames; // 主键名称
private String sql; // 原始SQL语句(仅在isDdl=true时有值)
private Map<String, Integer> sqlType; // 字段的Java类型
private String table; // 表名
private Long ts; // 消息产生时间戳(毫秒)
private String type; // 操作类型: INSERT/UPDATE/DELETE/CREATE/ALTER/ERASE...
}
4. 实现Kafka消费者
创建Kafka消费者,用于消费Canal发送的消息:
Kafka消费者完整代码
package com.example.canal.consumer;
import com.alibaba.fastjson.JSON;
import com.example.canal.model.CanalMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class CanalKafkaConsumer {
@KafkaListener(topics = "canal-topic")
public void onMessage(String message) {
log.info("Received message: {}", message);
try {
// 解析Canal消息
CanalMessage canalMessage = JSON.parseObject(message, CanalMessage.class);
// 根据操作类型处理消息
switch (canalMessage.getType()) {
case "INSERT":
handleInsert(canalMessage);
break;
case "UPDATE":
handleUpdate(canalMessage);
break;
case "DELETE":
handleDelete(canalMessage);
break;
default:
log.info("Unsupported operation type: {}", canalMessage.getType());
}
} catch (Exception e) {
log.error("Error processing message: {}", message, e);
}
}
private void handleInsert(CanalMessage message) {
log.info("Processing INSERT: database={}, table={}", message.getDatabase(), message.getTable());
message.getData().forEach(row -> {
log.info("Insert data: {}", row);
// 在这里实现插入数据的业务逻辑
// 例如:更新缓存、发送通知等
});
}
private void handleUpdate(CanalMessage message) {
log.info("Processing UPDATE: database={}, table={}", message.getDatabase(), message.getTable());
for (int i = 0; i < message.getData().size(); i++) {
log.info("Update data, before: {}, after: {}",
message.getOld() != null && i < message.getOld().size() ? message.getOld().get(i) : null,
message.getData().get(i));
// 在这里实现更新数据的业务逻辑
// 例如:更新缓存、发送通知等
}
}
private void handleDelete(CanalMessage message) {
log.info("Processing DELETE: database={}, table={}", message.getDatabase(), message.getTable());
message.getData().forEach(row -> {
log.info("Delete data: {}", row);
// 在这里实现删除数据的业务逻辑
// 例如:更新缓存、发送通知等
});
}
}
5. 启动应用
创建Spring Boot应用的主类:
package com.example.canal;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class CanalKafkaApplication {
public static void main(String[] args) {
SpringApplication.run(CanalKafkaApplication.class, args);
}
}
Canal与RocketMQ集成
1. 环境准备
安装RocketMQ
首先,我们需要安装RocketMQ。可以从RocketMQ官网下载最新版本。
# 下载RocketMQ
wget https://downloads.apache.org/rocketmq/4.9.4/rocketmq-all-4.9.4-bin-release.zip
# 解压
unzip rocketmq-all-4.9.4-bin-release.zip
cd rocketmq-4.9.4
启动RocketMQ
# 启动NameServer
nohup sh bin/mqnamesrv &
# 启动Broker
nohup sh bin/mqbroker -n localhost:9876 &
2. 配置Canal Server
修改Canal Server的配置,使其将数据发送到RocketMQ。
修改canal.properties
# 修改canal.properties文件,设置serverMode为rocketMQ
canal.serverMode = rocketMQ
# 设置RocketMQ相关配置
canal.mq.servers = 127.0.0.1:9876
# 设置flatMessage,扁平化消息
canal.mq.flatMessage = true
# 设置批量获取数据的大小
canal.mq.batchSize = 50
# 设置获取数据的超时时间
canal.mq.fetchTimeout = 100
修改instance.properties
# 修改instance.properties文件,设置消息队列相关配置
# 设置消息主题,可以为每个instance设置不同的主题
canal.mq.topic = canal-topic
# 设置消息标签,可以为每个instance设置不同的标签
canal.mq.tag = canal-tag
# 设置消息队列的过滤规则,与canal的过滤规则一致
canal.instance.filter.regex = .*\..*
3. 启动Canal Server
# 启动Canal Server
sh bin/startup.sh
4. 开发RocketMQ消费者
接下来,我们将开发一个Spring Boot应用,用于消费Canal发送到RocketMQ的消息。
添加依赖
<dependencies>
<!-- Spring Boot Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- RocketMQ -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
<!-- JSON处理 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
配置RocketMQ消费者
在application.yml
中添加RocketMQ配置:
rocketmq:
name-server: localhost:9876
consumer:
group: canal-consumer
实现RocketMQ消费者
创建RocketMQ消费者,用于消费Canal发送的消息:
RocketMQ消费者完整代码
package com.example.canal.consumer;
import com.alibaba.fastjson.JSON;
import com.example.canal.model.CanalMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RocketMQMessageListener(
topic = "canal-topic",
consumerGroup = "canal-consumer",
selectorExpression = "canal-tag"
)
public class CanalRocketMQConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("Received message: {}", message);
try {
// 解析Canal消息
CanalMessage canalMessage = JSON.parseObject(message, CanalMessage.class);
// 根据操作类型处理消息
switch (canalMessage.getType()) {
case "INSERT":
handleInsert(canalMessage);
break;
case "UPDATE":
handleUpdate(canalMessage);
break;
case "DELETE":
handleDelete(canalMessage);
break;
default:
log.info("Unsupported operation type: {}", canalMessage.getType());
}
} catch (Exception e) {
log.error("Error processing message: {}", message, e);
}
}
private void handleInsert(CanalMessage message) {
log.info("Processing INSERT: database={}, table={}", message.getDatabase(), message.getTable());
message.getData().forEach(row -> {
log.info("Insert data: {}", row);
// 在这里实现插入数据的业务逻辑
// 例如:更新缓存、发送通知等
});
}
private void handleUpdate(CanalMessage message) {
log.info("Processing UPDATE: database={}, table={}", message.getDatabase(), message.getTable());
for (int i = 0; i < message.getData().size(); i++) {
log.info("Update data, before: {}, after: {}",
message.getOld() != null && i < message.getOld().size() ? message.getOld().get(i) : null,
message.getData().get(i));
// 在这里实现更新数据的业务逻辑
// 例如:更新缓存、发送通知等
}
}
private void handleDelete(CanalMessage message) {
log.info("Processing DELETE: database={}, table={}", message.getDatabase(), message.getTable());
message.getData().forEach(row -> {
log.info("Delete data: {}", row);
// 在这里实现删除数据的业务逻辑
// 例如:更新缓存、发送通知等
});
}
}
实战案例:数据同步应用场景
通过Canal与消息队列的集成,我们可以实现多种数据同步应用场景:
1. 数据库同步
将MySQL数据变更同步到其他数据库,如PostgreSQL、Oracle等。
2. 缓存更新
实时更新Redis缓存,保持缓存与数据库的一致性。
3. 搜索引擎索引更新
将数据库变更同步到Elasticsearch,实现实时搜索。
4. 数据仓库同步
将业务数据库的变更实时同步到数据仓库,用于实时分析。
实现思路
无论是哪种应用场景,实现思路基本相似:
- 配置Canal将数据发送到消息队列
- 开发消费者应用,从消息队列消费数据
- 根据业务需求处理数据,如更新缓存、更新索引等
在下一篇文章中,我们将详细介绍Canal与Redis的集成,实现数据库与缓存的实时同步。
## 最佳实践
### 消息队列选择
在选择消息队列时,需要考虑以下因素:
1. **性能**:Kafka在高吞吐量场景下表现更好
2. **可靠性**:RocketMQ提供更强的消息可靠性保证
3. **功能**:RocketMQ提供更丰富的功能,如消息轨迹、事务消息等
4. **运维**:Kafka的运维相对简单,生态更丰富
::: tip 建议
- 对于高吞吐量场景,推荐使用Kafka
- 对于需要强可靠性保证的场景,推荐使用RocketMQ
- 对于已有Kafka或RocketMQ集群的场景,可以直接使用现有集群
:::
### 消息处理策略
在处理Canal消息时,需要注意以下几点:
1. **幂等性**:消息可能会重复消费,需要保证处理逻辑的幂等性
2. **异常处理**:处理消息时可能会出现异常,需要有完善的异常处理机制
3. **顺序性**:某些场景下需要保证消息的顺序性,可以通过分区或队列保证
4. **性能优化**:批量处理消息可以提高性能
::: warning 注意事项
1. 消息处理逻辑应该是幂等的,避免重复消费导致数据不一致
2. 消息处理应该是异步的,避免阻塞消费线程
3. 消息处理应该有完善的异常处理机制,避免因为一条消息的异常导致整个消费者停止工作
:::
## 总结
本文详细介绍了Canal与消息队列的集成方案,包括与Kafka和RocketMQ的集成。通过引入消息队列,我们可以实现Canal与业务系统的解耦,提高系统的可扩展性和可靠性。
我们还通过一个实际案例,展示了如何使用Canal+Kafka实现MySQL数据库与Elasticsearch的实时同步。这种架构可以应用于各种实时数据同步场景,如缓存更新、搜索引擎索引更新、数据仓库同步等。
::: info 下一步学习
- 学习Canal与Redis的集成
- 了解Canal的高级特性和最佳实践
- 探索Canal在微服务架构中的应用
:::
希望这篇文章对您有所帮助!如果您有任何问题,欢迎在评论区讨论。