小言_互联网的博客

CDC系列(三)、Debezium 监控MySQL操作日志实时同步到Kafka(对比canal,maxwell)

1310人阅读  评论(0)

目录

前言

准备工作

配置Connector

注册启动Connector

验证CDC

尾巴


CDC系列:

CDC系列(一)、Canal 集群部署及使用(带WebUI)

CDC系列(二)、Maxwell_v1.27.1 监控MySQL操作日志实时同步到Kafka


前言

前两篇我们介绍了CDC工具,以及利用Canal和Maxwell监控mysql操作日志并写入kafka,本篇我们来讲解另一个CDC工具:debezium。debezium不仅仅支持mysql的监控,目前最新版已支持mysql,postgre,mongodb,oracle,sqlserver,db2,cassandra等,它依赖于kafka connect 服务,通过kafka connect,我们可以很方便的将debezium采集到的数据库日志输出到各类目标端。

本篇我们演示如何利用kafka connect启动debezium mysql connector监控mysql 数据库操作日志并写入kafka内

准备工作

上面提到debezium依赖于kafka connect ,而且kafka connect是在0.9版本之后才由confluent团队贡献出来,因此我们必须要先安装好kafka0.9+版本,如果没有的话也可以参考我之前的文章:

由于我们要监控mysql的binlog,因此还需要一个可以访问mysql binlog权限的用户,这个操作在前面两篇有介绍就不在此赘述了。

配置Connector

1. 下载,到官方网址下载最新1.2 版debezium-mysql-connector:https://debezium.io/releases/1.2/

2. 配置kafka connect服务


  
  1. 1. 在kafka目录下新建一个文件夹
  2. mkdir /opt/app/lib_conn
  3. 2. 将下载的debezium插件上传至服务器并解压至刚才创建的目录
  4. tar -zxvf debezium-connector-mysql-1.2.5.Final-plugin.tar.gz -C /opt/app/lib_conn
  5. 3. 修改kafka connect 配置文件
  6. vim /opt/app/kafka/connect-standalone.properties
  7. #----------------------
  8. #修改下面的几个参数,其他的不变
  9. bootstrap.servers=wykd:9092
  10. plugin.path=/opt/app/kafka_2.11-2.4.1/lib_conn #指向刚才放置debezium插件的目录
  11. #这两个参数表示connect消息格式,可选json或avro,在消费的时候也需要使用对应的格式,本次我们使用json格式
  12. key.converter=org.apache.kafka.connect.json.JsonConverter
  13. value.converter=org.apache.kafka.connect.json.JsonConverter

3. 配置debezium mysql connector

创建debezium mysql connector插件配置文件,核心参数如下,其他参数请参考官方手册:mysql-connector-configuration-properties_debezium


  
  1. #修改kafka connect 配置文件
  2. vim /opt/app/kafka/connect-debezium-mysql-wyk-csdn.properties
  3. #----------------------
  4. name=wyk_csdn_mysql_debezium_connector #connector 名称,唯一
  5. connector.class=io.debezium.connector.mysql.MySqlConnector #connector 插件
  6. database.hostname=wykd #数据库host
  7. database.port=3306 #数据库port
  8. database.user=root #数据库user 有复制binlog权限
  9. database.password=123456 #数据库password
  10. database.server.id=9999 # server id 同一个数据库的serverId不能有重复的
  11. database.server.name=wyk_sandbox # server name [记录DDL动作的topic名称,给消费端使用的]
  12. database.whitelist=testschema # schema白名单,只监控白名单库下的,和blacklist参数互相冲突
  13. database.history.kafka.bootstrap.servers=wykd:9092 # kafka
  14. database.history.kafka.topic=wyk_csdn_debezium_mysql #记录所有schema变动记录的topic[给连接器使用的]
  15. include.schema.changes= true # 监控schema变动,写入一个名称和server name一样的topic内
  16. include.query= true # 将执行的sql也记录下来,需要mysql端的参数binlog_rows_query_log_events=ON

