易用的 canal java 客户端 canal-client - 个人文章 - SegmentFault 思否 - 信息有用 - 不过可能是老版本的canal

易用的canaljava 客户端

canal 自身提供了简单的客户端,数据格式较为复杂,处理消费数据也不太方便,为了方便给业务使用,提供一种直接能获取实体对象的方式来进行消费才更方便。
先说一下实现的思路,首先canal 客户端的消息对象有两种,message 和 flatMessage,分别是普通的消息(protobuf格式)和消息队列的扁平消息(json格式),现在将这两种消息转化为我们直接使用的 model 对象,根据消息中的数据库表名称找到对应的的实体对象,那么如何根据数据库表名找到实体对象呢?
第一种方式,如果我们的实体对象都使用JPA 的 @Table注解来标识表和实体的对应关系,可以使用该注解来找到实体对象和表名的关系
第二种方式,可以使用自定义注解的来标注实体和表名的关系,为解耦各个表的处理,我们使用策略模式来封装各个表的增删改操作

canal 主要客户端类

ClientIdentity

canal client和server交互之间的身份标识,目前clientId写死为1001. (目前canal server上的一个instance只能有一个client消费,clientId的设计是为1个instance多client消费模式而预留的)

CanalConnector

SimpleCanalConnector/ClusterCanalConnector : 两种connector的实现,simple针对的是简单的ip直连模式,cluster针对多ip的模式,可依赖CanalNodeAccessStrategy进行failover控制

CanalNodeAccessStrategy

SimpleNodeAccessStrategy/ClusterNodeAccessStrategy:两种failover的实现,simple针对给定的初始ip列表进行failover选择,cluster基于zookeeper上的cluster节点动态选择正在运行的canal server.

ClientRunningMonitor/ClientRunningListener/ClientRunningData

client running相关控制,主要为解决client自身的failover机制。canal client允许同时启动多个canal client,通过running机制,可保证只有一个client在工作,其他client做为冷备. 当运行中的client挂了,running会控制让冷备中的client转为工作模式,这样就可以确保canal client也不会是单点. 保证整个系统的高可用性.

Canal 客户端类型

canal 客户端可以主要分以下几种类型

单一ip 直连模式

这种方式下,可以启动多个客户端,连接同一个canal 服务端,多个客户端只有一个client 工作,其他的可以作为冷备,当一个client的挂了,其他的客户端会有一个进入工作模式
缺点:连接同一个服务端,如果服务端挂了将导致不可用

多ip 模式

这种方式下,客户端连接多个canal服务端,一个客户端随机选择一个canal server 消费,当这个server 挂了,会选择另外一个进行消费
缺点:不支持订阅消费

zookeeper 模式

使用zookeeper来server,client 的状态,当两个canal server 连接zookeeper 后,
优先连接的节点作为 活跃节点,client从活跃节点消费,当server挂了以后,从另外一个节点消费
缺点:不支持订阅消费

消息 队列模式

canal 支持消息直接发送到消息队列,从消息队列消费,目前支持的有kafka 和rocketMq,这种方式支持订阅消费

canal 客户端实现

EntryHandler 实体消息处理器

首先定义一个策略接口,定义增加,更新,删除功能,使用java 8声明方法为default,让客户端选择实现其中的方法,提高灵活性,客户端实现EntryHandler接口后,会返回基于handler中的泛型的实例对象,在对应的方法中实现自定义逻辑

public interface EntryHandler<T> {

    default void insert(T t) {

    }


    default void update(T before, T after) {

    }


    default void delete(T t) {

    }
}

定义一个canalClient 的抽象类,封装canal 的链接开启关闭操作,启动一个线程不断去消费canal 数据,依赖一个 messageHandler 封装消息处理的逻辑

public abstract class AbstractCanalClient implements CanalClient {



    @Override
    public void start() {
        log.info("start canal client");
        workThread = new Thread(this::process);
        workThread.setName("canal-client-thread");
        flag = true;
        workThread.start();
    }

    @Override
    public void stop() {
        log.info("stop canal client");
        flag = false;
        if (null != workThread) {
            workThread.interrupt();
        }

    }

    @Override
    public void process() {
        if (flag) {
            try {
                connector.connect();
                connector.subscribe(filter);
                while (flag) {
                    Message message = connector.getWithoutAck(batchSize, timeout, unit);
                    log.info("获取消息 {}", message);
                    long batchId = message.getId();
                    if (message.getId() != -1 && message.getEntries().size() != 0) {
                        messageHandler.handleMessage(message);
                    }
                    connector.ack(batchId);
                }
            } catch (Exception e) {
                log.error("canal client 异常", e);
            } finally {
                connector.disconnect();
            }
        }
    }

}

基于该抽象类,分别提供各种客户端的实现

  1. SimpleCanalClient
  2. ClusterCanalClient
  3. ZookeeperCanalClient
  4. KafkaCanalClient

消息处理器 messageHandler

消息处理器 messageHandler 封装了消息处理逻辑,其中定义了一个消息处理方法

public interface MessageHandler<T> {

     void handleMessage(T t);

}

消息处理器可能要适配4种情况,分别是消费message,flatMessage和两种消息的同步与异步消费
消息处理的工作主要有两个

  1. 获取增删改的行数据,交给行处理器继续处理
  2. 在上下文对象中保存其他的数据,例如库名,表名,binlog 时间戳等等数据

首先我们封装一个抽象的 message 消息处理器,实现MessageHandler接口

public abstract class AbstractMessageHandler implements MessageHandler<Message> {


