本文将介绍canal项目中client-adapter的使用,以及落地生产中需要考虑的可靠性、高可用与监控报警。(基于canal 1.1.4版本)
canal作为mysql的实时数据订阅组件,实现了对mysql binlog数据的抓取。
虽然阿里也开源了一个纯粹从mysql同步数据到mysql的项目otter(github.com/alibaba/otter,基于canal的),实现了mysql的单向同步、双向同步等能力。但是我们经常有从mysql同步数据到es、hbase等存储的需求,就需要用户自己用canal-client获取数据进行消费,比较麻烦。
从1.1.1版本开始,canal实现了一个配套的落地模块,实现对canal订阅的消息进行消费,就是client-adapter(github.com/alibaba/canal/wiki/ClientAdapter)。
目前的最新稳定版1.1.4版本中,client-adapter已经实现了同步数据到RDS、ES、HBase的能力。
1. Client-Adapter基本能力
目前Adapter具备以下基本能力:
- 对接上游消息,包括kafka、rocketmq、canal-server
- 实现mysql数据的增量同步
- 实现mysql数据的全量同步
- 下游写入支持mysql、es、hbase
2.Client-Adapter架构
Adapter本质上是为了将canal-server订阅到的实时增量数据进行消费,所以必须有上游canal-server产生数据。
整体架构如下:
3. 迁移与同步配置(以Mysql为例)
官方文档地址:github.com/alibaba/canal/wiki/Sync-RDB
下面给出实践过程中的注意事项。
3.1 参数配置
1)总配置文件application.yml
说明:
- 一份数据可以被多个group同时消费, 多个group之间会是一个并行执行, 一个group内部是一个串行执行多个outerAdapters, 比如例子中logger和hbase
- 目前client adapter数据订阅的方式支持两种,直连canal server 或者 订阅kafka/RocketMQ的消息
zookeeperHosts填了以后,可以支持分布式锁;如果对接Canal-Server为集群模式,那么还是需要填写的,具体原因见下面高可用部分。
2)对应任务的Adapter配置
同步到mysql去的任务配置在conf/rdb路径下,本文使用的任务配置文件名叫 mysql1.yml
注意!targetPk下面填的是源主键和目标主键的映射关系, srcPk:targetPk。
3)日志格式修改
logback.xml中默认日志等级为debug,线上使用时,记得改到info,否则日志会打爆
3.2 增量同步能力
1) DML 增量同步
完成上面的配置,启动后就能正常订阅增量数据了。Adapter能够接收到mq到信息,并在目标库投递成功。
具体会打出如下日志。
2)DDL同步
如果需要使用DDL同步能力,必须在rdb中配置mirroDb为true才可以。
3.3 全量同步能力
Adapter提供了全量同步的能力,具体操作可以参考官网 github.com/alibaba/canal/wiki/ClientAdapter中的3.2节。
这里我们使用命令
curl http://127.0.0.1:8081/etl/rdb/mysql1/mysql1.yml -X POST
输出结果如下
4. 动态配置
4.1 任务开关
curl http://127.0.0.1:8081/syncSwitch/dts-dbvtest-insertdata/on -X PUT
如果在application.yml里面配置了zk地址,那么会使用分布式开关,这个任务开关会注册到zk上,对任意机器执行开关,会把所有同样任务的机器进行启停。
相关源码实现如下:
- 获取zk上的任务开关状态信息
- 如果是false,就断开连接
4.2 配置变更
1)本地配置文件
adapter默认是读取本地的配置文件进行配置的。
有个比较意外的地方,就是修改配置文件,任务会自动刷新配置,实现了动态配置。
我们看下实现原理。
- 继承了FileAlterationListenerAdaptor
- 发现文件变更后
- 销毁目前的canalAdapterService
- 刷新contextRefresher
- sleep 2秒
- 重新初始化canalAdapterService
最终日志会打印
2)基于mysql的远程配置
如果配置了多个adapter,可以采用mysql存储配置信息,实现全局统一的配置。
这个的实现原理也比较简单:
- 本地异步线程轮训mysql
- 如果有更新就将更新的配置写入本地配置文件
- 动态更新
5. 数据可靠性分析
5.1 ack机制
Adapter的一个任务采用一个多线程模型。
- 主线程抓取mq的message写入队列queue,CountDownLatch等待
- 异步线程poll队列queue,投递下游
- 投递成功后,主线程释放latch,向mq返回ack
这里需要注意,这里同步的行为是重新执行一次,比如update一行数据,如果目标库由于某种原因没有这条数据的主键id,导致update返回0,也是认为消费成功了
5.2 重试机制
application.yml里面的retries参数用于从queue队列poll后,投递下游的重试。
需要仔细权衡一下,重试间隔0.5s,可以设置个x次,避免网络抖动丢失数据。
重试次数到了,会自动ack。所以这里在使用过程中需要注意采集失败的日志,及时报警提醒。
6. 性能问题分析
具体性能要求还是需要通过压测来得到结论。
这里给出两个从源码中看到的性能优化相关的点。
6.1 全量同步多线程
全量同步的时候,同步效率是一个值得考虑的问题。
adapter对全量数据同步效率做了一些设计,当全量同步数量大于1W会开多线程,代码如下所示:
但是这里有个mysql的深分页的问题,可以注意一下,会对源数据库造成比较大的性能压力。
6.2 全量同步select *
全量同步的另一个效率问题,就在于select * ,避免客户端内存被打爆。
看了下源码,果然也已经考虑了这个问题,开启了JDBC的流式查询。
7. 监控告警
如果要在生产使用,少不了监控告警的辅助。
虽然Adapter不像canal-server那样提供了监控指标的相关api,但是我们还是可以做一些辅助的监控告警。
1) mq的消息堆积告警
利用mq已有的topic下的堆积告警,如果Adapter出现故障,造成了mq的消息堆积,可以及时发现。
2) 日志异常告警
Adapter有自己的日志格式,可以跟已有监控系统确认下日志收集的配置方式与日志解析格式。
然后通过修改 conf/logback.xml的pattern来修改日志的打印格式,进行配置采集。
8. 高可用
通过源码阅读发现
tcp模式支持通过zk做HA(非自身高可用),mq模式不支持zk做HA
TCP模式可以HA,但是跟我们的HA理解又不太一样。
因为需要直接对接上游的Canal-Server,而Canal-Server的HA会导致ip变化,所以adapter的tcp模式的HA是为了支持这个,可以监听IP变化,对接不同的上游server,并不是自身的高可用架构。
而MQ模式本身是不支持HA的。
但是,我们如果我们对接上游MQ的模式,就可以做一个取巧的高可用。
目前从binlog抓取mq以后,只会投递到指定topic的一个队列中(即使哈希做了多队列,道理也一样),因此,mq消费时采用集群模式,就会只有一个client能够顺序消费对应队列中的消息。
这样,我们部署两台adapter,有两个mq的消费者同时运行,正常情况下,只会有一台机器在消费任务,一旦一台机器挂了,mq会自动用另一台机器的任务进行继续消费。做了一个简易的高可用。
缺点也比较明显,任务无法负载均衡,只能跑在一台机器上
因此,需要考虑分多个消费者组进行任务处理。
推荐阅读
看到这里了,原创不易,点个赞吧,你最好看了~
知识碎片重新梳理,构建Java知识图谱:https://github.com/saigu/JavaKnowledgeGraph (历史文章查阅非常方便)
转载:https://blog.csdn.net/u014730658/article/details/108399658