swoole - lulublog

一、前言

1.1、什么是 swoole

swoole是基于C开发的一个php扩展,类似你熟悉的Mysqli、cURL等等。

swoole的作用,其实更多的是解决php在某些方面的缺陷(当然,php是最好的语言),比如即时通讯、异步任务、消息队列等等。

Swoole是PHP语言的高性能网络通信框架,提供了PHP语言的异步多线程服务器,异步TCP/UDP网络客户端,异步MySQL,数据库连接池,AsyncTask,消息队列,毫秒定时器,异步文件读写,异步DNS查询。 Swoole虽然是标准的PHP扩展,实际上与普通的扩展不同。普通的扩展只是提供一个库函数。而swoole扩展在运行后会接管PHP的控制权,进入事件循环。当IO事件发生后,swoole会自动回调指定的PHP函数。

1.2、php的cli模式

PHP 除了可以被 Apache IIS 等服务器调用,还可以通过 cli 模式运行,因为 php 本质上还是 C 语言写的程序。

①、将 php.exe 加入环境变量

②、新建 cli.php

<?php
   echo "hello php cli\n";

③、打开 cmd ,切换到 cli.php 所在目录,输入

php cli.php

④、修改 cli.php

<?php
   echo "hello php cli\n";
   var_dump($\_SERVER\["argc"\]);   //$_SERVER["argc"]  为传递的参数的个数
   var_dump($_SERVER["argv"]);   //S_SERVER["argv"]  为传递的参数的值,以数组表示

⑤、打开 cmd ,切换到 cli.php 所在目录,输入

 php cli.php one two

返回:

hello php cli

int(3)

array(3) {
  [0] =>
  string(8) "test.php"
  [1] =>
  string(3) "one"
  [2] =>
  string(3) "two"
}

1.3、进程和线程

①、进程

对于操作系统而言,进程就是一个任务,比方说你打开了一个记事本,那就启动了一个进程,打开了两个浏览器,就是另外开启了两个进程,再或者说我现在在word内写文章,打开word也会占用一个进程。也就是说,一个进程至少要干一件事情。

对于linux系统而言,如果你想要查看当前系统中运行着哪些进程,可以通过ps命令进行查看。

比如我现在打开一个终端,用vim打开一个文件

vim test.php

打开后这个终端不动,再新打开一个终端,执行ps命令后

ps aux | grep vim
root      8381  0.0  0.4 143844  4724 pts/0    S+   09:30   0:00 vim test.php
root      8876  0.0  0.0 103324   884 pts/2    S+   09:40   0:00 grep vim

可以看到,有两个vim相关的进程在我执行ps的那一霎那还在执行。

②、线程

有些情况下,一个进程会同时做一些事情,比如说word。它可以同时进行打字、拼写检查等操作。注意这里我们说的同时进行。像这样,在一个进程内部,同时运行着多个“子任务”,我们就可以把这些子任务称之为“线程”。即进程是由多个线程组成的,一个进程至少要有一个线程。实际上,线程是操作系统最小的执行单元。

③、多任务的实现

A、试想一下,如果我们要同时执行多个任务怎么办?

根据上文的理解,我们可以:启动多个进程

B、试想一下,如果我们要同时执行多个任务怎么办?根据上文的理解,我们可以

  • 启动多个进程
  • 启动一个进程,并在该进程内启动多个线程
  • 启动多个进程,每个进程内启动多个线程

④、多进程实现

我们举一个实际点的例子:各位熟悉的apache,其实就是一种多进程实现的案例。当父进程监听到有新的请求时,就会fork出新的子进程来对之进行处理。

Linux的fork()函数通过系统调用即可实现创建一个与原进程几乎相同的进程。对于多任务,通常我们会设计Master-Worker模式,即一个Master进程负责分配任务,多个Worker进程负责执行任务。同理,如果是多线程,Master就是主线程,Worker就是子线程。

⑤、多进程与多线程的区别

多进程的优点就是稳定性很高,如果一个进程挂了,不会影响其他子进程,当然,如果主进程挂了那就都玩完(主进程挂点的可能性微乎其微,后面讲进程模型会说到)。而对于多线程,这个恐怕就是致命的缺点了,因为所有线程共享内存,如果某一个线程挂了,那这个进程几乎就崩溃了。

性能方面,不论是进程还是线程,如果启动太多,无疑都会带来CPU的调度问题,因为进程或者线程的切换,本身就非常耗费资源。数量达到一定程度的时候,CPU和内存就消耗殆尽,电脑就死机了。

举一个例子:使用过windows的用户都知道,如果我们打开的软件越多(开启的进程也就越多),电脑就会越卡,甚至装死机没反应。

线程与进程相比,自然是要比进程更轻量一些,而且线程之间是共享内存的,所以不同线程之间的交互就显得容易实现。而对于多进程之间的通信,需要借助消息队列,共享内存等复杂的方式才可以实现。

1.4、IO模型

①、什么是 IO

IO即Input/Output,输入和输出的意思。在计算机的世界里,涉及到数据交换的地方,比如磁盘、网络等,就需要IO接口。

通常,IO是相对的。比如说你打开浏览器,通过网络IO获取我们网站的网页,浏览器首先会往服务器发送请求,这是一个Output操作,随后服务器给浏览器返回信息,这就是一个Input操作。以上都是基于浏览器而言。但是,有些操作就比较特殊。比如程序在运行时,数据被加载在内存中,通过程序往磁盘写数据,对内存而言,这就是单方面的的Output。

②、IO模型

IO模型通常有很多种,我们简单介绍下同步IO和异步IO。

③、同步IO

实际上我们刚刚介绍的浏览器请求服务器的过程正是同步IO的例子。

那我们再比如,假设我们要通过程序往磁盘写大量的数据,如果没有磁盘IO操作,php程序在内存中执行的速度是非常快的,但是磁盘写数据的过程相对而言就是漫长的,CPU就需要等待磁盘IO操作之后才能继续执行其他代码,像上面这两种情况,我们都称之为同步IO。

php本身是单线程的,当php进程被挂起的时候,像上面的读取磁盘数据,往磁盘写数据,在IO操作之前php代码就没办法继续执行了。

因为IO操作阻塞了当前线程,如果某用户也想从磁盘上读取或者写数据,就需要等待。

有些人要反驳了,这不对呀,我经历不是这样的,很多人可以同时访问我的网站,这没问题的。

这个没必要纠结,php本身是单进程单线程的,用户可以同时访问你的网站实际上是web服务器的功劳。这就是我们之前讨论过的,如何解决多任务的问题。

web服务器的进程模型暂时不多讨论,免得懵。

如果不考虑web服务器,是不是当前进程一旦阻塞,其他人访问php都会被阻塞啦?答案是肯定的。要解决这个问题,有回到我们一直强调的多进程或者多线程。

但是,如果为了解决并发问题,系统开启了大量的进程,就像我们之前说的,操作系统在进程或者线程间切换同样会造成CPU大量的开销。有没有更好的解决方案呢?

④、异步IO

答案就就是异步IO。我们再来强调一遍异步IO是要解决什么问题的:同一线程内,执行一些耗时的任务时,其他代码是不能继续执行的,要等待该任务操作完之后才可以。

异步IO是什么样的呢?当程序需要执行一个非常耗时的IO操作的时候,它只发出IO指令,不需要等待IO的结果,然后可以继续执行其他的代码了。当IO返回结果时,再通知CPU去处理,这就是异步IO。

总结:同步IO模型下,主线程只能被挂起等待,但是在异步IO模型中,主线程发起IO指令后,可以继续执行其他指令,没有被挂起,也没有切换线程的操作。由此看来,使用异步IO明显可以提高了系统性能。

1.5、TCP/IP和UDP

①、浏览器访问网站的过程

平时我们打开一个浏览器,然后输入网址后回车,即展现了一个网页的内容。这是一个非常简单的操作。我们来简单的概括下背后的逻辑。

  • 浏览器通过TCP/IP协议建立到服务器的TCP连接
  • 客户端向服务器发送HTTP协议请求包,请求服务器里的资源文档
  • 服务器向客户端发送HTTP协议应答包,如果请求的资源包含有动态语言的内容,那么服务器会调用动态语言的解释引擎负责处理“动态内容”,并将处理得到的数据返回给客户端
  • 客户端与服务器断开,由客户端解释HTML文档,在客户端屏幕上渲染图形结果

表面上看这就是两台电脑之间进行的一种通信。

更确切的说,是两台计算机上两个进程之间的通信。你打开浏览器相当于启动了一个浏览器进程,而服务端事先也启动了某个进程,在某个端口监听,时刻等待客户端的连接。

那么问题来了,为什么客户端可以请求到服务器呢?服务器上跑那么多服务,又是怎么知道客户端想要什么呢?

其实答案很简单,因为有网络。计算机为了联网,就必须遵循通信协议。早期的互联网有很多协议,但是最重要的就非TCP协议和IP协议莫属了。所以,我们把互联网的协议简称为TCP/IP协议。

②、IP协议

想必大家都有过这样的经历,客户端通过telnet连接服务器的时候,往往都需要一个ip地址和一个端口。如果客户端跟服务器之间有数据的交互,其过程大致是这样的:

IP协议负责把你本机的数据发送到服务端,数据被分割成一块一块的。然后通过IP包发送出去。IP包的特点是按块发送,但不保证能到达,也不保证数据块依次到达。

如果是这样进行数据传输,服务器根本不能保证接收到的数据的完整性和顺序性,这样是不是就会有很大的问题呢?

③、TCP协议

于是乎,TCP协议应运而生,它是建立在IP协议之上,专门负责建立可靠连接,并保证数据包顺序到达。TCP协议会通过握手建立连接,然后,对每个IP包编号,确保对方顺序收到,如果出现丢包,则重新发送。

这个时候再说TCP协议是一种面向连接、可靠的、基于IP之上的传出层协议就不难理解了吧。

TCP协议还有一个更重要的特点,它是基于数据流的。

