Kafka 是一种高吞吐的分布式消息系统,能够替代传统的消息队列用于解耦合数据处理,缓存未处理消息等,同时具有更高的吞吐率,支持分区、多副本、冗余,因此被广泛用于大规模消息数据处理应用。
Kafka的特点:
Kafka的架构:
kafka
Kafka的整体架构非常简单,producer、broker(kafka)和consumer都可以有多个。Producer,consumer实现Kafka注册的接口,数据从producer发送到broker,broker承担一个中间缓存和分发的作用。broker分发注册到系统中的consumer。broker的作用类似于缓存,即活跃的数据和离线处理系统之间的缓存。客户端和服务器端的通信,是基于简单,高性能,且与编程语言无关的TCP协议。
Kafka基本概念:
Kafka消息发送的流程:
Kafka-Message
下面是PHP生产、消费Kafka消息的例子(假设已经配置好Kafka):
1.从zookeeper源码src/c/src安装zookeeper c client
cd zookeeper-3.4.8/src/c
./configure
make && make install
2.编译php libzookper扩展
git clone https://github.com/Timandes/libzookeeper.git
cd libzookeeper
phpize
./configure --with-libzookeeper=/usr/local/bin/cli_mt
make && make install
3.编译php zookeeper扩展
git clone https://github.com/andreiz/php-zookeeper.git
cd php-zookeeper
phpize
./configure
make && make install
4.修改php.ini配置,添加libzookeeper和php-zookeeper扩展
extension=libzookeeper.so
extension=zookeeper.so
PHP处理Kafka消息:
1.启动zookeeper和kafka
kafka_2.11-0.10.0.0/bin/zookeeper-server-start.sh --daemon
kafka_2.11-0.10.0.0/config/zookeeper.properties
kafka_2.11-0.10.0.0/bin/kafka-server-start.sh kafka_2.11-0.10.0.0/config/server.properties
2.创建由2个partition组成的、名为testtopic的topic
kafka_2.11-0.10.0.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic testtopic
3.composer安装nmred/kafka-php
composer require "nmred/kafka-php"
4.producer.php代码
<?php
require_once('./vendor/autoload.php');
$produce = \Kafka\Produce::getInstance('localhost:2181', 3000);
$produce->setRequireAck(-1);
$topicName = 'testtopic';
//获取到topic下可用的partitions
$partitions = $produce->getAvailablePartitions($topicName);
$partitionCount = count($partitions);
$count = 1;
while(true){
$message = json_encode(array('uid' => $count, 'age' => $count%100, 'datetime' => date('Y-m-d H:i:s')));
//发送消息到不同的partition
$partitionId = $count%$partitionCount;
$produce->setMessages('testtopic', $partitionId, array($message));
$result = $produce->send();
var_dump($result);
$count++;
echo "producer sleeping\n";
sleep(1);
}
5.consumer.php代码
<?php
require_once('./vendor/autoload.php');
//获取需要处理的partitionId
$partitionId = isset($argv[1]) ? intval($argv[1]) : 0;
$consumer = \Kafka\Consumer::getInstance('localhost:2181');
$consumer->setGroup('test-consumer-group');
$consumer->setPartition('testtopic', $partitionId);
$consumer->setFromOffset(true);
$consumer->setMaxBytes(102400);
while(true){
$topic = $consumer->fetch();
foreach ($topic as $topicName => $partition) {
foreach ($partition as $partId => $messageSet) {
foreach ($messageSet as $message) {
var_dump($message);
}
}
}
echo "consumer sleeping\n";
sleep(1);
}
6.运行php代码
在3个终端界面分别运行
php producer.php
php consumer.php 0
php consumer.php 1
7.结果
两个consumer脚本依次收到producer发送的消息
php-kafka-consumer-output
转自:https://aiddroid.com/kafka-introduction-and-php-kafka-usage/
作者:daos
链接:https://www.jianshu.com/p/b9d06f33f060
來源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。
Original url: Access
Created at: 2018-11-12 17:21:14
Category: default
Tags: none
未标明原创文章均为采集,版权归作者所有,转载无需和我联系,请注明原出处,南摩阿彌陀佛,知识,不只知道,要得到
java windows火焰图_mob64ca12ec8020的技术博客_51CTO博客 - 在windows下不可行,不知道作者是怎样搞的 监听SpringBoot 服务启动成功事件并打印信息_监听springboot启动完毕-CSDN博客 SpringBoot中就绪探针和存活探针_management.endpoint.health.probes.enabled-CSDN博客 u2u转换板 - 嘉立创EDA开源硬件平台 Spring Boot 项目的轻量级 HTTP 客户端 retrofit 框架,快来试试它!_Java精选-CSDN博客 手把手教你打造一套最牛的知识笔记管理系统! - 知乎 - 想法有重合-理论可参考 安宇雨 闲鱼 机械键盘 客制化 开贴记录 文本 linux 使用find命令查找包含某字符串的文件_beijihukk的博客-CSDN博客_find 查找字符串 ---- mac 也适用 安宇雨 打字音 记录集合 B站 bilibili 自行搭建 开坑 真正的客制化 安宇雨 黑苹果开坑 查找工具包maven pom 引用地 工具网站 Dantelis 介绍的玩轴入坑攻略 --- 关于轴的一些说法 --- 非官方 ---- 心得而已 --- 长期开坑更新 [本人问题][新开坑位]关于自动化测试的工具与平台应用 机械键盘 开团 网站记录 -- 能做一个收集的程序就好了 不过现在没时间 -- 信息大多是在群里发的 - 你要让垃圾佬 都去一个地方看难度也是很大的 精神支柱 [超级前台]sprinbboot maven superdesk-app 记录 [信息有用] [环境准备] [基本完成] [sebp/elk] 给已创建的Docker容器增加新的端口映射 - qq_30599553的博客 - CSDN博客 [正在研究] Elasticsearch, Logstash, Kibana (ELK) Docker image documentation elasticsearch centos 安装记录 及 启动手记 正式服务器 39 elasticsearch 问题合集 不断更新 6.1.1 | 6.5.1 两个版本 博客程序 - 测试 - bug记录 等等问题 laravel的启动过程解析 - lpfuture - 博客园 OAuth2 Server PHP 用 Laravel 搭建带 OAuth2 验证的 RESTful 服务 | Laravel China 社区 - 高品质的 Laravel 和 PHP 开发者社区 利用Laravel 搭建oauth2 API接口 附 Unauthenticated 解决办法 - 煮茶的博客 - SegmentFault 思否 使用 OAuth2-Server-php 搭建 OAuth2 Server - 午时的海 - 博客园 基于PHP构建OAuth 2.0 服务端 认证平台 - Endv - 博客园 Laravel 的 Artisan 命令行工具 Laravel 的文件系统和云存储功能集成 浅谈Chromium中的设计模式--终--Observer模式 浅谈Chromium中的设计模式--二--pre/post和Delegate模式 浅谈Chromium中的设计模式--一--Chromium中模块分层和进程模型 DeepMind 4 Hacking Yourself README.md update 20211011
Laravel China 简书 知乎 博客园 CSDN博客 开源中国 Go Further Ryan是菜鸟 | LNMP技术栈笔记 云栖社区-阿里云 Netflix技术博客 Techie Delight Linkedin技术博客 Dropbox技术博客 Facebook技术博客 淘宝中间件团队 美团技术博客 360技术博客 古巷博客 - 一个专注于分享的不正常博客 软件测试知识传播 - 测试窝 有赞技术团队 阮一峰 语雀 静觅丨崔庆才的个人博客 软件测试从业者综合能力提升 - isTester IBM Java 开发 使用开放 Java 生态系统开发现代应用程序 pengdai 一个强大的博主 HTML5资源教程 | 分享HTML5开发资源和开发教程 蘑菇博客 - 专注于技术分享的博客平台 个人博客-leapMie 流星007 CSDN博客 - 舍其小伙伴 稀土掘金 Go 技术论坛 | Golang / Go 语言中国知识社区
最新评论