MySQL Binlog解析工具Maxwell 1.17.1 的安装和使用 - 轻风博客

目录:

[显示)]

目前MySQL Binlog解析工具主要有阿里的canal、maxwell和mysql_streamer,三个工具对照如下:

其中阿里开源的canal(https://github.com/alibaba/canal)当前稳定版本为v1.1.0,据官网介绍,这次版本整体性能提升了150%,是一个里程碑式的重大版本。canal基于java开发,分为服务端和客户端,拥有众多的衍生应用,性能稳定,功能强大。

maxwell(https://github.com/zendesk/maxwell)由zendesk开源,官网为http://maxwells-daemon.io/,也是由java开发,体量没有canal那么大,解析出来的结果为json,可以方便的发送到kafka、rabbitmq、redis等下游应用,进行处理。

mysql_streamer则是由python开发的binlog解析工具,使用相对较少。

Maxwell功能

1、支持SELECT * FROM table 的方式进行全量数据初始化

2、支持在主库发生failover后,自动恢复binlog位置(GTID)

3、可以对数据进行分区,解决数据倾斜问题,发送到kafka的数据支持database、table、column等级别的数据分区

4、工作方式是伪装为Slave,接收binlog events,然后根据schemas信息拼装,可以接受ddl、xid、row等各种event

maxwell相对于canal的优势是使用简单,因为使用canal需要自己编写客户端来消费canal解析到的数据,而maxwell则不同,它直接输出数据变更的json串,不需要再编写客户端。maxwell可以看作是canal服务端+canal客户端

Maxwell安装

部署java和maven环境

文件版本:
jdk-8u161-linux-x64.tar.gz
apache-maven-3.5.4-bin.tar.gz 官网 下载:http://mirrors.hust.edu.cn/apache/maven/maven-3/3.5.4/binaries/apache-maven-3.5.4-bin.tar.gz

mkdir -p /opt/java_maven_env
mv jdk-8u161-linux-x64.tar.gz apache-maven-3.5.4-bin.tar.gz /opt/java_maven_env
#解压
cd /opt/java_maven_env
tar zxf jdk-8u161-linux-x64.tar.gz
tar zxf apache-maven-3.5.4-bin.tar.gz
#添加环境变量
vi /etc/profile  在末尾添加以下几行
JAVA_HOME=/opt/java_maven_env/jdk1.8.0_161
JRE_HOME=/opt/java_maven_env/jdk1.8.0_161/jre
CLASS_PATH=.:$JAVA\_HOME/lib:$JRE_HOME/lib
MAVEN_HOME=/opt/java_maven_env/apache-maven-3.5.4
PATH=$PATH:$JAVA_HOME/bin:$JRE\_HOME/bin:$MAVEN_HOME/bin
export JAVA_HOME JRE_HOME CLASS_PATH MAVEN_HOME PATH
#使环境变量生效 (注,去掉maven部分,即jdk环境)
source /etc/profile
#测试
java -version
java version "1.8.0_161"
Java(TM) SE Runtime Environment (build 1.8.0_161-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.161-b12, mixed mode)
mvn -v
Apache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; 2018-06-18T02:33:14+08:00)
Maven home: /opt/java_maven_env/apache-maven-3.5.4
Java version: 1.8.0_161, vendor: Oracle Corporation, runtime: /opt/java_maven_env/jdk1.8.0_161/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "3.10.0-514.el7.x86_64", arch: "amd64", family: "unix"

编译maxwell 1.17.1

项目地址:https://github.com/zendesk/maxwell
下载地址:https://github.com/zendesk/maxwell/releases/download/v1.17.1/maxwell-1.17.1.tar.gz  #已编译好的版本
源码:https://github.com/zendesk/maxwell/archive/v1.17.1.tar.gz

这里演示源码的编译,当然,也可以直接下载使用编译好的文件maxwell-1.17.1.tar.gz

下载

wget https://github.com/zendesk/maxwell/archive/v1.17.1.tar.gz

解压并进入目录

tar zxf v1.17.1.tar.gz
cd maxwell-1.17.1/

编译并打包文件(实际调用的是maven)

make package  #这个过程在我的机器上执行大约用时30min

如果运行测试,需要有gcc环境 make test

最终,会在maxwell-1.17.1/target目录下生成目标文件maxwell-1.17.1.tar.gz (48M)

Maxwell的使用

前置条件

1、MySQL需要开启binlog功能,binlog格式需为row,要设置一个唯一的server_id,binlog_row_image应该为FULL(默认就是FULL)

vi /etc/my.cnf  #仅列相关配置项
[mysqld]
server_id=1234
log-bin=bin-log
binlog_format=row

binlog_row_image=FULL

GTID模式

vi /etc/my.cnf  #仅列相关配置项
[mysqld]
server_id=1234
log-bin=bin-log
binlog_format=row

binlog_row_image=FULL

gtid-mode=ON
log-slave-updates=ON
enforce-gtid-consistency=true

2、Maxwell需要建一个数据库,用于存放状态信息,默认库名为maxwell,该库在maxwell启动时会自动创建,但是需要为其设置相应权限;maxwell运行时需要获取复制状态信息,以及在information_schema中获取数据库及字段信息,所以还需要授予相应权限

CREATE USER 'maxwell'@'%' identified by 'XXXXXX';
GRANT ALL on maxwell.* to 'maxwell'@'%' ;
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on . to 'maxwell'@'%';
FLUSH PRIVILEGES;

3、java环境,maxwell运行需要java

简单测试

现在,就可以运行maxwell做测试了

tar zxf maxwell-1.17.1.tar.gz && cd maxwell-1.17.1
chown -R root:root *

producer=stdout 直接输入到终端窗口

bin/maxwell --user='maxwell' --password='XXXXXX' --host=192.168.31.77 --producer=stdout
Using kafka version: 1.0.0
14:40:23,761 WARN MaxwellMetrics - Metrics will not be exposed: metricsReportingType not configured.
14:40:24,120 INFO SchemaStoreSchema - Creating maxwell database
14:40:24,240 INFO Maxwell - Maxwell v1.17.1 is booting (StdoutProducer), starting at Position[BinlogPosition[mysql-binlog.000011:2539732], lastHeartbeat=0]
14:40:24,363 INFO AbstractSchemaStore - Maxwell is capturing initial schema
14:40:24,801 INFO BinlogConnectorReplicator - Setting initial binlog pos to: mysql-binlog.000011:2539732
14:40:24,965 INFO BinaryLogClient - Connected to 192.168.31.77:3306 at mysql-binlog.000011/2539732 (sid:6379, cid:4896)
14:40:24,965 INFO BinlogConnectorLifecycleListener - Binlog connected.
{"database":"test","table":"mytest","type":"insert","ts":1535179251,"xid":205516,"commit":true,"data":{"id":1,"name":"aa","age":23,"as":"bb"}}

可以看到maxwell在启动时,会先创建maxwell数据库,然后获取当前binlog位点,初始化表结构,然后就开始解析binlog日志了,最后一条即解析的json串

如果使用的是docker镜像,则命令类似
docker pull zendesk/maxwell
docker run -it --rm zendesk/maxwell bin/maxwell --user=maxwell --password=XXXXXX --host=192.168.31.77 --producer=stdout

 输出json串说明
{"database":"test","table":"mytest","type":"insert","ts":1535179251,"xid":205516,"commit":true,"data":{"id":1,"name":"aa","age":23,"as":"bb"}}
{"database":"test","table":"mytest","type":"update","ts":1535179973,"xid":206703,"commit":true,"data":{"id":1,"name":"aa","age":23,"as":"cc"},"old":{"as":"bb"}}
字段说明

data,最新的数据,修改后的数据

old,旧数据,修改前的数据

type,操作类型,有insert, update, delete, database-create, database-alter, database-drop, table-create, table-alter, table-drop,bootstrap-insert,int(未知类型)

xid,事务id

commit,同一个xid代表同一个事务,事务的最后一条语句会有commit,可以利用这个重现事务

server_id,thread_id,

运行程序时添加参数--output_ddl,可以捕捉到ddl语句

datetime列会输出为"YYYY-MM-DD hh:mm:ss",如果遇到"0000-00-00 00:00:00"会原样输出

maxwell支持多种编码,但仅输出utf8编码
maxwell的TIMESTAMP总是作为UTC处理,如果要调整为自己的时区,需要在后端逻辑上进行处理

参数说明

完整参数如下:

参数

值类型

描述

默认值

全局参数

config

字符串

指定config.properties配置文件路径

当前目录$PWD

log_level

debug,info,warn,error

日志级别

info

daemon

maxwell作为守护进程运行

env_config_prefix

字符串

作为配置项对待的环境变量前缀,如FOO_

**mysql参数
**

host

字符串

mysql host

localhost

user

字符串

mysql username

password

字符串

mysql password

(no password)

port

INT

mysql port

3306

jdbc_options

字符串

mysql jdbc 连接串

DEFAULT_JDBC*

ssl

SSL_OPT

SSL启动

DISABLED

schema_database

字符串

存储表结构和binlog位点的数据库名称

maxwell

client_id

字符串

maxwell实例唯一定义文本

maxwell

replica_server_id

LONG

maxwell实例唯一定义编号,对应于MySQL server_id

6379

master_recovery

布尔值

启用实验性master恢复代码

false

gtid_mode

布尔值

启动基于GTID的复制

false

recapture_schema

布尔值

重新获得最新表结构. 配置文件中不适用

false

**replication参数,用于master-slave的架构,在slave上不方便创建maxwell库,从slave获取binlog
**

replication_host

字符串

主机

schema-store host

replication_password

字符串

密码

(none)

replication_port

INT

端口

3306

replication_user

字符串

用户

replication_ssl

SSL_OPT*

启用SSL

DISABLED

**schema参数,仅用于复制代理的情形,获取表结构
**

schema_host

字符串

主机

schema-store host

schema_password

字符串

密码

(none)

schema_port

INT

端口

3306

schema_user

字符串

用户

schema_ssl

SSL_OPT*

启用SSL

DISABLED

**producer参数
**

producer

PRODUCER_TYPE*

消费者类型

stdout

custom_producer.factory

CLASS_NAME

自定义消费者的工厂类

producer_ack_timeout

异步消费认为消息丢失的超时时间,毫秒ms

producer_partition_by

PARTITION_BY*

输入到kafka/kinesis的分区函数

database

producer_partition_columns

字符串

若按列分区,以逗号分隔的列名称

producer_partition_by_fallback

PARTITION_BY_F*

按列分区时需要,当列不存在是使用

ignore_producer_error

布尔值

为false时,在kafka/kinesis发生错误时退出程序;为true时,仅记录日志

true

**producer=file
**

output_file

字符串

输出的文件路径

javascript

字符串

指定javascript过滤器文件

**producer=kafka
**

kafka.bootstrap.servers

字符串

kafka brokers, 格式 HOST:PORT[,HOST:PORT]

kafka_topic

字符串

kafka topic

maxwell

kafka_version

KAFKA_VERSION*

kafka版本. 配置文件config.properties中不适用

0.11.0.1

kafka_partition_hash

default,murmur3

kafka分区hash函数

default

kafka_key_format

array,hash

maxwell输出kafka keys的格式

hash

ddl_kafka_topic

字符串

若output_ddl为true, kafka topic

kafka_topic

**producer=kinesis
**

kinesis_stream

字符串

kinesis stream name

producer=sqs

sqs_queue_uri

字符串

SQS Queue URI

producer=pubsub

pubsub_topic

字符串

Google Cloud pub-sub topic

pubsub_platform_id

字符串

Google Cloud platform id associated with topic

ddl_pubsub_topic

字符串

Google Cloud pub-sub topic to send DDL events to

producer=rabbitmq

rabbitmq_user

字符串

用户名

guest

rabbitmq_pass

字符串

密码

guest

rabbitmq_host

字符串

主机

rabbitmq_port

INT

端口

rabbitmq_virtual_host

字符串

虚拟主机

rabbitmq_exchange

字符串

交换器名称

rabbitmq_exchange_type

字符串

交换器类型

rabbitmq_exchange_durable

布尔值

交换器是否持久化

false

rabbitmq_exchange_autodelete

布尔值

为true时,当所有队列都完成时,删除交换器

false

rabbitmq_routing_key_template

字符串

路由键模板,可用 %db%%table% 作为变量

%db%.%table%.

rabbitmq_message_persistent

布尔值

启用消息持久化

false

rabbitmq_declare_exchange

布尔值

需要声明交换器

true

**producer=redis
**

redis_host

字符串

主机

localhost

redis_port

INT

端口

6379

redis_auth

字符串

密码

redis_database

INT

数据库[0-15]

0

redis_pub_channel

字符串

Pub/Sub模式的chanal名

maxwell

redis_list_key

字符串

LPUSH模式的列表名

maxwell

redis_type

pubsub,lpush

模式选择

pubsub

formatting格式化

output_binlog_position

布尔值

包含binlog位点

false

output_gtid_position

布尔值

包含gtid位点

false

output_commit_info

布尔值

包含commit和xid

true

output_xoffset

布尔值

包含虚拟的行偏移

false

output_nulls

布尔值

包含NULL值

true

output_server_id

布尔值

包含server_id

false

output_thread_id

布尔值

包含thread_id

false

output_row_query

布尔值

包含INSERT/UPDATE/DELETE语句. Mysql需要开启binlog_rows_query_log_events

false

output_ddl

布尔值

包含DDL (table-alter, table-create, etc)事件

false

filtering过滤器

filter

字符串

过滤规则,如 exclude: db.*, include: *.tbl, include: *./bar(bar)?/, exclude: foo.bar.col=val

encryption加密

encrypt

none,data,all

加密模式: none不加密,data仅加密data字段,all加密整个消息

none

secret_key

字符串

encryption key

null

**monitoring / metrics指标
**

metrics_prefix

字符串

指标前缀

MaxwellMetrics

metrics_type

slf4j,jmx,http,datadog

指标类型

metrics_jvm

布尔值

启用jvm指标: memory usage, GC stats等

false

metrics_slf4j_interval

SECONDS

slf4j的频率指标,秒

60

http_port

INT

http方式的端口

8080

http_path_prefix

字符串

http方式的路径前缀

/

http_bind_address

字符串

http绑定的地址

all addresses

http_diagnostic

布尔值

启用http诊断

false

http_diagnostic_timeout

MILLISECONDS

http诊断响应超时

10000

metrics_datadog_type

udp,http

datadog类型

udp

metrics_datadog_tags

字符串

datadog tags如 tag1:value1,tag2:value2

metrics_datadog_interval

INT

datadog推的频率,秒

60

metrics_datadog_apikey

字符串

datadog api key仅当metrics_datadog_type = http

metrics_datadog_host

字符串

目标主机,仅当metrics_datadog_type = udp

localhost

metrics_datadog_port

INT

端口,仅当metrics_datadog_type = udp

8125

其它

bootstrapper

async,sync,none

bootstrapper类型

async

init_position

FILE:POS[:HEARTBEAT]

从指定位点启动,配置文件中不适用

replay

布尔值

启用只读重放模式,不存储binlog位点或结构变更,配置文件中不适用

SSL_OPT*:[ DISABLED | PREFERRED | REQUIRED | VERIFY_CA | VERIFY_IDENTITY ]
PRODUCER_TYPE*:[ stdout | file | kafka | kinesis | pubsub | sqs | rabbitmq | redis ]
DEFAULT_JDBC*:zeroDateTimeBehavior=convertToNull&connectTimeout=5000
PARTITION_BY*:[ database | table | primary_key | column ]
PARTITION_BY_F*:[ database | table | primary_key ]
KAFKA_VERSION*:[ 0.8.2.2 | 0.9.0.1 | 0.10.0.1 | 0.10.2.1 | 0.11.0.1 ]

配置优先级
命令行 -> 环境变量 -> 配置文件 -> 默认值

Maxwell的角色区分
--host,指定表结构来源的主机,即建maxwell库的主机
--replication_host,指定binlog来源的主机,用于master-slave结构中,不能在slave中建maxwell库的情况
--schema_host,指定表结构的主机,仅用于复制代理的情况
注,bootstrapping不适用于host和replication分离的情形

Maxwell多实例
若需要运行多个Maxwell,需要为每个实例配置不同的client_id,以存储不同的binlog位点;同时,还需要配置replica_server_id,指定一个不同的server_id,如
./maxwell --user='canal' --password='canal' --host='127.0.0.1' --producer=stdout --client_id=maxwell--replica_server_id=6379 #默认
./maxwell --user='canal' --password='canal' --host='127.0.0.1' --producer=stdout --client_id=my --replica_server_id=2345

Maxwell过滤

仅匹配foodb数据库的tbl表和所有table_数字的表

--filter = 'exclude: foodb.*, include: foodb.tbl, include: foodb./table_\d+/'

排除所有库所有表,仅匹配db1数据库

--filter = 'exclude: ., include: db1.*'

排除含db.tbl.col列值为reject的所有更新

--filter = 'exclude: db.tbl.col = reject'

排除任何包含col_a列的更新

--filter = 'exclude: ..col_a = *'

完全排除bad_db数据库,若要恢复,必须删除maxwell库

--filter = 'blacklist: bad_db.*'

Maxwell-bootstrap功能
使用maxwell-bootstrap时,可用的参数有
--log_level,日志级别
--user,用户名
--password,密码
--host,主机
--port,端口
--database,包含需要bootstrap的表的数据库名
--table,需要bootstrap的表名
--where,限制条件
--client_id,maxwell实例名称
或者,也可以直接在maxwell.bootstrap表中手动添加触发同步
insert into maxwell.bootstrap (database_name, table_name) values ('fooDB', 'barTable');
示例:
bin/maxwell-bootstrap --config localhost.properties --database foobar --table test --log_level info
bin/maxwell-bootstrap --config localhost.properties --database foobar --table test --where "my_date >= '2017-01-07 00:00:00'" --log_level info
注意--bootstrapper=sync时,在处理bootstrap时,会阻塞正常的binlog解析;--bootstrapper=async时,不会阻塞
json中的type
bootstrap-start -> bootstrap-insert -> bootstrap-complete  #其中,start和complete的data字段为空,不携带数据
在进行bootstrap过程中,若maxwell崩溃,重启时,bootstrap会完全重新开始,不管之前进行到多少,若不希望,可以设置complete字段值为1(完成),或者删除该行

Maxwell-bootstrap实战
maxwell-boot工具的作用其实就是在maxwell库的bootstrap表中添加一条记录,只需要添加database_name,table_name,client_id三个字段,也可以直接使用SQL语句在表中插入,maxwell-boot工具只是一个比较方便的方式

主执行程序:
./maxwell --user='maxwell' --password='XXXXXX' --host=192.168.31.77 --producer=stdout
启动一个maxwell-bootstrap
./maxwell-bootstrap --user='maxwell' --password='XXXXXX' --host=192.168.31.77 --database ecard --table t_persons --log_level info  --client_id maxwell
11:33:08,746 INFO MaxwellBootstrapConnectionPool - MaxwellBootstrapConnectionPool: Getting connection (user/password): jdbc:mysql://192.168.31.77:3306/maxwell
Sun Aug 26 11:33:08 CST 2018 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
11:33:09,124 INFO MaxwellBootstrapConnectionPool - MaxwellBootstrapConnectionPool: Created a new connection
11:33:09,125 INFO MaxwellBootstrapUtility - counting rows
11:33:09,135 INFO MaxwellBootstrapUtility - inserting bootstrap start row
11:33:09,645 INFO MaxwellBootstrapUtility - done.
bootstrap导出的数据会通过maxwell(即第一个程序进行输出)

Maxwell监控/指标
所有指标目前都是针对kafka的
当启动HTTP节点后,会有以下路径
/metrics,返回json格式的指标数据
/healthcheck,健康检查,如果大于0说明在过去15min内有错误
/ping,返回pong
导出JMX信息,需要预先设置JAVA_OPTS

常用配置

config config.properties, 配置文件
log_level info,日志级别
daemon, 守护进程
schema_database maxwell, 数据库
client_id maxwell, maxwell实例名称
replica_server_id 6379, 类似于server_id,多实例
filter, 过滤器
output_binlog_position true, 记录binlog位点
output_ddl true, 记录ddl变更

1、输出到stdout,json包含binlog位点,包含ddl语句

./maxwell --user='maxwell' --password='XXXXXX' --host=192.168.31.77 --output_binlog_position=true --output_ddl=true --producer=stdout

位点:"position":"mysql-binlog.000011:4170905"

2、输出到redis,使用队列的方式

./maxwell --user='maxwell' --password='XXXXXX' --host=192.168.31.77 --output_binlog_position=true --output_ddl=true --producer=redis --redis_host=192.168.31.77 --redis_auth=xxxxxx --redis_type=lpush --redis_database=5 --redis_list_key=maxwell

./maxwell --user='maxwell' --password='XXXXXX' --host=192.168.31.77 --output_binlog_position=true --output_ddl=true --producer=redis --redis_host=192.168.31.77 --redis_auth=xxxxxx --redis_type=lpush --redis_database=5 --redis_list_key=maxwell --daemon

3、输出到rabbitmq

./maxwell --user='maxwell' --password='XXXXXX' --host=192.168.31.77 --output_binlog_position=true --output_ddl=true --producer=rabbitmq --rabbitmq_host=192.168.31.78 --rabbitmq_user=maxwell --rabbitmq_pass=maxwell --rabbitmq_virtual_host=maxwell_vhost --rabbitmq_exchange=maxwell --rabbitmq_exchange_type=topic --rabbitmq_routing_key_template=%db%.%table% --rabbitmq_declare_exchange=true

4、使用config.properties配置文件

cat config.properties
user=maxwell
password=XXXXXX
host=192.168.31.77
output_binlog_position=true
output_ddl=true
producer=rabbitmq
rabbitmq_host=192.168.31.78
rabbitmq_user=maxwell
rabbitmq_pass=maxwell
rabbitmq_virtual_host=maxwell_vhost
rabbitmq_exchange=maxwell
rabbitmq_exchange_type=topic
rabbitmq_routing_key_template=%db%.%table%
rabbitmq_declare_exchange=true
运行程序
./maxwell --config=config.properties --daemon

参考:

1、自建Binlog订阅服务--Maxwell http://seanlook.com/2018/01/13/maxwell-binlog/

转载请注明:轻风博客 » MySQL Binlog解析工具Maxwell 1.17.1 的安装和使用

喜欢 (2)or分享 (0)


Original url: Access
Created at: 2019-09-10 21:31:08
Category: default
Tags: none

请先后发表评论
  • 最新评论
  • 总共0条评论