swoole结合kafka实现高性能消息队列 | Bing的天涯路

接触swoole有一年了,一年前上singwa老师的课,用swoole结合redis实现了一个消息队列,但那个时候是使用TP来做的,TP对swoole的支持并不是特别的友好,在一年后的现在,使用kafka结合easyswoole异步定时任务已经多进程来实现一个高性能的消息队列服务,主要用来实现飞飞物联的设备逻辑(规则引擎),比如根据传感器的数据发短信等等。

首先连接kafka,这里的kaka我使用的百度云提供的kafka服务,自己部署太麻烦而且难以维护,连接的参考例子在这里https://github.com/BCEBIGDATA/kafka-sample-php ,其实最想使用的是微博的那个不使用扩展来连接kafka的库,但是一直没有解决使用ssl文件连接的问题,因此就是用了rdkafka扩展,首先按照样例中的说明安装librdkafka

1

2

3

sh setup-librdkafka.sh

pecl install rdkafka

echo "extension=rdkafka.so" >> /etc/php.ini //根据实际位置

这样就安装好了librdkafka和php扩展,要注意的是版本号必须要新一些的,否则使用ssl的会报没有该设置项的异常,排查这个异常花了一晚上的时间。

接下来在easyswoole创建一个连接kafka的基类,在飞飞物联的项目中只会使用到consumer,因为producer的数据是来自天工的设备数据

kafka.php – 连接kafka的基类

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

namespace AppLibKafka;

use RdKafkaConf;

use RdKafkaKafkaConsumer;

use SwooleException;

class Kafka

{

private $topic = '';

private $config = [

'broker' => 'xxxxxxxxx:9092',

'security_protocol' => 'ssl',

'client_pem' => EASYSWOOLE_ROOT.'/App/Lib/Kafka/client.pem',

'client_key' => EASYSWOOLE_ROOT.'/App/Lib/Kafka/client.key',

'ca_pem' => EASYSWOOLE_ROOT.'/App/Lib/Kafka/ca.pem',

'group_id' => 'kafka-feifei-swoole-consumer'

];

public function __construct($topic)

{

if(!extension_loaded(rdkafka)){

throw new Exception('rdkafka.so扩展必须开启');

}

if(!isset($topic) || empty($topic)){

throw new Exception('kafak实例化必须设置topic');

}

$this->topic = $topic;

}

public function subscribe(){

$conf = new RdKafkaConf();

$conf->set('metadata.broker.list', $this->config['broker']);

$conf->set('group.id', $this->config['group_id'].rand(0,10));

$conf->set('security.protocol', $this->config['security_protocol']);

$conf->set('ssl.certificate.location', $this->config['client_pem']);

$conf->set('ssl.key.location', $this->config['client_key']);

$conf->set('ssl.ca.location', $this->config['ca_pem']);

$consumer = new \RdKafka\KafkaConsumer($conf);

$consumer->subscribe(\[$this->topic]);

return $consumer;

}

}

这里需要特别注意的是PHPstorm的代码检查器好像找不到rdkafka这个扩展,但是没有关系,我没可以在初始化这个类的时候判断一下扩展是否存在。这里只实现了消费者,要使用消费者需要实例化的时候传入消费者的topic,然后调用subscribe方法,接下来实际在easyswoole的mainServiceCreate中创建三个进程来处理kafka的订阅事件

1

2

3

4

5

6

7

8

9

public static function mainServerCreate(EventRegister $register)

{

// TODO: Implement mainServerCreate() method.

//    注册Kafka消费事件, 开三个进程来处理

$allNum = 3;

for($i = 0; $i < $allNum; $i++){

ServerManager::getInstance()->getSwooleServer()->addProcess((new Consumer("consumer_{$i}"))->getProcess());

}

}

这里new的Consumer就是处理消费的进程

Consumer.php

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

<?php

/**

* Created by bingxiong.

* Date: 4/8/19

* Time: 10:22 PM

* Description:

*/

namespace AppLibKafka;

use EasySwooleComponentProcessAbstractProcess;

class Consumer extends AbstractProcess

{

private $isRun = false;

public function run($arg)

{

// 在这里处理kafka连接

// TODO: Implement run() method.

$this->addTick(500,function (){

if(!$this->isRun){

$this->isRun = true;

// 连接kafka并订阅TOPIC

$kafka = new Kafka('xxxxxxxxxxxx');//topic

$consumer = $kafka->subscribe();

while(true){

try{

$message = $consumer->consume(120*1000);

if($message){

switch ($message->err) {

case RD_KAFKA_RESP_ERR_NO_ERROR:

echo 'process name is'.$this->getProcessName().'\n';

echo "partition:", $message->partition,", offset:", $message->offset,", ", $message->payload, "\n";

break;

case RD_KAFKA_RESP_ERR__PARTITION_EOF:

echo "No more messages; will wait for more\n";

break;

case RD_KAFKA_RESP_ERR__TIMED_OUT:

echo "Timed out\n";

break;

default:

throw new Exception($message->errstr(), $message->err);

break;

}

}else{

break;

}

}catch (Throwable $throwable){

break;

}

}

$this->isRun = false;

}

var_dump($this->getProcessName().' task run check');

});

}

public function onShutDown()

{

// TODO: Implement onShutDown() method.

}

public function onReceive(string $str, ...$args)

{

// TODO: Implement onReceive() method.

}

}

这里使用了一个异步任务addTick,如果长期没有消息的话也会每500秒去检查一下有没有新的消息。这里还是用了一个死循环,在这里死循环中持续处理消息过来之后的逻辑

现在已经使用swoole+kafka拿到设备的数据了,接下来就是使用异步任务或者异步redis之类的去执行相应的业务逻辑了。


Original url: Access
Created at: 2019-05-22 16:37:16
Category: default
Tags: none

请先后发表评论
  • 最新评论
  • 总共0条评论