什么意思呢?这就好比客户端和服务端要进行数据交互,中间有一个管子连接着,这个时候交互数据就好比管子中的水,当数据在传输(水在流动)的过程中,服务端是无法知道哪段数据是我们想要的完整数据。怎么解决这一问题呢?这个时候就需要双方约定一个协议来解决了。再往后说就说到应用层协议了,比如http协议,我们姑且不谈。

④、UDP协议

TCP懂了,UDP自然就不难理解了。

相对于TCP,使用UDP协议进行通信的最大区别就是,UDP不需要建立连接,给他一个ip和端口,就可以直接发送数据包了。但是,能不能成功到达就不知道了。虽然UDP传输不可靠,但是速度快。对于一些对数据要求不高的场景,使用UDP通信无疑是不错的选择。

二、swoole

2.1、swoole的安装与升级

windows用户可以使用cygwin环境来学习swoole,但是会有很多问题,下面我们主要介绍下linux环境下swoole的安装。

php版本推荐php5.4+

此外,你应该保证系统中安装了如下软件

gcc-4.4+
make
autoconf

gcc版本不够或者上述软件有一项未安装,下面的操作就没必要往下看了。

注:还需安装以下插件

A、pcre——主要用于swoole_server::connections-

yum install -y pcre pcre-devel

两种安装方式

①、方法一:编译安装

A、下载最新的稳定版,以pecl官网为准,后面针对swoole的学习,我们以1.9.6为准,如果本地已经安装过swoole了,但是版本不一致,可以直接看下面关于swoole升级的讲解。此外,由于swoole还很年轻,各个版本中可能会与我们所讲述的配置有所冲突,建议尽可能的保证你的版本跟我们一致,除非你知道版本本身的差异。

我们切换到 /usr/local/src 目录,你也可以下载到你期望的路径。利用wget下载,提示wget不是命令的请先下载一下这个命令

wget http://pecl.php.net/get/swoole-1.9.6.tgz

B、随后利用tar命令解压,同样tar命令不存在的自行下载

tar zxvf swoole-1.9.6.tgz

C、切换到 swoole-1.9.6 目录

cd swoole-1.9.6

D、找到phpize所在路径,注意要找你要给具体php版本安装的那个phpize,比如我的php源码在 /usr/local/php56/ 目录,phpize路径就是 /usr/local/php56/bin/phpize,在不确保终端下的phpize是哪个版本的php时候,建议指定绝对路径

/usr/local/php/bin/phpize

终端下输入上面的命令后回车即可

E、检查&&编译&&安装

./configure --with-php-config=/usr/local/php/bin/php-config
make 
sudo make install

依次输入上述命令进行操作。

注:如果要支持ssl,需要(当然,要先确保你的系统安装了openssl,php也安装了openssl扩展)

./configure --with-php-config=/usr/local/php/bin/php-config --enable-openssl

②、方法二:PECL安装

方法一的编译安装的过程稍微有一些麻烦,swoole也是pecl的项目,所以,我们还可以通过pecl进行一键安装

pecl install swoole

如果以上步骤一切正常的话,即表示swoole已经成功的安装了。

③、修改 php.ini

成功之后,我们打开php.ini,把swoole.so加入到文件最后

extension=swoole.so

随后通过命令php -m查看swoole是否被正确的安装

php -m | grep swoole

能看到结果即表示安装成功了,当然这是在我们安装过程中一切顺利的情况下进行的。

④、swoole升级

swoole现在还处于发展中,可能我们还没学完,新的版本又要出来了。有同学要说了,过段时间我估计就忘记现在安装的swoole的版本是多少了,这个怎么办?

我们可以通过 php --ri 命令查看swoole版本

php --ri swoole | grep Version

结果

Version => 1.9.6

如果后期发现有新的版本发布了,怎么升级swoole呢?

  • 编译升级:编译升级,只需要从pecl官网下载最新的稳定版,按照我们一开始的编译安装步骤再走一遍就完事了。之前安装的版本不需要过问,这就相当于重新安装一次新版本就好了。友情提醒,尽可能的下载稳定版,非稳定版可能会发生很多意外的事。
  • pecl升级:这个更简单,一条命令搞定

pecl upgrade swoole

结果

swoole

2.2、swoole初识之异步多线程服务器

①、同步和异步

我们在 IO模型 中解释过同步和异步的概念,并非是web开发模式下ajax这种异步的请求。在常见的web开发模式下,我们所碰到的几乎都是同步模式。

为什么这么说?无论是fpm还是httpd,同一时间内一个进程只能处理一个请求,如果当前进程处于繁忙,后面的请求也只能继续等待有新的空闲进程。如果负载稍微上去了些,我们还可以调整fpm和httpd的进程数,即增加worker进程的数量。但是,在服务器资源有限的情况下,随着worker进程数量的递增,系统消耗的资源也会逐步增加,直至over。

swoole是既支持全异步,也支持同步,同步模式我们后面结合fpm再说。

IO模型 中,我们也可以感受到异步很强大。为什么喃?

我们举一个一名老师指导多名学生解题的场景。

同步模式下,当该老师在给某学生A指导题目的时候,嘴里可能一边嘟囔着“这个要这么写...”,话没说完,另一个学生B喊道“老师快来,我这碰到难题了,快过来指导指导”。

“等会,没看见在忙吗?”

然后学生B只能乖乖的等老师给A解答完之后才可以。

异步模式就不同啦,老师在给A指导的同时,B又屁颠屁颠的喊着“老师老师...”,这个时候老师态度上就360大转弯,“来了来了”,顺便跟A说了“你先理解下我刚才说的,等会好了叫我”,然后呢,后面的剧情可能就是这样的

  • A解答完毕跟老师说“谢谢”,B喊老师
  • B先喊老师,A进入B一开始的状态,B解答完毕跟老师说“谢谢”
  • 剧情很多,自己没事想吧

又重温了下什么是同步和异步的概念,禁止混淆。

②、socket编程

socket是什么?

在大部分的书本或者网络文章中,你都能找到一个解释:套接字,是属于应用层和传输层之间的抽象层。真想把发明这词的人拉出来暴打一顿,这也太抽象了。

socket即套接字,是用来与另一个进程进行跨网络通信的文件,说是“文件”,也很好理解哈,因为在linux中一切都可以理解为“文件”。比如客户端可以借助socket与服务器之间建立连接。你也可以把socket理解为一组函数库,它确实也就是一堆函数。

我们知道,常见的网络应用都是基于Client-Server模型的。即一个服务器进程和多个客户端进程组合而成,如果你还理解为是一台电脑对另一台电脑,可以回去把 进程/线程 再看看了。在Client-Server模型中,服务器管理某种资源,并且通过对它管理的资源进行操作来为客户端提供服务。

那Client和Server又如何实现通信呢?这就要利用socket一系列的函数实现了。

基于套接字接口的网络应用的描述,用下面这张图来理解就好。

01.png

大致可以描述为:服务器创建一个socket,绑定ip和端口,在该端口处进行监听,然后通过accept函数阻塞。当有新的客户端连接进来时,server接收客户端数据并处理数据,然后返回给客户端,客户端关闭连接,server关闭该客户端,一次连接交互完成。

③、初识server

server,顾名思义,就是服务器。我们平时接触比较多的无非就是nginx和apache。作为webServer,二者都是通过监听某端口对外提供服务。

下面我们来创建一个简单的server。

A、创建一个server对象

server的创建,只需要绑定要监听的ip和端口,如果ip指定为127.0.0.1,则表示客户端只能位于本机才能连接,其他计算机无法连接。

$serv = new swoole_server("127.0.0.1", 9501);

端口这里指定为9501,可以通过netstat查看下该端口是否被占用。如果该端口被占用,可更改为其他端口,如9502,9503等。

B、配置

swoole的运行模式,默认是多进程模式,这根fpm有点像。怎么体现多进程呢?要开启几个进程才合适呢?

这个就需要我们做一些配置了,但是并非像fpm直接在文件内配置,我们可以在server创建后,通过$serv->set(array())指定配置项。当然,这个配置项也有很多,比如说我们可以指定日志文件记录具体的错误信息等等,你都可以在官网的手册上寻找有哪些配置项,我们也会在贯穿swoole的同时讲解一部分常用的配置项。

这里我们首要说明一下worker进程数的配置。

我们可以指定配置项worker_num等于某个正整数。这个正整数设置多少合适,即我要开多少个worker进程处理我们的业务逻辑才好呢?官方建议我们设置为CPU核数的1-4倍。因为我们开的进程越多,内存的占用也就更多,进程间切换也就需要耗费更多的资源。我们这里设置开启两个worker进程。默认该参数的值等于你机器的CPU核数。

$serv->set([
    "worker_num" => 2,
]);

C、事件驱动

swoole另外一个比较吸引人的地方,就是swoole_server是事件驱动的。我们在使用的过程中不需要关注底层怎么样怎么样,只需要对底层相应的动作注册相应的回调,在回调函数中处理我们的业务逻辑即可。

什么意思呢?我举个例子:

你启动了一个server,当客户端连接的时候,你不需要关心它是怎么连接的,你就单纯的注册一个connect函数,做一些连接后的处理即可。再比如server收到了client传递的数据,你用关心复杂的网络是怎么接受到的吗?不用,你只需要注册一个receive回调,处理数据就这么多。

让我们来看看几种常见的事件回调。

// 有新的客户端连接时,worker进程内会触发该回调
$serv->on("Connect", function ($serv, $fd) {
    echo "new client connected." . PHP_EOL;
});

  • 参数$serv是我们一开始创建的swoole_server对象,
  • 参数$fd是唯一标识,用于区分不同的客户端,同时该参数是1-1600万之间可以复用的整数。

我来解释下复用:假设现在客户端1、2、3处于连接中,客户端4要连接的话$fd就是4,但是不巧的是客户端3连接不稳定,断掉了,客户端4连接到server的话,$fd就是3,这样看的话,实际可能远不止1600W。那1600W个连接够用吗?我的妈呀,你丫单个业务先做到160W再考虑这个问题吧...

