目录:
[显示)]
目前MySQL Binlog解析工具主要有阿里的canal、maxwell和mysql_streamer,三个工具对照如下:
其中阿里开源的canal(https://github.com/alibaba/canal)当前稳定版本为v1.1.0,据官网介绍,这次版本整体性能提升了150%,是一个里程碑式的重大版本。canal基于java开发,分为服务端和客户端,拥有众多的衍生应用,性能稳定,功能强大。
maxwell(https://github.com/zendesk/maxwell)由zendesk开源,官网为http://maxwells-daemon.io/,也是由java开发,体量没有canal那么大,解析出来的结果为json,可以方便的发送到kafka、rabbitmq、redis等下游应用,进行处理。
mysql_streamer则是由python开发的binlog解析工具,使用相对较少。
1、支持SELECT * FROM table 的方式进行全量数据初始化
2、支持在主库发生failover后,自动恢复binlog位置(GTID)
3、可以对数据进行分区,解决数据倾斜问题,发送到kafka的数据支持database、table、column等级别的数据分区
4、工作方式是伪装为Slave,接收binlog events,然后根据schemas信息拼装,可以接受ddl、xid、row等各种event
maxwell相对于canal的优势是使用简单,因为使用canal需要自己编写客户端来消费canal解析到的数据,而maxwell则不同,它直接输出数据变更的json串,不需要再编写客户端。maxwell可以看作是canal服务端+canal客户端
文件版本:
jdk-8u161-linux-x64.tar.gz
apache-maven-3.5.4-bin.tar.gz 官网 下载:http://mirrors.hust.edu.cn/apache/maven/maven-3/3.5.4/binaries/apache-maven-3.5.4-bin.tar.gz
mkdir -p /opt/java_maven_env
mv jdk-8u161-linux-x64.tar.gz apache-maven-3.5.4-bin.tar.gz /opt/java_maven_env
#解压
cd /opt/java_maven_env
tar zxf jdk-8u161-linux-x64.tar.gz
tar zxf apache-maven-3.5.4-bin.tar.gz
#添加环境变量
vi /etc/profile 在末尾添加以下几行
JAVA_HOME=/opt/java_maven_env/jdk1.8.0_161
JRE_HOME=/opt/java_maven_env/jdk1.8.0_161/jre
CLASS_PATH=.:$JAVA\_HOME/lib:$JRE_HOME/lib
MAVEN_HOME=/opt/java_maven_env/apache-maven-3.5.4
PATH=$PATH:$JAVA_HOME/bin:$JRE\_HOME/bin:$MAVEN_HOME/bin
export JAVA_HOME JRE_HOME CLASS_PATH MAVEN_HOME PATH
#使环境变量生效 (注,去掉maven部分,即jdk环境)
source /etc/profile
#测试
java -version
java version "1.8.0_161"
Java(TM) SE Runtime Environment (build 1.8.0_161-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.161-b12, mixed mode)
mvn -v
Apache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; 2018-06-18T02:33:14+08:00)
Maven home: /opt/java_maven_env/apache-maven-3.5.4
Java version: 1.8.0_161, vendor: Oracle Corporation, runtime: /opt/java_maven_env/jdk1.8.0_161/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "3.10.0-514.el7.x86_64", arch: "amd64", family: "unix"
项目地址:https://github.com/zendesk/maxwell
下载地址:https://github.com/zendesk/maxwell/releases/download/v1.17.1/maxwell-1.17.1.tar.gz #已编译好的版本
源码:https://github.com/zendesk/maxwell/archive/v1.17.1.tar.gz
这里演示源码的编译,当然,也可以直接下载使用编译好的文件maxwell-1.17.1.tar.gz
wget https://github.com/zendesk/maxwell/archive/v1.17.1.tar.gz
tar zxf v1.17.1.tar.gz
cd maxwell-1.17.1/
make package #这个过程在我的机器上执行大约用时30min
1、MySQL需要开启binlog功能,binlog格式需为row,要设置一个唯一的server_id,binlog_row_image应该为FULL(默认就是FULL)
vi /etc/my.cnf #仅列相关配置项
[mysqld]
server_id=1234
log-bin=bin-log
binlog_format=row
GTID模式
vi /etc/my.cnf #仅列相关配置项
[mysqld]
server_id=1234
log-bin=bin-log
binlog_format=row
gtid-mode=ON
log-slave-updates=ON
enforce-gtid-consistency=true
2、Maxwell需要建一个数据库,用于存放状态信息,默认库名为maxwell,该库在maxwell启动时会自动创建,但是需要为其设置相应权限;maxwell运行时需要获取复制状态信息,以及在information_schema中获取数据库及字段信息,所以还需要授予相应权限
CREATE USER 'maxwell'@'%' identified by 'XXXXXX';
GRANT ALL on maxwell.* to 'maxwell'@'%' ;
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on . to 'maxwell'@'%';
FLUSH PRIVILEGES;
3、java环境,maxwell运行需要java
现在,就可以运行maxwell做测试了
tar zxf maxwell-1.17.1.tar.gz && cd maxwell-1.17.1
chown -R root:root *
bin/maxwell --user='maxwell' --password='XXXXXX' --host=192.168.31.77 --producer=stdout
Using kafka version: 1.0.0
14:40:23,761 WARN MaxwellMetrics - Metrics will not be exposed: metricsReportingType not configured.
14:40:24,120 INFO SchemaStoreSchema - Creating maxwell database
14:40:24,240 INFO Maxwell - Maxwell v1.17.1 is booting (StdoutProducer), starting at Position[BinlogPosition[mysql-binlog.000011:2539732], lastHeartbeat=0]
14:40:24,363 INFO AbstractSchemaStore - Maxwell is capturing initial schema
14:40:24,801 INFO BinlogConnectorReplicator - Setting initial binlog pos to: mysql-binlog.000011:2539732
14:40:24,965 INFO BinaryLogClient - Connected to 192.168.31.77:3306 at mysql-binlog.000011/2539732 (sid:6379, cid:4896)
14:40:24,965 INFO BinlogConnectorLifecycleListener - Binlog connected.
{"database":"test","table":"mytest","type":"insert","ts":1535179251,"xid":205516,"commit":true,"data":{"id":1,"name":"aa","age":23,"as":"bb"}}
如果使用的是docker镜像,则命令类似
docker pull zendesk/maxwell
docker run -it --rm zendesk/maxwell bin/maxwell --user=maxwell --password=XXXXXX --host=192.168.31.77 --producer=stdout
输出json串说明
{"database":"test","table":"mytest","type":"insert","ts":1535179251,"xid":205516,"commit":true,"data":{"id":1,"name":"aa","age":23,"as":"bb"}}
{"database":"test","table":"mytest","type":"update","ts":1535179973,"xid":206703,"commit":true,"data":{"id":1,"name":"aa","age":23,"as":"cc"},"old":{"as":"bb"}}
字段说明
data,最新的数据,修改后的数据
old,旧数据,修改前的数据
type,操作类型,有insert, update, delete, database-create, database-alter, database-drop, table-create, table-alter, table-drop,bootstrap-insert,int(未知类型)
xid,事务id
commit,同一个xid代表同一个事务,事务的最后一条语句会有commit,可以利用这个重现事务
server_id,thread_id,
运行程序时添加参数--output_ddl,可以捕捉到ddl语句
datetime列会输出为"YYYY-MM-DD hh:mm:ss",如果遇到"0000-00-00 00:00:00"会原样输出
maxwell支持多种编码,但仅输出utf8编码
maxwell的TIMESTAMP总是作为UTC处理,如果要调整为自己的时区,需要在后端逻辑上进行处理
完整参数如下:
参数
值类型
描述
默认值
全局参数
config
字符串
指定config.properties配置文件路径
当前目录$PWD
log_level
debug,info,warn,error
日志级别
info
daemon
maxwell作为守护进程运行
env_config_prefix
字符串
作为配置项对待的环境变量前缀,如FOO_
**mysql参数
**
host
字符串
mysql host
localhost
user
字符串
mysql username
password
字符串
mysql password
(no password)
port
INT
mysql port
3306
jdbc_options
字符串
mysql jdbc 连接串
DEFAULT_JDBC*
ssl
SSL_OPT
SSL启动
DISABLED
schema_database
字符串
存储表结构和binlog位点的数据库名称
maxwell
client_id
字符串
maxwell实例唯一定义文本
maxwell
replica_server_id
LONG
maxwell实例唯一定义编号,对应于MySQL server_id
6379
master_recovery
布尔值
启用实验性master恢复代码
false
gtid_mode
布尔值
启动基于GTID的复制
false
recapture_schema
布尔值
重新获得最新表结构. 配置文件中不适用
false
**replication参数,用于master-slave的架构,在slave上不方便创建maxwell库,从slave获取binlog
**
replication_host
字符串
主机
schema-store host
replication_password
字符串
密码
(none)
replication_port
INT
端口
3306
replication_user
字符串
用户
replication_ssl
SSL_OPT*
启用SSL
DISABLED
**schema参数,仅用于复制代理的情形,获取表结构
**
schema_host
字符串
主机
schema-store host
schema_password
字符串
密码
(none)
schema_port
INT
端口
3306
schema_user
字符串
用户
schema_ssl
SSL_OPT*
启用SSL
DISABLED
**producer参数
**
producer
PRODUCER_TYPE*
消费者类型
stdout
custom_producer.factory
CLASS_NAME
自定义消费者的工厂类
producer_ack_timeout
异步消费认为消息丢失的超时时间,毫秒ms
producer_partition_by
PARTITION_BY*
输入到kafka/kinesis的分区函数
database
producer_partition_columns
字符串
若按列分区,以逗号分隔的列名称
producer_partition_by_fallback
PARTITION_BY_F*
按列分区时需要,当列不存在是使用
ignore_producer_error
布尔值
为false时,在kafka/kinesis发生错误时退出程序;为true时,仅记录日志
true
**producer=file
**
output_file
字符串
输出的文件路径
javascript
字符串
指定javascript过滤器文件
**producer=kafka
**
kafka.bootstrap.servers
字符串
kafka brokers, 格式 HOST:PORT[,HOST:PORT]
kafka_topic
字符串
kafka topic
maxwell
kafka_version
KAFKA_VERSION*
kafka版本. 配置文件config.properties中不适用
0.11.0.1
kafka_partition_hash
default,murmur3
kafka分区hash函数
default
kafka_key_format
array,hash
maxwell输出kafka keys的格式
hash
ddl_kafka_topic
字符串
若output_ddl为true, kafka topic
kafka_topic
**producer=kinesis
**
kinesis_stream
字符串
kinesis stream name
producer=sqs
sqs_queue_uri
字符串
SQS Queue URI
producer=pubsub
pubsub_topic
字符串
Google Cloud pub-sub topic
pubsub_platform_id
字符串
Google Cloud platform id associated with topic
ddl_pubsub_topic
字符串
Google Cloud pub-sub topic to send DDL events to
producer=rabbitmq
rabbitmq_user
字符串
用户名
guest
rabbitmq_pass
字符串
密码
guest
rabbitmq_host
字符串
主机
rabbitmq_port
INT
端口
rabbitmq_virtual_host
字符串
虚拟主机
rabbitmq_exchange
字符串
交换器名称
rabbitmq_exchange_type
字符串
交换器类型
rabbitmq_exchange_durable
布尔值
交换器是否持久化
false
rabbitmq_exchange_autodelete
布尔值
为true时,当所有队列都完成时,删除交换器
false
rabbitmq_routing_key_template
字符串
路由键模板,可用 %db%
和 %table%
作为变量
%db%.%table%
.
rabbitmq_message_persistent
布尔值
启用消息持久化
false
rabbitmq_declare_exchange
布尔值
需要声明交换器
true
**producer=redis
**
redis_host
字符串
主机
localhost
redis_port
INT
端口
6379
redis_auth
字符串
密码
redis_database
INT
数据库[0-15]
0
redis_pub_channel
字符串
Pub/Sub模式的chanal名
maxwell
redis_list_key
字符串
LPUSH模式的列表名
maxwell
redis_type
pubsub,lpush
模式选择
pubsub
formatting格式化
output_binlog_position
布尔值
包含binlog位点
false
output_gtid_position
布尔值
包含gtid位点
false
output_commit_info
布尔值
包含commit和xid
true
output_xoffset
布尔值
包含虚拟的行偏移
false
output_nulls
布尔值
包含NULL值
true
output_server_id
布尔值
包含server_id
false
output_thread_id
布尔值
包含thread_id
false
output_row_query
布尔值
包含INSERT/UPDATE/DELETE语句. Mysql需要开启binlog_rows_query_log_events
false
output_ddl
布尔值
包含DDL (table-alter, table-create, etc)事件
false
filtering过滤器
filter
字符串
过滤规则,如 exclude: db.*, include: *.tbl, include: *./bar(bar)?/, exclude: foo.bar.col=val
encryption加密
encrypt
none,data,all
加密模式: none不加密,data仅加密data字段,all加密整个消息
none
secret_key
字符串
encryption key
null
**monitoring / metrics指标
**
metrics_prefix
字符串
指标前缀
MaxwellMetrics
metrics_type
slf4j,jmx,http,datadog
指标类型
metrics_jvm
布尔值
启用jvm指标: memory usage, GC stats等
false
metrics_slf4j_interval
SECONDS
slf4j的频率指标,秒
60
http_port
INT
http方式的端口
8080
http_path_prefix
字符串
http方式的路径前缀
/
http_bind_address
字符串
http绑定的地址
all addresses
http_diagnostic
布尔值
启用http诊断
false
http_diagnostic_timeout
MILLISECONDS
http诊断响应超时
10000
metrics_datadog_type
udp,http
datadog类型
udp
metrics_datadog_tags
字符串
datadog tags如 tag1:value1,tag2:value2
metrics_datadog_interval
INT
datadog推的频率,秒
60
metrics_datadog_apikey
字符串
datadog api key仅当metrics_datadog_type = http
metrics_datadog_host
字符串
目标主机,仅当metrics_datadog_type = udp
localhost
metrics_datadog_port
INT
端口,仅当metrics_datadog_type = udp
8125
其它
bootstrapper
async,sync,none
bootstrapper类型
async
init_position
FILE:POS[:HEARTBEAT]
从指定位点启动,配置文件中不适用
replay
布尔值
启用只读重放模式,不存储binlog位点或结构变更,配置文件中不适用
SSL_OPT*:[ DISABLED | PREFERRED | REQUIRED | VERIFY_CA | VERIFY_IDENTITY ]
PRODUCER_TYPE*:[ stdout | file | kafka | kinesis | pubsub | sqs | rabbitmq | redis ]
DEFAULT_JDBC*:zeroDateTimeBehavior=convertToNull&connectTimeout=5000
PARTITION_BY*:[ database | table | primary_key | column ]
PARTITION_BY_F*:[ database | table | primary_key ]
KAFKA_VERSION*:[ 0.8.2.2 | 0.9.0.1 | 0.10.0.1 | 0.10.2.1 | 0.11.0.1 ]
配置优先级:
命令行 -> 环境变量 -> 配置文件 -> 默认值
Maxwell的角色区分:
--host,指定表结构来源的主机,即建maxwell库的主机
--replication_host,指定binlog来源的主机,用于master-slave结构中,不能在slave中建maxwell库的情况
--schema_host,指定表结构的主机,仅用于复制代理的情况
注,bootstrapping不适用于host和replication分离的情形
Maxwell多实例:
若需要运行多个Maxwell,需要为每个实例配置不同的client_id,以存储不同的binlog位点;同时,还需要配置replica_server_id,指定一个不同的server_id,如
./maxwell --user='canal' --password='canal' --host='127.0.0.1' --producer=stdout --client_id=maxwell--replica_server_id=6379 #默认
./maxwell --user='canal' --password='canal' --host='127.0.0.1' --producer=stdout --client_id=my --replica_server_id=2345
Maxwell过滤:
--filter = 'exclude: foodb.*, include: foodb.tbl, include: foodb./table_\d+/'
--filter = 'exclude: ., include: db1.*'
--filter = 'exclude: db.tbl.col = reject'
--filter = 'exclude: ..col_a = *'
--filter = 'blacklist: bad_db.*'
Maxwell-bootstrap功能:
使用maxwell-bootstrap时,可用的参数有
--log_level,日志级别
--user,用户名
--password,密码
--host,主机
--port,端口
--database,包含需要bootstrap的表的数据库名
--table,需要bootstrap的表名
--where,限制条件
--client_id,maxwell实例名称
或者,也可以直接在maxwell.bootstrap表中手动添加触发同步
insert into maxwell.bootstrap (database_name, table_name) values ('fooDB', 'barTable');
示例:
bin/maxwell-bootstrap --config localhost.properties --database foobar --table test --log_level info
bin/maxwell-bootstrap --config localhost.properties --database foobar --table test --where "my_date >= '2017-01-07 00:00:00'" --log_level info
注意--bootstrapper=sync时,在处理bootstrap时,会阻塞正常的binlog解析;--bootstrapper=async时,不会阻塞
json中的type
bootstrap-start -> bootstrap-insert -> bootstrap-complete #其中,start和complete的data字段为空,不携带数据
在进行bootstrap过程中,若maxwell崩溃,重启时,bootstrap会完全重新开始,不管之前进行到多少,若不希望,可以设置complete字段值为1(完成),或者删除该行
Maxwell-bootstrap实战:
maxwell-boot工具的作用其实就是在maxwell库的bootstrap表中添加一条记录,只需要添加database_name,table_name,client_id三个字段,也可以直接使用SQL语句在表中插入,maxwell-boot工具只是一个比较方便的方式
主执行程序:
./maxwell --user='maxwell' --password='XXXXXX' --host=192.168.31.77 --producer=stdout
启动一个maxwell-bootstrap
./maxwell-bootstrap --user='maxwell' --password='XXXXXX' --host=192.168.31.77 --database ecard --table t_persons --log_level info --client_id maxwell
11:33:08,746 INFO MaxwellBootstrapConnectionPool - MaxwellBootstrapConnectionPool: Getting connection (user/password): jdbc:mysql://192.168.31.77:3306/maxwell
Sun Aug 26 11:33:08 CST 2018 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
11:33:09,124 INFO MaxwellBootstrapConnectionPool - MaxwellBootstrapConnectionPool: Created a new connection
11:33:09,125 INFO MaxwellBootstrapUtility - counting rows
11:33:09,135 INFO MaxwellBootstrapUtility - inserting bootstrap start row
11:33:09,645 INFO MaxwellBootstrapUtility - done.
bootstrap导出的数据会通过maxwell(即第一个程序进行输出)
Maxwell监控/指标:
所有指标目前都是针对kafka的
当启动HTTP节点后,会有以下路径
/metrics,返回json格式的指标数据
/healthcheck,健康检查,如果大于0说明在过去15min内有错误
/ping,返回pong
导出JMX信息,需要预先设置JAVA_OPTS
config config.properties, 配置文件
log_level info,日志级别
daemon, 守护进程
schema_database maxwell, 数据库
client_id maxwell, maxwell实例名称
replica_server_id 6379, 类似于server_id,多实例
filter, 过滤器
output_binlog_position true, 记录binlog位点
output_ddl true, 记录ddl变更
1、输出到stdout,json包含binlog位点,包含ddl语句
./maxwell --user='maxwell' --password='XXXXXX' --host=192.168.31.77 --output_binlog_position=true --output_ddl=true --producer=stdout
位点:"position":"mysql-binlog.000011:4170905"
2、输出到redis,使用队列的方式
./maxwell --user='maxwell' --password='XXXXXX' --host=192.168.31.77 --output_binlog_position=true --output_ddl=true --producer=redis --redis_host=192.168.31.77 --redis_auth=xxxxxx --redis_type=lpush --redis_database=5 --redis_list_key=maxwell
./maxwell --user='maxwell' --password='XXXXXX' --host=192.168.31.77 --output_binlog_position=true --output_ddl=true --producer=redis --redis_host=192.168.31.77 --redis_auth=xxxxxx --redis_type=lpush --redis_database=5 --redis_list_key=maxwell --daemon
3、输出到rabbitmq
./maxwell --user='maxwell' --password='XXXXXX' --host=192.168.31.77 --output_binlog_position=true --output_ddl=true --producer=rabbitmq --rabbitmq_host=192.168.31.78 --rabbitmq_user=maxwell --rabbitmq_pass=maxwell --rabbitmq_virtual_host=maxwell_vhost --rabbitmq_exchange=maxwell --rabbitmq_exchange_type=topic --rabbitmq_routing_key_template=%db%.%table% --rabbitmq_declare_exchange=true
4、使用config.properties配置文件
cat config.properties
user=maxwell
password=XXXXXX
host=192.168.31.77
output_binlog_position=true
output_ddl=true
producer=rabbitmq
rabbitmq_host=192.168.31.78
rabbitmq_user=maxwell
rabbitmq_pass=maxwell
rabbitmq_virtual_host=maxwell_vhost
rabbitmq_exchange=maxwell
rabbitmq_exchange_type=topic
rabbitmq_routing_key_template=%db%.%table%
rabbitmq_declare_exchange=true
运行程序
./maxwell --config=config.properties --daemon
参考:
1、自建Binlog订阅服务--Maxwell http://seanlook.com/2018/01/13/maxwell-binlog/
转载请注明:轻风博客 » MySQL Binlog解析工具Maxwell 1.17.1 的安装和使用
喜欢 (2)or分享 (0)
Original url: Access
Created at: 2019-09-10 21:31:08
Category: default
Tags: none
未标明原创文章均为采集,版权归作者所有,转载无需和我联系,请注明原出处,南摩阿彌陀佛,知识,不只知道,要得到
最新评论