自助搭建mysql-canal-kafka通道 - 一个大龄DBA的修炼之路 - CSDN博客

该canal集成了java,无须繁琐的安装,直接配置几个参数即可实现mysql-canal-kafka

Mysql 环境配置

一、binlog 格式设置

调整 mysql 参数至如下即可。

mysql> show variables like 'binlog_format';+---------------+-------+| Variable_name | Value |+---------------+-------+| binlog_format | ROW   |+---------------+-------+1 row in set (0.00 sec)

参考配置(my.cnf):

[mysqld]

character_set_server = utf8

init_connect         = `'SET NAMES utf8'`

server-id            = `1`

log_bin              = mysql-bin

binlog_format        = row

expire_logs_days     = `30`

sql_mode             = NO_ENGINE_SUBSTITUTION,STRICT_TRANS_TABLES

修改好上述配置后,最好再运行一次 flush logs 命令

二、super privilege user

线下测试默认数据库是可以用 root 的,有 root 用 root 即可。

Kafka 环境配置

这个安装比较简单,网上都有,掠过,canal需要配置kafka的broker地址和topic

Zookeeper 环境配置

线上使用的 zookeeper 为 3.4.6 版本,部署到某个地方,微调 zoo.cfg,启动即可。

Canal 环境配置

一、部署

下载 canal-lite.xxx (附件过大无法上传,有需求请留言)到目标位置,运行以下命令:

cat canal-lite.x* > canal.tar.gztar -zxvf canal.tar.gz

canal 需要 java 版本 >= 1.7 才能运行。开启和关闭方式分别为 sh bin/startup.sh 和 sh bin/stop.sh

二、配置修改

canal 需要修改的配置在以下两个位置:

conf/canal.properties

conf/example/instance.properties

其中 example 为 destination 名称,如果和其他 canal 混用 zookeeper,请保证 destination 不会冲突。

conf/canal.properties 需要修改的参数

1

2

3

4

5

# canal 本身会绑定到一个端口上,需要保证这个端口没有被占用

canal.port = `3406`

# canal 所依赖的 zookeeper 地址

canal.zkServers = `127.0.0.1:2181`

conf/destination/instance.properties 需要修改的参数

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

# canal 连接至 mysql 时所使用的 slave id,需要保证连接至同一个 mysql 的 id 互相不冲突

canal.instance.mysql.slaveId = `123456`

# canal 连接至 mysql 时所使用的地址/账号/密码

canal.instance.master.address = `127.0.0.1:3306`

canal.instance.dbUsername = root

canal.instance.dbPassword = `123456`

# mapping database to topicstrategy

# format: db1:topic1,db2:topic2,...

canal.instance.topicMapping        = orders:db_test,test:db_test

# partition strategy

canal.instance.partitionByPrimaryEnable = `true`

# partition by a specific column, only effective when partitionByPrimary = False

# format: name of the column

canal.instance.partitionColumn     =

# kafka broker address

canal.brokerAddress                =


Original url: Access
Created at: 2019-09-10 18:53:44
Category: default
Tags: none

请先后发表评论
  • 最新评论
  • 总共0条评论