// server接收到客户端的数据后,worker进程内触发该回调
$serv->on("Receive", function ($serv, $fd, $fromId, $data) {
    // 收到数据后发送给客户端
    $serv->send($fd, "Server". $data);
});

Receive回调的前两个参数就不说了,刚说完。

上面说到的两个回调,都强调了是在worker进程内触发的。第三个参数$fromId指的是哪一个reactor线程,具体我们会在多进程模型一文中详细分析,先忽略吧。

我们看第四个参数,这个参数就是服务端接受到的数据,注意是字符串或者二进制内容哦,后面我们只谈字符串,不用怕。

注意我们在Receive回调内,调用了$serv的send方法,我们可以使用send方法,向client发起通知。

// 客户端断开连接或者server主动关闭连接时 worker进程内调用
$serv->on("Close", function ($serv, $fd) {
    echo "Client close." . PHP_EOL;
});

注意哦,当客户端与服务端的连接关闭的时候就会调用close回调,有些新手可能习惯性的会在close回调中继续调用$serv->close($fd),人都关闭了才去调用这个方法,你再调用是不是想找事?

到此呢,我们基本上已经搭建到了一个高性能的server。“我什么都没做,这就完啦?好没劲啊”

是的,非常简单,下面我们只需要调用start方法启动server即可。

// 启动server
$serv->start();

如此,便开启了一个server服务。

由于swoole_server只能运行在CLI模式下,所以不要试图通过浏览器进行访问,这样是无效的。不信的可以试试。

我们在命令行下面执行

php server.php

回车。

随后继续回车随便输入点什么都没有效果,感觉当前终端卡住了有木有?

我们平时执行完一个指令,执行完就结束了,但是现在的情况正好相反,当前程序一直处于执行中的状态,并没有退出终端。退出状态一般为当前终端的执行权交给了终端,即可用在终端下进行其他操作。

还记得我们第一步初始化server所填写的ip和端口吗,也就是说server现在正在监听9501端口提供服务。

当前终端暂时不动,我们新开一个终端,看看是不是这样。

netstat -an| grep 9501
tcp 0 0 127.0.0.1:9501 0.0.0.0:* LISTEN

发现本地的9501端口正在被监听对不对?server启动好了能干什么呢?常见的网络编程模式都是client-server的,也就是说我们还需要模拟一个客户端与之交互。

关于客户端,我们可以先通过telnet模拟

01.png

上图中上侧是开启的server窗口,下侧是我们用telnet模拟client的结果。

从结果中可以看出,客户端输入xxx,服务端就会直接返回 Server xxx,这正是我们在Receive回调方法中调用$serv->send方法发送给客户端的数据。而且在server启动的窗口下,也有我们在connect回调打印的信息。

在整个过程中,swoole server提供了类似web服务器的功能,监听端口,做出响应。

此外,swoole还提供了一套对socket客户端的封装,而且啊而且,这个要重点说一下,同步阻塞的swoole_client可以用于php-fpm或者apache环境。

swoole的大部分模块都只能运行在CLI模式下,像我们刚刚在cli下启动的server。但是对于面向web的应用怎么办?所以,swoole_client是我们与服务端交互的一个重要方法,先笔记记下。

下面我们用swoole_client来模拟下客户端。

新建一个Client.php文件。

代码如下:

connect("127.0.0.1", 9501) || exit("connect failed. Error: {$client->errCode}\n");
// 向服务端发送数据
$client->send("hello server.");
// 从服务端接收数据
$response = $client->recv();
// 输出接受到的数据
echo $response . PHP_EOL;
// 关闭连接
$client->close();

我们看到,客户端无非就是创建一个socket对象,然后指定ip和端口,连接server,随后向server发送了一段数据,而后接收server的数据并输出,最后关闭连接。

看下模拟结果

01.png

注意到无论是server还是client,都是在CLI下执行的。

从模拟的结果中我们也可以清晰的看到client与server交互的整个过程。

但是,相信很多人都会有疑问,尤其是phper,server和客户端都这么玩,完全看不到实际应用啊。先慢慢练习吧,我们这才刚打响与swoole之间的战役。

server的关闭,手动执行Ctrl+C即退出。

2.3、swoole之task初体验

①、task初体验

在上文和IO模型中我们都对同步和异步进行了详细的解释,可能你们都懂了,可能部分人还是没懂,毕竟异步始终是个抽象的概念。

今天我们再来强化下这个概念,说一说Async Task。

AsyncTask,即异步任务。我们可以利用AsyncTask将一个耗时的任务投递到队列中,由进程池异步去执行。

博主你说人话,啥是异步任务?

总有些人吐槽不知道swoole的应用场景是啥,我们就以实际中遇到的问题为例:

  • 情景一:管理员需要给指定的用户发送邮件,当勾选10封甚至更多封的时候,点击发送,浏览器会一直转圈,直到邮件全部发送完毕。
  • 情景二:大家都爱看小说,我们以某小说网站的一个需求为例:要求作者可以把他事先写好的小说直接批量导入到网站(根据某种规则),这个操作起来同样会比较耗时。

从我们理解的角度思考,这其实都是php线程一直被阻塞,客户端才一直在等待服务端的响应。

对用户而言,这就是漫长的等待。如何优雅的提高用户体验就是一个非常棘手的问题。

我们的目的就是当用户选了10000封邮件或者提交了他含有500章节的内容之后,及时的通知用户邮件正在发送中或者提示用户章节内容正在上传中,对不对?明白我们今天的重点了吗?

对,你没理解错,AsyncTask的目的就是这个。下面我们来介绍下AsyncTask的使用。

②、先创建一个server

$serv = new swoole_server("127.0.0.1", 9501);

③、开启task功能

task功能默认是关闭的,开启task功能需要满足两个条件:

  • 配置task进程的数量
  • 注册task的回调函数onTask和onFinish

配置task进程的数量,即配置task_worker_num这个配置项。比如我们开启一个task进程

$serv->set([
    "task_worker_num" => 1,
]);

④、task怎么使用?

task进程其实是要在worker进程内发起的,即我们把需要投递的任务,通过worker进程投递到task进程中去处理。

怎么操作呢?我们可以利用swoole_server->task函数把任务数据投递到task进程池中。

swoole_server->task函数是非阻塞函数,任务投递到task进程中后会立即返回,即不管任务需要在task进程内处理多久,worker进程也不需要任何的等待,不会影响到worker进程的其他操作。但是task进程却是阻塞的,如果当前task进程都处于繁忙状态即都在处理任务,你又投递过来100个甚至更多任务,这个时候新投递的任务就只能乖乖的排队等task进程空闲才能继续处理。

如果投递的任务量总是大于task进程的处理能力,建议适当的调大task_worker_num的数量,增加task进程数,不然一旦task塞满缓冲区,就会导致worker进程阻塞,这将是我们不期望的结果。

我们写一个例子来解释下上面所说的内容。

$serv->on("Connect", function ($serv, $fd) {
    echo "new client connected." . PHP_EOL;
});
$serv->on("Receive", function ($serv, $fd, $fromId, $data) {
    echo "worker received data: {$data}" . PHP_EOL;

    // 投递一个任务到task进程中
    $serv->task($data);

    // 通知客户端server收到数据了
    $serv->send($fd, "This is a message from server.");

    // 为了校验task是否是异步的,这里和task进程内都输出内容,看看谁先输出
    echo "worker continue run."  . PHP_EOL;
});

⑤、注册onTask回调

/**
 * $serv swoole_server
 * $taskId 投递的任务id,因为task进程是由worker进程发起,所以多worker多task下,该值可能会相同
 * $fromId 来自那个worker进程的id
 * $data 要投递的任务数据
 */
$serv->on("Task", function ($serv, $taskId, $fromId, $data) {
    echo "task start. --- from worker id: {$fromId}." . PHP_EOL;
    for ($i=0; $i < 5; $i++) {
        sleep(1);
        echo "task runing. --- {$i}" . PHP_EOL;
    }
    echo "task end." . PHP_EOL;
});

为了模拟判断到底是不是异步的,我们在task的回调中循环一个耗时任务,另一个需要注意的地方,我们在task回调内的结尾并没有return任何内容。

⑥、注册onFinish回调

/**
 * 只有在task进程中调用了finish方法或者return了结果,才会触发finish
 */
$serv->on("Finish", function ($serv, $taskId, $data) {
    echo "finish received data "{$data}"" . PHP_EOL;
});

⑦、最后,调用server的start方法

$serv->start();

整个过程是这样的:我们在worker进程收到数据后,直接调用swoole_server->task函数把数据投递给task进程,随后在swoole_server->task调用后和task进程内都输出内容。

⑧、执行结果

准备就绪之后我们在终端下启动server,执行

php server.php

客户端的测试,我们仍然利用上文在client.php写好的代码进行测试,新开一个终端,执行

php client.php

一起看下测试结果:

服务端

new client connected.
worker received data: hello server.
worker continue run.
task start. --- from worker id: 3.
client closed
task runing. --- 0
task runing. --- 1
task runing. --- 2
task runing. --- 3
task runing. --- 4
task end.

客户端

This is a message from server.

从测试结果中,我们看到在swoole_server的task函数之后输出的内容“worker continue run”在task进程开始之前输出。第二个应该引起你注意的是在结果中我们并没有看到在onFinish回调中输出的信息,我们把task回调函数的最后一句echo改为return再试一次。

return "task end." . PHP_EOL;

如果你修改了代码之后,直接去执行client.php,你会发现结果并没有任何变化。

我们在server启动的那个终端下,按Ctrl+C退出,然后再重新启动server

php server.php

发现了什么?有没有看到server终端下面的最后一行显示的信息变了?

finish received data "task end.";

怎么回事,为什么是这样的呢?大白天见鬼啦?为什么要重启下server代码才生效呢?

