什么是canal
用途是基于 MySQL 数据库增量日志解析 ,提供增量数据订阅和消费 。
这里我们可以简单地把canal理解为一个用来同步增量数据的一个工具 。
canal的工作原理就是把自己伪装成MySQL slave,模拟MySQL slave的交互协议向MySQL Mater发送 dump协议,MySQL mater收到canal发送过来的dump请求,开始推送binary log给canal,然后canal解析binary log,再发送到存储目的地,比如MySQL,Kafka,Elastic Search等等。
但是canal的数据同步不是全量的,而是增量 。基于binary log增量订阅和消费,canal可以做:
数据库镜像
数据库实时备份
索引构建和实时维护
业务cache(缓存)刷新
带业务逻辑的增量数据处理
工作原理 mysql主备复制实现:
从上层来看,复制分成三步:
master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);
slave将master的binary log events拷贝到它的中继日志(relay log);
slave重做中继日志中的事件,将改变反映它自己的数据。
知识科普 mysql的Binlay Log介绍
简单点说:
mysql的binlog是多文件存储,定位一个LogEvent需要通过binlog filename + binlog position,进行定位
mysql的binlog数据格式,按照生成的方式,主要分为:statement-based、row-based、mixed。
1 2 3 4 5 6 7 mysql> show variables like 'binlog_format' ; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | binlog_format | ROW | +---------------+-------+ 1 row in set (0.00 sec)
目前canal支持所有模式的增量订阅(但配合同步时,因为statement只有sql,没有数据,无法获取原始的变更日志,所以一般建议为ROW模式)
如何搭建canal 首先有一个MySQL服务器 当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
然后在MySQL中需要创建一个用户,并授权:
1 2 3 4 5 6 create user 'canal' @'%' identified by 'Canal@123456' ;grant SELECT , REPLICATION SLAVE, REPLICATION CLIENT on * .* to 'canal' @'%' identified by 'Canal@123456' ;12345
下一步在MySQL配置文件my.cnf设置如下信息:
1 2 3 4 5 6 7 8 [mysqld ] log-bin=mysql-bin binlog-format=ROW server_id=1 1234567
改了配置文件之后,重启MySQL,使用命令查看是否打开binlog模式:
查看binlog日志文件列表:
查看当前正在写入的binlog文件:
MySQL服务器这边就搞定了,很简单。
安装canal 去官网下载页面进行下载:https://github.com/alibaba/canal/releases
我这里下载的是1.1.4的版本:
解压canal.deployer-1.1.4.tar.gz ,我们可以看到里面有四个文件夹:
接着打开配置文件conf/example/instance.properties,配置信息如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 canal.instance.master.address =127.0.0.1:3306 canal.instance.master.journal.name =mysql-bin.000001 canal.instance.master.position =154 canal.instance.master.timestamp =canal.instance.master.gtid =canal.instance.dbUsername =canal canal.instance.dbPassword =Canal@123456 canal.instance.connectionCharset = UTF-8 canal.instance.enableDruid =false canal.instance.filter.regex =.*\\..* canal.instance.filter.black.regex =123456789101112131415161718192021222324252627
我这里用的是win10系统,所以在bin目录下找到startup.bat启动:
启动就报错,坑呀:
要修改一下启动的脚本startup.bat:
然后再启动脚本:
这就启动成功了。
Java客户端操作 首先引入maven依赖:
1 2 3 4 5 6 <dependency > <groupId > com.alibaba.otter</groupId > <artifactId > canal.client</artifactId > <version > 1.1.4</version > </dependency > 12345
然后创建一个canal项目,使用SpringBoot构建,如图所示: 在CannalClient类使用Spring Bean的生命周期函数afterPropertiesSet():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 @Component public class CannalClient implements InitializingBean { private final static int BATCH_SIZE = 1000 ; @Override public void afterPropertiesSet () throws Exception { CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1" , 11111 ), "example" , "" , "" ); try { connector.connect(); connector.subscribe(".*\\..*" ); connector.rollback(); while (true ) { Message message = connector.getWithoutAck(BATCH_SIZE); long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0 ) { try { Thread.sleep(2000 ); } catch (InterruptedException e) { e.printStackTrace(); } } else { printEntry(message.getEntries()); } connector.ack(batchId); } } catch (Exception e) { e.printStackTrace(); } finally { connector.disconnect(); } } private static void printEntry (List<Entry> entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue ; } RowChange rowChage; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } EventType eventType = rowChage.getEventType(); System.out.println(String.format("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s" , entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); if (rowChage.getIsDdl()) { System.out.println("================》;isDdl: true,sql:" + rowChage.getSql()); } for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { System.out.println("------->; before" ); printColumn(rowData.getBeforeColumnsList()); System.out.println("------->; after" ); printColumn(rowData.getAfterColumnsList()); } } } } private static void printColumn (List<Column> columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } }
以上就完成了Java客户端的代码。这里不做具体的处理,仅仅是打印,先有个直观的感受。
最后我们开始测试,首先启动MySQL、Canal Server,还有刚刚写的Spring Boot项目。然后创建表:
1 2 3 4 5 6 7 8 9 CREATE TABLE `tb_commodity_info` ( `id` varchar (32 ) NOT NULL , `commodity_name` varchar (512 ) DEFAULT NULL COMMENT '商品名称' , `commodity_price` varchar (36 ) DEFAULT '0' COMMENT '商品价格' , `number` int (10 ) DEFAULT '0' COMMENT '商品数量' , `description` varchar (2048 ) DEFAULT '' COMMENT '商品描述' , PRIMARY KEY (`id`) ) ENGINE= InnoDB DEFAULT CHARSET= utf8mb4 COMMENT= '商品信息表' ; 12345678
然后我们在控制台就可以看到如下信息:
如果新增一条数据到表中:
1 INSERT INTO tb_commodity_info VALUES ('3e71a81fd80711eaaed600163e046cc3' ,'叉烧包' ,'3.99' ,3 ,'又大又香的叉烧包,老人小孩都喜欢' );
控制台可以看到如下信息:
Demo2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 public class CannalClient { public static void main (String []) { CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1" , 11111 ), "example" , "" , "" ); while (true ) { connector.connect(); connector.subscribe("mydatabase.*" ); Message message = connector.get(100 ); List<cannalEntry.Entry> entries = message.getEntries(); if (entries.size() <= 0 ) { System.out.println("本次抓取没有数据,sleep 1s" ); Thread.sleep(1000 ); } else { for (CannelEntry.Entry entry : entries) { String tableName = entry.getHeader().getTableName(); Cannal.EntryType entryType = entry.getEntryType(); ByteString storeValue = entry.getStoreValue(); if (CannalEntry.EntryType.ROWDATA.equals(entryType)) { CannalEntry.RowChange rowChange = CannalEntry.ROwChange.parseForm(storeValue); CannalEntry.EventType eventType = rowChange.getEventType(); List<CannalEntry.RowData> rowDataList = rowChange.getRowDatasList(); for (ChannalEntry.RowData rowData : rowDataList) { JSONObject beforeData = new JSONObject(); List<CannalEntry.Column> beforeColumnList = rowData.getBeforeColumnsList(); for (CannalEntry.Column column : beforeColumnsList) { beforeData.put(column.getName(), column.getValue()); } JSONObject afterData = new JSONObject(); List<CannalEntry.Column> afterColumnList = rowData.getAfterColumnsList(); for (CannalEntry.Column column : afterColumnsList) { afterData.put(column.getName(), column.getValue()); } System.out.println( "Table:" + tableName + ",eventType:" + eventType + ",before:" + beforeData + ",after:" + afterData ); } } else { System.out.println("当前操作类型为:" +entryType); } } } } } }
架构
说明:
server代表一个canal运行实例,对应于一个jvm
instance对应于一个数据队列 (1个server对应1..n个instance)
instance模块:
eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
eventStore (数据存储)
metaManager (增量订阅&消费信息管理器)
总结 canal的好处在于对业务代码没有侵入 ,因为是基于监听binlog日志去进行同步数据的 。实时性也能做到准实时,其实是很多企业一种比较常见的数据同步的方案。
实际项目我们是配置MQ模式,配合RocketMQ或者Kafka,canal会把数据发送到MQ的topic中,然后通过消息队列的消费者进行处理 。
Canal的部署也是支持集群的,需要配合ZooKeeper进行集群管理。Canal还有一个简单的Web管理界面。