一、引言
最初是打算写个聊天室分享给大家,后来仔细斟酌了一下,还是讲个web通知吧,两个案例都差不多。
当然,在前面两篇介绍websocket的基础之上,相信你一定会觉得web通知这个功能就是一个小case。所以本文我们把重点放在后面多端口复合协议的使用。
websocket通知的实现方式,基本上跟websocket初识一文中最后介绍的案例差不多,只不过我们当时是循环所有的客户端推送消息,此时我们是一对一推送提醒。
二、需求分析
我们以评论被回复为例,当一条评论被其他某个用户(假设是用户B)回复,即发一条通知给被回复的评论所属人(假设是用户A),告诉A,他的评论被回复了。
三、功能分析
我们不能保证用户B和用户A都处于连接状态,但是通常情况下,用户B至少是连接状态,用户A不一定跟server保持连接;
任一用户都不止对应一个客户端。换言之,用户A和用户B都可能打开了多个tab页,对于一个tab页,就会有一个独立的fd标识,我们这里认为任一用户只有最新的fd有效,或者你可以认为用户所有的tab页的连接都有效;
因为没有用户系统,我们以get传递的参数uid为标识,uid=100视为用户A,uid=101视为用户B;
我们不准备做一个评论系统,我们模拟的tab页包将会包含一个输入内容的文本框、一个输入目标uid的input和一个发送的按钮以满足需求。
四、流程分析
用户A($_GET['uid'] = 100)在某个tab页的输入框输入"回复xxx的内容"字样后,点击发送
用户B($_GET['uid'] = 101)如果处于连接状态,则alert提醒用户B,他的评论被回复了
五、server 端代码
分析了半天,我们看server端代码的实现
class CommentServer { private $_serv; public $key = "^www.lulublog.cn&swoole$"; // 用户id和fd对应的映射,key => value,key是用户的uid,value是用户的fd public $user2fd = []; public function __construct() { $this->_serv = new swoole_websocket_server("0.0.0.0", 9501); $this->_serv->set([ "worker_num" => 1, "heartbeat_check_interval" => 60, "heartbeat_idle_time" => 125, ]); $this->_serv->on("open", [$this, "onOpen"]); $this->_serv->on("message", [$this, "onMessage"]); $this->_serv->on("close", [$this, "onClose"]); } /** * @param $serv * @param $request * @return mixed */ public function onOpen($serv, $request) { // 连接授权 $accessResult = $this->checkAccess($serv, $request); if (!$accessResult) { return false; } // 始终把用户最新的fd跟uid映射在一起 if (array_key_exists($request->get["uid"], $this->user2fd)) { $existFd = $this->user2fd[$request->get["uid"]]; $this->close($existFd, "uid exists."); $this->user2fd[$request->get["uid"]] = $request->fd; return false; } else { $this->user2fd[$request->get["uid"]] = $request->fd; } } /** * @param $serv * @param $frame * @return mixed */ public function onMessage($serv, $frame) { // 校验数据的有效性,我们认为数据被`json_decode`处理之后是数组并且数组的`event`项非空才是有效数据 // 非有效数据,关闭该连接 $data = $frame->data; $data = json_decode($data, true); if (!$data || !is_array($data) || empty($data["event"])) { $this->close($frame->fd, "data format invalidate."); return false; } // 根据数据的`event`项,判断要做什么,`event`映射到当前类具体的某一个方法,方法不存在则关闭连接 $method = $data["event"]; if (!method_exists($this, $method)) { $this->close($frame->fd, "event is not exists."); return false; } $this->$method($frame->fd, $data); } public function onClose($serv, $fd) { echo "client {$fd} closed.\n"; } /** * 校验客户端连接的合法性,无效的连接不允许连接 * @param $serv * @param $request * @return mixed */ public function checkAccess($serv, $request) { // get不存在或者uid和token有一项不存在,关闭当前连接 if (!isset($request->get) || !isset($request->get["uid"]) || !isset($request->get["token"])) { $this->close($request->fd, "access faild."); return false; } $uid = $request->get["uid"]; $token = $request->get["token"]; // 校验token是否正确,无效关闭连接 if (md5(md5($uid) . $this->key) != $token) { $this->close($request->fd, "token invalidate."); return false; } return true; } /** * @param $fd * @param $message * 关闭$fd的连接,并删除该用户的映射 */ public function close($fd, $message = "") { // 关闭连接 $this->_serv->close($fd); // 删除映射关系 if ($uid = array_search($fd, $this->user2fd)) { unset($this->user2fd[$uid]); } } public function alertTip($fd, $data) { // 推送目标用户的uid非真或者该uid尚无保存的映射fd,关闭连接 if (empty($data["toUid"]) || !array_key_exists($data["toUid"], $this->user2fd)) { $this->close($fd); return false; } $this->push($this->user2fd[$data["toUid"]], ["event" => $data["event"], "msg" => "收到一条新的回复."]); } /** * @param $fd * @param $message */ public function push($fd, $message) { if (!is_array($message)) { $message = [$message]; } $message = json_encode($message); // push失败,close if ($this->_serv->push($fd, $message) == false) { $this->close($fd); } } public function start() { $this->_serv->start(); } } $server = new CommentServer; $server->start();
满眼看下来,代码挺长的,没关系,我们整理了一下代码的逻辑
我们给CommentServer类增加了一个属性 $user2fd,这个是key => value结构,用于保存uid和fd的映射关系
onOpen回调做两件事,验证授权和添加新的映射关系
onMessage回调只接收含有event项的数组,event等同于CommentServer类的方法名,我们这里只有一个用于web通知的alertTip方法
此外,我们封装了该类的close方法和push方法,close方法用于server主动关闭连接,删除uid和fd的映射,push方法用于向指定的fd推送消息
六、客户端代码
$key = "^www.lulublog.cn&swoole$";
$uid = isset($_GET["uid"]) ? intval($_GET["uid"]) : 0;
$token = md5(md5($uid) . $key);
var ws = new WebSocket("ws://139.199.201.210:9501?uid=&token="); ws.onopen = function(event) { }; ws.onmessage = function(event) { var data = event.data; data = eval("("+data+")"); if (data.event == "alertTip") { alert(data.msg); } }; ws.onclose = function(event) { console.log("Client has closed.\n"); }; function send() { var obj = document.getElementById("content"); var content = obj.value; var toUid = document.getElementById("toUid").value; ws.send("{"event":"alertTip", "toUid": "+toUid+"}"); }
server开启之后,演示的效果我们看下动图
结果中,注意看地址栏,alert弹窗是在哪个tab页弹出的。
七、多端口复合协议:server与server之间的交互
上例中,我们模拟的是评论被回复的简单例子。
回顾过去讲的内容,无论是tcp server,http server还是websocket server,server都是独立的,server与server之间并没有太多的交互。
实际上有没有交互的必要呢?
假设现在有这么一个需求,在刚刚评论的案例中,前文用户的回复不是直接发送给被回复的用户,而是评论在后台被人审核成功的一瞬间,再通知被回复的用户呢?
审核操作改为ajax操作,success回调内再new一个websocket客户端,然后send?可以,但是这显然不是一个很好的操作。
在websocket初识的时候我们说过,要想与websocket server通信,客户端只能是websocket客户端!既然我们刚刚否决了new一个websocket客户端,那是要怎么做呢?
从程序的角度出发,如果我们在php的层面上直接就能通知到websocket服务器,换言之,如果我们能够从php的层面上,直接实现alertTip方法的功能是不是就对了?
前文我们介绍tcp server的时候了解到,首先我们要想让web应用同server进行“互撩”,swoole_client少不了,既然有swoole_client,swoole_server肯定也少不了。但是目前server正在跑websocket,难不成我们在单独跑一个tcp server?对,我们就是要在websocket server的基础之上,想办法再跑一个tcp server。
为了使用多端口复合协议,swoole为server提供了listen方法,可以让当前server监听新的端口。
比如我们可以让刚刚创建的websocket server额外监听9502端口,这个端口主要负责tcp的工作。
$this->_tcp = $this->_serv->listen("127.0.0.1", 9502, SWOOLE_SOCK_TCP); $this->_tcp->set([ "open_eof_check" => true, //打开EOF检测 "package_eof" => "\r\n", //设置EOF "open_eof_split" => true, // 自动分包 ]); $this->_tcp->on("Receive", [$this, "onReceive"]);
listen函数返回的是swoole_server_port对象,需要注意的是swoole_server_port的set函数只能设置一些特定的参数,比如socket参数、协议相关等,像worker_num、log_file、max_request等等这些都是不支持的。就tcp服务器而言,swoole_server_port对象也仅仅对onConnect\onReceive\onClose这三个回调支持,其他的一律不可用,详细可翻阅swoole手册查看。
下面我们就以评论审核通知来看看多端口复合协议的玩法。
再来看下我们现在的流程
用户回复某评论 => 评论进入审核状态 ;很明显这个过程我们不需要做什么
管理员审核该评论 => 通知被回复的人;这个时候我们要做的就等同于alertTip函数要做的
server端除了刚刚设置的$this->_tcp一段代码之外,我们单独绑定了onReceive回调,下面看onReceive回调的实现
public function onReceive($serv, $fd, $fromId, $data) { try { $data = json_decode($data, true); if (!isset($data["event"])) { throw new \Exception("params error, needs event param.", 1); } $method = $data["event"]; // 调起对应的方法 if(!method_exists($this, $method)) { throw new \Exception("params error, not support method.", 1); } $this->$method($fd, $data); return true; } catch (\Exception $e) { $msg = $e->getMessage(); throw new \Exception("{$msg}", 1); } }
可以看到,除了进行简单的判断之外,如果tcp客户单携带一个event=alertTip即可
在这之前,websocket客户端的代码我们依然以前面的为例,假设要回复的用户uid=100,我们运行server之后,先让uid=100的客户端连接到server,运行的客户端地址栏添加uid参数等于100即可
下面我们再写一个tcp client,连接9502端口,我们的tcp server在这个端口监听
class Client { private $client; public function __construct () { $this->client = new Swoole\Client(SWOOLE_SOCK_TCP); if (!$this->client->connect("127.0.0.1", 9502)) { $msg = "swoole client connect failed."; throw new \Exception("Error: {$msg}."); } } /** * @param $data Array * send data */ public function sendData ($data) { $data = $this->togetherDataByEof($data); $this->client->send($data); } /** * 数据末尾拼接EOF标记 * @param Array $data 要处理的数据 * @return String json_encode($data) . EOF */ public function togetherDataByEof($data) { if (!is_array($data)) { return false; } return json_encode($data) . "\r\n"; } } $client = new Client; $client->sendData([ "event" => "alertTip", "toUid" => 100, ]);
现在无论是websocket服务器、tcp 服务器还是websocket客户端 tcp客户端都已经准备就绪了,下面我们浏览器直接访问下tcp client,如果正常的话,websocket客户端所在页面会弹出有新回复的通知。
看动图运行结果