这个问题跟常驻内存有关,我们准备后面单独增加一个章节说说这个事。

在结果中我们看到了在onFinish回调中打印的信息。为什么这个时候能输出onFinish回调的内容了呢?

这是因为task进程内一旦return或者调用swoole_server->finish方法,就会通知到worker进程该任务已经完成,worker进程会继续触发onFinish回调,进一步对投递的结果进行处理。

这个过程有没有必要呢?讲真话,还真得看自己的业务需求。比如我们以开篇抛出的情境一发送邮件为例,如果我们在task进程内发送完邮件就完事了,不需要关注邮件是否发送成功,反正发不发也无所谓,这个时候就没必要调onFinish回调了。但是如果说我们还需要确认发送的邮件是否成功,没成功还要再继续发,这个时候我们就可以在onFinish回调中继续处理task的结果了。

⑨、总结

  • 没有耗时任务的情况下,worker直接运行,无需开启task
  • 对于耗时的任务,可以在worker内调用task函数,把异步任务投递给task进程进行处理,task进程的数量取决于task_worker_num的配置
  • task进程内可以选择调用finish方法或者return,来通知worker进程此任务已完成,worker进程会在onFinish回调中对task的执行结果进一步处理。如果worker进程不关心任务的结果,finish就不需要了。

2.4、swoole之进程模型

①、引入 Master-Manager-Worker 模式

swoole是事件驱动的。在使用swoole的过程中,我们也体会到,swoole的使用非常简单,仅仅注册相应的回调处理我们的业务逻辑即可。

但是,在继续学习swoole之前,我们有必要再看一看swoole的运行流程和进程模型。

前面两篇文章我们已经对server和task做了简单的介绍,后面再对server的创建以及脚本的执行,如无特殊说明均在CLI下执行,我就不啰嗦了。

$serv = new swoole_server("127.0.0.1", 9501);
$serv->set([
    "worker_num" => 2,
    "task_worker_num" => 1,
]);
$serv->on("Connect", function ($serv, $fd) {
});
$serv->on("Receive", function ($serv, $fd, $fromId, $data) {
});
$serv->on("Close", function ($serv, $fd) {
});
$serv->on("Task", function ($serv, $taskId, $fromId, $data) {
});
$serv->on("Finish", function ($serv, $taskId, $data) {
});

$serv->start();

注意这里我们选择了两个worker进程个一个task进程,那是不是就意味着创建这个server就是开启了3个进程呢?我们来看下

新开一个终端,我们用ps命令看下结果

ps aux | grep server-process
root     21843  xxx... php server-process.php
root     21844  xxx... php server-process.php
root     21846  xxx... php server-process.php
root     21847  xxx... php server-process.php
root     21848  xxx... php server-process.php
root     21854  xxx... grep --color=auto server-process

为了方便阅读,ps的结果中部分不重要数据已经被稍加处理了。

排除最后一个结果(最后一个是我们运行的ps命令)我们发现,竟然有多达5个相似的进程在运行,按照我们理解,不应该是3个吗,怎么多了两个呢?

还记得我们在进程/线程一文中说过的多进程的实现吗?我们说到多进程的实现一般会被设计Master-Worker模式,常见的nginx默认的多进程模式也正是如此,当然swoole默认的也是多进程模型。

相比Master-Worker模式,swoole的进程模型可以用Master-Manager-Worker来形容。即在Master-Worker的基础上又增加了一层Manager进程。这也就解答了我们开头抛出的问题为什么是5个进程而不是3个进程了。(1个Master进程+1个Manager进程+2个Worker进程+1个Task进程)

正所谓“存在即合理”,我们来看一下Master\Manager\Worker三种进程各自存在的原因。

②、介绍 Master\Manager\Worker 三种进程

Master进程是一个多线程程序。注解:按照我们之前的理解,多个线程是运行在单一进程的上下文中的,其实对于单一进程中的每一个线程,都有它自己的上下文,但是由于共同存在于同一进程,所以它们也共享这个进程,包括它的代码、数据等等。

再回来继续说Master进程,Master进程就是我们的主进程,掌管生杀大权,它挂了,那底下的都得玩完。Master进程,包括主线程,多个Reactor线程等。

每一个线程都有自己的用途,比如主线程用于Accept、信号处理等操作,而Reactor线程是处理tcp连接,处理网络IO,收发数据的线程。

A、说明两点:

  • 主线程的Accept操作,socket服务端经常用accept阻塞,上一节介绍socket编程的时候有一张配图,可以看看
  • 信号处理,信号就相当于一条消息,比如我们经常操作的Ctrl+C其实就是给Master进程的主线程发送一个SIGINT的信号,意思就是你可以终止啦,信号有很多种,后面还有介绍

B、Reactor线程

通常,主线程处理完新的连接后,会将这个连接分配给固定的Reactor线程,并且这个Reactor线程会一直负责监听此socket(上文中后面对socket更新为socket即套接字,是用来与另一个进程进行跨网络通信的文件,文件可读可写),换句话就是说当此socket可读时,会读取数据,并将该请求分配给worker进程,这也就解释了我们在swoole初识讲解worker进程内的回调onReceive的第三个参数$fromId的含义;当此socket可写时,会把数据发送给tcp客户端。

用一张图清晰的梳理下

01.png

那swoole为啥不能像Nginx一样,是Master-Worker进程结构的呢?Manager进程是干啥的?

这个我正准备说。

C、Manager进程

我们知道,在Master-Worker模型中,Master只有一个,Worker是由父进程Master进程复制出来的,且Worker进程可以有多个。

注解:在linux中,父进程可以通过调用fork函数创建一个新的子进程,子进程是父进程的一个副本,几乎但不完全相同,二者的最大区别就是都拥有自己独立的进程ID,即PID。

对于多线程的Master进程而言,想要多Worker进程就必须fork操作,但是fork操作是不安全的,所以,在swoole中,有一个专职的Manager进程,Manager进程就专门负责worker/task进程的fork操作和管理。换句话也就是说,对于worker进程的创建、回收等操作全权有“保姆”Manager进程进行管理。

通常,worker进程被误杀或者由于程序的原因会异常退出,Manager进程为了保证服务的稳定性,会重新拉起新的worker进程,意思就是Worker进程你发生意外“死”了,没关系,我自身不“死”,就可以fork千千万万个你。

当然,Master进程和Manager进程我们是不怎么关心的,从前面两篇文章我们了解到,真正实现业务逻辑,是在worker/task进程内完成的。

再来一张图梳理下Manager进程和Worker/Task进程的关系。

01.png

D、区分进程

再回到我们开篇抛出的的5个进程的问题,ps的结果简直一模一样,有没有办法能区分这5个进程哪个是哪个呢?

有同学要说啦,既然各个进程之间存在父子关系,那我们就可以通过linux的pstree命令查看结果。

pstree | grep server-process

 | |   \-+= 02548 manks php server-process.php

 | |     \-+- 02549 manks php server-process.php

 | |       |--- 02550 manks php server-process.php

 | |       |--- 02551 manks php server-process.php

 | |       \--- 02552 manks php server-process.php

 |     \--- 02572 manks grep server-process

注:centos下命令可修改为 pstree -ap | grep server-process

从结果中我们可以看出,进程id等于02548的进程就是Master进程,因为从结构上看就它是“父”嘛,02549是Manager进程,Worker进程和Task进程就是02550、02551和02552了(每个人的电脑上显示的进程id可能不同,但顺序是一致的,依照此模型分析即可)。

我们看到pstree命令也只能得到大致结果,而且在事先不知道的情况下,根本无法区分Worker进程和Task进程。

在swoole中,我们可以在各个进程启动和关闭的回调中去解决上面这个问题。各个进程的启动和关闭?那岂不是又要记住主进程、Manager进程、Worker进程,二三得六,6个回调函数?

是的,不过这6个是最简单也是最好记的,你实际需要了解的可能还要更多。

Master进程:
    启动:onStart
    关闭:onShutdown
Manager进程:
    启动:onManagerStart
    关闭:onManagerStop
Worker进程:
    启动:onWorkerStart
    关闭:onWorkerStop

提醒:task_worker也会触发onWorkerStart回调。

是不是很好记?那我们就在server-process.php中通过上面这几种回调来实现对各个进程名的修改。

$serv->on("start", function ($serv){
    swoole_set_process_name("server-process: master");
});
// 以下回调发生在Manager进程
$serv->on("ManagerStart", function ($serv){
    swoole_set_process_name("server-process: manager");
});
$serv->on("WorkerStart", function ($serv, $workerId){
    if($workerId >= $serv->setting["worker_num"]) {
        swoole_set_process_name("server-process: task");
    } else {
        swoole_set_process_name("server-process: worker");
    }
});

ps aux | grep server-process
root     27546  xxx... server-process: master
root     27547  xxx... server-process: manager
root     27549  xxx... server-process: task worker
root     27550  xxx... server-process: worker
root     27551  xxx... server-process: worker
root     27570  xxx... grep --color=auto simple

运行结果谁是谁一目了然,简直了!

有同学傻眼了,说在workerStart回调中写的看不明白,worker进程和task进程怎么区分的?

我来解释一下:在onWorkerStart回调中,$workerId表示的是一个值,这个值的范围是0~worker_num,worker_num是我们的对worker进程的配置,其中0~worker_num表示worker进程的标识,包括0但不包括worker_num;worker_num~worker_num+task_worker_num是task进程的标识,包括worker_num不包括worker_num+task_worker_num。

按照高中学的区间的知识可能更好理解,以我们案例的配置,workerId的值的范围就是[0,2],[0,2)表示worker进程,[2,3)就表示task_worker进程。

swoole的进程模型很重要,本节掌握不好,后面的理解可能就会有些问题。

补充:我们在onWorkerStart的回调中,用了serv−>setting去获取配置的 server 信息,在swoole中预留了一些swooleserver的属性,我们可以在回调函数中访问。比如说我们可以用serv->connections属性获取当前server的所有的连接,再比如我们可以通过$serv->master_pid属性获取当前server的 主进程id 等等。

