在上文 设计一个百万级的消息推送系统 中提到消息流转采用的是 Kafka
作为中间件。
其中有朋友咨询在大量消息的情况下 Kakfa
是如何保证消息的高效及一致性呢?
正好以这个问题结合 Kakfa
的源码讨论下如何正确、高效的发送消息。
内容较多,对源码感兴趣的朋友请系好安全带(源码基于 v0.10.0.0
版本分析)。同时最好是有一定的 Kafka 使用经验,知晓基本的用法。
在分析之前先看一个简单的消息发送是怎么样的。
以下代码基于 SpringBoot 构建。
首先创建一个 org.apache.kafka.clients.producer.Producer
的 bean。
主要关注 bootstrap.servers
,它是必填参数。指的是 Kafka 集群中的 broker 地址,例如 127.0.0.1:9094
。
其余几个参数暂时不做讨论,后文会有详细介绍。
接着注入这个 bean 即可调用它的发送函数发送消息。
这里我给某一个 Topic 发送了 10W 条数据,运行程序消息正常发送。
但这仅仅只是做到了消息发送,对消息是否成功送达完全没管,等于是纯异步
的方式。
那么我想知道消息到底发送成功没有该怎么办呢?
其实 Producer
的 API
已经帮我们考虑到了,发送之后只需要调用它的 get()
方法即可同步获取发送结果。
发送结果:
这样的发送效率其实是比较低下的,因为每次都需要同步等待消息发送的结果。
为此我们应当采取异步的方式发送,其实 send()
方法默认则是异步的,只要不手动调用 get()
方法。
但这样就没法获知发送结果。
所以查看 send()
的 API 可以发现还有一个参数。
Future<RecordMetadata> send(ProducerRecord<K, V> producer, Callback callback);
Callback
是一个回调接口,在消息发送完成之后可以回调我们自定义的实现。
执行之后的结果:
同样的也能获取结果,同时发现回调的线程并不是上文同步时的主线程
,这样也能证明是异步回调的。
同时回调的时候会传递两个参数:
RecordMetadata
和上文一致的消息发送成功后的元数据。Exception
消息发送过程中的异常信息。但是这两个参数并不会同时都有数据,只有发送失败才会有异常信息,同时发送元数据为空。
所以正确的写法应当是:
至于为什么会只有参数一个有值,在下文的源码分析中会一一解释。
现在只掌握了基本的消息发送,想要深刻的理解发送中的一些参数配置还是得源码说了算。
首先还是来谈谈消息发送时的整个流程是怎么样的,Kafka
并不是简单的把消息通过网络发送到了 broker
中,在 Java 内部还是经过了许多优化和设计。
为了直观的了解发送的流程,简单的画了几个在发送过程中关键的步骤。
从上至下依次是:
kafka-producer-network-thread
IO 线程。接下来详解每个步骤。
调用该构造方法进行初始化时,不止是简单的将基本参数写入 KafkaProducer
。比较麻烦的是初始化 Sender
线程进行缓冲区消费。
初始化 IO 线程处:
可以看到 Sender 线程有需要成员变量,比如:
acks,retries,requestTimeout
等,这些参数会在后文分析。
在调用 send()
函数后其实第一步就是序列化,毕竟我们的消息需要通过网络才能发送到 Kafka。
其中的 valueSerializer.serialize(record.topic(), record.value());
是一个接口,我们需要在初始化时候指定序列化实现类。
我们也可以自己实现序列化,只需要实现 org.apache.kafka.common.serialization.Serializer
接口即可。
接下来就是路由分区,通常我们使用的 Topic
为了实现扩展性以及高性能都会创建多个分区。
如果是一个分区好说,所有消息都往里面写入即可。
但多个分区就不可避免需要知道写入哪个分区。
通常有三种方式。
可以在构建 ProducerRecord
为每条消息指定分区。
这样在路由时会判断是否有指定,有就直接使用该分区。
这种一般在特殊场景下会使用。
如果没有指定分区,则会调用 partitioner.partition
接口执行自定义分区策略。
而我们也只需要自定义一个类实现 org.apache.kafka.clients.producer.Partitioner
接口,同时在创建 KafkaProducer
实例时配置 partitioner.class
参数。
通常需要自定义分区一般是在想尽量的保证消息的顺序性。
或者是写入某些特有的分区,由特别的消费者来进行处理等。
最后一种则是默认的路由策略,如果我们啥都没做就会执行该策略。
该策略也会使得消息分配的比较均匀。
来看看它的实现:
简单的来说分为以下几步:
其实这就是很典型的轮询算法,所以只要分区数不频繁变动这种方式也会比较均匀。
在 send()
方法拿到分区后会调用一个 append()
函数:
该函数中会调用一个 getOrCreateDeque()
写入到一个内部缓存中 batches
。
在最开始初始化的 IO 线程其实是一个守护线程,它会一直消费这些数据。
通过图中的几个函数会获取到之前写入的数据。这块内容可以不必深究,但其中有个 completeBatch
方法却非常关键。
调用该方法时候肯定已经是消息发送完毕了,所以会调用 batch.done()
来完成之前我们在 send()
方法中定义的回调接口。
从这里也可以看出为什么之前说发送完成后元数据和异常信息只会出现一个。
发送流程讲完了再来看看 Producer
中比较重要的几个参数。
acks
是一个影响消息吞吐量的一个关键参数。
主要有 [all、-1, 0, 1]
这几个选项,默认为 1。
由于 Kafka
不是采取的主备模式,而是采用类似于 Zookeeper 的主备模式。
前提是Topic
配置副本数量replica > 1
。
当 acks = all/-1
时:
意味着会确保所有的 follower 副本都完成数据的写入才会返回。
这样可以保证消息不会丢失!
但同时性能和吞吐量却是最低的。
当 acks = 0
时:
producer 不会等待副本的任何响应,这样最容易丢失消息但同时性能却是最好的!
当 acks = 1
时:
这是一种折中的方案,它会等待副本 Leader 响应,但不会等到 follower 的响应。
一旦 Leader 挂掉消息就会丢失。但性能和消息安全性都得到了一定的保证。
这个参数看名称就知道是内部缓存区的大小限制,对他适当的调大可以提高吞吐量。
但也不能极端,调太大会浪费内存。小了也发挥不了作用,也是一个典型的时间和空间的权衡。
上图是几个使用的体现。
retries
该参数主要是来做重试使用,当发生一些网络抖动都会造成重试。
这个参数也就是限制重试次数。
但也有一些其他问题。
消息重复
。这种只能是消费者进行幂等处理。如果消息量真的非常大,同时又需要尽快的将消息发送到 Kafka
。一个 producer
始终会收到缓存大小等影响。
那是否可以创建多个 producer
来进行发送呢?
producer
,获取的同时判断是否达到最大上限,没有就新建一个同时保存到内部的 List
中,保存时做好同步处理防止并发问题。这样在大量、频繁的消息发送场景中可以提高发送效率减轻单个 producer
的压力。
最后则是 Producer
的关闭,Producer 在使用过程中消耗了不少资源(线程、内存、网络等)因此需要显式的关闭从而回收这些资源。
默认的 close()
方法和带有超时时间的方法都是在一定的时间后强制关闭。
但在过期之前都会处理完剩余的任务。
所以使用哪一个得视情况而定。
本文内容较多,从实例和源码的角度分析了 Kafka 生产者。
希望看完的朋友能有收获,同时也欢迎留言讨论。
不出意外下期会讨论 Kafka 消费者。
如果对你有帮助还请分享让更多的人看到。
欢迎关注公众号一起交流:
原网址: 访问
创建于: 2018-10-13 16:01:59
目录: default
标签: 无
未标明原创文章均为采集,版权归作者所有,转载无需和我联系,请注明原出处,南摩阿彌陀佛,知识,不只知道,要得到
java windows火焰图_mob64ca12ec8020的技术博客_51CTO博客 - 在windows下不可行,不知道作者是怎样搞的 监听SpringBoot 服务启动成功事件并打印信息_监听springboot启动完毕-CSDN博客 SpringBoot中就绪探针和存活探针_management.endpoint.health.probes.enabled-CSDN博客 u2u转换板 - 嘉立创EDA开源硬件平台 Spring Boot 项目的轻量级 HTTP 客户端 retrofit 框架,快来试试它!_Java精选-CSDN博客 手把手教你打造一套最牛的知识笔记管理系统! - 知乎 - 想法有重合-理论可参考 安宇雨 闲鱼 机械键盘 客制化 开贴记录 文本 linux 使用find命令查找包含某字符串的文件_beijihukk的博客-CSDN博客_find 查找字符串 ---- mac 也适用 安宇雨 打字音 记录集合 B站 bilibili 自行搭建 开坑 真正的客制化 安宇雨 黑苹果开坑 查找工具包maven pom 引用地 工具网站 Dantelis 介绍的玩轴入坑攻略 --- 关于轴的一些说法 --- 非官方 ---- 心得而已 --- 长期开坑更新 [本人问题][新开坑位]关于自动化测试的工具与平台应用 机械键盘 开团 网站记录 -- 能做一个收集的程序就好了 不过现在没时间 -- 信息大多是在群里发的 - 你要让垃圾佬 都去一个地方看难度也是很大的 精神支柱 [超级前台]sprinbboot maven superdesk-app 记录 [信息有用] [环境准备] [基本完成] [sebp/elk] 给已创建的Docker容器增加新的端口映射 - qq_30599553的博客 - CSDN博客 [正在研究] Elasticsearch, Logstash, Kibana (ELK) Docker image documentation elasticsearch centos 安装记录 及 启动手记 正式服务器 39 elasticsearch 问题合集 不断更新 6.1.1 | 6.5.1 两个版本 博客程序 - 测试 - bug记录 等等问题 laravel的启动过程解析 - lpfuture - 博客园 OAuth2 Server PHP 用 Laravel 搭建带 OAuth2 验证的 RESTful 服务 | Laravel China 社区 - 高品质的 Laravel 和 PHP 开发者社区 利用Laravel 搭建oauth2 API接口 附 Unauthenticated 解决办法 - 煮茶的博客 - SegmentFault 思否 使用 OAuth2-Server-php 搭建 OAuth2 Server - 午时的海 - 博客园 基于PHP构建OAuth 2.0 服务端 认证平台 - Endv - 博客园 Laravel 的 Artisan 命令行工具 Laravel 的文件系统和云存储功能集成 浅谈Chromium中的设计模式--终--Observer模式 浅谈Chromium中的设计模式--二--pre/post和Delegate模式 浅谈Chromium中的设计模式--一--Chromium中模块分层和进程模型 DeepMind 4 Hacking Yourself README.md update 20211011
Laravel China 简书 知乎 博客园 CSDN博客 开源中国 Go Further Ryan是菜鸟 | LNMP技术栈笔记 云栖社区-阿里云 Netflix技术博客 Techie Delight Linkedin技术博客 Dropbox技术博客 Facebook技术博客 淘宝中间件团队 美团技术博客 360技术博客 古巷博客 - 一个专注于分享的不正常博客 软件测试知识传播 - 测试窝 有赞技术团队 阮一峰 语雀 静觅丨崔庆才的个人博客 软件测试从业者综合能力提升 - isTester IBM Java 开发 使用开放 Java 生态系统开发现代应用程序 pengdai 一个强大的博主 HTML5资源教程 | 分享HTML5开发资源和开发教程 蘑菇博客 - 专注于技术分享的博客平台 个人博客-leapMie 流星007 CSDN博客 - 舍其小伙伴 稀土掘金 Go 技术论坛 | Golang / Go 语言中国知识社区
最新评论