注意:

配置文件中每行参数前后不能有空格;

如果需要在消息内记录执行的sql,需要:

  • mysql:set global binlog_rows_query_log_events=ON;
  • debezium mysql connect配置文件:include.query=true

需要特别关注的几个参数:whitelist、blacklist、include.query、include.schema.changes、snapshot.mode

注册启动Connector

如果当前kafka没有启动kafka connect服务,则使用下面的命令启动并注册connect到kafka:


  
  1. cd $KAFKA_HOME
  2. bin/connect-standalone.sh config/connect-standalone.properties config/connect-debezium-mysql-wyk-csdn2.properties

启动成功后,会一并启动kafka rest服务,我们可以使用restful api的方式查看或管理connectors:


  
  1. GET /Connectors:返回活跃的 Connector 列表
  2. POST /Connectors:创建一个新的 Connector;请求的主体是一个包含字符串name字段和对象 config 字段(Connector 的配置参数)的 JSON 对象。
  3. GET /Connectors/{name}:获取指定 Connector 的信息
  4. GET /Connectors/{name}/config:获取指定 Connector 的配置参数
  5. PUT /Connectors/{name}/config:更新指定 Connector 的配置参数
  6. GET /Connectors/{name}/status:获取 Connector 的当前状态,包括它是否正在运行,失败,暂停等。
  7. GET /Connectors/{name}/tasks:获取当前正在运行的 Connector 的任务列表。
  8. GET /Connectors/{name}/tasks/{taskid}/status:获取任务的当前状态,包括是否是运行中的,失败的,暂停的等,
  9. PUT /Connectors/{name}/pause:暂停连接器和它的任务,停止消息处理,直到 Connector 恢复。
  10. PUT /Connectors/{name}/resume:恢复暂停的 Connector(如果 Connector 没有暂停,则什么都不做)
  11. POST /Connectors/{name}/restart:重启 Connector(Connector 已故障)
  12. POST /Connectors/{name}/tasks/{taskId}/restart:重启单个任务 (通常这个任务已失败)
  13. DELETE /Connectors/{name}:删除 Connector, 停止所有的任务并删除其配置

在浏览器查看当前活跃的connectors: ip:8083/connectors

例:此时我们想再注册一个debezium mysql connector服务的话就不能用刚刚的命令启动了,会报错提示Failed to bind to 0.0.0.0/0.0.0.0:8083,Caused by: java.net.BindException: Address already in use。这里的8083端口就是kafka connect服务,因此我们只能使用下面的命令来注册新的connector:


  
  1. # 这里的 connector 配置需要使用json格式
  2. curl -s http://10.1.174.10:8083/connectors -X POST -H "Content-Type: application/json" --data \
  3. '{"name": "wyk_csdn_mysql_debezium_connector2","config": {"connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "10.1.174.10", "database.port": "3306", "database.user": "root", "database.password": "123456", "database.server.id": "9988", "database.server.name": "wyk_sandbox", "database.whitelist": "canal_manager", "database.history.kafka.bootstrap.servers": "10.1.174.10:9092 ", "database.history.kafka.topic": "wyk_csdn_debezium_mysql2", "include.schema.changes": "true","include.query":"true" }}'

例:使用rest api管理kafka connectors:


  
  1. # 查看所有活跃的connectors
  2. curl -s http://10.1.174.10:8083/connectors -X GET
  3. # 注册一个新的connector
  4. curl -s http://10.1.174.10:8083/connectors -X POST -H "Content-Type: application/json" --data \
  5. '{"name": "wyk_csdn_mysql_debezium_connector2","config": {"connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "10.1.174.10", "database.port": "3306", "database.user": "root", "database.password": "123456", "database.server.id": "9988", "database.server.name": "wyk_sandbox", "database.whitelist": "canal_manager", "database.history.kafka.bootstrap.servers": "10.1.174.10:9092 ", "database.history.kafka.topic": "wyk_csdn_debezium_mysql", "include.schema.changes": "true","include.query":"true" }}'
  6. # 删除指定的connector
  7. curl -s http://10.1.174.10:8083/connectors/wyk_csdn_mysql_debezium_connector2 -X DELETE