2.5、常驻内存以及如何避免内存泄漏

①、传统 web 开发模式之内存开销

Task初体验一节中我们提到,server中的代码修改之后,要先按Ctrl+C终止server再重新启动下server才会生效,当时我们一言以过之,本节我们主要就来看看这个常驻内存相关的事。

在传统的web开发模式中,我们知道,每一次php请求,都要经过php文件从磁盘上读取、初始化、词法解析、语法解析、编译等过程,而且还要与nginx或者apache通信,如果再涉及数据库的交互,还要再算上数据库的握手、验权、关闭等操作,可见一次请求的背后其实是有相当繁琐的过程,无疑,这个过程也就带来了相当多的开销!当然,所有的这些资源和内存,在一次请求结束之前,都会得到释放。

②、swoole 常驻内存

但是,swoole是常驻内存运行的。这有几点不同,我们分别了解下。

  • 在运行server之后所加载的任何资源,都会一直持续在内存中存在。也就是说假设我们开启了一个server,有100个client要connect,加载一些配置文件、初始化变量等操作,只有在第一个client连接的时候才有这些操作,后面的client连接的时候就省去了重复加载的过程,直接从内存中读取就好了。这样好不好呢?很明显非常好,如此一来还可以提升不小的性能。但是,对开发人员的要求也更高了。因为这些资源常驻内存,并不会像web模式下,在请求结束之后会释放内存和资源。也就是说我们在操作中一旦没有处理好,就会发生内存泄漏,久而久之就可能会发生内存溢出。之前一直对swoole印象不错,没想到都是坑。其实这都不算坑,如果你觉得是坑,权且当做是一种提升自身能力的约束好了。
  • 回到我们的开篇提到的问题上,再啰嗦的解释一遍:server一开始就把我们的代码加载到内存中了,无论后期我们怎么修改本地磁盘上的代码,客户端再次发起请求的时候,永远都是内存中的代码在生效,所以我们只能终止server,释放内存然后再重启server,重新把新的代码加载到内存中,如此,明白否?那有同学要说了,感觉好麻烦,是不是说在swoole中申请的内存啥的都要自己手动unset释放呢?对于局部变量,就没必要操这个心了,swoole会在事件回调函数返回之后释放。但是对于全局变量你就要悠着点了,因为他们在使用完之后并不会被释放。不会被释放?那在php中,这几种全局变量:global声明的变量,static声明的对象属性或者函数内的静态变量和超全局变量谁还敢用?一个不小心服务器直接就玩完的节奏!

③、为什么要用全局变量?

我们想一下为什么要用全局变量?

是不是就是想全局共享?但是,在多进程开发模式下,进程内的全局变量所用的内存那也是保存在子进程内存堆的,也并非共享内存,所以在swoole开发中我们还是尽量避免使用全局变量!

那我要是非用不可呢?就是乐意,就是想用。

④、如何避免内存泄漏?

比如有一个static大数组,用于保存客户端的连接标识。我们就可以在onClose回调内清理变量。

此外,swoole还提供了max_request机制,我们可以配置max_request和task_max_request这两个参数来避免内存溢出。

  • max_request的含义是worker进程的最大任务数,当worker进程处理的任务数超过这个参数时,worker进程会自动退出,如此便达到释放内存和资源的目的。不必担心worker进程退出后,没“人”处理业务逻辑了,因为我们还有Manager进程,Worker进程退出后Manager进程会重新拉起一个新的Worker进程。
  • task_max_request针对task进程,含义同max_request。

光溜溜的说了半天,我们来看下是不是这么玩的。

server的代码简写如下

$serv = new swoole_server("127.0.0.1", 9501);

$serv->set([
    "worker_num" => 1,
    "task_worker_num" => 1,
    "max_request" => 3,
    "task_max_request" => 4,
]);
$serv->on("Connect", function ($serv, $fd) {
});
$serv->on("Receive", function ($serv, $fd, $fromId, $data) {
    $serv->task($data);
});
$serv->on("Task", function ($serv, $taskId, $fromId, $data) {

});
$serv->on("Finish", function ($serv, $taskId, $data) {
});
$serv->on("Close", function ($serv, $fd) {
});
$serv->start();

client代码如下

$client = new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_SYNC);
$client->connect("127.0.0.1", 9501) || exit("connect failed. Error: {$client->errCode}\n");

// 向服务端发送数据
$client->send("Just a test.");
$client->close();

为了方便测试,我们开了一个Worker进程,一个Task进程,Worker进程的最大任务设置为3次,Task进程的最大任务设置为4次。

运行server后,在client未请求前我们看下当前的进程结构

01.png

注意进程id等于15644和15645哦,这两个一个是Worker进程,一个是Task进程。Mac下我们就不区分到底谁是谁了。

随后我们让客户端请求3次,再看下结果

01.png

有没有发现原先进程id等于15645的现在变成15680了?请求3次后我们确定是Worker进程自动退出了,并且Manager进程拉起了一个15680的Worker进程。

我们再请求一次,第四次

01.png

发现进程id等于15644的Task进程消失了,有一个新的子进程15704被重新创建了。

看来官方没有骗人,说的都对。

So…原来我在一开始介绍的那么多都是废话?

不全是,因为max_request参数对server有下面几种限制条件。

  • max_request只能用于同步阻塞、无状态的请求响应式服务器程序
  • 纯异步的Server不应当设置max_request
  • 使用Base模式时max_request是无效的,其中Base模式是swoole运行模式的一种,我们主要介绍多进程模式

⑤、总结

  • 常驻内存减少了不小开销,swoole不错
  • 应尽量避免使用全局变量,不用最好,没啥用
  • max_request可以解决php的内存溢出问题,但是主要还是要养成释放内存的习惯,因为max_request也有限制场景

2.6、swoole之守护进程、信号和平滑重启

①、守护进程

之前我们介绍过进程和线程,今天我们再来谈一谈守护进程。

无论是server初识还是task邂逅,不管我们程序写的多么精彩,都没有办法把项目应用到实际业务中,因为我们知道,把运行server的终端关闭之后,server也就不复存在了。

那有没有一种办法说仅且当电脑关机的时候才终止server的运行,不管终端怎么玩,server也能够在后台持续运行呢?

守护进程(daemon)就是一种长期生存的进程,它不受终端的控制,可以在后台运行。其实我们之前也有了解,比如说nginx,fpm等一般都是作为守护进程在后台提供服务。

熟悉linux的同学可能知道,我们可以利用nohup命令让程序在后台跑。swoole官方也为我们提供了配置选项daemonize,默认不启用守护进程,若要开启守护进程,daemonize设置为true即可。

守护进程有优点,必然也存在缺点。我们启用守护进程后,server内所有的标准输出都会被丢弃,这样的话我们也就无法跟踪进程在运行过程中是否异常之类的错误信息了。为方便起见,swoole为我们提供了另一个配置选项log_file,我们可以指定日志路径,这样swoole在运行时就会把所有的标准输出统统记载到该文件内。

②、信号

学习本文之前,我们了解到,swoole是常驻内存的,若想让修改后的代码生效,就必须Ctrl+C,然后再重启server。对于守护进程化的server呢?了解过kill命令的同学要说了,我直接把它干掉,然后终端下再重启,就可以了。

事实上,对于线上繁忙的server,如果你直接把它干掉了,可能某个进程刚好就只处理了一半的数据,对于天天来回倒腾的你来说,面对错乱的数据你不头疼,DBA也想long死你!

这个时候我们就需要考虑如何平滑重启server的问题了。所谓的平滑重启,也叫“热重启”,就是在不影响用户的情况下重启服务,更新内存中已经加载的php程序代码,从而达到对业务逻辑的更新。

swoole为我们提供了平滑重启机制,我们只需要向swoole_server的主进程发送特定的信号,即可完成对server的重启。

我们在进程模型一文中介绍主进程的主线程的时候也提到过主线程的主要任务之一就是处理信号。

那什么是信号呢?

信号是软件中断,每一个信号都有一个名字。通常,信号的名字都以“SIG”开头,比如我们最熟悉的Ctrl+C就是一个名字叫“SIGINT”的信号,意味着“终端中断”。

③、平滑重启

在swoole中,我们可以向主进程发送各种不同的信号,主进程根据接收到的信号类型做出不同的处理。比如下面这几个

  • SIGTERM,一种优雅的终止信号,会待进程执行完当前程序之后中断,而不是直接干掉进程
  • SIGUSR1,将平稳的重启所有的Worker进程
  • SIGUSR2,将平稳的重启所有的Task进程

如果我们要实现重启server,只需要向主进程发送SIGUSR1信号就好了。

平滑重启的原理:当主进程收到SIGUSR1信号时,主进程就会向一个子进程发送安全退出的信号,所谓的安全退出的意思是主进程并不会直接把Worker进程杀死,而是等这个子进程处理完手上的工作之后,再让其光荣的“退休”,最后再拉起新的子进程(重新载入新的PHP程序代码)。然后再向其他子进程发送“退休”命令,就这样一个接一个的重启所有的子进程。

我们注意到,平滑重启实际上就是让旧的子进程逐个退出并重新创建新的进程。为了在平滑重启时不影响到用户,这就要求进程中不要保存用户相关的状态信息,即业务进程最好是无状态的,避免由于进程退出导致信息丢失。

感觉很美好的样子,凡是重启只要简单的向主进程发送信号就完事了呗。

理想很丰满,现实并非如此。

在swoole中,重启只能针对Worker进程启动之后载入的文件才有效!什么意思呢,就是说只有在onWorkerStart回调之后加载的文件,重启才有意义。在Worker进程启动之前就已经加载到内存中的文件,如果想让它重新生效,还是只能乖乖的关闭server再重启。

lsof -i:9501
kill -9 pid

说了这么多,我们写个例子看看到底怎么样向主进程发送SIGUSR1信号以便有效重启Worker进程。

