这一节来看看如何使用Java编写Kafka Consumer。
首先创建Consumer需要的配置信息,最基本的有五个信息:
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:Port");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "consumer_group_1");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // earliest, none
然后传入上面实例化好的配置信息,实例化Consumer:
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
然后通过Consumer的subscribe(Collection<String> topics)
方法订阅Topic:
consumer.subscribe(Arrays.asList("first_topic"));
最后获取Topic里的Message,将Message信息输出到日志中:
while(true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for(ConsumerRecord<String, String> record : records) {
logger.info("Key: " + record.key() + ", Value: " + record.value());
logger.info("Partition: " + record.partition() + ", Offset: " + record.offset());
}
}
Consumer的poll(Duration timeout)
方法可以设置获取数据的时间间隔,同时回忆一下在之前Consumer章节的Consumer Poll Options小节中,说过关于Consumer获取Message的四个配置项,都可以在Properties里进行设置。
启动Java Consumer后,在控制台可以看到如下信息:
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.0.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 3402a8361b734732
[main] INFO org.apache.kafka.clients.Metadata - Cluster ID: 4nh_0r5iQ_KsR_Fzf1HTGg
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Discovered group coordinator IP:9092 (id: 2147483647 rack: null)
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Revoking previously assigned partitions []
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Successfully joined group with generation 1
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Setting newly assigned partitions [first_topic-0, first_topic-1, first_topic-2]
[main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=consumer_group_1] Resetting offset for partition first_topic-0 to offset 23.
[main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=consumer_group_1] Resetting offset for partition first_topic-1 to offset 24.
[main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=consumer_group_1] Resetting offset for partition first_topic-2 to offset 21.
在上面的信息中,可以看到Setting newly assigned partitions [first_topic-0, first_topic-1, first_topic-2]
这句话,说明当前这个Consumer会获取first_topic
这个Topic中全部Partition中的Message。
如果我们再启动一个Consumer,这个Consumer和第一个在同一个组里,看看会有什么输出信息:
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.0.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 3402a8361b734732
[main] INFO org.apache.kafka.clients.Metadata - Cluster ID: 4nh_0r5iQ_KsR_Fzf1HTGg
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Discovered group coordinator IP:9092 (id: 2147483647 rack: null)
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Revoking previously assigned partitions []
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Successfully joined group with generation 2
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Setting newly assigned partitions [first_topic-2]
可以看到新启动的Consumer会输出Setting newly assigned partitions [first_topic-2]
这句话,说明新的这个Consumer只会获取first_topic
这个Topic的一个Partition中的Message。
再回去看看第一个Consumer的控制台:
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Attempt to heartbeat failed since group is rebalancing
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Revoking previously assigned partitions [first_topic-0, first_topic-1, first_topic-2]
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Successfully joined group with generation 2
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Setting newly assigned partitions [first_topic-0, first_topic-1]
第一个Consumer新输出在控制台中的信息很关键,首先看到Attempt to heartbeat failed since group is rebalancing
这句话,说明Kafka会自动重新给Consumer Group里的Consumer分配Topic的Partition。
再看Setting newly assigned partitions [first_topic-0, first_topic-1]
这句,说明第一个Consumer不会再获取first_topic-2
这个Partition里的Message了。这也印证了在Consumer章节的Consumer Group小节里讲过的概念。
如果我们有一个临时的Consumer,不想加入任何一个Consumer Group,而且需要指定Topic的Partition,以及指定从哪个Message Offset开始获取数据,怎么办?所幸,Kafka提供了这样的API。
首先我们在实例化配置信息时,就不需要指定Consumer Group了:
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstant.BOOTSTRAP_SERVER);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // earliest, none
然后实例化TopicPartition
,指定Topic和Partition序号。使用Consumer的assign(Collection<TopicPartition> partitions)
方法,分配给该Consumer:
TopicPartition topicPartition = new TopicPartition("first_topic", 0);
consumer.assign(Arrays.asList(topicPartition));
再然后指定Message Offset:
long offset = 21L;
consumer.seek(topicPartition, offset);
运行该Consumer,可以看到如下输出信息:
[main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=] Fetch offset 21 is out of range for partition first_topic-0, resetting offset
[main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=] Resetting offset for partition first_topic-0 to offset 22.
[main] INFO com.devtalking.jacefu.kafka.tutorial.ConsumerDemoAssignSeek - Key: null, Value: hello world!
[main] INFO com.devtalking.jacefu.kafka.tutorial.ConsumerDemoAssignSeek - Partition: 0, Offset: 22
如果我们使用Consumer Group CLI查看,会发现这种操作其实也是临时创建了一个Consumer Group:
root@iZ2ze2booskait1cxxyrljZ:~# kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list
consumer_group_1
KMOffsetCache-iZ2ze2booskait1cxxyrljZ
这一章节带大家实践如何使用Kafka提供的API编写Java Consumer。上一节和这一节主要介绍了Kafka Java Client(Producer和Consumer)的使用方式,相比Kafka CLI,Java Client在实际的开发中可能使用的更加频繁,希望能给使用Java语言的小伙伴们带来帮助。
Original url: Access
Created at: 2019-12-26 11:03:27
Category: default
Tags: none
未标明原创文章均为采集,版权归作者所有,转载无需和我联系,请注明原出处,南摩阿彌陀佛,知识,不只知道,要得到
最新评论