阿里巴巴 Canal 产品配置 RocketMQ 流转和测试
后知后觉 暂无评论

Canal 将 MySQL 数据库变化传送至 RocketMQ 进行数据流转。

MySQL 相关

配置 BINLOG

修改配置文件 /etc/mysql/mysql.conf.d/mysqld.cnf 这是官方 5.7.42 版本配置文件路径,在 [mysqld] 标签下增加配置

log-bin          = mysql-bin
binlog-format    = ROW
server_id        = 1

修改后配置文件大概这样:

[mysqld]
pid-file         = /var/run/mysqld/mysqld.pid
socket           = /var/run/mysqld/mysqld.sock
datadir          = /var/lib/mysql
log-error        = /var/log/mysql/error.log
log-bin          = mysql-bin
binlog-format    = ROW
server_id        = 1
bind-address     = 127.0.0.1
symbolic-links   = 0

重启数据库

sudo systemctl restart mysql

检查主从状态

检查配置结果

SHOW VARIABLES LIKE '%log_bin%';
+---------------------------------+--------------------------------+
| Variable_name                   | Value                          |
+---------------------------------+--------------------------------+
| log_bin                         | ON                             |
| log_bin_basename                | /var/lib/mysql/mysql-bin       |
| log_bin_index                   | /var/lib/mysql/mysql-bin.index |
| log_bin_trust_function_creators | OFF                            |
| log_bin_use_v1_row_events       | OFF                            |
| sql_log_bin                     | ON                             |
+---------------------------------+--------------------------------+
6 rows in set (0.00 sec)

Canal 相关

配置数据库

创建 Canal 主从角色

CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

创建测试库

CREATE DATABASE analytic CHARACTER SET utf8mb4;
SHOW DATABASES;

创建测试表

CREATE TABLE `user` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '用户 ID',
  `username` varchar(50) DEFAULT NULL COMMENT '用户名',
  `password` varchar(50) DEFAULT NULL COMMENT '密码',
  `email` varchar(45) DEFAULT NULL COMMENT '邮箱',
  `phone` varchar(15) DEFAULT NULL COMMENT '手机号',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb4;

插入测试数据

INSERT INTO `analytic`.`user` ( `id`, `username`, `password`, `email`, `phone` )
VALUES
    ( 1, '张三', '275&553/7', '3rb95f7kb98ws7nre@qq.com', '18124578941' ),
    ( 2, '李四', '148309#1=', '4fsdng27fp3q55au2@qq.com', '13197387591' ),
    ( 3, '王五', '77=0&6923', 'wf4189dbm7yfmqw73@qq.com', '18638390876' ),
    ( 4, '赵六', '0872_~191', 'je7h008x475klwnbp@qq.com', '13039809384' ),
    ( 5, '钱七', '264.4+939', '6n20g95qw2hswp4ep@qq.com', '18898052943' );

配置 Canal 对接 MQ

安装 Canal Deployer

wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz
sudo mkdir /opt/canal/deployer
sudo tar xf canal.deployer-1.1.6.tar.gz -C /opt/canal/deployer
sudo chown -R $USER:$USER /opt/canal/deployer

修改 Canal 配置文件 deployer/conf/canal.properties

# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = tcp

改成 RocketMQ

##################################################
#########                   RocketMQ         #############
##################################################
rocketmq.producer.group = test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 127.0.0.1:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag =

一般来说只需改以下两行

rocketmq.producer.group = canal-producer
rocketmq.namesrv.addr = 127.0.0.1:9876

修改配置 deployer/conf/example/instance.properties

canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.mq.topic=canal-default-topic
小贴士:这里 canal.mq.topic 为默认主题,如果启用动态主题的话,MQ会根据分类自动将不同表的消息写入不同的主题,如果没有匹配的会写入到默认主题,如果没有配置默认主题,任务会截止后报错,canal.mq.dynamicTopiccanal.instance.filter.regex配合

修正项目文件权限

find . -type d | xargs chmod 755
find . -type f | xargs chmod 644
chmod +x bin/*.sh

重启 Canal

bin/stop.sh
bin/startup.sh

然后查看堆栈

jps -ml
2403 com.alibaba.otter.canal.deployer.CanalLauncher

Canal 工作验证

使用 Dashboard 查看数据库更新

可以使用 WebUI 查看数据库更新的详情

Dashboard1

Dashboard2

Dashboard3

主题

修改 MySQL 数据

UPDATE `analytic`.`user` SET `username` = '余罪' WHERE `id` = 5

如果安装了 RocketMQ-Dashboard 可以登录查看 Topic 中已经出现名为 canal-topic 主题。

Java 测试代码

尝试使用示例代码

git clone https://github.com/vndroid/java-canal-mq.git

然后修改文件 src/main/java/cn/butterfly/canal/listener/UserCanalListener.java 的 #19 为实际使用的主题名

diff --git a/src/main/java/cn/butterfly/canal/listener/UserCanalListener.java b/src/main/java/cn/butterfly/canal/listener/UserCanalListener.java
index 6c0359a..2203386 100644
--- a/src/main/java/cn/butterfly/canal/listener/UserCanalListener.java
+++ b/src/main/java/cn/butterfly/canal/listener/UserCanalListener.java
@@ -16,7 +16,7 @@ import org.springframework.stereotype.Component;
 @Slf4j
 @Component
 @RocketMQMessageListener(
-        topic = "canal_topic",
+        topic = "canal-topic",
         consumerGroup = "canal_group"
 )
 public class UserCanalListener implements RocketMQListener<CanalMessage<User>> {

配置 MQ 地址 src/main/resources/application-dev.yml

然后构建项目

mvn clean package -Dmaven.test.skip=true

启动项目,监听在 10086 端口

java -jar -Dserver.port=10086 target/canal-0.0.1-SNAPSHOT.jar

另一边可以修改数据库数据,即时查看更新日志

2023-08-18 17:45:58.780  INFO 22428 --- [MessageThread_2] c.b.canal.listener.UserCanalListener     : 
====================
Database.table: analytic.user
Type of operation: INSERT
User(id=5, username=钱七, password=264.4+939, email=6n20g95qw2hswp4ep@qq.com, phone=18898052943)

Canal Admin

为了方便 Canal 程序管理,官方推出了一个 WebUI 项目,可以在网页中查看状态

wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.admin-1.1.6.tar.gz
sudo mkdir /opt/canal/admin
sudo tar xf canal.admin-1.1.6.tar.gz -C /opt/canal/admin
sudo chown -R $USER:$USER /opt/canal/admin/

初始化数据库,使用 root 登录数据库,执行 conf/canal_manager.sql 初始化数据库。

CREATE DATABASE canal_manager;
GRANT ALL PRIVILEGES ON canal_manager.* TO 'canal'@'%';
FLUSH PRIVILEGES;
USE canal_manager;
source conf/canal_manager.sql;

附录

参考链接

如果遇到问题或者对文章内容存疑,请在下方留言,博主看到后将及时回复,谢谢!
禁用 / 当前已拒绝评论,仅可查看「历史评论」。