原文网址:Kafka--延迟队列--使用/实现/原理_IT利刃出鞘的博客-CSDN博客
本文介绍Kafka如何使用延迟队列的功能。
Kafka是很长用的消息队列,但Kafka本身是没有延迟队列功能的,RabbitMQ、RocketMQ有延迟队列的功能。本文介绍如何手动给Kafka添加延迟消息的功能。
虽然Kafka内部有时间轮,支持延时操作,例如:延迟生产、延迟拉取以及延迟删除,但这是Kafka自己内部使用的,用户无法将其作为延迟队列来使用。
本内容也是Java后端面试常问的问题 。
kafka作为一个高性能的消息队列,只要消费能力足够,发出的消息都是会立刻被收到的,因此需要想一个办法,让消息延迟发送出去。
方案如下:
延迟消息发出去之后,代码程序就会立刻收到延迟消息,要如何处理才能让延迟消息等待一段时间才发送到真正的topic里面?
有同学会说很简单嘛,在程序收到消息后判断若条件不满足,就调用sleep方法,过一段时间再进行下一个循环拉取消息。但这样是不行的,原因如下:
在轮询kafka拉取消息的时候,kafka会返回由max.poll.records配置指定的一批消息,当程序不能在max.poll.interval.ms配置的期望时间内处理这些消息的话,kafka就会认为这个消费者已经挂了,会进行rebalance,同时你这个消费者就无法再拉取到任何消息了。
举个例子:当你需要一个24小时的延迟消息队列,在代码里面写下了Thread.sleep(1000*60*60*24);,为了不发生rebalance,你把max.poll.interval.ms 也改成了1000*60*60*24,这个时候你或许会感觉到一丝丝的怪异,我是谁?我在哪?我为什么要写出来这样的代码?
KafkaConsumer 提供了暂停和恢复的API函数,调用消费者的暂停方法后就无法再拉取到新的消息,同时长时间不消费kafka也不会认为这个消费者已经挂掉了。
为了更优雅,我们会启动一个定时器来替换sleep。
完整流程如下图,当消费者发现消息不满足条件时,我们就暂停消费者,并把偏移量seek到上一次消费的位置以便等待下一个周期再次消费这条消息。
import com.fasterxml.jackson.core.JsonProcessingException;import com.fasterxml.jackson.databind.JsonNode;import com.fasterxml.jackson.databind.ObjectMapper;import org.apache.kafka.clients.consumer.*;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.common.TopicPartition;import org.apache.kafka.common.serialization.StringDeserializer;import org.apache.kafka.common.serialization.StringSerializer;import org.junit.jupiter.api.BeforeEach;import org.junit.jupiter.api.Test;import org.springframework.boot.test.context.SpringBootTest; import java.time.Duration;import java.util.*;import java.util.concurrent.ExecutionException; @SpringBootTestpublic class DelayQueueTest { private KafkaConsumer<String, String> consumer; private KafkaProducer<String, String> producer; private volatile Boolean exit = false; private final Object lock = new Object(); private final String servers = ""; @BeforeEach void initConsumer() { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ConsumerConfig.GROUP_ID_CONFIG, "d"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "5000"); consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer()); } @BeforeEach void initProducer() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producer = new KafkaProducer<>(props); } @Test void testDelayQueue() throws JsonProcessingException, InterruptedException { String topic = "delay-minutes-1"; List<String> topics = Collections.singletonList(topic); consumer.subscribe(topics); Timer timer = new Timer(); timer.schedule(new TimerTask() { @Override public void run() { synchronized (lock) { consumer.resume(consumer.paused()); lock.notify(); } } }, 0, 1000); do { synchronized (lock) { ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(200)); if (consumerRecords.isEmpty()) { lock.wait(); continue; } boolean timed = false; for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { long timestamp = consumerRecord.timestamp(); TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition()); if (timestamp + 60 * 1000 < System.currentTimeMillis()) { String value = consumerRecord.value(); ObjectMapper objectMapper = new ObjectMapper(); JsonNode jsonNode = objectMapper.readTree(value); JsonNode jsonNodeTopic = jsonNode.get("topic"); String appTopic = null, appKey = null, appValue = null; if (jsonNodeTopic != null) { appTopic = jsonNodeTopic.asText(); } if (appTopic == null) { continue; } JsonNode jsonNodeKey = jsonNode.get("key"); if (jsonNodeKey != null) { appKey = jsonNode.asText(); } JsonNode jsonNodeValue = jsonNode.get("value"); if (jsonNodeValue != null) { appValue = jsonNodeValue.asText(); } // send to application topic ProducerRecord<String, String> producerRecord = new ProducerRecord<>(appTopic, appKey, appValue); try { producer.send(producerRecord).get(); // success. commit message OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(consumerRecord.offset() + 1); HashMap<TopicPartition, OffsetAndMetadata> metadataHashMap = new HashMap<>(); metadataHashMap.put(topicPartition, offsetAndMetadata); consumer.commitSync(metadataHashMap); } catch (ExecutionException e) { consumer.pause(Collections.singletonList(topicPartition)); consumer.seek(topicPartition, consumerRecord.offset()); timed = true; break; } } else { consumer.pause(Collections.singletonList(topicPartition)); consumer.seek(topicPartition, consumerRecord.offset()); timed = true; break; } } if (timed) { lock.wait(); } } } while (!exit); }}
这是基于SpringBoot 2.4.4版本和 kafka-client 2.7.0版本编写的一个单元测试,需要修改私有变量servers为kafka broker的地址。
启动程序后,向Topic delay-minutes-1 发送如以下格式的json字符串数据
{ "topic": "target", "key": "key1", "value": "value1"}
同时启动一个消费者监听target这个,在一分钟后,将会收到一条 key="key1", value="value1"的数据。
创建多个topic用于处理不同时间的延迟消息,例如delay-minutes-1 delay-minutes-5 delay-minutes-10 delay-minutes-15以提供指数级别的延迟时间,这样比一个topic要好很多,毕竟在顺序拉取消息的时候,有一条消息不满足条件,后面的将全部进行排队。
原网址: 访问
创建于: 2022-06-29 10:17:54
目录: default
标签: 无
未标明原创文章均为采集,版权归作者所有,转载无需和我联系,请注明原出处,南摩阿彌陀佛,知识,不只知道,要得到
最新评论