首先我们创建一个Test类,用于处理onReceive回调的数据,为什么要把onReceive回调的业务拿出来单独写,看完例子你就明白了。

<?php

class Test
{
    public function run($data)
    {
        echo $data;
    }
}

在Test::run方法中,我们第一步仅仅是echo输出swoole_server接收到的数据。

当前目录下我们创建一个swoole_server的类NoReload.php

_serv = new Swoole\Server("127.0.0.1", 9501);
        $this->_serv->set([
            "worker_num" => 1,
        ]);
        $this->_serv->on("Receive", \[$this, "onReceive"]);

        $this->_test = new Test;
    }
    /**
     * start server
     */
    public function start()
    {
        $this->_serv->start();
    }
    public function onReceive($serv, $fd, $fromId, $data)
    {
        $this->_test->run($data);
    }
}

$noReload = new NoReload;
$noReload->start();

特别提醒:我们在初始化swoole_server的时候的写法是命名空间的写法

new Swoole\Server

该种风格的写法等同于下划线写法 ,swoole对这两种风格的写法都支持

new swoole_server

此外我们看下server的代码逻辑:类定义之前require_once了Test.php,初始化的时候设置了一个Worker进程,注册了NoReload::onReceive方法为swoole_server的onReceive回调,在onReceive回调内接收到的数据传递给了Test::run方法处理。

接下来我们写一个client脚本测试下运行结果

connect("127.0.0.1", 9501) || exit("connect failed. Error: {$client->errCode}\n");
// 向服务端发送数据
$client -> send("Just a test.\n");
$client->close();

客户端的测试代码也很简单,连接server并向server发一个字符串信息

01.png

正常,没发现问题,server所在终端输出了客户端send的内容。

在server不动的情况下我们修改下Test.php,代码如下

<?php
class Test
{
    public function run($data)
    {
        // echo $data;
        $data = json_decode($data, true);
        if (!is_array($data)) {
            echo "server receive \$data format error.\n";
            return ;
        } 
        var_dump($data);
    }
}

原先echo直接输出,现在我们改了下Test的代码,如果接收到的数据经过json_decode处理后不是数组,就返回一段内容并结束,否则打印receive到的数据

如果这个时候我们不对server进行重启,运行client的结果肯定还是一样的,看下结果

01.png

server端新的代码未生效,如果Test.php新的代码生效了,会在server所在终端输出“server receive $data format error.”,这符合我们的认知。

下面我们通过ps命令查看下左侧server的主进程的pid,然后通过kill命令向该进程发送SIGUSR1信号,看看结果如何

01.png结果发现即使向主进程发送了SIGUSR1信号,但是左侧server终端显示的依然是未生效的php代码,这也是对的,因为我们说过新的程序代码只针对在onWorkerStart回调之后才加载进来的php文件才能生效,我们事例中Test.php是在class定义之前就加载进来了,所以肯定不生效。

我们新建一个Reload.php文件,把server的代码修改如下

<?php

class Reload
{
    private $_serv;
    private $_test;

    /**
     * init
     */
    public function __construct()
    {
        $this->_serv = new Swoole\Server("127.0.0.1", 9501);
        $this->_serv->set([
            "worker_num" => 1,
        ]);
        $this->_serv->on("Receive", \[$this, "onReceive"]);
        $this->_serv->on("WorkerStart", \[$this, "onWorkerStart"]);
    }
    /**
     * start server
     */
    public function start()
    {
        $this->_serv->start();
    }
    public function onWorkerStart($serv, $workerId)
    {
        require_once("Test.php");
        $this->_test = new Test;
    }
    public function onReceive($serv, $fd, $fromId, $data)
    {
        $this->_test->run($data);
    }
}

$reload = new Reload;
$reload->start();

仔细观察,我们仅仅移除了在类定义之前引入Test.php以及在__construct中new Test的操作。

而是在__construct方法中增加了onWorkerStart回调,并在该回调内引入Test.php并初始化Test类。

Test.php的代码,我们仍然先后用上面的两处代码为例,运行client看下结果

01.png

图例右侧运行client过程中,给主进程发送SIGUSR1信号之前,记得修改Test.php的代码,然后再运行client脚本测试。

结果我们发现,在给主进程发送SIGUSR1信号之后,Test.php的新代码生效了。这也便实现了热重启的效果。

如此,我们在Test.php中不论如何更新代码,只需要找到主进程的PID,向它发送SIGUSR1信号即可。同理,SIGUSR2信号是只针对Task进程的,后面可以自行测试下。

热重启的效果实现了,现在针对Reload.php的server,让该server进程守护化看看。

__construct中,$serv->set代码修改如下

$this->_serv->set([   
    "worker_num" => 1,          
    "daemonize" => true,          
    "log_file" => __DIR__ . "/server.log",
]);

我们在终端下在运行下Reload.php

php Reload.php

代码好像突然就执行完毕了,现在终端不“卡”着了,终端的执行权又重新交给了终端,我们的server呢?怎么回事?

其实这就是守护进程化的概念,我们开启的swoole_server进程已经在后端跑着了,不信我们ps看下

ps aux | grep Reload
manks 14117   xxx...    1:51下午   0:07.49 php Reload.php
manks 14117   xxx...    1:51下午   0:07.47 php Reload.php
manks 36807   xxx...    1:54下午   0:00.00 grep Reload
manks 14116   xxx...    1:51下午   0:00.01 php Reload.php

发现还真有几个进程在跑着。

不光如此,我们再看下当前目录下是不是有一个server.log的日志文件,我们在swoole_server::set的log_file配置项指定了日志文件就是它,那么在server运行的过程中,所有的标准输出都会输出到这个文件中,此时我们再运行下client.php,然后打开server.log看看是不是终端输出的结果都显示在该文件内了呢?毋庸置疑。

注意!!!!当使用daemonize,需要在启动的回调里面同时改变一下目录,否则会有奇怪的事情发生

public function onWorkerStart($serv, $workerId){
        chdir(__DIR__);
        require_once "Test.php";
        $this->_test = new Test;
    }

2.7、swoole之定时器

①、前言

说起定时器,大家都不陌生。我最早接触定时器的概念,是javascript的setInterval和setTimeout这两个函数,前者会持续执行,后者仅会执行一次。

在后端开发中,一些涉及到定时器相关的需求,比如数据库备份,排行榜数据更新等,通常我们可以借助linux的crontab工具实现。但是对于一些想精确到秒级别或者想暂停定时器的需求,就相对麻烦一些了。

直到swoole的诞生,异步毫秒级的定时器真的是好用到没得说。

相对与javascript的setInterval和setTimeout,swoole也提供了永久性定时器和一次性定时器,我们分别来看下怎么玩。

②、永久性定时器

所谓的永久性定时器,就是在设定好定时器之后,该定时器就会按照一定的时间间隔执行,直到该定时器被删除。

这种类型的定时器,我们可以使用swoole_timer_tick函数创建,该函数接收3个参数,原型如下

int swoole_timer_tick(int $ms, callable $callback, mixed $params);

  • $ms 指时间,单位毫秒
  • $callback 回调函数,定时器创建后会调用该函数
  • $params 传递给回调函数的参数

即创建一个ms毫秒后执行callback的定时器。

来看一个简单的例子:tick.php

<?php
swoole_timer_tick(1000, function () {
    echo "This is a tick.\n";
});

案例中我们创建了一个永久性定时器,每1000毫秒即每秒执行一次回调函数,输出"This is a tick.\n"。

定时器的清除,可以使用swoole_timer_clear函数操作,该函数接收一个参数,定时器的id,函数原型如下

bool swoole_timer_clear(int $timerId)

再来看一个稍微完整的例子:tick-2.php

<?php

$i = 0;

swoole_timer_tick(1000, function ($timeId, $params) use (&$i) {
    $i ++;
    echo "hello, {$params} --- {$i}\n";
    if ($i >= 5) {
        swoole_timer_clear($timeId);
    }
}, 'world');

事例中我们创建了一个定时器,该定时器每秒执行一次,swoole_timer_tick的第二个参数即回调函数,该函数的参数$timeId是创建的定时器的id, params是swooletimertick的第三个参数传递的值,use闭包中我们取了变量i的地址,在回调函数中,我们对i++处理,当i >= 5的时候,用swoole_timer_clear函数清除了定时器。运行下该文件,我们看看结果

php tick.php
hello, world --- 1
hello, world --- 2
hello, world --- 3
hello, world --- 4
hello, world --- 5

需要说明的是,swoole_timer_tick函数是全局性的,通常情况下是可以在任意地方调用。

另外,如果在事件的回调函数内,我们还可以通过swoole_server->tick函数创建永久性定时器,并使用swoole_server->clearTimer函数清除定时器,比如上面的例子我们可以在回调函数onWorkerStart中这样写

<?php

$serv->set([
    'worker_num' => 2,
]);
$serv->on('WorkerStart', function ($serv, $workerId){
    if ($workerId == 0) {
        $i = 0;
        $params = 'world';
        $serv->tick(1000, function ($timeId) use ($serv, &$i, $params) {
            $i ++;
            echo "hello, {$params} --- {$i}\n";
            if ($i >= 5) {
                $serv->clearTimer($timeId);
            }
        });
    }
});

代码总体上就不分析了,只看一点,为什么在onWorkerStart回调内判断了$workerId是否等于0?

注意到我们开启了两个Worker进程,如果不判断,那么就会在两个Worker进程内各注册一个定时器,实际上也就是我们注册了两个相同的定时器,这是没有必要的。

注:swoole_server->tick等价于swoole_timer_tick,swoole_server->clearTimer等价于swoole_timer_clear。

③、一次性定时器

一次性定时器执行完一次之后,便会自动销毁。这种场景往往是当xxx几秒之后再执行。

同样也有两个函数供我们使用,全局的swoole_timer_after和回调内可调用的swoole_server->after。

前者的参数等同于swoole_timer_tick,只有一点不同,该函数所支持的最大毫秒数是86400000。

