需求来源
跟踪表对比前后修改数据变化
方案
- AOP、拦截器优点: 实现简单缺点:
- 代码入侵性强 每加入一个模块就需要处理切面添加切入点,模块配置也需要增加。
- 项目依赖麻烦,档案模块需要依赖对应模块api包进行序列化实现对比,业务模块需要实现查询接口。
- 模块适配插拔式实现困难
- sql解析困难复杂容易出错(拦截mapper层解析update语句)
- 日志采集方案优点:
- 只需要处理数据变化对比,可以直接集成三方插件如es查询,无代码入侵。后续扩展强,无论任何任何模块需要加入类似功能只需要中间件配置,添加log监听即可。
- 需要部署其他服务,增加运维成本,依赖于三方服务可能存在数据丢失
canal部署
环境
这里使用的是mac os 搭配docker部署,几乎和linux环境差不多
canal + rocketMQ + ES 实现
基础同步技术介绍
准备
对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下
[mysqld] log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;
快速启动参考:QuickStart
其中主要注意配置匹配数据库、表规则配置,可以选择匹配部分表做记录。 也可以通过配置不同的topic消费做消费实现不同的业务。参考文档:配置 主要关注Topic配置匹配规则
RoketMQ 搭建
docker 使用 foxiswho/rocketmq 镜像 参考博客
docker run -d -p 10911:10911 -p 10909:10909\ --name rmqbroker --link rmqserver:namesrv\ -e "NAMESRV_ADDR=namesrv:9876" -e "JAVA_OPTS=-Duser.home=/opt"\ -e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m"\ foxiswho/rocketmq:broker-4.5.1
使用上面docker命令拉取镜像,快速搭建
搭建canal
下载docker启动文件
linux命令
wget https://github.com/alibaba/canal/blob/master/docker/run.sh
mac 命令
curl -0 https://github.com/alibaba/canal/blob/master/docker/run.sh
配置canal配置项
主要注意配置
# 按需修改成自己的数据库信息 canal.instance.master.address=192.168.1.20:3306 # username/password,数据库的用户名和密码 canal.instance.dbUsername = canal canal.instance.dbPassword = canal # mq config 这个配置使用动态topic就可以不配置,选其一 canal.mq.topic=example # 针对库名或者表名发送动态topic,参考配置 #canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
sh run.sh -e canal.auto.scan=false \ -e canal.destinations=test \ -e canal.instance.master.address=172.17.0.7:3306 \ -e canal.instance.dbUsername=admin \ -e canal.instance.dbPassword=123456 \ -e canal.instance.defaultDatabaseName=mytest \ -e canal.instance.connectionCharset=UTF-8 \ -e canal.instance.tsdb.enable=true \ -e canal.instance.gtidon=false \ -e canal.mq.dynamicTopic=mytest:mytest.sg_pop_base_info \ -e canal.mq.partition=0 \ -e canal.serverMode=rocketmq \ -e canal.mq.servers=172.17.0.10:9876 \ -e canal.mq.retries=0 \ -e canal.mq.batchSize=16384 \ -e canal.mq.maxRequestSize=1048576 \ -e canal.mq.lingerMs=200 \ -e canal.mq.bufferMemory=33554432 \ -e canal.mq.canalBatchSize=50 \ -e canal.mq.canalGetTimeout=100 \ -e canal.mq.flatMessage=true \ -e canal.mq.compressionType=none \ -e canal.mq.acks=all \ -e canal.mq.transaction=false
mysql、mq如果使用的容器,注意添加link配置 或者 通过命令docker inspect [容器名] 查看IPAddress配置到配置中
docker查看启动成功
#运行 docker logs 容器名称 docker logs canal-server # 如下表示成功 DOCKER_DEPLOY_TYPE=VM ==> INIT /alidata/init/02init-sshd.sh ==> EXIT CODE: 0 ==> INIT /alidata/init/fix-hosts.py ==> EXIT CODE: 0 ==> INIT DEFAULT Generating SSH1 RSA host key: [ OK ] Starting sshd: [ OK ] Starting crond: [ OK ] ==> INIT DONE ==> RUN /home/admin/app.sh ==> START ... start canal ... start canal successful ==> START SUCCESSFUL ...
有兴趣的可以看一下run.sh这个文件,搞过服务器的肯定看得懂,就是各种变量然后组装docker 的run命令
查看MQ消息
修改监控表数据即可查看mq中消息,如果修改了数据没有消息,查看canal-server容器中日志报错
本地mq客户端查看消息:http://localhost:8180/#/message
实现技术通过canal获取日志拿到对比数据放入MQ,MQ消息数据如下:
新增消息
{ "data": [ { "id": "111", "name": "ad ", "create_date": "2020-04-15 14:31:03", "update_date": "2020-04-15 14:31:08", "create_user": "asdf ", "update_user": "sadf ", "is_deleted": "0" } ], "database": "grid_comm_center", "es": 1586932274000, "id": 66, "isDdl": false, "mysqlType": { "id": "bigint(64)", "name": "varchar(200)", "create_date": "timestamp", "update_date": "timestamp", "create_user": "varchar(32)", "update_user": "varchar(32)", "is_deleted": "tinyint(2) unsigned" }, "old": null, "pkNames": [ "id" ], "sql": "", "sqlType": { "id": -5, "name": 12, "create_date": 93, "update_date": 93, "create_user": 12, "update_user": 12, "is_deleted": -6 }, "table": "demo_id", "ts": 1586932275053, "type": "INSERT" }
修改消息
{ "data": [ { "id": "111", "name": "ad asdf ", "create_date": "2020-04-15 14:31:03", "update_date": "2020-04-15 14:31:54", "create_user": "asdf adf ", "update_user": "sadf asdf", "is_deleted": "0" } ], "database": "grid_comm_center", "es": 1586932314000, "id": 67, "isDdl": false, "mysqlType": { "id": "bigint(64)", "name": "varchar(200)", "create_date": "timestamp", "update_date": "timestamp", "create_user": "varchar(32)", "update_user": "varchar(32)", "is_deleted": "tinyint(2) unsigned" }, "old": [ { "name": "ad ", "update_date": "2020-04-15 14:31:08", "create_user": "asdf ", "update_user": "sadf " } ], "pkNames": [ "id" ], "sql": "", "sqlType": { "id": -5, "name": 12, "create_date": 93, "update_date": 93, "create_user": 12, "update_user": 12, "is_deleted": -6 }, "table": "demo_id", "ts": 1586932314983, "type": "UPDATE" }
删除消息
{ "data": [ { "id": "1111", "name": "123123", "create_date": "2020-03-17 17:48:23", "update_date": "2020-03-17 17:48:26", "create_user": "你好", "update_user": "你好", "is_deleted": "1" } ], "database": "grid_comm_center", "es": 1587366810000, "id": 1049, "isDdl": false, "mysqlType": { "id": "bigint(64)", "name": "varchar(200)", "create_date": "timestamp", "update_date": "timestamp", "create_user": "varchar(32)", "update_user": "varchar(32)", "is_deleted": "tinyint(2) unsigned" }, "old": null, "pkNames": [ "id" ], "sql": "", "sqlType": { "id": -5, "name": 12, "create_date": 93, "update_date": 93, "create_user": 12, "update_user": 12, "is_deleted": -6 }, "table": "demo_id", "ts": 1587366810548, "type": "DELETE" }
如上消息可以清楚拿到数据库、表详细变更,通过拿到变更数据解析插入数据库即可。
官方中还可以通过ClientAdapter 适配器入库到RDB、HBase、ES,这里根据需求做部署操作吧。目前这个里自己做档案模块进行消费写入档案数据,预计搜索可以放入es可以加快查询效率。
查看这些消息,然后做消费即可做自己想做的事情,至此已经完成了canal的基础搭建
消费消息写入ElasticSearch
maven配置
<elasticsearch.version>7.2.0</elasticsearch.version> <!-- Java High Level REST Client --> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>${elasticsearch.version}</version> </dependency>
mq使用的是公司封装的包,所以这里省略mq相关代码。直接集成mq消费相关消费做自己业务处理即可。