#在[client]段增加下面代码
default-character-set=utf8
#在[mysql]段增加下面的代码
character-set-server=utf8
#在[mysqld]段增加下面的代码
character-set-server=utf8
init_connect='SET NAMES utf8' (设定连接mysql数据库时使用utf8编码,以让mysql数据库为utf8运行,也可以不设置)
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
1. git clone https://github.com/alibaba/canal.git
2. 用idea打开
3. 修改代码
pom文件es相关的依赖变为es的对于版本,我的是5.6.10
com.alibaba.otter.canal.client.adapter.es.ESAdapter
transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host.substring(0, i)),
Integer.parseInt(host.substring(i + 1))));
//transportClient.addTransportAddress(new TransportAddress(InetAddress.getByName(host.substring(0, i)),
// Integer.parseInt(host.substring(i + 1))));
注意!!!:
mysql版本不是5.6+的同学,还是升级至5.6+吧,摊牌了,实在是搞不定。由于mysql5.6在binlog中加了些内容,导致和5.5有些差异,但是在官方文档上并没有关闭5.6+的解析协议,md,实在搞不定了,还是升级吧。但是,虽然会报异常,但是还是基本可用,看你们。
canal会进行这个操作。5.5没有这个,导致开始校验的时候报错。
select @@global.binlog_checksum
vim canal.propertie
# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
# binlog ddl isolation
canal.instance.get.ddl.isolation = false
# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#开启一下
canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256
# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360
# aliyun ak/sk , support rds/mq
canal.aliyun.accesskey =
canal.aliyun.secretkey =
#################################################
######### destinations #############
#################################################
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
canal.instance.global.mode = spring
canal.instance.global.lazy = false
#canal.instance.global.manager.address = 127.0.0.1:1099
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml
##################################################
######### MQ #############
##################################################
vim instance.properties
################################################
## mysql serverId , v1.0.26+ will autoGen
#canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# position info
#mysql的地址
canal.instance.master.address=127.0.0.1:3306
# binlog的名称可以不设置
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
# mysql用户名和密码
canal.instance.dbUsername=root
canal.instance.dbPassword=root
canal.instance.connectionCharset = UTF-8
#canal.instance.defaultDatabaseName =test
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=
# mq config
canal.mq.topic=example
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
#################################################
sh startup.sh
大坑,真是坑,换了好多版本,就是收不到消息,应该是对canal内部详细原理不了解导致的,倒是客户端就是无法收到数据库变更的消息,气人不?官方稳定也没指出。。。还好解决了
getWithoutAck(int)该方法感觉是有问题,可能是自己的问题,但是至少目前是有问题的,该方法是非阻塞的,导致消息出现丢失问题,个人觉得应该是bug,还有网上的Thread.sleep(1000)来解决非阻塞的cpu性能问题,虽然可以,但是不可取,还有一个致命的问题,就是导致无法获取到服务端的消息,导致我一天都在搞这个事情,所以,大家是幸运的,使用阻塞的方法更好,只要合理控制超时时间即可
package com.yzz.es;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
/**
* @author yzz
* @time 2019/3/7 14:25
* @E-mail yzzstyle@163.com
* @since 0.0.1
*/
public class TestCanal {
public static void main(String[] args) throws Exception {
start();
}
public static void start() throws Exception {
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("192.168.33.14", 11111), "example", "", "");
connector.connect();
connector.subscribe(".*");
System.out.println("success");
while (true) {
Message message = connector.getWithoutAck(100, (long) 1, TimeUnit.SECONDS);
long batchId = message.getId();
if (batchId == -1 || message.getEntries().isEmpty()) {
//System.out.println("sleep");
continue;
}
System.out.println(batchId);
printEntries(message.getEntries());
connector.ack(batchId);
}
}
private static void printEntries(List<Entry> entries) throws Exception {
System.out.println("********************");
for (Entry entry :
entries) {
CanalEntry.Header header = entry.getHeader();
System.out.println("logfileName: " + header.getLogfileName());
System.out.println("logfileOffset : " + header.getLogfileOffset());
System.out.println("executeTime :" + header.getExecuteTime());
System.out.println("schemaName: " + header.getSchemaName());
System.out.println("table_name: " + header.getTableName());
System.out.println("eventType: " + header.getEventType());
printRowChange(RowChange.parseFrom(entry.getStoreValue()));
}
System.out.println("********************");
}
private static void printRowChange(RowChange rowChange) {
System.out.println("rowchange");
System.out.println(rowChange.getSql());
List<RowData> rowDatasList = rowChange.getRowDatasList();
for (RowData rowData :
rowDatasList) {
List<Column> beforeColumnsList = rowData.getBeforeColumnsList();
List<Column> afterColumnsList = rowData.getAfterColumnsList();
System.out.println("before");
printColum(beforeColumnsList);
System.out.println("after");
printColum(afterColumnsList);
}
}
private static void printColum(List<Column> columns) {
for (Column column :
columns) {
System.out.println(column.getName() + ":" + column.getValue());
}
}
private static void printColumns(List<Column> columns) {
String line = columns.stream()
.map(column -> column.getName() + "=" + column.getValue())
.collect(Collectors.joining(","));
System.out.println(line);
}
}
准备是做一个mysql到es实时同步的组件的,想用canal,但是一天都在调试这个canal,哎,太菜了,加油。愿共同进步,希望我的笔记能让你在实际工作中节省点时间,嘻嘻。
Original url: Access
Created at: 2019-09-10 20:26:08
Category: default
Tags: none
未标明原创文章均为采集,版权归作者所有,转载无需和我联系,请注明原出处,南摩阿彌陀佛,知识,不只知道,要得到
最新评论