同样我们看两个简单的demo:tick-after.php

<?php
swoole_timer_after(3000, function () {
    echo "only once.\n";
});

回调内执行,我们这回举一个在onReceive内为例:tick-server.php

<?php
$serv = new swoole_server('127.0.0.1', 9501);
$serv->set([
    'worker_num' => 2,
]);
$serv->on('WorkerStart', function ($serv, $workerId){
    if ($workerId == 0) {
        $i = 0;
        $params = 'world';
        $serv->tick(1000, function ($timeId) use ($serv, &$i, $params) {
            $i ++;
            echo "hello, {$params} --- {$i}\n";
            if ($i >= 5) {
                $serv->clearTimer($timeId);
            }
        });
    }
});
$serv->on('Connect', function ($serv, $fd) {
});
$serv->on('Receive', function ($serv, $fd, $fromId, $data) {
    $serv->after(3000, function () {
        echo "only once.\n";
    });
});
$serv->on('Close', function ($serv, $fd) {
});
$serv->start();

以后再也不用crontab了,这随随便便简简单单的就实现了定时器的功能,so easy~

2.8、swoole之粘包问题

①、前言

什么是粘包问题,为什么我们要讲这个看起来比较奇怪的问题呢?

不着急解释,我们先看一个例子

创建一个server,server端代码如下:tcp-buffer-server.php

<?php

class TcpBufferServer
{
    private $_serv;

    /**
     * init
     */
    public function __construct()
    {
        $this->_serv = new Swoole\Server("127.0.0.1", 9501);
        $this->_serv->set([
            "worker_num" => 1,
        ]);
        $this->_serv->on("Receive", \[$this, "onReceive"]);
    }
    public function onReceive($serv, $fd, $fromId, $data)
    {
        echo "Server received data: {$data}" . PHP_EOL;
    }
    /**
     * start server
     */
    public function start()
    {
        $this->_serv->start();
    }
}

$reload = new TcpBufferServer;
$reload->start();

server的代码很简单,仅仅是在收到客户端代码后,标准输出一句话而已,client的代码需要注意了,我们写了一个for循环,连续向server send三条信息,代码如下:tcp-buffer-client.php

<?php

$client = new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_SYNC);
$client->connect("127.0.0.1", 9501) || exit("connect failed. Error: {$client->errCode}\n");

// 向服务端发送数据
for ($i = 0; $i < 3; $i++) {
    $client->send("Just a test.\n");
}
$client->close();

在未运行测试的情况下,我们期望server所在终端输出的结果应该是这样的

Server received data: Just a test.
Server received data: Just a test.
Server received data: Just a test.

注意哦,我们期望的结果是server被回调了3次,才有上述期望的结果值

实际运行的结果呢,是否与我们所期望的一致?我们看下

01.png

上图左边是server输出的信息。

我们看到,左侧显示的结果是server一次性输出的结果,按理论来说,client发起了3次请求,server应该跟我们期望的结果一致,会执行3次呀,这怎么回事呢?

这个问题,便是我们今天要说的粘包问题。

为了说清楚这个问题,我们先来看下client/server之间数据传递的过程

  • 客户端->发送数据
  • 服务端->接收数据

通常我们直觉性的认为,客户端直接向网络中传输数据,对端从网络中读取数据,但是这是不正确的。

socket有缓冲区buffer的概念,每个TCP socket在内核中都有一个发送缓冲区和一个接收缓冲区。客户端send操作仅仅是把数据拷贝到buffer中,也就是说send完成了,数据并不代表已经发送到服务端了,之后才由TCP协议从buffer中发送到服务端。此时服务端的接收缓冲区被TCP缓存网络上来的数据,而后server才从buffer中读取数据。

所以,在onReceive中我们拿到的数据并没有办法保证数据包的完整性,swoole_server可能会同时收到多个请求包,也可能只收到一个请求包的一部分数据。

这就是一个大问题呀,如此TCP协议不行呀,这货虽然能保证我们能正确的接收到数据但是数据不对呀,这麻烦不容小觑。

既然是个问题,那我们自然也就有解决问题的方法,不然我下面说啥呢,对吧。

swoole给我们提供了两种解决方案

②、方法一:EOF结束协议

EOF,end of file,意思是我们在每一个数据包的结尾加一个eof标记,表示这就是一个完整的数据包,但是如果你的数据本身含有EOF标记,那就会造成收到的数据包不完整,所以开启EOF支持后,应避免数据中含有EOF标记。

在swoole_server中,我们可以配置open_eof_check为true,打开EOF检测,配置package_eof来指定EOF标记。

swoole_server收到一个数据包时,会检测数据包的结尾是否是我们设置的EOF标记,如果不是就会一直拼接数据包,直到超出buffer或者超时才会终止,一旦认定是一个完整的数据包,就会投递给Worker进程,这时候我们才可以在回调内处理数据。

这样server就能保证接收到一个完整的数据包了?不能保证,这样只能保证server能收到一个或者多个完整的数据包。

为啥是多个呢?

我们说了开启EOF检测,即open_eof_check设置为true,server只会检测数据包的末尾是否有EOF标记,如果向我们开篇的案例连发3个EOF的数据,server可能还是会一次性收到,这样我们只能在回调内对数据包进行拆分处理。

我们拿开篇的案例为例

server开启eof检测并指定eof标记是\r\n,代码如下:server-eof-check.php

<?php
class ServerEofCheck
{
    private $_serv;
    /**
     * init
     */
    public function __construct()
    {
        $this->_serv = new Swoole\Server("127.0.0.1", 9501);
        $this->_serv->set([
            "worker_num" => 1,
            "open_eof_check" => true, //打开EOF检测
            "package_eof" => "\r\n", //设置EOF
        ]);
        $this->_serv->on("Connect", array($this, "onConnect"));
        $this->_serv->on("Close", array($this, "onClose"));
        $this->_serv->on("Receive", \[$this, "onReceive"]);
    }
    public function onConnect($serv, $fd, $fromId)
    {
    }
    public function onReceive($serv, $fd, $fromId, $data)
    {
        echo "Server received data: {$data}" . PHP_EOL;
    }
    public function onClose($serv, $fd, $fromId)
    {
    }
    /**
     * start server
     */
    public function start()
    {
        $this->_serv->start();
    }
}
$reload = new ServerEofCheck;
$reload->start();

客户端设置发送的数据末尾是\r\n符号,代码如下:server-eof-client.php

<?php
$client = new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_SYNC);
$client->connect("127.0.0.1", 9501) || exit("connect failed. Error: {$client->errCode}\n");
// 向服务端发送数据
for ($i = 0; $i < 3; $i++) {
    $client->send("Just a test.\r\n");
}
$client->close();

按照我们刚才的分析,server的效果可能会一次性收到多个完整的包,我们运行看看结果

01.png

因此我们还需要在onReceive回调内对收到的数据进行拆分处理

public function onReceive($serv, $fd, $fromId, $data)
{
    $datas = explode("\\r\\n", $data);
    foreach ($datas as $data)
    {
        if(!$data)
            continue;

        echo "Server received data: {$data}" . PHP_EOL;
    }
}

此时我们再看下运行结果

01.png

自行分包的效果便实现了,考虑到自行分包稍微麻烦,swoole提供了open_eof_split配置参数,启用该参数后,server会从左到右对数据进行逐字节对比,查找数据中的EOF标记进行分包,效果跟我们刚刚自行拆包是一样的,性能较差。

在案例的基础上我们看看open_eof_split配置

$this->_serv->set([
    "worker_num" => 1,
    "open_eof_check" => true, //打开EOF检测
    "package_eof" => "\r\n", //设置EOF
    "open_eof_split" => true,
]);

onReceive的回调,我们不需要自行拆包

public function onReceive($serv, $fd, $fromId, $data)
{
     echo "Server received data: {$data}" . PHP_EOL;
}

lient的测试代码使用\r\n(同server端package_eof标记一致),我们看下运行效果

01.png

EOF标记解决粘包就说这么多,下面我们再看看另一种解决方案

③、方法二:固定包头+包体协议

下面我们要说的,对于部分同学可能有点难度,对于不理解的,建议多看多操作多问多查,不躲避不畏惧,这样才能有所提高。

固定包头是一种非常通用的协议,它的含义就是在你要发送的数据包的前面,添加一段信息,这段信息了包含了你要发送的数据包的长度,长度一般是2个或者4个字节的整数。

在这种协议下,我们的数据包的组成就是包头+包体。其中包头就是包体长度的二进制形式。比如我们本来想向服务端发送一段数据 "Just a test." 共12个字符,现在我们要发送的数据就应该是这样的

pack("N", strlen("Just a test.")) . "Just a test."

其中php的pack函数是把数据打包成二进制字符串。

为什么这样就能保证Worker进程收到的是一个完整的数据包呢?我来解释一下:

当server收到一个数据包(可能是多个完整的数据包)之后,会先解出包头指定的数据长度,然后按照这个长度取出后面的数据,如果一次性收到多个数据包,依次循环,如此就能保证Worker进程可以一次性收到一个完整的数据包。

估计好多人都看蒙了,这都是神马玩意?我们以案例来分析

server代码

<?php
class ServerPack
{
    private $_serv;
    /**
     * init
     */
    public function __construct()
    {
        $this->_serv = new Swoole\Server("127.0.0.1", 9501);
        $this->_serv->set([
            "worker_num" => 1,
            "open_length_check"     => true,      // 开启协议解析
            "package_length_type"   => "N",     // 长度字段的类型
            "package_length_offset" => 0,       //第几个字节是包长度的值
            "package_body_offset"   => 4,       //第几个字节开始计算长度
            "package_max_length"    => 81920,  //协议最大长度
        ]);
        $this->_serv->on("Receive", \[$this, "onReceive"]);
    }
    public function onReceive($serv, $fd, $fromId, $data)
    {
        $info = unpack("N", $data);
        $len = $info[1];
        $body = substr($data, - $len);
        echo "server received data: {$body}\n";
    }
    /**
     * start server
     */
    public function start()
    {
        $this->_serv->start();
    }
}
$reload = new ServerPack;
$reload->start();

