(3条消息)SpringBoot2.0项目模块整合之kafka_2.11-2.0.0(同步,异步) - lx1309244704的博客 - CSDN博客

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/lx1309244704/article/details/87348506

kafka单机环境:jdk1.8,zookeeper-3.4.12,kafka_2.11-2.0.0;提供GitHubDemo

1.zookeeper安装

下载地址:https://www.apache.org/dyn/closer.cgi/zookeeper/

上传zookeeper安装包,解压安装包

tar -zxf zookeeper-3.4.12.tar.gz

进入data文件下 

cd /home/zookeeper-3.4.12/data/

在data下创建 myid 文件, 编辑 myid 文件,并在对应的 IP 的机器上输入对应的编号。如在 zookeeper 上,如果只在单点上进行安装配置, 那么只有一个 server.1, myid文件内容就是 1。

vi myid

1

编辑配置文件

cd zookeeper-3.4.12/conf

复制一份配置文件

cp zoo_sample.cfg zoo.cfg

编辑配置

vi zoo.cfg

编辑内容如下

tickTime=2000 initLimit=10 syncLimit=5 dataDir=/home/zookeeper-3.4.12/data dataLogDir=/var/log/kafka/zk clientPort=2181 server.1=192.168.234.128:2888:3888

然后启动zookeeper,进入zookeeper文件下

cd /home/zookeeper-3.4.12/

启动zookeeper

bin/zkServer.sh start

2.安装kafka

下载地址:http://kafka.apache.org/downloads

上传kafka包,然后解压,重命名

tar –zxf kafka_2.11-2.0.0.tgz
mv kafka_2.11-2.0.0 kafka_2.11

 编辑环境变量,添加kafka环境变量

vi /etc/profile
#kafkaexport KAFKA_HOME=/home/kafka_2.11export PATH=${KAFKA_HOME}/bin:$PATH
source /etc/profile

启动kafka

bin/kafka-server-start.sh -daemon config/server.properties

创建一个复制因子为1的新主题:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看创建的topic信息

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

运行生产者

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

运行消费者

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

停止

bin/kafka-server-stop.sh

代码如下:

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>    <parent>        <groupId>springboot-lx-master</groupId>        <artifactId>springboot-kafka</artifactId>        <version>0.0.1-SNAPSHOT</version>    </parent>    <artifactId>springboot-kafka-service</artifactId>    <dependencies>        <dependency>            <artifactId>springboot-kafka-api</artifactId>            <groupId>springboot-kafka-api</groupId>            <version>0.0.1-SNAPSHOT</version>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-test</artifactId>            <scope>test</scope>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-web</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.kafka</groupId>            <artifactId>spring-kafka</artifactId>        </dependency>    </dependencies>    <build>        <plugins>            <plugin>                <groupId>org.apache.maven.plugins</groupId>                <artifactId>maven-compiler-plugin</artifactId>                <configuration>                    <source>1.8</source>                    <target>1.8</target>                </configuration>            </plugin>             <plugin>                <groupId>org.springframework.boot</groupId>                <artifactId>spring-boot-maven-plugin</artifactId>            </plugin>        </plugins>    </build></project>

application.yml:

server:    port: 8007  spring:    application:    name: kafkaDemo  kafka:    producer:      acks: all #acks:消息的确认机制,默认值是0, acks=0:如果设置为0,生产者不会等待kafka的响应。 acks=1:这个配置意味着kafka会把这条消息写到本地日志文件中,但是不会等待集群中其他机器的成功响应。 acks=all:这个配置意味着leader会等待所有的follower同步完成。这个确保消息不会丢失,除非kafka集群中所有机器挂掉。这是最强的可用性保证。      retries: 0 #发送失败重试次数,配置为大于0的值的话,客户端会在消息发送失败时重新发送。      batch-size: 16384 #当多条消息需要发送到同一个分区时,生产者会尝试合并网络请求。这会提高client和生产者的效率。      buffer-memory: 33554432 #即32MB的批处理缓冲区      key-serializer: org.apache.kafka.common.serialization.StringSerializer      value-serializer: org.apache.kafka.common.serialization.StringSerializer      bootstrap-servers: 192.168.234.128:9092 #如果kafka启动错误,打开debug级别日志,出现Can't resolve address: flink:9092 的错误,需要在 windows下修改IP映射即可, C:\Windows\System32\drivers\etc\hosts, 192.168.234.128 flink。    consumer:      group-id: test         auto-offset-reset: latest #(1)earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费;(2)latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 ;(3)none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常      enable-auto-commit:  true  #如果为true,消费者的偏移量将在后台定期提交。      auto-commit-interval: 1000 #消费者偏移自动提交给Kafka的频率 (以毫秒为单位),默认值为5000      max-poll-records: 5 #一次拉起的条数      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer      bootstrap-servers: 192.168.234.128:9092    logging:  file: kafkaDemo.log  level:#    root: debug #开启dubug级别    com.kafka: debug

