canal 基于zookeeper的HA安装 - 简书 - 不是太有用 - 可以不看

我的zookeeper 集群已经安装
我的mysql已经安装
两台机器
10.86.43.154
10.93.0.192

cd /home/q/
sudo mkdir canal
cd canal/
sudo wget https://github.com/alibaba/canal/releases/download/canal-1.0.22/canal.deployer-1.0.22.tar.gz
sudo mkdir canal-deployer
sudo tar zxvf canal.deployer-1.0.22.tar.gz -C ./canal-deployer

需要 mysql支持复制模式

sudo vim /etc/my.cnf
[mysqld]
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.sock
symbolic-links=0
log-bin=mysql-bin #添加这一行就ok  
binlog-format=ROW #选择row模式
server_id=1#配置mysql replaction需要定义,不能和canal的slaveId重复   

[mysqld_safe]
log-error=/var/log/mysqld.log
pid-file=/var/run/mysqld/mysqld.pid

地球人都知道,更新mysql配置my.cnf需要重启mysql才能生效,但是有些时候mysql在线上,不一定允许你重启,这时候应该怎么办呢?(我的已经修改过)

mysql> show variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW   |
+---------------+-------+
1 row in set (0.00 sec)
mysql> show variables like 'binlog_format';
+---------------+-----------+
| Variable_name | Value     |
+---------------+-----------+
| binlog_format | STATEMENT |
+---------------+-----------+
1 row in set (0.00 sec)

mysql> set binlog_format='ROW';
Query OK, 0 rows affected (0.00 sec)

mysql> show variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW   |
+---------------+-------+
1 row in set (0.00 sec)

但是 执行

mysql> set log_bin='mysql_bin';
ERROR 1238 (HY000): Variable 'log_bin' is a read only variable

开启mysql 日志是必须重启mysql了,gdb修改也不管用

mysql有以下几种日志:
错误日志: -log-err
查询日志: -log
慢查询日志: -log-slow-queries
更新日志: -log-update
二进制日志: -log-bin
mysql> show variables like  '%log%';
+-----------------------------------------+---------------------------------+
| Variable_name                           | Value                           |
+-----------------------------------------+---------------------------------+
| back_log                                | 50                              |
| binlog_cache_size                       | 32768                           |
| binlog_direct_non_transactional_updates | OFF                             |
| binlog_format                           | ROW                             |
| expire_logs_days                        | 0                               |
| general_log                             | OFF                             |
| general_log_file                        | /var/run/mysqld/mysqld.log      |
| innodb_flush_log_at_trx_commit          | 1                               |
| innodb_locks_unsafe_for_binlog          | OFF                             |
| innodb_log_buffer_size                  | 1048576                         |
| innodb_log_file_size                    | 5242880                         |
| innodb_log_files_in_group               | 2                               |
| innodb_log_group_home_dir               | ./                              |
| innodb_mirrored_log_groups              | 1                               |
| log                                     | OFF                             |
| log_bin                                 | ON                              |
| log_bin_trust_function_creators         | OFF                             |
| log_bin_trust_routine_creators          | OFF                             |
| log_error                               | /var/log/mysqld.log             |
| log_output                              | FILE                            |
| log_queries_not_using_indexes           | OFF                             |
| log_slave_updates                       | OFF                             |
| log_slow_queries                        | OFF                             |
| log_warnings                            | 1                               |
| max_binlog_cache_size                   | 18446744073709547520            |
| max_binlog_size                         | 1073741824                      |
| max_relay_log_size                      | 0                               |
| relay_log                               |                                 |
| relay_log_index                         |                                 |
| relay_log_info_file                     | relay-log.info                  |
| relay_log_purge                         | ON                              |
| relay_log_space_limit                   | 0                               |
| slow_query_log                          | OFF                             |
| slow_query_log_file                     | /var/run/mysqld/mysqld-slow.log |
| sql_log_bin                             | ON                              |
| sql_log_off                             | OFF                             |
| sql_log_update                          | ON                              |
| sync_binlog                             | 0                               |
+-----------------------------------------+---------------------------------+
38 rows in set (0.00 sec)

mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | ON    |
+---------------+-------+
1 row in set (0.00 sec)

canal配置

sudo vim canal.properties

#################################################
#########               common argument         ############# 
#################################################
canal.id= 1
canal.ip=
canal.port= 11111
canal.zkServers=10.86.43.164:4180,10.86.43.154:4180,10.93.0.192:4180
# flush data to zk
canal.zookeeper.flush.period = 1000
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE

## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60

# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30

# binlog filter config
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false

# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

# binlog ddl isolation
canal.instance.get.ddl.isolation = false

