swoole 第16章 websocket通知案例以及多端口复合协议的使用

2018-09-19

一、引言

最初是打算写个聊天室分享给大家,后来仔细斟酌了一下,还是讲个web通知吧,两个案例都差不多。

当然,在前面两篇介绍websocket的基础之上,相信你一定会觉得web通知这个功能就是一个小case。所以本文我们把重点放在后面多端口复合协议的使用。

websocket通知的实现方式,基本上跟websocket初识一文中最后介绍的案例差不多,只不过我们当时是循环所有的客户端推送消息,此时我们是一对一推送提醒。

二、需求分析

我们以评论被回复为例,当一条评论被其他某个用户(假设是用户B)回复,即发一条通知给被回复的评论所属人(假设是用户A),告诉A,他的评论被回复了。

三、功能分析

  • 我们不能保证用户B和用户A都处于连接状态,但是通常情况下,用户B至少是连接状态,用户A不一定跟server保持连接;

  • 任一用户都不止对应一个客户端。换言之,用户A和用户B都可能打开了多个tab页,对于一个tab页,就会有一个独立的fd标识,我们这里认为任一用户只有最新的fd有效,或者你可以认为用户所有的tab页的连接都有效;

  • 因为没有用户系统,我们以get传递的参数uid为标识,uid=100视为用户A,uid=101视为用户B;

  • 我们不准备做一个评论系统,我们模拟的tab页包将会包含一个输入内容的文本框、一个输入目标uid的input和一个发送的按钮以满足需求。

四、流程分析

  • 用户A($_GET['uid'] = 100)在某个tab页的输入框输入"回复xxx的内容"字样后,点击发送

  • 用户B($_GET['uid'] = 101)如果处于连接状态,则alert提醒用户B,他的评论被回复了

五、server 端代码

分析了半天,我们看server端代码的实现

class CommentServer
{
    private $_serv;
    public $key = "^www.lulublog.cn&swoole$";
    // 用户id和fd对应的映射,key => value,key是用户的uid,value是用户的fd
    public $user2fd = [];
    public function __construct()
    {
        $this->_serv = new swoole_websocket_server("0.0.0.0", 9501);
        $this->_serv->set([
            "worker_num" => 1,
            "heartbeat_check_interval" => 60,
            "heartbeat_idle_time" => 125,
        ]);
        $this->_serv->on("open", [$this, "onOpen"]);
        $this->_serv->on("message", [$this, "onMessage"]);
        $this->_serv->on("close", [$this, "onClose"]);
    }
    /**
     * @param $serv
     * @param $request
     * @return mixed
     */
    public function onOpen($serv, $request)
    {
        // 连接授权
        $accessResult = $this->checkAccess($serv, $request);
        if (!$accessResult) {
            return false;
        }
        // 始终把用户最新的fd跟uid映射在一起
        if (array_key_exists($request->get["uid"], $this->user2fd)) {
            $existFd = $this->user2fd[$request->get["uid"]];
            $this->close($existFd, "uid exists.");
            $this->user2fd[$request->get["uid"]] = $request->fd;
            return false;
        } else {
            $this->user2fd[$request->get["uid"]] = $request->fd;
        }
    }
    /**
     * @param $serv
     * @param $frame
     * @return mixed
     */
    public function onMessage($serv, $frame)
    {
        // 校验数据的有效性,我们认为数据被`json_decode`处理之后是数组并且数组的`event`项非空才是有效数据
        // 非有效数据,关闭该连接
        $data = $frame->data;
        $data = json_decode($data, true);
        if (!$data || !is_array($data) || empty($data["event"])) {
            $this->close($frame->fd, "data format invalidate.");
            return false;
        }
        // 根据数据的`event`项,判断要做什么,`event`映射到当前类具体的某一个方法,方法不存在则关闭连接
        $method = $data["event"];
        if (!method_exists($this, $method)) {
            $this->close($frame->fd, "event is not exists.");
            return false;
        }
        $this->$method($frame->fd, $data);
    }
    public function onClose($serv, $fd)
    {
        echo "client {$fd} closed.\n";
    }
    /**
     * 校验客户端连接的合法性,无效的连接不允许连接
     * @param $serv
     * @param $request
     * @return mixed
     */
    public function checkAccess($serv, $request)
    {
        // get不存在或者uid和token有一项不存在,关闭当前连接
        if (!isset($request->get) || !isset($request->get["uid"]) || !isset($request->get["token"])) {
            $this->close($request->fd, "access faild.");
            return false;
        }
        $uid = $request->get["uid"];
        $token = $request->get["token"];
        // 校验token是否正确,无效关闭连接
        if (md5(md5($uid) . $this->key) != $token) {
            $this->close($request->fd, "token invalidate.");
            return false;
        }
        return true;
    }
    /**
     * @param $fd
     * @param $message
     * 关闭$fd的连接,并删除该用户的映射
     */
    public function close($fd, $message = "")
    {
        // 关闭连接
        $this->_serv->close($fd);
        // 删除映射关系
        if ($uid = array_search($fd, $this->user2fd)) {
            unset($this->user2fd[$uid]);
        }
    }
    public function alertTip($fd, $data)
    {
        // 推送目标用户的uid非真或者该uid尚无保存的映射fd,关闭连接
        if (empty($data["toUid"]) || !array_key_exists($data["toUid"], $this->user2fd)) {
            $this->close($fd);
            return false;
        }
        $this->push($this->user2fd[$data["toUid"]], ["event" => $data["event"], "msg" => "收到一条新的回复."]);
    }
    /**
     * @param $fd
     * @param $message
     */
    public function push($fd, $message)
    {
        if (!is_array($message)) {
            $message = [$message];
        }
        $message = json_encode($message);
        // push失败,close
        if ($this->_serv->push($fd, $message) == false) {
            $this->close($fd);
        }
    }
    public function start()
    {
        $this->_serv->start();
    }
}
$server = new CommentServer;
$server->start();

