[林进雨]修正后台启动命令 ---- kafka安装及Kafka-PHP扩展的使用 - imarno - 博客园 文章比较旧 但能用 整理了启动命令 superdesk

话说用了就要有点产出,要不然过段时间又忘了,所以在这里就记录一下试用Kafka的安装过程和php扩展的试用。

实话说,如果用于队列的话,跟PHP比较配的,还是Redis。用的顺手,呵呵,只是Redis不能有多个consumer。但Kafka官方对PHP不支持,PHP扩展是爱好者或使用者写的。下面就开始讲Kafka的安装吧。我以CentOS6.4为例,64位。

一. 首先确认下jdk有没有安装

使用命令

\[root@localhost ~\]# java -version
java version "1.8.0_73" Java(TM) SE Runtime Environment (build 1.8.0_73-b02)
Java HotSpot(TM) 64-Bit Server VM (build 25.73-b02, mixed mode)

如果有以上信息的话,就往下安装吧,有些可能是jdk对不上,那就装到对的上的。如果没有安装,就看一下下面的jdk安装方法:

http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

到这个地址下载jdk8版本,我下载的是jdk-8u73-linux-x64.tar.gz,然后解压到/usr/local/jdk/下。

然后打开/etc/profile文件

\[root@localhost ~\]# vim /etc/profile

把下面这段代码写到文件里

export JAVA_HOME=/usr/local/jdk/jdk1.8.0_73
export CLASSPATH=.:$JAVA\_HOME/lib/tools.jar:$JAVA\_HOME/lib/dt.jar
export PATH=$JAVA_HOME/bin:$PATH

最后

\[root@localhost ~\]# source /etc/profile

这时jdk就生效了,可以使用 java -version验证下。

二. 接下来安装Kafka

1. 下载Kafka

http://kafka.apache.org/downloads.html下载相应的版本,我使用的是kafka_2.9.1-0.8.2.2.tgz

2. 下载完解压到你喜欢的目录

我是解压到 /usr/local/kafka/kafka_2.9.1-0.8.2.2

3. 运行默认的Kafka

启动Zookeeper server

[root@localhost kafka_2.9.1-0.8.2.2\]# sh bin/zookeeper-server-start.sh config/zookeeper.properties &

亲测

sh bin/zookeeper-server-start.sh config/zookeeper.properties &

nohup sh bin/zookeeper-server-start.sh config/zookeeper.properties > /dev/null 2>&1 &

启动Kafka server

[root@localhost kafka_2.9.1-0.8.2.2\]# sh bin/kafka-server-start.sh config/server.properties &

亲测

sh bin/kafka-server-start.sh config/server.properties &

nohup sh bin/kafka-server-start.sh config/server.properties > /dev/null 2>&1 &

运行生产者producer

[root@localhost kafka_2.9.1-0.8.2.2\]# sh bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

[linjinyu@localhost kafka_2.11-2.0.0]$ sh bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

运行消费者consumer

[root@localhost kafka_2.9.1-0.8.2.2\]# sh bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

[linjinyu@localhost kafka_2.11-2.0.0]$ sh bin/kafka-console-consumer.sh --bootstrap-server localhost:2181 --topic test --from-beginning

这样,在producer那边输入内容,consumer马上就能接收到。

4. 当有跨机的producer或consumer连接时

需要配置config/server.properties的host.name,要不然跨机的连不上。

三. Kafka-PHP扩展

使用了一圈,就https://github.com/nmred/kafka-php可以用

我是使用composer安装的,以下是示例:

producer.php

<?php 

require 'vendor/autoload.php';

while (1) {
    $part    = mt_rand(0, 1);
    $produce = \\Kafka\\Produce::getInstance('kafka0:2181', 3000); // get available partitions
    $partitions = $produce->getAvailablePartitions('topic_name'); 
    var_dump($partitions); // send message
    $produce->setRequireAck(-1); 
    $produce->setMessages('topic_name', 0, array(date('Y-m-d H:i:s')); 
    sleep(3);
}

consumer.php

<?php

require 'vendor/autoload.php';
$consumer = \\Kafka\\Consumer::getInstance('kafka0:2181'); 
$group = 'topic_name'; 
$consumer->setGroup($group); 
$consumer->setFromOffset(true); 
$consumer->setTopic('topic_name', 0); 
$consumer->setMaxBytes(102400); 
$result = $consumer->fetch(); 
print_r($result); 
foreach ($result as $topicName =\> $partition) {
    foreach ($partition as $partId =\> $messageSet) {
        var_dump($partition->getHighOffset());
        foreach ($messageSet as $message) {
            var_dump((string)$message);
        }
        var_dump($partition->getMessageOffset());
    }
}

Original url: Access
Created at: 2018-11-12 17:17:33
Category: default
Tags: none

请先后发表评论
  • 最新评论
  • 总共2条评论
DeepMind

Rain酱

kafka port 9092

2018-12-03 19:58:09 回复

DeepMind

Rain酱

zookeeper port 2181

2018-12-03 19:57:20 回复