验证CDC

此时我们已经启动了debezium mysql connect来监控指定的mysql并将数据写入了kafka,和maxwell,canal不同的是,debezium监控的消息会将每张表对应一个topic,且该topic只包含该表的增删改(insert/delete/update)操作,DDL操作如create/drop/truncate/rename/alter等命令会统一写入以配置文件中的database.server.name 参数的值为名称的topic内。

Topic 命名方式:对于同一个connector 监控的数据范围,一张表对应一个topic,该范围内所有库所有表的DDL操作记录在同一个topic内:

  • DDL topic:serverName
  • DML topic:serverName.databaseName.tableName

默认情况下,为了提升性能及保证数据一致性,在第一次启动时并不会从binlog回放表的每个操作来同步数据,而是通过一个全局读锁将表的快照直接同步到对应的topic内,这个过程很快,在同步完成后才会继续监控binlog内的操作达到初始化时全量切实时的目的。该快照模式的参数在connector配置文件中为 snapshot.mode,有下面几个可选的值:

  • initial:默认值,第一次启动connector时同步数据快照,完成后再从记录的binlog位置开始同步,但当关闭connector很久以后再次启动时若上次记录的Offset在binlog内已经被删除的时候就会启动失败(建议mysql保留全部binlog);
  • when_needed:在有必要的任何时候执行快照任务,和initial类似,不同的是当遇到故障时如binlog丢失时会重新发起一个快照同步;
  • never:永不执行快照同步,从Binlog记录的位置回放;
  • schema_only:从第一次启动后记录当前schema,然后从binlog位置开始同步,仅同步schema变动而不同步数据;
  • schema_only_recovery:仅同步schema变更而不同步数据,只能配置于已有的connector之上,不是从当前启动时记录schema而是从已存在的connector上次记录的位置开始;

消息体: 分为两种,一个记录DDL,一个记录DML

DDL:schema_change_topic_structure

DML:change_event_values ,默认会使用primary key 或unique key作为消息体的key,也可自定义key。

DML中insert对应的op为c,update对应为u,delete对应为d。 

例:

