Canal的坑你遇到了多少,全在这儿呢 - 尹忠政的博客 - CSDN博客

Canal的坑你遇到了多少,全在这儿呢

  1. 修改mysql字符集
#在[client]段增加下面代码 
default-character-set=utf8
#在[mysql]段增加下面的代码 
character-set-server=utf8 
#在[mysqld]段增加下面的代码
character-set-server=utf8
init_connect='SET NAMES utf8' (设定连接mysql数据库时使用utf8编码,以让mysql数据库为utf8运行,也可以不设置)
  1. 创建用户 canal
CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
  1. 安装canal
  • 如果es的版本不是6.x.x的话,需要重新编译client-adapter.elasticsearch
1. git clone https://github.com/alibaba/canal.git
2. 用idea打开
3. 修改代码
pom文件es相关的依赖变为es的对于版本,我的是5.6.10
com.alibaba.otter.canal.client.adapter.es.ESAdapter
transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host.substring(0, i)),
                       Integer.parseInt(host.substring(i + 1))));
               //transportClient.addTransportAddress(new TransportAddress(InetAddress.getByName(host.substring(0, i)),
                //   Integer.parseInt(host.substring(i + 1))));

注意!!!:
mysql版本不是5.6+的同学,还是升级至5.6+吧,摊牌了,实在是搞不定。由于mysql5.6在binlog中加了些内容,导致和5.5有些差异,但是在官方文档上并没有关闭5.6+的解析协议,md,实在搞不定了,还是升级吧。但是,虽然会报异常,但是还是基本可用,看你们。

canal会进行这个操作。5.5没有这个,导致开始校验的时候报错。
select @@global.binlog_checksum

  1. 更改配置文件

vim canal.propertie

# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false

# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

# binlog ddl isolation
canal.instance.get.ddl.isolation = false

# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#开启一下
canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256

# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360

# aliyun ak/sk , support rds/mq
canal.aliyun.accesskey =
canal.aliyun.secretkey =

#################################################
#########               destinations            ############# 
#################################################
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5

canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = spring
canal.instance.global.lazy = false
#canal.instance.global.manager.address = 127.0.0.1:1099
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

##################################################
#########                    MQ                      #############
##################################################

vim instance.properties

################################################
## mysql serverId , v1.0.26+ will autoGen 
#canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info
#mysql的地址
canal.instance.master.address=127.0.0.1:3306
# binlog的名称可以不设置
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
# mysql用户名和密码
canal.instance.dbUsername=root
canal.instance.dbPassword=root
canal.instance.connectionCharset = UTF-8
#canal.instance.defaultDatabaseName =test
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=

# mq config
canal.mq.topic=example
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
#################################################


  1. 启动
sh startup.sh
  1. 消费者客户端

大坑,真是坑,换了好多版本,就是收不到消息,应该是对canal内部详细原理不了解导致的,倒是客户端就是无法收到数据库变更的消息,气人不?官方稳定也没指出。。。还好解决了

getWithoutAck(int)该方法感觉是有问题,可能是自己的问题,但是至少目前是有问题的,该方法是非阻塞的,导致消息出现丢失问题,个人觉得应该是bug,还有网上的Thread.sleep(1000)来解决非阻塞的cpu性能问题,虽然可以,但是不可取,还有一个致命的问题,就是导致无法获取到服务端的消息,导致我一天都在搞这个事情,所以,大家是幸运的,使用阻塞的方法更好,只要合理控制超时时间即可

package com.yzz.es;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;


import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;

/**
 * @author yzz
 * @time 2019/3/7 14:25
 * @E-mail yzzstyle@163.com
 * @since 0.0.1
 */
public class TestCanal {
    public static void main(String[] args) throws Exception {
        start();
    }

    public static void start() throws Exception {
        CanalConnector connector = CanalConnectors.newSingleConnector(
                new InetSocketAddress("192.168.33.14", 11111), "example", "", "");

        connector.connect();
        connector.subscribe(".*");
        System.out.println("success");
        while (true) {
            Message message = connector.getWithoutAck(100, (long) 1, TimeUnit.SECONDS);
            long batchId = message.getId();

            if (batchId == -1 || message.getEntries().isEmpty()) {
                //System.out.println("sleep");
                continue;
            }
            System.out.println(batchId);
            printEntries(message.getEntries());
            connector.ack(batchId);
        }
    }

    private static void printEntries(List<Entry> entries) throws Exception {
        System.out.println("********************");
        for (Entry entry :
                entries) {
            CanalEntry.Header header = entry.getHeader();

            System.out.println("logfileName: " + header.getLogfileName());
            System.out.println("logfileOffset : " + header.getLogfileOffset());
            System.out.println("executeTime :" + header.getExecuteTime());
            System.out.println("schemaName: " + header.getSchemaName());
            System.out.println("table_name: " + header.getTableName());
            System.out.println("eventType: " + header.getEventType());
            printRowChange(RowChange.parseFrom(entry.getStoreValue()));
        }
        System.out.println("********************");
    }

    private static void printRowChange(RowChange rowChange) {
        System.out.println("rowchange");
        System.out.println(rowChange.getSql());
        List<RowData> rowDatasList = rowChange.getRowDatasList();
        for (RowData rowData :
                rowDatasList) {
            List<Column> beforeColumnsList = rowData.getBeforeColumnsList();
            List<Column> afterColumnsList = rowData.getAfterColumnsList();
            System.out.println("before");
            printColum(beforeColumnsList);
            System.out.println("after");
            printColum(afterColumnsList);
        }
    }

    private static void printColum(List<Column> columns) {
        for (Column column :
                columns) {
            System.out.println(column.getName() + ":" + column.getValue());
        }
    }


    private static void printColumns(List<Column> columns) {
        String line = columns.stream()
                .map(column -> column.getName() + "=" + column.getValue())
                .collect(Collectors.joining(","));
        System.out.println(line);
    }
}

好啦,谈谈

准备是做一个mysql到es实时同步的组件的,想用canal,但是一天都在调试这个canal,哎,太菜了,加油。愿共同进步,希望我的笔记能让你在实际工作中节省点时间,嘻嘻。


Original url: Access
Created at: 2019-09-10 20:26:08
Category: default
Tags: none

请先后发表评论
  • 最新评论
  • 总共0条评论