api:

package com.kafka.api; public interface HelloProducerService {        public void sendSyncHello(String helloQueue,String message);        public void sendAsyncHello(String helloQueue,String message);    }

 producerService:

package com.kafka.producer.impl; import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.support.SendResult;import org.springframework.stereotype.Service;import org.springframework.util.concurrent.ListenableFuture;import org.springframework.util.concurrent.ListenableFutureCallback; import com.kafka.api.HelloProducerService; @Servicepublic class HelloProducerServiceImpl implements HelloProducerService{        private Logger logger = LoggerFactory.getLogger(HelloProducerServiceImpl.class);        @Autowired    private KafkaTemplate<String, String> kafkaTemplate;     @Override    public void sendSyncHello(String helloQueue, String message) {        logger.debug("发送信息");        kafkaTemplate.send("app_log", message);        try {            Thread.sleep(1000L);        } catch (InterruptedException e) {            e.printStackTrace();        }        logger.debug("消费成功"+System.currentTimeMillis());    }     @Override    public void sendAsyncHello(String helloQueue, String message) {        logger.debug("发送信息");        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("app_log1", message);        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {            @Override            public void onSuccess(SendResult<String, String> result) {                try {                    Thread.sleep(1000L);                } catch (InterruptedException e) {                    e.printStackTrace();                }                logger.debug("消费成功"+System.currentTimeMillis());            }            @Override            public void onFailure(Throwable ex) {                logger.debug("消费失败");                ex.getStackTrace();            }        });    } }

ConsumerService:

package com.kafka.consumer; import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component; @Componentpublic class HelloConsumerService {         private Logger logger = LoggerFactory.getLogger(HelloConsumerService.class);          @KafkaListener(topics = {"app_log","app_log1"})     public void receive(String message){         logger.info("------hello:消费者处理消息------"+message);         System.out.println("消费完成"+System.currentTimeMillis()+"ms");         logger.debug(message);        }      }

测试方法:

package com.kafka; import java.util.UUID; import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.autoconfigure.EnableAutoConfiguration;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;import org.springframework.transaction.annotation.EnableTransactionManagement; import com.kafka.api.HelloProducerService;   /** * @author Administrator * */@RunWith(SpringRunner.class)  @SpringBootTest(classes = KafkaApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)@EnableTransactionManagement //如果mybatis中service实现类中加入事务注解,需要此处添加该注解@EnableAutoConfigurationpublic class KafkaCase {        @Autowired    private HelloProducerService helloProducerService;        @Test    public void sendSyncTest() {        for (int i = 0; i < 1; i++) {             String message = UUID.randomUUID().toString();             System.out.println("发送消息:"+i);             helloProducerService.sendSyncHello("app_log", message);             System.out.println("发送完成"+System.currentTimeMillis()+"ms");        }    }        @Test    public void sendAsyncTest() {        for (int i = 0; i < 1; i++) {             String message = UUID.randomUUID().toString();             System.out.println("发送消息:"+i);             helloProducerService.sendAsyncHello("app_log1", message+0000+i);             System.out.println("发送完成"+System.currentTimeMillis()+"ms");        }    }}

同步发送 ,如下图我们可以看出,发送完成的时间是在消费成功之后

 异步发送,如下图,我们可以看出,发送完成后,才消费完成

GitHubDemo下载地址:https://github.com/LX1309244704/SpringBoot-master/tree/master/springboot-kafka

参考以下文档:

https://blog.csdn.net/wackycrazy/article/details/47810741

http://orchome.com/kafka/index

http://kafka.apache.org/

https://docs.spring.io/spring-kafka/docs/2.0.2.RELEASE/reference/html/_reference.html#kafka-template


Original url: Access
Created at: 2019-03-25 19:16:47
Category: default
Tags: none

请先后发表评论
  • 最新评论
  • 总共0条评论