Canal 安装配置与基本使用
Canal 安装配置与基本使用
前置知识
在开始本教程之前,建议您具备以下基础知识:
- Java 开发环境
- MySQL 数据库基础
- 基本的 Linux 命令
- Maven 项目管理
环境准备
在安装 Canal 之前,我们需要先准备好 MySQL 环境,并进行相关配置。
MySQL 配置
当前的 Canal 支持源端 MySQL 版本包括 5.1.x、5.5.x、5.6.x、5.7.x、8.0.x。
1. 开启 Binlog
首先,我们需要在 MySQL 的配置文件 my.cnf
或 my.ini
中开启 Binlog,并设置为 ROW 模式:
[mysqld]
# 开启 binlog
log-bin=mysql-bin
# 选择 ROW 模式
binlog-format=ROW
# 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
server_id=1
修改配置文件后,需要重启 MySQL 服务。
2. 创建 Canal 用户并授权
为了让 Canal 能够读取 Binlog,我们需要在 MySQL 中创建一个专门的用户,并授予相应的权限:
-- 创建用户 canal,密码为 Canal@123456
CREATE USER 'canal'@'%' IDENTIFIED BY 'Canal@123456';
-- 授权 canal 用户访问所有库的权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- 刷新权限
FLUSH PRIVILEGES;
3. 验证 Binlog 是否开启
可以通过以下命令验证 Binlog 是否已经开启:
SHOW VARIABLES LIKE 'log_bin';
如果返回的值为 ON
,则表示 Binlog 已经开启。
可以通过以下命令查看 Binlog 文件列表:
SHOW BINARY LOGS;
可以通过以下命令查看当前正在写入的 Binlog 文件:
SHOW MASTER STATUS;
Canal 服务端安装
1. 下载 Canal
首先,从 Canal 的 GitHub 仓库下载最新的发布版本:
# 下载地址:https://github.com/alibaba/canal/releases
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
2. 解压安装包
tar -zxvf canal.deployer-1.1.5.tar.gz
解压后,我们可以看到以下目录结构:
bin/ # 启动停止脚本
conf/ # 配置文件
lib/ # 依赖的 jar 包
logs/ # 日志文件
plugin/ # 插件目录
3. 配置 Canal
Canal 的配置主要分为两部分:全局配置和实例配置。
全局配置
编辑 conf/canal.properties
文件,主要配置如下:
# canal 服务器唯一标识
canal.id = 1
# canal 服务器地址,默认为本机 IP
canal.ip =
# canal 服务器端口
canal.port = 11111
# canal 实例列表,多个实例用逗号分隔
canal.destinations = example
# 自动扫描实例目录,发现新的实例配置会自动加载
canal.auto.scan = true
# 自动扫描间隔,单位:毫秒
canal.auto.scan.interval = 5000
实例配置
编辑 conf/example/instance.properties
文件,主要配置如下:
# MySQL 数据库地址
canal.instance.master.address = 127.0.0.1:3306
# binlog 日志名称,可以通过 SHOW MASTER STATUS 查看
canal.instance.master.journal.name = mysql-bin.000001
# binlog 偏移量,可以通过 SHOW MASTER STATUS 查看
canal.instance.master.position = 4
# MySQL 账号密码
canal.instance.dbUsername = canal
canal.instance.dbPassword = Canal@123456
# 字符集
canal.instance.connectionCharset = UTF-8
# 订阅的表,支持正则表达式,多个用逗号分隔
# 格式:schema.table,例如:test\..*表示test库下的所有表
canal.instance.filter.regex = .*\\..*
更多配置选项
# 是否开启 druid 密码加密
canal.instance.enableDruid = false
# 黑名单,不订阅的表,多个用逗号分隔
canal.instance.filter.black.regex =
# 是否开启 GTID 模式
canal.instance.gtidon = false
4. 启动 Canal
sh bin/startup.sh
启动后,可以通过查看日志文件 logs/canal/canal.log
和 logs/example/example.log
来确认 Canal 是否启动成功。
5. 停止 Canal
sh bin/stop.sh
Canal 客户端使用
1. 添加依赖
在 Maven 项目的 pom.xml
文件中添加 Canal 客户端依赖:
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.5</version>
</dependency>
2. 编写客户端代码
下面是一个简单的 Canal 客户端示例,用于监听 MySQL 数据库的变更:
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import java.util.List;
public class SimpleCanalClient {
public static void main(String[] args) {
// 创建连接
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111),
"example", "", "");
try {
// 连接 Canal 服务器
connector.connect();
// 订阅数据库表,格式:schema.table
connector.subscribe(".*\\..*");
// 回滚到未进行 ack 的地方,下次 fetch 的时候,可以从最后一个没有 ack 的地方开始拿
connector.rollback();
while (true) {
// 获取指定数量的数据
Message message = connector.getWithoutAck(100);
long batchId = message.getId();
if (batchId == -1 || message.getEntries().isEmpty()) {
// 没有数据
System.out.println("没有数据,休息一会儿...");
Thread.sleep(1000);
continue;
}
// 处理数据
printEntries(message.getEntries());
// 提交确认
connector.ack(batchId);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭连接
connector.disconnect();
}
}
private static void printEntries(List<CanalEntry.Entry> entries) {
for (CanalEntry.Entry entry : entries) {
if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) {
continue;
}
try {
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
String schema = entry.getHeader().getSchemaName();
String table = entry.getHeader().getTableName();
String eventType = rowChange.getEventType().toString();
System.out.println(String.format("================> binlog[%s:%s], name[%s,%s], eventType: %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
schema, table, eventType));
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
if (rowChange.getEventType() == CanalEntry.EventType.DELETE) {
printColumns(rowData.getBeforeColumnsList());
} else if (rowChange.getEventType() == CanalEntry.EventType.INSERT) {
printColumns(rowData.getAfterColumnsList());
} else {
System.out.println("--------> before");
printColumns(rowData.getBeforeColumnsList());
System.out.println("--------> after");
printColumns(rowData.getAfterColumnsList());
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
private static void printColumns(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
代码解析
上面的代码主要做了以下几件事:
- 创建 Canal 连接器,连接到 Canal 服务器
- 订阅所有数据库表的变更
- 循环获取数据库变更消息
- 解析变更消息,打印变更的详细信息
- 确认消息处理完成
在实际应用中,你可以根据自己的需求,对获取到的数据库变更进行处理,例如同步到其他数据库、更新缓存、发送到消息队列等。
3. 使用 Spring Boot 集成 Canal
在实际项目中,我们通常会使用 Spring Boot 来集成 Canal。下面是一个简单的示例:
添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.5</version>
</dependency>
创建 Canal 客户端
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.List;
@Component
public class CanalClient implements InitializingBean {
private final static int BATCH_SIZE = 1000;
@Override
public void afterPropertiesSet() {
// 创建连接
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111),
"example", "", "");
// 创建一个独立线程处理数据库变更
new Thread(() -> process(connector)).start();
}
private void process(CanalConnector connector) {
try {
// 连接 Canal 服务器
connector.connect();
// 订阅数据库表,格式:schema.table
connector.subscribe(".*\\..*");
// 回滚到未进行 ack 的地方
connector.rollback();
while (true) {
// 获取指定数量的数据
Message message = connector.getWithoutAck(BATCH_SIZE);
long batchId = message.getId();
if (batchId == -1 || message.getEntries().isEmpty()) {
// 没有数据
Thread.sleep(1000);
continue;
}
// 处理数据
processEntries(message.getEntries());
// 提交确认
connector.ack(batchId);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭连接
connector.disconnect();
}
}
private void processEntries(List<CanalEntry.Entry> entries) {
for (CanalEntry.Entry entry : entries) {
if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) {
continue;
}
try {
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
String schema = entry.getHeader().getSchemaName();
String table = entry.getHeader().getTableName();
CanalEntry.EventType eventType = rowChange.getEventType();
// 根据不同的操作类型进行处理
switch (eventType) {
case INSERT:
handleInsert(schema, table, rowChange.getRowDatasList());
break;
case UPDATE:
handleUpdate(schema, table, rowChange.getRowDatasList());
break;
case DELETE:
handleDelete(schema, table, rowChange.getRowDatasList());
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void handleInsert(String schema, String table, List<CanalEntry.RowData> rowDatasList) {
for (CanalEntry.RowData rowData : rowDatasList) {
List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
// 处理插入操作
System.out.println("Insert into " + schema + "." + table);
printColumns(afterColumnsList);
}
}
private void handleUpdate(String schema, String table, List<CanalEntry.RowData> rowDatasList) {
for (CanalEntry.RowData rowData : rowDatasList) {
List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
// 处理更新操作
System.out.println("Update " + schema + "." + table);
System.out.println("Before:");
printColumns(beforeColumnsList);
System.out.println("After:");
printColumns(afterColumnsList);
}
}
private void handleDelete(String schema, String table, List<CanalEntry.RowData> rowDatasList) {
for (CanalEntry.RowData rowData : rowDatasList) {
List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
// 处理删除操作
System.out.println("Delete from " + schema + "." + table);
printColumns(beforeColumnsList);
}
}
private void printColumns(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
System.out.println(column.getName() + " = " + column.getValue());
}
}
}
测试 Canal
1. 创建测试数据库和表
-- 创建测试数据库
CREATE DATABASE canal_test;
-- 使用测试数据库
USE canal_test;
-- 创建测试表
CREATE TABLE user (
id INT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(50) NOT NULL,
age INT,
email VARCHAR(50),
create_time DATETIME DEFAULT CURRENT_TIMESTAMP
);
2. 执行 DML 操作
-- 插入数据
INSERT INTO user (name, age, email) VALUES ('张三', 25, 'zhangsan@example.com');
-- 更新数据
UPDATE user SET age = 26 WHERE name = '张三';
-- 删除数据
DELETE FROM user WHERE name = '张三';
3. 观察客户端输出
执行上述 SQL 语句后,我们可以在 Canal 客户端的控制台看到相应的数据变更信息。
常见问题
1. Canal 启动失败
问题:Canal 启动时报错 java.io.IOException: No such file or directory
。
解决方案:检查 canal.instance.master.journal.name
和 canal.instance.master.position
配置是否正确,可以通过 SHOW MASTER STATUS
查看当前的 binlog 文件和位置。
2. 无法连接到 MySQL
问题:Canal 报错 com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
。
解决方案:
- 检查 MySQL 是否正常运行
- 检查 Canal 用户是否有正确的权限
- 检查防火墙是否允许 Canal 连接 MySQL
3. 无法解析 Binlog
问题:Canal 报错 com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed
。
解决方案:
- 检查 MySQL 的 binlog_format 是否设置为 ROW
- 检查 binlog 文件是否损坏
总结
本文详细介绍了 Canal 的安装配置和基本使用方法,包括 MySQL 的配置、Canal 服务端的安装配置以及 Canal 客户端的使用。通过本文的学习,您应该能够搭建一个基本的 Canal 环境,并实现数据库变更的监听和处理。
在下一篇文章中,我们将介绍 Canal 与其他系统的集成,例如与 Kafka、Redis、Elasticsearch 等系统的集成,以实现更复杂的数据同步场景。
下一步学习
- 学习 Canal 与 Kafka 的集成
- 了解 Canal 与 Redis 的集成
- 探索 Canal 与 Elasticsearch 的集成
希望这篇文章对您有所帮助!如果您有任何问题,欢迎在评论区讨论。