我的zookeeper 集群已经安装
我的mysql已经安装
两台机器
10.86.43.154
10.93.0.192
cd /home/q/
sudo mkdir canal
cd canal/
sudo wget https://github.com/alibaba/canal/releases/download/canal-1.0.22/canal.deployer-1.0.22.tar.gz
sudo mkdir canal-deployer
sudo tar zxvf canal.deployer-1.0.22.tar.gz -C ./canal-deployer
需要 mysql支持复制模式
sudo vim /etc/my.cnf
[mysqld]
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.sock
symbolic-links=0
log-bin=mysql-bin #添加这一行就ok
binlog-format=ROW #选择row模式
server_id=1#配置mysql replaction需要定义,不能和canal的slaveId重复
[mysqld_safe]
log-error=/var/log/mysqld.log
pid-file=/var/run/mysqld/mysqld.pid
地球人都知道,更新mysql配置my.cnf需要重启mysql才能生效,但是有些时候mysql在线上,不一定允许你重启,这时候应该怎么办呢?(我的已经修改过)
mysql> show variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW |
+---------------+-------+
1 row in set (0.00 sec)
mysql> show variables like 'binlog_format';
+---------------+-----------+
| Variable_name | Value |
+---------------+-----------+
| binlog_format | STATEMENT |
+---------------+-----------+
1 row in set (0.00 sec)
mysql> set binlog_format='ROW';
Query OK, 0 rows affected (0.00 sec)
mysql> show variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW |
+---------------+-------+
1 row in set (0.00 sec)
但是 执行
mysql> set log_bin='mysql_bin';
ERROR 1238 (HY000): Variable 'log_bin' is a read only variable
开启mysql 日志是必须重启mysql了,gdb修改也不管用
mysql有以下几种日志:
错误日志: -log-err
查询日志: -log
慢查询日志: -log-slow-queries
更新日志: -log-update
二进制日志: -log-bin
mysql> show variables like '%log%';
+-----------------------------------------+---------------------------------+
| Variable_name | Value |
+-----------------------------------------+---------------------------------+
| back_log | 50 |
| binlog_cache_size | 32768 |
| binlog_direct_non_transactional_updates | OFF |
| binlog_format | ROW |
| expire_logs_days | 0 |
| general_log | OFF |
| general_log_file | /var/run/mysqld/mysqld.log |
| innodb_flush_log_at_trx_commit | 1 |
| innodb_locks_unsafe_for_binlog | OFF |
| innodb_log_buffer_size | 1048576 |
| innodb_log_file_size | 5242880 |
| innodb_log_files_in_group | 2 |
| innodb_log_group_home_dir | ./ |
| innodb_mirrored_log_groups | 1 |
| log | OFF |
| log_bin | ON |
| log_bin_trust_function_creators | OFF |
| log_bin_trust_routine_creators | OFF |
| log_error | /var/log/mysqld.log |
| log_output | FILE |
| log_queries_not_using_indexes | OFF |
| log_slave_updates | OFF |
| log_slow_queries | OFF |
| log_warnings | 1 |
| max_binlog_cache_size | 18446744073709547520 |
| max_binlog_size | 1073741824 |
| max_relay_log_size | 0 |
| relay_log | |
| relay_log_index | |
| relay_log_info_file | relay-log.info |
| relay_log_purge | ON |
| relay_log_space_limit | 0 |
| slow_query_log | OFF |
| slow_query_log_file | /var/run/mysqld/mysqld-slow.log |
| sql_log_bin | ON |
| sql_log_off | OFF |
| sql_log_update | ON |
| sync_binlog | 0 |
+-----------------------------------------+---------------------------------+
38 rows in set (0.00 sec)
mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+
1 row in set (0.00 sec)
canal配置
sudo vim canal.properties
#################################################
######### common argument #############
#################################################
canal.id= 1
canal.ip=
canal.port= 11111
canal.zkServers=10.86.43.164:4180,10.86.43.154:4180,10.93.0.192:4180
# flush data to zk
canal.zookeeper.flush.period = 1000
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false
# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size = 1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60
# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30
# binlog filter config
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
# binlog ddl isolation
canal.instance.get.ddl.isolation = false
#################################################
######### destinations #############
#################################################
canal.destinations= example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
canal.instance.global.mode = spring
canal.instance.global.lazy = false
#canal.instance.global.manager.address = 127.0.0.1:1099
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml
修改 instance.properties
cd example
sudo vim instance.properties
#################################################
## mysql serverId
canal.instance.mysql.slaveId = 1235 #两台不一样
# position info
canal.instance.master.address = 10.93.0.192:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
# username/password
canal.instance.dbUsername = 用户名
canal.instance.dbPassword = 密码
canal.instance.defaultDatabaseName = canal_test
canal.instance.connectionCharset = UTF-8
# table regex
canal.instance.filter.regex = .*\\..*
# table black regex
canal.instance.filter.black.regex =
#################################################
两台做同样的操作
启动及 查看日志
cd ../../bin
sudo ./startup.sh
cd /home/q/canal/canal-deployer/logs
可以查看 canal的日志 和对应的instance日志
canal是看canal是否启动正常
instance是查看对应的实例是否和mysql的master连接正常
客户端代码
新建maven项目
pom加入依赖
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.sample</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>canal.sample</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.0.12</version>
</dependency>
<dependency>
<groupId>org.jetbrains</groupId>
<artifactId>annotations</artifactId>
<version>13.0</version>
</dependency>
</dependencies>
</project>
创建类
package com.alibaba.otter.canal.sample;
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.client.*;
import org.jetbrains.annotations.NotNull;
public class App
{
public static void main(String args[]) {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("10.93.0.192",
11111), "example", "", "");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int totalEmtryCount = 1200;
while (emptyCount < totalEmtryCount) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
System.out.println(message);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
private static void printEntry(@NotNull List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(@NotNull List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
Original url: Access
Created at: 2019-09-11 14:24:16
Category: default
Tags: none
未标明原创文章均为采集,版权归作者所有,转载无需和我联系,请注明原出处,南摩阿彌陀佛,知识,不只知道,要得到
最新评论