#################################################
#########               destinations            ############# 
#################################################
canal.destinations= example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5

canal.instance.global.mode = spring 
canal.instance.global.lazy = false
#canal.instance.global.manager.address = 127.0.0.1:1099
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

修改 instance.properties

cd example
sudo vim  instance.properties

#################################################
## mysql serverId
canal.instance.mysql.slaveId = 1235 #两台不一样

# position info
canal.instance.master.address = 10.93.0.192:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =

#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 

# username/password
canal.instance.dbUsername = 用户名
canal.instance.dbPassword = 密码
canal.instance.defaultDatabaseName = canal_test
canal.instance.connectionCharset = UTF-8

# table regex
canal.instance.filter.regex = .*\\..*
# table black regex
canal.instance.filter.black.regex =

#################################################

两台做同样的操作
启动及 查看日志

cd ../../bin
sudo ./startup.sh
cd  /home/q/canal/canal-deployer/logs

可以查看 canal的日志 和对应的instance日志
canal是看canal是否启动正常
instance是查看对应的实例是否和mysql的master连接正常

客户端代码
新建maven项目
pom加入依赖

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.alibaba.otter</groupId>
  <artifactId>canal.sample</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>canal.sample</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>  
        <groupId>com.alibaba.otter</groupId>  
        <artifactId>canal.client</artifactId>  
        <version>1.0.12</version>  
    </dependency>
    <dependency>
        <groupId>org.jetbrains</groupId>
        <artifactId>annotations</artifactId>
        <version>13.0</version>
    </dependency>
  </dependencies>
</project>

创建类

package com.alibaba.otter.canal.sample;


import java.net.InetSocketAddress;  
import java.util.List;  
  
import com.alibaba.otter.canal.client.CanalConnector;  
import com.alibaba.otter.canal.common.utils.AddressUtils;  
import com.alibaba.otter.canal.protocol.Message;  
import com.alibaba.otter.canal.protocol.CanalEntry.Column;  
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;  
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;  
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;  
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;  
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;  
import com.alibaba.otter.canal.client.*;  
import org.jetbrains.annotations.NotNull;
public class App 
{
     public static void main(String args[]) {  
            // 创建链接  
            CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("10.93.0.192",  
                    11111), "example", "", "");  
            int batchSize = 1000;  
            int emptyCount = 0;  
            try {  
                connector.connect();  
                connector.subscribe(".*\\..*");  
                connector.rollback();  
                int totalEmtryCount = 1200;  
                while (emptyCount < totalEmtryCount) {  
                    Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据  
                    System.out.println(message);
                    long batchId = message.getId();  
                    int size = message.getEntries().size();  
                    if (batchId == -1 || size == 0) {  
                        emptyCount++;  
                        System.out.println("empty count : " + emptyCount);  
                        try {  
                            Thread.sleep(1000);  
                        } catch (InterruptedException e) {  
                            e.printStackTrace();  
                        }  
                    } else {  
                        emptyCount = 0;  
                        // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);  
                        printEntry(message.getEntries());  
                    }  
      
                    connector.ack(batchId); // 提交确认  
                    // connector.rollback(batchId); // 处理失败, 回滚数据  
                }  
      
                System.out.println("empty too many times, exit");  
            } finally {  
                connector.disconnect();  
            }  
        }  
      
        private static void printEntry(@NotNull List<Entry> entrys) {  
            for (Entry entry : entrys) {  
                if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {  
                    continue;  
                }  
      
                RowChange rowChage = null;  
                try {  
                    rowChage = RowChange.parseFrom(entry.getStoreValue());  
                } catch (Exception e) {  
                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),  
                            e);  
                }  
      
                EventType eventType = rowChage.getEventType();  
                System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",  
                        entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),  
                        entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),  
                        eventType));  
      
                for (RowData rowData : rowChage.getRowDatasList()) {  
                    if (eventType == EventType.DELETE) {  
                        printColumn(rowData.getBeforeColumnsList());  
                    } else if (eventType == EventType.INSERT) {  
                        printColumn(rowData.getAfterColumnsList());  
                    } else {  
                        System.out.println("-------> before");  
                        printColumn(rowData.getBeforeColumnsList());  
                        System.out.println("-------> after");  
                        printColumn(rowData.getAfterColumnsList());  
                    }  
                }  
            }  
        }  
      
        private static void printColumn(@NotNull List<Column> columns) {  
            for (Column column : columns) {  
                System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());  
            }  
        }  
}


Original url: Access
Created at: 2019-09-11 14:24:16
Category: default
Tags: none

请先后发表评论
  • 最新评论
  • 总共0条评论