满眼看下来,代码挺长的,没关系,我们整理了一下代码的逻辑

  • 我们给CommentServer类增加了一个属性 $user2fd,这个是key => value结构,用于保存uid和fd的映射关系

  • onOpen回调做两件事,验证授权和添加新的映射关系

  • onMessage回调只接收含有event项的数组,event等同于CommentServer类的方法名,我们这里只有一个用于web通知的alertTip方法

  • 此外,我们封装了该类的close方法和push方法,close方法用于server主动关闭连接,删除uid和fd的映射,push方法用于向指定的fd推送消息

六、客户端代码

$key = "^www.lulublog.cn&swoole$";
$uid = isset($_GET["uid"]) ? intval($_GET["uid"]) : 0;
$token = md5(md5($uid) . $key);

https://file.lulublog.cn/images/3/2022/08/zBbMBHcgHMHmcGSg5TTSlmB5bsM5BZ.png

var ws = new WebSocket("ws://139.199.201.210:9501?uid=&token=");
    ws.onopen = function(event) {
    };
    ws.onmessage = function(event) {
        var data = event.data;
        data = eval("("+data+")");
        if (data.event == "alertTip") {
            alert(data.msg);
        }
    };
    ws.onclose = function(event) {
        console.log("Client has closed.\n");
    };
    function send() {
        var obj = document.getElementById("content");
        var content = obj.value;
        var toUid = document.getElementById("toUid").value;
        ws.send("{"event":"alertTip", "toUid": "+toUid+"}");
    }

server开启之后,演示的效果我们看下动图

https://file.lulublog.cn/images/3/2022/08/VhttH4NttSUdwrUVfPws6dUpfzSZSm.gif

结果中,注意看地址栏,alert弹窗是在哪个tab页弹出的。

七、多端口复合协议:server与server之间的交互

上例中,我们模拟的是评论被回复的简单例子。

回顾过去讲的内容,无论是tcp server,http server还是websocket server,server都是独立的,server与server之间并没有太多的交互。

实际上有没有交互的必要呢?

假设现在有这么一个需求,在刚刚评论的案例中,前文用户的回复不是直接发送给被回复的用户,而是评论在后台被人审核成功的一瞬间,再通知被回复的用户呢?

审核操作改为ajax操作,success回调内再new一个websocket客户端,然后send?可以,但是这显然不是一个很好的操作。

在websocket初识的时候我们说过,要想与websocket server通信,客户端只能是websocket客户端!既然我们刚刚否决了new一个websocket客户端,那是要怎么做呢?

