版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/lx1309244704/article/details/87348506
kafka单机环境:jdk1.8,zookeeper-3.4.12,kafka_2.11-2.0.0;提供GitHubDemo
1.zookeeper安装
下载地址:https://www.apache.org/dyn/closer.cgi/zookeeper/
上传zookeeper安装包,解压安装包
tar -zxf zookeeper-3.4.12.tar.gz
进入data文件下
cd /home/zookeeper-3.4.12/data/
在data下创建 myid 文件, 编辑 myid 文件,并在对应的 IP 的机器上输入对应的编号。如在 zookeeper 上,如果只在单点上进行安装配置, 那么只有一个 server.1, myid文件内容就是 1。
vi myid
1
编辑配置文件
cd zookeeper-3.4.12/conf
复制一份配置文件
cp zoo_sample.cfg zoo.cfg
编辑配置
vi zoo.cfg
编辑内容如下
tickTime=2000 initLimit=10 syncLimit=5 dataDir=/home/zookeeper-3.4.12/data dataLogDir=/var/log/kafka/zk clientPort=2181 server.1=192.168.234.128:2888:3888
然后启动zookeeper,进入zookeeper文件下
cd /home/zookeeper-3.4.12/
启动zookeeper
bin/zkServer.sh start
2.安装kafka
下载地址:http://kafka.apache.org/downloads
上传kafka包,然后解压,重命名
tar –zxf kafka_2.11-2.0.0.tgz
mv kafka_2.11-2.0.0 kafka_2.11
编辑环境变量,添加kafka环境变量
vi /etc/profile
#kafkaexport KAFKA_HOME=/home/kafka_2.11export PATH=${KAFKA_HOME}/bin:$PATH
source /etc/profile
启动kafka
bin/kafka-server-start.sh -daemon config/server.properties
创建一个复制因子为1的新主题:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看创建的topic信息
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
运行生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
运行消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
停止
bin/kafka-server-stop.sh
代码如下:
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>springboot-lx-master</groupId> <artifactId>springboot-kafka</artifactId> <version>0.0.1-SNAPSHOT</version> </parent> <artifactId>springboot-kafka-service</artifactId> <dependencies> <dependency> <artifactId>springboot-kafka-api</artifactId> <groupId>springboot-kafka-api</groupId> <version>0.0.1-SNAPSHOT</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build></project>
application.yml:
server: port: 8007 spring: application: name: kafkaDemo kafka: producer: acks: all #acks:消息的确认机制,默认值是0, acks=0:如果设置为0,生产者不会等待kafka的响应。 acks=1:这个配置意味着kafka会把这条消息写到本地日志文件中,但是不会等待集群中其他机器的成功响应。 acks=all:这个配置意味着leader会等待所有的follower同步完成。这个确保消息不会丢失,除非kafka集群中所有机器挂掉。这是最强的可用性保证。 retries: 0 #发送失败重试次数,配置为大于0的值的话,客户端会在消息发送失败时重新发送。 batch-size: 16384 #当多条消息需要发送到同一个分区时,生产者会尝试合并网络请求。这会提高client和生产者的效率。 buffer-memory: 33554432 #即32MB的批处理缓冲区 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer bootstrap-servers: 192.168.234.128:9092 #如果kafka启动错误,打开debug级别日志,出现Can't resolve address: flink:9092 的错误,需要在 windows下修改IP映射即可, C:\Windows\System32\drivers\etc\hosts, 192.168.234.128 flink。 consumer: group-id: test auto-offset-reset: latest #(1)earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费;(2)latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 ;(3)none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 enable-auto-commit: true #如果为true,消费者的偏移量将在后台定期提交。 auto-commit-interval: 1000 #消费者偏移自动提交给Kafka的频率 (以毫秒为单位),默认值为5000 max-poll-records: 5 #一次拉起的条数 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer bootstrap-servers: 192.168.234.128:9092 logging: file: kafkaDemo.log level:# root: debug #开启dubug级别 com.kafka: debug
api:
package com.kafka.api; public interface HelloProducerService { public void sendSyncHello(String helloQueue,String message); public void sendAsyncHello(String helloQueue,String message); }
producerService:
package com.kafka.producer.impl; import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.support.SendResult;import org.springframework.stereotype.Service;import org.springframework.util.concurrent.ListenableFuture;import org.springframework.util.concurrent.ListenableFutureCallback; import com.kafka.api.HelloProducerService; @Servicepublic class HelloProducerServiceImpl implements HelloProducerService{ private Logger logger = LoggerFactory.getLogger(HelloProducerServiceImpl.class); @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Override public void sendSyncHello(String helloQueue, String message) { logger.debug("发送信息"); kafkaTemplate.send("app_log", message); try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } logger.debug("消费成功"+System.currentTimeMillis()); } @Override public void sendAsyncHello(String helloQueue, String message) { logger.debug("发送信息"); ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("app_log1", message); future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onSuccess(SendResult<String, String> result) { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } logger.debug("消费成功"+System.currentTimeMillis()); } @Override public void onFailure(Throwable ex) { logger.debug("消费失败"); ex.getStackTrace(); } }); } }
ConsumerService:
package com.kafka.consumer; import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component; @Componentpublic class HelloConsumerService { private Logger logger = LoggerFactory.getLogger(HelloConsumerService.class); @KafkaListener(topics = {"app_log","app_log1"}) public void receive(String message){ logger.info("------hello:消费者处理消息------"+message); System.out.println("消费完成"+System.currentTimeMillis()+"ms"); logger.debug(message); } }
测试方法:
package com.kafka; import java.util.UUID; import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.autoconfigure.EnableAutoConfiguration;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;import org.springframework.transaction.annotation.EnableTransactionManagement; import com.kafka.api.HelloProducerService; /** * @author Administrator * */@RunWith(SpringRunner.class) @SpringBootTest(classes = KafkaApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)@EnableTransactionManagement //如果mybatis中service实现类中加入事务注解,需要此处添加该注解@EnableAutoConfigurationpublic class KafkaCase { @Autowired private HelloProducerService helloProducerService; @Test public void sendSyncTest() { for (int i = 0; i < 1; i++) { String message = UUID.randomUUID().toString(); System.out.println("发送消息:"+i); helloProducerService.sendSyncHello("app_log", message); System.out.println("发送完成"+System.currentTimeMillis()+"ms"); } } @Test public void sendAsyncTest() { for (int i = 0; i < 1; i++) { String message = UUID.randomUUID().toString(); System.out.println("发送消息:"+i); helloProducerService.sendAsyncHello("app_log1", message+0000+i); System.out.println("发送完成"+System.currentTimeMillis()+"ms"); } }}
同步发送 ,如下图我们可以看出,发送完成的时间是在消费成功之后
异步发送,如下图,我们可以看出,发送完成后,才消费完成
GitHubDemo下载地址:https://github.com/LX1309244704/SpringBoot-master/tree/master/springboot-kafka
参考以下文档:
https://blog.csdn.net/wackycrazy/article/details/47810741
http://orchome.com/kafka/index
https://docs.spring.io/spring-kafka/docs/2.0.2.RELEASE/reference/html/_reference.html#kafka-template
Original url: Access
Created at: 2019-03-25 19:16:47
Category: default
Tags: none
未标明原创文章均为采集,版权归作者所有,转载无需和我联系,请注明原出处,南摩阿彌陀佛,知识,不只知道,要得到
最新评论