DDL:


  
  1. {
  2. "schema": {
  3. "type": "struct",
  4. "fields": [
  5. {
  6. "type": "struct",
  7. "fields": [
  8. {
  9. "type": "string",
  10. "optional": false,
  11. "field": "version"
  12. },
  13. {
  14. "type": "string",
  15. "optional": false,
  16. "field": "connector"
  17. },
  18. {
  19. "type": "string",
  20. "optional": false,
  21. "field": "name"
  22. },
  23. {
  24. "type": "int64",
  25. "optional": false,
  26. "field": "ts_ms"
  27. },
  28. {
  29. "type": "string",
  30. "optional": true,
  31. "name": "io.debezium.data.Enum",
  32. "version": 1,
  33. "parameters": {
  34. "allowed": "true,last,false"
  35. },
  36. "default": "false",
  37. "field": "snapshot"
  38. },
  39. {
  40. "type": "string",
  41. "optional": false,
  42. "field": "db"
  43. },
  44. {
  45. "type": "string",
  46. "optional": true,
  47. "field": "table"
  48. },
  49. {
  50. "type": "int64",
  51. "optional": false,
  52. "field": "server_id"
  53. },
  54. {
  55. "type": "string",
  56. "optional": true,
  57. "field": "gtid"
  58. },
  59. {
  60. "type": "string",
  61. "optional": false,
  62. "field": "file"
  63. },
  64. {
  65. "type": "int64",
  66. "optional": false,
  67. "field": "pos"
  68. },
  69. {
  70. "type": "int32",
  71. "optional": false,
  72. "field": "row"
  73. },
  74. {
  75. "type": "int64",
  76. "optional": true,
  77. "field": "thread"
  78. },
  79. {
  80. "type": "string",
  81. "optional": true,
  82. "field": "query"
  83. }
  84. ],
  85. "optional": false,
  86. "name": "io.debezium.connector.mysql.Source",
  87. "field": "source"
  88. },
  89. {
  90. "type": "string",
  91. "optional": false,
  92. "field": "databaseName"
  93. },
  94. {
  95. "type": "string",
  96. "optional": false,
  97. "field": "ddl"
  98. }
  99. ],
  100. "optional": false,
  101. "name": "io.debezium.connector.mysql.SchemaChangeValue"
  102. },
  103. "payload": {
  104. "source": {
  105. "version": "1.2.2.Final",
  106. "connector": "mysql",
  107. "name": "wyk_sandbox",
  108. "ts_ms": 1601016725000,
  109. "snapshot": "false",
  110. "db": "testschema",
  111. "table": "wyk_csdn",
  112. "server_id": 1,
  113. "gtid": null,
  114. "file": "mysql-bin.000016",
  115. "pos": 93212,
  116. "row": 0,
  117. "thread": null,
  118. "query": null
  119. },
  120. "databaseName": "testschema",
  121. "ddl": "create table wyk_csdn(id int,name varchar(20),ins_ts timestamp)"
  122. }
  123. }