客户端的代码

<?php
$client = new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_SYNC);
$client->connect("127.0.0.1", 9501) || exit("connect failed. Error: {$client->errCode}\n");
// 向服务端发送数据
for ($i = 0; $i < 3; $i++) {
    $data = "Just a test.";
    $data = pack("N", strlen($data)) . $data;
    $client->send($data);
}
$client->close();

运行的结果

01.png

结果没错,是我们期望的结果。

我们来分析下这是为什么

  • 首先,在server端我们配置了open_length_check,该参数表明我们要开启固定包头协议解析
  • package_length_type配置,表明包头长度的类型,这个类型跟客户端使用pack打包包头的类型一致,一般设置为N或者n,N表示4个字节,n表示2个字节
  • 我们看下客户端的代码 pack('N', strlen($data)) . $data,这句话就是包头+包体的意思,包头是pack函数打包的二进制数据,内容便是真实数据的长度strlen(data)。在内存中,整数一般占用4个字节,所以我们看到,在这段数据中0-4字节表示的是包头,剩余的就是真实的数据。但是server不知道呀,怎么告诉server这一事实呢?看配置package_length_offset和package_body_offset,前者就是告诉server,从第几个字节开始是长度,后者就是从第几个字节开始计算长度。
  • 既然如此,我们就可以在onReceive回调对数据解包,然后从包头中取出包体长度,再从接收到的数据中截取真正的包体。

$info = unpack("N", $data);
$len = $info[1];
$body = substr($data, - $len);
echo "server received data: {$body}\n";

这便是swoole对于粘包问题的解决,你学会了吗?

2.9、异步发送邮件案例

①、引言

在介绍task的时候我们提到过邮件发送,不过当时是作为引子,主要讲的是swoole task方面的知识点,今天我们来实战性的完善一下“发送邮件”的案例。

先来总结一下我们在写代码的过程中应该注意哪些问题

  • 开启数量适中的Worker进程和Task进程
  • 守护进程化
  • 配置运行时日志
  • 平滑重启
  • 避免内存泄漏
  • 避免粘包问题

除此之外,跟swoole打交道,我们还应该注意下面这些

  • 为了避免Worker阻塞,避免使用sleep等睡眠函数
  • 不要使用die或者exit函数,即使在你调试的时候
  • 保持良好的代码风格,try/catch捕获异常
  • 如果Worker进程无法预料会发生异常退出,虽然Manager进程会重新拉起新的Worker进程,但是我们可以通过register_shutdown_function方法在进程退出前“善后”

那下面我们开始吧。

②、swiftmailer

首先发送邮件,我们借助第三方类库 swiftmailer。有些框架可能集成了swiftmailer,比如yii2,本来准备在yii2的基础之上来讲,考虑部分人可能对这个框架不熟悉,我们这里直接根据swiftmailer代码操作,框架中一样可以使用,无任何影响。

我们执行下面的命令,把swiftmailer下载到本地,下载好之后swiftmailer会被下载到一个叫vendor文件夹的目录里面

composer require "swiftmailer/swiftmailer"

然后我们封装一个简单的邮件类Mailer.php,同vendor目录同级,用于发送邮件,该类后期可自行完善,比如增加批量发送邮件或者增加发送模版邮件等操作。

<?php
require_once __DIR__ . "/vendor/autoload.php";
class Mailer
{
    public $transport;
    public $mailer;
    /**
     * 发送邮件类 参数 $data 需要三个必填项 包括 邮件主题$data\["subject"\]、接收邮件的人$data\["to"\]和邮件内容 $data\["content"\]
     * @param Array $data
     * @return bool $result 发送成功 or 失败
     */
    public function send($data)
    {
        $this->transport = (new Swift_SmtpTransport("smtp.qq.com", 587))
            ->setEncryption("tls")
            ->setUsername("452936616@qq.com")
            ->setPassword("xxxxxx");
        $this->mailer = new Swift_Mailer($this->transport);
        $message = (new Swift_Message($data["subject"]))
            ->setFrom(array("452936616@qq.com" => "lulublog"))
            ->setTo(array($data["to"]))
            ->setBody($data["content"]);
            
        $result = $this->mailer->send($message);
        // 释放
        $this->destroy();
        return $result;
    }
    public function destroy()
    {
        $this->transport = null;
        $this->mailer = null;
    }
}

在这段代码中,你需要修改的地方包括 Host、Post、Encryption、Username、Password和From。

Mailer类简单的封装好之后,我们写几行代码测试下你的邮件类是否可以正确的使用

ini_set("date.timezone","Asia/Shanghai");
require_once __DIR__ . "/Mailer.php";
$data = [
    "to" => "452936616@qq.com",
    "subject" => "just a test",
    "content" => "This is just a test.",
];
$mailer = new Mailer;
$mailer->send($data);

to是要发送给谁,subject邮件标题,content邮件内容。

如果不可以正常发送,请检查swiftmailer相关类正确引入并且保证Mailer类的配置可用。

③、TaskServer、TaskRun、TaskClient

邮件类准备好之后,我们正式开始写swoole server,主要代码如下:

<?php
class TaskServer
{
    private $_serv;
    private $_run;
    /**
    * init
    */
    public function __construct()
    {
        $this->_serv = new Swoole\Server("127.0.0.1", 9501);
        $this->_serv->set([
            "worker_num" => 2,
            "daemonize" => false,
            "log_file" => __DIR__ . "/server.log",
            "task_worker_num" => 2,
            "max_request" => 5000,
            "task_max_request" => 5000,
            "open_eof_check" => true, //打开EOF检测
            "package_eof" => "\r\n", //设置EOF
            "open_eof_split" => true, // 自动分包
        ]);
        $this->_serv->on("Connect", \[$this, "onConnect"]);
        $this->_serv->on("Receive", \[$this, "onReceive"]);
        $this->_serv->on("WorkerStart", \[$this, "onWorkerStart"]);
        $this->_serv->on("Task", \[$this, "onTask"]);
        $this->_serv->on("Finish", \[$this, "onFinish"]);
        $this->_serv->on("Close", \[$this, "onClose"]);
    }
    public function onConnect($serv, $fd, $fromId)
    {
    }
    public function onWorkerStart($serv, $workerId)
    {
        require_once __DIR__ . "/TaskRun.php";
        $this->_run = new TaskRun;
    }
    public function onReceive($serv, $fd, $fromId, $data)
    {
        $data = $this->unpack($data);
        $this->_run->receive($serv, $fd, $fromId, $data);
        // 投递一个任务到task进程中
        if (!empty($data["event"])) {
            $serv->task(array_merge($data , ["fd" => $fd]));
        }
    }
    public function onTask($serv, $taskId, $fromId, $data)
    {
        $this->_run->task($serv, $taskId, $fromId, $data);
    }
    public function onFinish($serv, $taskId, $data)
    {
        $this->_run->finish($serv, $taskId, $data);
    }
    public function onClose($serv, $fd, $fromId)
    {
    }
    /**
    * 对数据包单独处理,数据包经过`json_decode`处理之后,只能是数组
    * @param $data
    * @return bool|mixed
    */
    public function unpack($data)
    {
        $data = str_replace("\\r\\n", "", $data);
        if (!$data) {
            return false;
        }
        $data = json_decode($data, true);
        if (!$data || !is_array($data)) {
            return false;
        }
        return $data;
    }
    public function start()
    {
        $this->_serv->start();
    }
}
$reload = new TaskServer;
$reload->start();

有的人一看那么多代码就头疼,实际上这也就几行代码,仔细看的同学会发现,这个类就是server的基本配置和一些回调,不涉及任何业务逻辑。简单分析下

  • 配置项,对照开篇提到的注意项,挨个比较吧
  • 在onWorkerStart回调内,我们引入了实际处理业务逻辑的类TaskRun.php,为什么这么说呢?因为我们在onReceive\onTask\onFinish回调内均把数据交给了TaskRun对象去处理了
  • 我们约定,每个数据包都必须带有EOF标记\r\n,在server端为了更好的处理数据,onReceive回调内我们把数据包丢给了unpack方法处理,该方法的目的就是把数据包的EOF标记去掉,还原真实的数据包。我们还约定,server收到的数据包经过unpack处理之后只能是数组,非数组在unpack中就被直接处理掉了。
  • onReceive回调内,我们看到,只有数据包含有event项才会被投递给Task进程,这样做的原因是Task进程可能要处理各种任务,增加event项是为了表明投递过来的任务是要做什么的。

为什么要单独的把业务逻辑分开再另起一个文件处理呢?有疑问的可以回去再看看平滑重启一文,我们看TaskRun的实现

<?php
require_once ("./TaskClient.php");
require_once ("./Mailer.php");
class TaskRun
{
    public function receive($serv, $fd, $fromId, $data)
    {
    }
    public function task($serv, $taskId, $fromId, $data)
    {
        try {
            switch ($data["event"]) {
                case TaskClient::EVENT_TYPE_SEND_MAIL:
                    $mailer = new Mailer;
                    $result = $mailer->send($data);
                    break;
                default:
                    break;
            }
            return $result;
        } catch (\Exception $e) {
            throw new \Exception("task exception :" . $e->getMessage());
        }
    }
    public function finish($serv, $taskId, $data)
    {
        return true;
    }
}

目前,我们主要就一个业务,“发送邮件”,所以TaskRun类的实现现在看来非常简单。

因为发邮件是一件比较耗时的任务,所以我们这里完善的是task回调。我们根据投递给Task进程的数据类型,判断投递过来的数据是要做什么。比如我们这里有一项event,等于TaskClient::EV

跳转到

收起


Original url: Access
Created at: 2019-04-11 12:07:41
Category: default
Tags: none

请先后发表评论
  • 最新评论
  • 总共0条评论