    @Override
    public void handleMessage(Message message) {
        List<CanalEntry.Entry> entries = message.getEntries();
        for (CanalEntry.Entry entry : entries) {
            if (entry.getEntryType().equals(CanalEntry.EntryType.ROWDATA)) {
                try {
                    EntryHandler entryHandler = HandlerUtil.getEntryHandler(entryHandlers, entry.getHeader().getTableName());
                    if(entryHandler!=null){
                        CanalModel model = CanalModel.Builder.builder().id(message.getId()).table(entry.getHeader().getTableName())
                                .executeTime(entry.getHeader().getExecuteTime()).database(entry.getHeader().getSchemaName()).build();
                        CanalContext.setModel(model);
                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                        List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();
                        CanalEntry.EventType eventType = rowChange.getEventType();
                        for (CanalEntry.RowData rowData : rowDataList) {
                            rowDataHandler.handlerRowData(rowData,entryHandler,eventType);
                        }
                    }
                } catch (Exception e) {
                    throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
                }finally {
                   CanalContext.removeModel();
                }

            }
        }
    }
}

分别定义两个实现类,同步与异步实现类,继承AbstractMessageHandler抽象类

public class SyncMessageHandlerImpl extends AbstractMessageHandler {


    public SyncMessageHandlerImpl(List<? extends EntryHandler> entryHandlers, RowDataHandler<CanalEntry.RowData> rowDataHandler) {
        super(entryHandlers, rowDataHandler);
    }

    @Override
    public void handleMessage(Message message) {
        super.handleMessage(message);
    }
}
public class AsyncMessageHandlerImpl extends AbstractMessageHandler {


    private ExecutorService executor;


    public AsyncMessageHandlerImpl(List<? extends EntryHandler> entryHandlers, RowDataHandler<CanalEntry.RowData> rowDataHandler, ExecutorService executor) {
        super(entryHandlers, rowDataHandler);
        this.executor = executor;
    }

    @Override
    public void handleMessage(Message message) {
        executor.execute(() -> super.handleMessage(message));
    }
}

RowDataHandler 行消息处理器

消息处理器依赖的行消息处理器主要是将原始的column list 转为 实体对象,并将相应的增删改消息交给相应的hangler对象方法,行消息处理器分别需要处理两种对象,一个是 message的行数据 和 flatMessage 的行数据

public interface RowDataHandler<T> {


    void handlerRowData(T t, EntryHandler entryHandler, CanalEntry.EventType eventType) throws Exception;
}

两个行处理器的实现为

public class RowDataHandlerImpl implements RowDataHandler<CanalEntry.RowData> {



    private IModelFactory<List<CanalEntry.Column>> modelFactory;




    public RowDataHandlerImpl(IModelFactory modelFactory) {
        this.modelFactory = modelFactory;
    }

    @Override
    public void handlerRowData(CanalEntry.RowData rowData, EntryHandler entryHandler, CanalEntry.EventType eventType) throws Exception {
        if (entryHandler != null) {
            switch (eventType) {
                case INSERT:
                    Object object = modelFactory.newInstance(entryHandler, rowData.getAfterColumnsList());
                    entryHandler.insert(object);
                    break;
                case UPDATE:
                    Set<String> updateColumnSet = rowData.getAfterColumnsList().stream().filter(CanalEntry.Column::getUpdated)
                            .map(CanalEntry.Column::getName).collect(Collectors.toSet());
                    Object before = modelFactory.newInstance(entryHandler, rowData.getBeforeColumnsList(),updateColumnSet);
                    Object after = modelFactory.newInstance(entryHandler, rowData.getAfterColumnsList());
                    entryHandler.update(before, after);
                    break;
                case DELETE:
                    Object o = modelFactory.newInstance(entryHandler, rowData.getBeforeColumnsList());
                    entryHandler.delete(o);
                    break;
                default:
                    break;
            }
        }
    }
}
public class MapRowDataHandlerImpl implements RowDataHandler<List<Map<String, String>>> {



    private IModelFactory<Map<String,String>> modelFactory;


    public MapRowDataHandlerImpl(IModelFactory<Map<String, String>> modelFactory) {
        this.modelFactory = modelFactory;
    }

    @Override
    public void handlerRowData(List<Map<String, String>> list, EntryHandler entryHandler, CanalEntry.EventType eventType) throws Exception{
        if (entryHandler != null) {
            switch (eventType) {
                case INSERT:
                    Object object = modelFactory.newInstance(entryHandler, list.get(0));
                    entryHandler.insert(object);
                    break;
                case UPDATE:
                    Object before = modelFactory.newInstance(entryHandler, list.get(1));
                    Object after = modelFactory.newInstance(entryHandler, list.get(0));
                    entryHandler.update(before, after);
                    break;
                case DELETE:
                    Object o = modelFactory.newInstance(entryHandler, list.get(0));
                    entryHandler.delete(o);
                    break;
                default:
                    break;
            }
        }
    }
}

IModelFactory bean实例创建工厂

行消息处理的依赖的工厂 主要是是通过反射创建与表名称对应的bean实例

public interface IModelFactory<T> {


    Object newInstance(EntryHandler entryHandler, T t) throws Exception;


    default Object newInstance(EntryHandler entryHandler, T t, Set<String> updateColumn) throws Exception {
        return null;
    }
}

CanalContext canal 消息上下文

目前主要用于保存bean实例以外的其他数据,使用threadLocal实现

代码已在github开源canal-client


Original url: Access
Created at: 2019-09-11 10:58:55
Category: default
Tags: none

请先后发表评论
  • 最新评论
  • 总共0条评论