DML:


  
  1. {
  2. "schema": {
  3. "type": "struct",
  4. "fields": [
  5. {
  6. "type": "struct",
  7. "fields": [
  8. {
  9. "type": "int32",
  10. "optional": true,
  11. "field": "id"
  12. },
  13. {
  14. "type": "string",
  15. "optional": true,
  16. "field": "name"
  17. },
  18. {
  19. "type": "string",
  20. "optional": true,
  21. "name": "io.debezium.time.ZonedTimestamp",
  22. "version": 1,
  23. "field": "ins_ts"
  24. }
  25. ],
  26. "optional": true,
  27. "name": "wyk_sandbox.testschema.wyk_csdn.Value",
  28. "field": "before"
  29. },
  30. {
  31. "type": "struct",
  32. "fields": [
  33. {
  34. "type": "int32",
  35. "optional": true,
  36. "field": "id"
  37. },
  38. {
  39. "type": "string",
  40. "optional": true,
  41. "field": "name"
  42. },
  43. {
  44. "type": "string",
  45. "optional": true,
  46. "name": "io.debezium.time.ZonedTimestamp",
  47. "version": 1,
  48. "field": "ins_ts"
  49. }
  50. ],
  51. "optional": true,
  52. "name": "wyk_sandbox.testschema.wyk_csdn.Value",
  53. "field": "after"
  54. },
  55. {
  56. "type": "struct",
  57. "fields": [
  58. {
  59. "type": "string",
  60. "optional": false,
  61. "field": "version"
  62. },
  63. {
  64. "type": "string",
  65. "optional": false,
  66. "field": "connector"
  67. },
  68. {
  69. "type": "string",
  70. "optional": false,
  71. "field": "name"
  72. },
  73. {
  74. "type": "int64",
  75. "optional": false,
  76. "field": "ts_ms"
  77. },
  78. {
  79. "type": "string",
  80. "optional": true,
  81. "name": "io.debezium.data.Enum",
  82. "version": 1,
  83. "parameters": {
  84. "allowed": "true,last,false"
  85. },
  86. "default": "false",
  87. "field": "snapshot"
  88. },
  89. {
  90. "type": "string",
  91. "optional": false,
  92. "field": "db"
  93. },
  94. {
  95. "type": "string",
  96. "optional": true,
  97. "field": "table"
  98. },
  99. {
  100. "type": "int64",
  101. "optional": false,
  102. "field": "server_id"
  103. },
  104. {
  105. "type": "string",
  106. "optional": true,
  107. "field": "gtid"
  108. },
  109. {
  110. "type": "string",
  111. "optional": false,
  112. "field": "file"
  113. },
  114. {
  115. "type": "int64",
  116. "optional": false,
  117. "field": "pos"
  118. },
  119. {
  120. "type": "int32",
  121. "optional": false,
  122. "field": "row"
  123. },
  124. {
  125. "type": "int64",
  126. "optional": true,
  127. "field": "thread"
  128. },
  129. {
  130. "type": "string",
  131. "optional": true,
  132. "field": "query"
  133. }
  134. ],
  135. "optional": false,
  136. "name": "io.debezium.connector.mysql.Source",
  137. "field": "source"
  138. },
  139. {
  140. "type": "string",
  141. "optional": false,
  142. "field": "op"
  143. },
  144. {
  145. "type": "int64",
  146. "optional": true,
  147. "field": "ts_ms"
  148. },
  149. {
  150. "type": "struct",
  151. "fields": [
  152. {
  153. "type": "string",
  154. "optional": false,
  155. "field": "id"
  156. },
  157. {
  158. "type": "int64",
  159. "optional": false,
  160. "field": "total_order"
  161. },
  162. {
  163. "type": "int64",
  164. "optional": false,
  165. "field": "data_collection_order"
  166. }
  167. ],
  168. "optional": true,
  169. "field": "transaction"
  170. }
  171. ],
  172. "optional": false,
  173. "name": "wyk_sandbox.testschema.wyk_csdn.Envelope"
  174. },
  175. "payload": {
  176. "before": null,
  177. "after": {
  178. "id": 2,
  179. "name": "wyk2",
  180. "ins_ts": "2020-09-25T08:10:22Z"
  181. },
  182. "source": {
  183. "version": "1.2.2.Final",
  184. "connector": "mysql",
  185. "name": "wyk_sandbox",
  186. "ts_ms": 1601021422000,
  187. "snapshot": "false",
  188. "db": "testschema",
  189. "table": "wyk_csdn",
  190. "server_id": 1,
  191. "gtid": null,
  192. "file": "mysql-bin.000016",
  193. "pos": 97798,
  194. "row": 0,
  195. "thread": 162,
  196. "query": "insert into wyk_csdn values(2,'wyk2',current_timestamp())"
  197. },
  198. "op": "c",
  199. "ts_ms": 1601021422679,
  200. "transaction": null
  201. }
  202. }

另外如果使用confluent platform,可以在control center中直接查看管理connectors以及查看topic内的数据:

 

尾巴

优点: 

  1. debezium支持全DDL(create/alter/rename/drop/truncate)和DML(insert/update/delete)的操作日志同步;(Maxwell不支持truncate命令同步)
  2. 支持使用数据的主键或唯一键作为消息体的key(可以根据该唯一键做Topic分区进行负载均衡);
  3. 支持快照模式(snapshot.mode)全量同步;(Canal不支持全量,Maxwell支持bootstrap全量)
  4. 还支持多种数据源(MySQL,Oracle,Postgre,MongoDB,SqlServer,DB2,Cassandra),比Canal,Maxwell功能更强大;
  5. 社区活跃,与kafka connect配合使用如虎添翼。

缺点:

  1. 需要kafka connect依赖;
  2. 消息体内容太多,对消息队列压力较大;(可使用transforms 模块进行压缩和定制)
  3. 每张表对应一个topic,管理起来不够方便(canal,maxwell则可以使用正则来处理); (可使用transforms 模块进行压缩和定制)

希望本文对你有帮助,请点个赞鼓励一下作者吧~ 谢谢!


转载:https://blog.csdn.net/wsdc0521/article/details/108799267
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场