[有意思的代码] kafka2x-elasticsearch: kafka-elasticsearch数据同步工具,适用于kafka 2x版本

Bboss is a good elasticsearch Java rest client. It operates and accesses elasticsearch in a way similar to mybatis.

[](#bboss-environmental-requirements)BBoss Environmental requirements

JDK requirement: JDK 1.7+

Elasticsearch version requirements: 1.x,2.X,5.X,6.X,+

Spring boot: 1.x,2.x,+

[](#kafka2x-elasticsearch-kafka2x-database%E6%95%B0%E6%8D%AE%E5%90%8C%E6%AD%A5%E5%B7%A5%E5%85%B7demo)kafka2x-Elasticsearch kafka2x-Database数据同步工具demo

适用于新版本kafka client包 ,使用本demo所带的应用程序运行容器环境,可以快速编写,打包发布可运行的数据导入工具

支持的kafka_2.12-0.10.2.0系列版本、 kafka_2.12-2.3.0 系列版本

支持的Elasticsearch版本: 1.x,2.x,5.x,6.x,7.x,+

支持海量PB级数据同步导入功能

使用参考文档

[](#%E5%AF%BC%E5%85%A5maven%E5%9D%90%E6%A0%87)导入maven坐标

<dependency>
<groupId>com.bbossgroups.plugins</groupId>
<artifactId>bboss-elasticsearch-rest-kafka2x</artifactId>
<version>6.2.8</version>
<scope>compile</scope>
</dependency>

根据kafka服务端版本导入和调整kafka client版本及版本号:

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.4.0</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-tools</artifactId>
<version>2.4.0</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.4.0</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
<scope>compile</scope>
</dependency>

[](#%E6%9E%84%E5%BB%BA%E9%83%A8%E7%BD%B2)构建部署

[](#%E5%87%86%E5%A4%87%E5%B7%A5%E4%BD%9C)准备工作

需要通过gradle构建发布版本,gradle安装配置参考文档:

https://esdoc.bbossgroups.com/#/bboss-build

[](#%E4%B8%8B%E8%BD%BD%E6%BA%90%E7%A0%81%E5%B7%A5%E7%A8%8B-%E5%9F%BA%E4%BA%8Egradle)下载源码工程-基于gradle

https://github.com/bbossgroups/kafka2x-elasticsearch

从上面的地址下载源码工程,然后导入idea或者eclipse,根据自己的需求,修改导入程序逻辑

[](#kafka%E5%88%B0elasticsearch%E5%90%8C%E6%AD%A5%E5%8A%9F%E8%83%BD%E8%B0%83%E6%B5%8B)kafka到Elasticsearch同步功能调测

org.frameworkset.elasticsearch.imp.Kafka2ESdemo

如果需要测试和调试导入功能,运行Kafka2ESdemo的main方法即可:

public class Kafka2ESdemo {
public static void main(String[] args){
Kafka2ESdemo dbdemo = new Kafka2ESdemo();
boolean dropIndice = true;//CommonLauncher.getBooleanAttribute("dropIndice",false);//同时指定了默认值
dbdemo.scheduleTimestampImportData(dropIndice);
}
.....
}

[](#kafka%E5%88%B0database%E5%90%8C%E6%AD%A5%E5%8A%9F%E8%83%BD%E8%B0%83%E6%B5%8B)kafka到Database同步功能调测

以mysql为例,也支持其他数据库,支持增删改数据的同步 org.frameworkset.elasticsearch.imp.Kafka2DBdemo

如果需要测试和调试导入功能,运行Kafka2DBdemo的main方法即可:

public class Kafka2ESdemo {
public static void main(String[] args){
Kafka2DBdemo dbdemo = new Kafka2DBdemo();
boolean dropIndice = true;//CommonLauncher.getBooleanAttribute("dropIndice",false);//同时指定了默认值
dbdemo.scheduleTimestampImportData(dropIndice);
}
.....
}

修改es配置-kafka2x-elasticsearch\src\main\resources\application.properties

修改完毕配置后,就可以进行功能调试了。

测试调试通过后,就可以构建发布可运行的版本了:进入命令行模式,在源码工程根目录kafka2x-elasticsearch 下运行以下gradle指令打包发布版本

release.bat

[](#%E8%BF%90%E8%A1%8C%E4%BD%9C%E4%B8%9A)运行作业

gradle构建成功后,在build/distributions目录下会生成可以运行的zip包,解压运行导入程序

linux:

chmod +x restart.sh

./restart.sh

windows: restart.bat

[](#%E4%BD%9C%E4%B8%9Ajvm%E9%85%8D%E7%BD%AE)作业jvm配置

修改jvm.options,设置内存大小和其他jvm参数

-Xms1g

-Xmx1g

[](#%E4%BD%9C%E4%B8%9A%E5%8F%82%E6%95%B0%E9%85%8D%E7%BD%AE)作业参数配置

在使用kafka2x-elasticsearch 时,为了避免调试过程中不断打包发布数据同步工具,可以将部分控制参数配置到启动配置文件resources/application.properties中,然后在代码中通过以下方法获取配置的参数:

工具主程序

mainclass=org.frameworkset.elasticsearch.imp.Kafka2ESdemo

mainclass=org.frameworkset.elasticsearch.imp.Kafka2DBdemo

# 参数配置
# 在代码中获取方法:CommonLauncher.getBooleanAttribute("dropIndice",false);//同时指定了默认值false
dropIndice=false

在代码中获取参数dropIndice方法:

boolean dropIndice = CommonLauncher.getBooleanAttribute("dropIndice",false);//同时指定了默认值false

另外可以在resources/application.properties配置控制作业执行的一些参数,例如工作线程数,等待队列数,批处理size等等:

queueSize=50
workThreads=10
batchSize=20

在作业执行方法中获取并使用上述参数:

int batchSize = CommonLauncher.getIntProperty("batchSize",10);//同时指定了默认值
int queueSize = CommonLauncher.getIntProperty("queueSize",50);//同时指定了默认值
int workThreads = CommonLauncher.getIntProperty("workThreads",10);//同时指定了默认值
importBuilder.setBatchSize(batchSize);
importBuilder.setQueue(queueSize);//设置批量导入线程池等待队列长度
importBuilder.setThreadCount(workThreads);//设置批量导入线程池工作线程数量

bin/kafka-console-consumer.sh --zookeeper 10.19.85.65:2185 --topic mysqlbinlog

[](#elasticsearch%E6%8A%80%E6%9C%AF%E4%BA%A4%E6%B5%81%E7%BE%A4166471282)elasticsearch技术交流群:166471282

[](#elasticsearch%E5%BE%AE%E4%BF%A1%E5%85%AC%E4%BC%97%E5%8F%B7bbossgroup)elasticsearch微信公众号:bbossgroup

GitHub Logo


原网址: 访问
创建于: 2021-03-03 13:29:31
目录: default
标签: 无

请先后发表评论
  • 最新评论
  • 总共0条评论