从程序的角度出发,如果我们在php的层面上直接就能通知到websocket服务器,换言之,如果我们能够从php的层面上,直接实现alertTip方法的功能是不是就对了?

前文我们介绍tcp server的时候了解到,首先我们要想让web应用同server进行“互撩”,swoole_client少不了,既然有swoole_client,swoole_server肯定也少不了。但是目前server正在跑websocket,难不成我们在单独跑一个tcp server?对,我们就是要在websocket server的基础之上,想办法再跑一个tcp server。

为了使用多端口复合协议,swoole为server提供了listen方法,可以让当前server监听新的端口。

比如我们可以让刚刚创建的websocket server额外监听9502端口,这个端口主要负责tcp的工作。

$this->_tcp = $this->_serv->listen("127.0.0.1", 9502, SWOOLE_SOCK_TCP);
$this->_tcp->set([
    "open_eof_check" => true, //打开EOF检测
    "package_eof" => "\r\n", //设置EOF
    "open_eof_split" => true, // 自动分包
]);
$this->_tcp->on("Receive", [$this, "onReceive"]);

listen函数返回的是swoole_server_port对象,需要注意的是swoole_server_port的set函数只能设置一些特定的参数,比如socket参数、协议相关等,像worker_num、log_file、max_request等等这些都是不支持的。就tcp服务器而言,swoole_server_port对象也仅仅对onConnect\onReceive\onClose这三个回调支持,其他的一律不可用,详细可翻阅swoole手册查看。

下面我们就以评论审核通知来看看多端口复合协议的玩法。

再来看下我们现在的流程

  • 用户回复某评论 => 评论进入审核状态 ;很明显这个过程我们不需要做什么

  • 管理员审核该评论 => 通知被回复的人;这个时候我们要做的就等同于alertTip函数要做的

server端除了刚刚设置的$this->_tcp一段代码之外,我们单独绑定了onReceive回调,下面看onReceive回调的实现

public function onReceive($serv, $fd, $fromId, $data)
{
    try {
        $data = json_decode($data, true);
        if (!isset($data["event"])) {
            throw new \Exception("params error, needs event param.", 1);
        }
        
        $method = $data["event"];

        // 调起对应的方法
        if(!method_exists($this, $method)) {
            throw new \Exception("params error, not support method.", 1);
        }
        $this->$method($fd, $data);

        return true;

    } catch (\Exception $e) {
        $msg = $e->getMessage();
        throw new \Exception("{$msg}", 1);
    }
}

可以看到,除了进行简单的判断之外,如果tcp客户单携带一个event=alertTip即可

在这之前,websocket客户端的代码我们依然以前面的为例,假设要回复的用户uid=100,我们运行server之后,先让uid=100的客户端连接到server,运行的客户端地址栏添加uid参数等于100即可

下面我们再写一个tcp client,连接9502端口,我们的tcp server在这个端口监听

class Client
{
    private $client;

    public function __construct ()
    {
        $this->client = new Swoole\Client(SWOOLE_SOCK_TCP);

        if (!$this->client->connect("127.0.0.1", 9502)) {
            $msg = "swoole client connect failed.";
            throw new \Exception("Error: {$msg}.");
        }
    }

    /**
     * @param $data Array
     * send data
     */
    public function sendData ($data)
    {
        $data = $this->togetherDataByEof($data);
        $this->client->send($data);
    }

    /**
     * 数据末尾拼接EOF标记
     * @param Array $data 要处理的数据
     * @return String json_encode($data) . EOF
     */
    public function togetherDataByEof($data)
    {
        if (!is_array($data)) {
            return false;
        }

        return json_encode($data) . "\r\n";
    }
}

$client = new Client;
$client->sendData([
    "event" => "alertTip",
    "toUid" => 100,
]);

现在无论是websocket服务器、tcp 服务器还是websocket客户端 tcp客户端都已经准备就绪了,下面我们浏览器直接访问下tcp client,如果正常的话,websocket客户端所在页面会弹出有新回复的通知。

看动图运行结果

https://file.lulublog.cn/images/3/2022/08/p3pQ93PnpPaF72iZj7IoViCjbF2F3F.gif

八、完整代码

完整代码

阅读 4711