目录
CDC系列:
CDC系列(一)、Canal 集群部署及使用(带WebUI)
CDC系列(二)、Maxwell_v1.27.1 监控MySQL操作日志实时同步到Kafka
前言
前两篇我们介绍了CDC工具,以及利用Canal和Maxwell监控mysql操作日志并写入kafka,本篇我们来讲解另一个CDC工具:debezium。debezium不仅仅支持mysql的监控,目前最新版已支持mysql,postgre,mongodb,oracle,sqlserver,db2,cassandra等,它依赖于kafka connect 服务,通过kafka connect,我们可以很方便的将debezium采集到的数据库日志输出到各类目标端。
本篇我们演示如何利用kafka connect启动debezium mysql connector监控mysql 数据库操作日志并写入kafka内
准备工作
上面提到debezium依赖于kafka connect ,而且kafka connect是在0.9版本之后才由confluent团队贡献出来,因此我们必须要先安装好kafka0.9+版本,如果没有的话也可以参考我之前的文章:
由于我们要监控mysql的binlog,因此还需要一个可以访问mysql binlog权限的用户,这个操作在前面两篇有介绍就不在此赘述了。
配置Connector
1. 下载,到官方网址下载最新1.2 版debezium-mysql-connector:https://debezium.io/releases/1.2/
2. 配置kafka connect服务
-
1. 在kafka目录下新建一个文件夹
-
mkdir /opt/app/lib_conn
-
-
2. 将下载的debezium插件上传至服务器并解压至刚才创建的目录
-
tar -zxvf debezium-connector-mysql-1.2.5.Final-plugin.tar.gz -C /opt/app/lib_conn
-
-
3. 修改kafka connect 配置文件
-
vim /opt/app/kafka/connect-standalone.properties
-
#----------------------
-
#修改下面的几个参数,其他的不变
-
bootstrap.servers=wykd:9092
-
plugin.path=/opt/app/kafka_2.11-2.4.1/lib_conn
#指向刚才放置debezium插件的目录
-
-
#这两个参数表示connect消息格式,可选json或avro,在消费的时候也需要使用对应的格式,本次我们使用json格式
-
key.converter=org.apache.kafka.connect.json.JsonConverter
-
value.converter=org.apache.kafka.connect.json.JsonConverter
3. 配置debezium mysql connector
创建debezium mysql connector插件配置文件,核心参数如下,其他参数请参考官方手册:mysql-connector-configuration-properties_debezium
-
#修改kafka connect 配置文件
-
vim /opt/app/kafka/connect-debezium-mysql-wyk-csdn.properties
-
#----------------------
-
name=wyk_csdn_mysql_debezium_connector
#connector 名称,唯一
-
connector.class=io.debezium.connector.mysql.MySqlConnector
#connector 插件
-
database.hostname=wykd
#数据库host
-
database.port=3306
#数据库port
-
database.user=root
#数据库user 有复制binlog权限
-
database.password=123456
#数据库password
-
database.server.id=9999
# server id 同一个数据库的serverId不能有重复的
-
database.server.name=wyk_sandbox
# server name [记录DDL动作的topic名称,给消费端使用的]
-
database.whitelist=testschema
# schema白名单,只监控白名单库下的,和blacklist参数互相冲突
-
database.history.kafka.bootstrap.servers=wykd:9092
# kafka
-
database.history.kafka.topic=wyk_csdn_debezium_mysql
#记录所有schema变动记录的topic[给连接器使用的]
-
include.schema.changes=
true
# 监控schema变动,写入一个名称和server name一样的topic内
-
include.query=
true
# 将执行的sql也记录下来,需要mysql端的参数binlog_rows_query_log_events=ON
注意:
配置文件中每行参数前后不能有空格;
如果需要在消息内记录执行的sql,需要:
- mysql:set global binlog_rows_query_log_events=ON;
- debezium mysql connect配置文件:include.query=true
需要特别关注的几个参数:whitelist、blacklist、include.query、include.schema.changes、snapshot.mode
注册启动Connector
如果当前kafka没有启动kafka connect服务,则使用下面的命令启动并注册connect到kafka:
-
cd
$KAFKA_HOME
-
bin/connect-standalone.sh config/connect-standalone.properties config/connect-debezium-mysql-wyk-csdn2.properties
启动成功后,会一并启动kafka rest服务,我们可以使用restful api的方式查看或管理connectors:
-
GET /Connectors:返回活跃的 Connector 列表
-
POST /Connectors:创建一个新的 Connector;请求的主体是一个包含字符串name字段和对象 config 字段(Connector 的配置参数)的 JSON 对象。
-
GET /Connectors/{name}:获取指定 Connector 的信息
-
GET /Connectors/{name}/config:获取指定 Connector 的配置参数
-
PUT /Connectors/{name}/config:更新指定 Connector 的配置参数
-
GET /Connectors/{name}/status:获取 Connector 的当前状态,包括它是否正在运行,失败,暂停等。
-
GET /Connectors/{name}/tasks:获取当前正在运行的 Connector 的任务列表。
-
GET /Connectors/{name}/tasks/{taskid}/status:获取任务的当前状态,包括是否是运行中的,失败的,暂停的等,
-
PUT /Connectors/{name}/pause:暂停连接器和它的任务,停止消息处理,直到 Connector 恢复。
-
PUT /Connectors/{name}/resume:恢复暂停的 Connector(如果 Connector 没有暂停,则什么都不做)
-
POST /Connectors/{name}/restart:重启 Connector(Connector 已故障)
-
POST /Connectors/{name}/tasks/{taskId}/restart:重启单个任务 (通常这个任务已失败)
-
DELETE /Connectors/{name}:删除 Connector, 停止所有的任务并删除其配置
在浏览器查看当前活跃的connectors: ip:8083/connectors
例:此时我们想再注册一个debezium mysql connector服务的话就不能用刚刚的命令启动了,会报错提示Failed to bind to 0.0.0.0/0.0.0.0:8083,Caused by: java.net.BindException: Address already in use。这里的8083端口就是kafka connect服务,因此我们只能使用下面的命令来注册新的connector:
-
# 这里的 connector 配置需要使用json格式
-
curl -s http://10.1.174.10:8083/connectors -X POST -H
"Content-Type: application/json" --data \
-
'{"name": "wyk_csdn_mysql_debezium_connector2","config": {"connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "10.1.174.10", "database.port": "3306", "database.user": "root", "database.password": "123456", "database.server.id": "9988", "database.server.name": "wyk_sandbox", "database.whitelist": "canal_manager", "database.history.kafka.bootstrap.servers": "10.1.174.10:9092 ", "database.history.kafka.topic": "wyk_csdn_debezium_mysql2", "include.schema.changes": "true","include.query":"true" }}'
例:使用rest api管理kafka connectors:
-
# 查看所有活跃的connectors
-
curl -s http://10.1.174.10:8083/connectors -X GET
-
-
# 注册一个新的connector
-
curl -s http://10.1.174.10:8083/connectors -X POST -H
"Content-Type: application/json" --data \
-
'{"name": "wyk_csdn_mysql_debezium_connector2","config": {"connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "10.1.174.10", "database.port": "3306", "database.user": "root", "database.password": "123456", "database.server.id": "9988", "database.server.name": "wyk_sandbox", "database.whitelist": "canal_manager", "database.history.kafka.bootstrap.servers": "10.1.174.10:9092 ", "database.history.kafka.topic": "wyk_csdn_debezium_mysql", "include.schema.changes": "true","include.query":"true" }}'
-
-
# 删除指定的connector
-
curl -s http://10.1.174.10:8083/connectors/wyk_csdn_mysql_debezium_connector2 -X DELETE
验证CDC
此时我们已经启动了debezium mysql connect来监控指定的mysql并将数据写入了kafka,和maxwell,canal不同的是,debezium监控的消息会将每张表对应一个topic,且该topic只包含该表的增删改(insert/delete/update)操作,DDL操作如create/drop/truncate/rename/alter等命令会统一写入以配置文件中的database.server.name 参数的值为名称的topic内。
Topic 命名方式:对于同一个connector 监控的数据范围,一张表对应一个topic,该范围内所有库所有表的DDL操作记录在同一个topic内:
- DDL topic:serverName
- DML topic:serverName.databaseName.tableName
默认情况下,为了提升性能及保证数据一致性,在第一次启动时并不会从binlog回放表的每个操作来同步数据,而是通过一个全局读锁将表的快照直接同步到对应的topic内,这个过程很快,在同步完成后才会继续监控binlog内的操作达到初始化时全量切实时的目的。该快照模式的参数在connector配置文件中为 snapshot.mode,有下面几个可选的值:
- initial:默认值,第一次启动connector时同步数据快照,完成后再从记录的binlog位置开始同步,但当关闭connector很久以后再次启动时若上次记录的Offset在binlog内已经被删除的时候就会启动失败(建议mysql保留全部binlog);
- when_needed:在有必要的任何时候执行快照任务,和initial类似,不同的是当遇到故障时如binlog丢失时会重新发起一个快照同步;
- never:永不执行快照同步,从Binlog记录的位置回放;
- schema_only:从第一次启动后记录当前schema,然后从binlog位置开始同步,仅同步schema变动而不同步数据;
- schema_only_recovery:仅同步schema变更而不同步数据,只能配置于已有的connector之上,不是从当前启动时记录schema而是从已存在的connector上次记录的位置开始;
消息体: 分为两种,一个记录DDL,一个记录DML
DDL:schema_change_topic_structure
DML:change_event_values ,默认会使用primary key 或unique key作为消息体的key,也可自定义key。
DML中insert对应的op为c,update对应为u,delete对应为d。
例:
DDL:
-
{
-
"schema": {
-
"type":
"struct",
-
"fields": [
-
{
-
"type":
"struct",
-
"fields": [
-
{
-
"type":
"string",
-
"optional":
false,
-
"field":
"version"
-
},
-
{
-
"type":
"string",
-
"optional":
false,
-
"field":
"connector"
-
},
-
{
-
"type":
"string",
-
"optional":
false,
-
"field":
"name"
-
},
-
{
-
"type":
"int64",
-
"optional":
false,
-
"field":
"ts_ms"
-
},
-
{
-
"type":
"string",
-
"optional":
true,
-
"name":
"io.debezium.data.Enum",
-
"version":
1,
-
"parameters": {
-
"allowed":
"true,last,false"
-
},
-
"default":
"false",
-
"field":
"snapshot"
-
},
-
{
-
"type":
"string",
-
"optional":
false,
-
"field":
"db"
-
},
-
{
-
"type":
"string",
-
"optional":
true,
-
"field":
"table"
-
},
-
{
-
"type":
"int64",
-
"optional":
false,
-
"field":
"server_id"
-
},
-
{
-
"type":
"string",
-
"optional":
true,
-
"field":
"gtid"
-
},
-
{
-
"type":
"string",
-
"optional":
false,
-
"field":
"file"
-
},
-
{
-
"type":
"int64",
-
"optional":
false,
-
"field":
"pos"
-
},
-
{
-
"type":
"int32",
-
"optional":
false,
-
"field":
"row"
-
},
-
{
-
"type":
"int64",
-
"optional":
true,
-
"field":
"thread"
-
},
-
{
-
"type":
"string",
-
"optional":
true,
-
"field":
"query"
-
}
-
],
-
"optional":
false,
-
"name":
"io.debezium.connector.mysql.Source",
-
"field":
"source"
-
},
-
{
-
"type":
"string",
-
"optional":
false,
-
"field":
"databaseName"
-
},
-
{
-
"type":
"string",
-
"optional":
false,
-
"field":
"ddl"
-
}
-
],
-
"optional":
false,
-
"name":
"io.debezium.connector.mysql.SchemaChangeValue"
-
},
-
"payload": {
-
"source": {
-
"version":
"1.2.2.Final",
-
"connector":
"mysql",
-
"name":
"wyk_sandbox",
-
"ts_ms":
1601016725000,
-
"snapshot":
"false",
-
"db":
"testschema",
-
"table":
"wyk_csdn",
-
"server_id":
1,
-
"gtid":
null,
-
"file":
"mysql-bin.000016",
-
"pos":
93212,
-
"row":
0,
-
"thread":
null,
-
"query":
null
-
},
-
"databaseName":
"testschema",
-
"ddl":
"create table wyk_csdn(id int,name varchar(20),ins_ts timestamp)"
-
}
-
}
DML:
-
{
-
"schema": {
-
"type":
"struct",
-
"fields": [
-
{
-
"type":
"struct",
-
"fields": [
-
{
-
"type":
"int32",
-
"optional":
true,
-
"field":
"id"
-
},
-
{
-
"type":
"string",
-
"optional":
true,
-
"field":
"name"
-
},
-
{
-
"type":
"string",
-
"optional":
true,
-
"name":
"io.debezium.time.ZonedTimestamp",
-
"version":
1,
-
"field":
"ins_ts"
-
}
-
],
-
"optional":
true,
-
"name":
"wyk_sandbox.testschema.wyk_csdn.Value",
-
"field":
"before"
-
},
-
{
-
"type":
"struct",
-
"fields": [
-
{
-
"type":
"int32",
-
"optional":
true,
-
"field":
"id"
-
},
-
{
-
"type":
"string",
-
"optional":
true,
-
"field":
"name"
-
},
-
{
-
"type":
"string",
-
"optional":
true,
-
"name":
"io.debezium.time.ZonedTimestamp",
-
"version":
1,
-
"field":
"ins_ts"
-
}
-
],
-
"optional":
true,
-
"name":
"wyk_sandbox.testschema.wyk_csdn.Value",
-
"field":
"after"
-
},
-
{
-
"type":
"struct",
-
"fields": [
-
{
-
"type":
"string",
-
"optional":
false,
-
"field":
"version"
-
},
-
{
-
"type":
"string",
-
"optional":
false,
-
"field":
"connector"
-
},
-
{
-
"type":
"string",
-
"optional":
false,
-
"field":
"name"
-
},
-
{
-
"type":
"int64",
-
"optional":
false,
-
"field":
"ts_ms"
-
},
-
{
-
"type":
"string",
-
"optional":
true,
-
"name":
"io.debezium.data.Enum",
-
"version":
1,
-
"parameters": {
-
"allowed":
"true,last,false"
-
},
-
"default":
"false",
-
"field":
"snapshot"
-
},
-
{
-
"type":
"string",
-
"optional":
false,
-
"field":
"db"
-
},
-
{
-
"type":
"string",
-
"optional":
true,
-
"field":
"table"
-
},
-
{
-
"type":
"int64",
-
"optional":
false,
-
"field":
"server_id"
-
},
-
{
-
"type":
"string",
-
"optional":
true,
-
"field":
"gtid"
-
},
-
{
-
"type":
"string",
-
"optional":
false,
-
"field":
"file"
-
},
-
{
-
"type":
"int64",
-
"optional":
false,
-
"field":
"pos"
-
},
-
{
-
"type":
"int32",
-
"optional":
false,
-
"field":
"row"
-
},
-
{
-
"type":
"int64",
-
"optional":
true,
-
"field":
"thread"
-
},
-
{
-
"type":
"string",
-
"optional":
true,
-
"field":
"query"
-
}
-
],
-
"optional":
false,
-
"name":
"io.debezium.connector.mysql.Source",
-
"field":
"source"
-
},
-
{
-
"type":
"string",
-
"optional":
false,
-
"field":
"op"
-
},
-
{
-
"type":
"int64",
-
"optional":
true,
-
"field":
"ts_ms"
-
},
-
{
-
"type":
"struct",
-
"fields": [
-
{
-
"type":
"string",
-
"optional":
false,
-
"field":
"id"
-
},
-
{
-
"type":
"int64",
-
"optional":
false,
-
"field":
"total_order"
-
},
-
{
-
"type":
"int64",
-
"optional":
false,
-
"field":
"data_collection_order"
-
}
-
],
-
"optional":
true,
-
"field":
"transaction"
-
}
-
],
-
"optional":
false,
-
"name":
"wyk_sandbox.testschema.wyk_csdn.Envelope"
-
},
-
"payload": {
-
"before":
null,
-
"after": {
-
"id":
2,
-
"name":
"wyk2",
-
"ins_ts":
"2020-09-25T08:10:22Z"
-
},
-
"source": {
-
"version":
"1.2.2.Final",
-
"connector":
"mysql",
-
"name":
"wyk_sandbox",
-
"ts_ms":
1601021422000,
-
"snapshot":
"false",
-
"db":
"testschema",
-
"table":
"wyk_csdn",
-
"server_id":
1,
-
"gtid":
null,
-
"file":
"mysql-bin.000016",
-
"pos":
97798,
-
"row":
0,
-
"thread":
162,
-
"query":
"insert into wyk_csdn values(2,'wyk2',current_timestamp())"
-
},
-
"op":
"c",
-
"ts_ms":
1601021422679,
-
"transaction":
null
-
}
-
}
另外如果使用confluent platform,可以在control center中直接查看管理connectors以及查看topic内的数据:
尾巴
优点:
- debezium支持全DDL(create/alter/rename/drop/truncate)和DML(insert/update/delete)的操作日志同步;(Maxwell不支持truncate命令同步)
- 支持使用数据的主键或唯一键作为消息体的key(可以根据该唯一键做Topic分区进行负载均衡);
- 支持快照模式(snapshot.mode)全量同步;(Canal不支持全量,Maxwell支持bootstrap全量)
- 还支持多种数据源(MySQL,Oracle,Postgre,MongoDB,SqlServer,DB2,Cassandra),比Canal,Maxwell功能更强大;
- 社区活跃,与kafka connect配合使用如虎添翼。
缺点:
- 需要kafka connect依赖;
- 消息体内容太多,对消息队列压力较大;(可使用transforms 模块进行压缩和定制)
- 每张表对应一个topic,管理起来不够方便(canal,maxwell则可以使用正则来处理); (可使用transforms 模块进行压缩和定制)
希望本文对你有帮助,请点个赞鼓励一下作者吧~ 谢谢!
转载:https://blog.csdn.net/wsdc0521/article/details/108799267