一、引言
最初是打算写个聊天室分享给大家,后来仔细斟酌了一下,还是讲个web通知吧,两个案例都差不多。
当然,在前面两篇介绍websocket的基础之上,相信你一定会觉得web通知这个功能就是一个小case。所以本文我们把重点放在后面多端口复合协议的使用。
websocket通知的实现方式,基本上跟websocket初识一文中最后介绍的案例差不多,只不过我们当时是循环所有的客户端推送消息,此时我们是一对一推送提醒。
二、需求分析
我们以评论被回复为例,当一条评论被其他某个用户(假设是用户B)回复,即发一条通知给被回复的评论所属人(假设是用户A),告诉A,他的评论被回复了。
三、功能分析
我们不能保证用户B和用户A都处于连接状态,但是通常情况下,用户B至少是连接状态,用户A不一定跟server保持连接;
任一用户都不止对应一个客户端。换言之,用户A和用户B都可能打开了多个tab页,对于一个tab页,就会有一个独立的fd标识,我们这里认为任一用户只有最新的fd有效,或者你可以认为用户所有的tab页的连接都有效;
因为没有用户系统,我们以get传递的参数uid为标识,uid=100视为用户A,uid=101视为用户B;
我们不准备做一个评论系统,我们模拟的tab页包将会包含一个输入内容的文本框、一个输入目标uid的input和一个发送的按钮以满足需求。
四、流程分析
用户A($_GET['uid'] = 100)在某个tab页的输入框输入"回复xxx的内容"字样后,点击发送
用户B($_GET['uid'] = 101)如果处于连接状态,则alert提醒用户B,他的评论被回复了
五、server 端代码
分析了半天,我们看server端代码的实现
class CommentServer
{
private $_serv;
public $key = "^www.lulublog.cn&swoole$";
// 用户id和fd对应的映射,key => value,key是用户的uid,value是用户的fd
public $user2fd = [];
public function __construct()
{
$this->_serv = new swoole_websocket_server("0.0.0.0", 9501);
$this->_serv->set([
"worker_num" => 1,
"heartbeat_check_interval" => 60,
"heartbeat_idle_time" => 125,
]);
$this->_serv->on("open", [$this, "onOpen"]);
$this->_serv->on("message", [$this, "onMessage"]);
$this->_serv->on("close", [$this, "onClose"]);
}
/**
* @param $serv
* @param $request
* @return mixed
*/
public function onOpen($serv, $request)
{
// 连接授权
$accessResult = $this->checkAccess($serv, $request);
if (!$accessResult) {
return false;
}
// 始终把用户最新的fd跟uid映射在一起
if (array_key_exists($request->get["uid"], $this->user2fd)) {
$existFd = $this->user2fd[$request->get["uid"]];
$this->close($existFd, "uid exists.");
$this->user2fd[$request->get["uid"]] = $request->fd;
return false;
} else {
$this->user2fd[$request->get["uid"]] = $request->fd;
}
}
/**
* @param $serv
* @param $frame
* @return mixed
*/
public function onMessage($serv, $frame)
{
// 校验数据的有效性,我们认为数据被`json_decode`处理之后是数组并且数组的`event`项非空才是有效数据
// 非有效数据,关闭该连接
$data = $frame->data;
$data = json_decode($data, true);
if (!$data || !is_array($data) || empty($data["event"])) {
$this->close($frame->fd, "data format invalidate.");
return false;
}
// 根据数据的`event`项,判断要做什么,`event`映射到当前类具体的某一个方法,方法不存在则关闭连接
$method = $data["event"];
if (!method_exists($this, $method)) {
$this->close($frame->fd, "event is not exists.");
return false;
}
$this->$method($frame->fd, $data);
}
public function onClose($serv, $fd)
{
echo "client {$fd} closed.\n";
}
/**
* 校验客户端连接的合法性,无效的连接不允许连接
* @param $serv
* @param $request
* @return mixed
*/
public function checkAccess($serv, $request)
{
// get不存在或者uid和token有一项不存在,关闭当前连接
if (!isset($request->get) || !isset($request->get["uid"]) || !isset($request->get["token"])) {
$this->close($request->fd, "access faild.");
return false;
}
$uid = $request->get["uid"];
$token = $request->get["token"];
// 校验token是否正确,无效关闭连接
if (md5(md5($uid) . $this->key) != $token) {
$this->close($request->fd, "token invalidate.");
return false;
}
return true;
}
/**
* @param $fd
* @param $message
* 关闭$fd的连接,并删除该用户的映射
*/
public function close($fd, $message = "")
{
// 关闭连接
$this->_serv->close($fd);
// 删除映射关系
if ($uid = array_search($fd, $this->user2fd)) {
unset($this->user2fd[$uid]);
}
}
public function alertTip($fd, $data)
{
// 推送目标用户的uid非真或者该uid尚无保存的映射fd,关闭连接
if (empty($data["toUid"]) || !array_key_exists($data["toUid"], $this->user2fd)) {
$this->close($fd);
return false;
}
$this->push($this->user2fd[$data["toUid"]], ["event" => $data["event"], "msg" => "收到一条新的回复."]);
}
/**
* @param $fd
* @param $message
*/
public function push($fd, $message)
{
if (!is_array($message)) {
$message = [$message];
}
$message = json_encode($message);
// push失败,close
if ($this->_serv->push($fd, $message) == false) {
$this->close($fd);
}
}
public function start()
{
$this->_serv->start();
}
}
$server = new CommentServer;
$server->start();
满眼看下来,代码挺长的,没关系,我们整理了一下代码的逻辑
我们给CommentServer类增加了一个属性 $user2fd,这个是key => value结构,用于保存uid和fd的映射关系
onOpen回调做两件事,验证授权和添加新的映射关系
onMessage回调只接收含有event项的数组,event等同于CommentServer类的方法名,我们这里只有一个用于web通知的alertTip方法
此外,我们封装了该类的close方法和push方法,close方法用于server主动关闭连接,删除uid和fd的映射,push方法用于向指定的fd推送消息
六、客户端代码
$key = "^www.lulublog.cn&swoole$";
$uid = isset($_GET["uid"]) ? intval($_GET["uid"]) : 0;
$token = md5(md5($uid) . $key);
var ws = new WebSocket("ws://139.199.201.210:9501?uid=&token=");
ws.onopen = function(event) {
};
ws.onmessage = function(event) {
var data = event.data;
data = eval("("+data+")");
if (data.event == "alertTip") {
alert(data.msg);
}
};
ws.onclose = function(event) {
console.log("Client has closed.\n");
};
function send() {
var obj = document.getElementById("content");
var content = obj.value;
var toUid = document.getElementById("toUid").value;
ws.send("{"event":"alertTip", "toUid": "+toUid+"}");
}
server开启之后,演示的效果我们看下动图
结果中,注意看地址栏,alert弹窗是在哪个tab页弹出的。
七、多端口复合协议:server与server之间的交互
上例中,我们模拟的是评论被回复的简单例子。
回顾过去讲的内容,无论是tcp server,http server还是websocket server,server都是独立的,server与server之间并没有太多的交互。
实际上有没有交互的必要呢?
假设现在有这么一个需求,在刚刚评论的案例中,前文用户的回复不是直接发送给被回复的用户,而是评论在后台被人审核成功的一瞬间,再通知被回复的用户呢?
审核操作改为ajax操作,success回调内再new一个websocket客户端,然后send?可以,但是这显然不是一个很好的操作。
在websocket初识的时候我们说过,要想与websocket server通信,客户端只能是websocket客户端!既然我们刚刚否决了new一个websocket客户端,那是要怎么做呢?
从程序的角度出发,如果我们在php的层面上直接就能通知到websocket服务器,换言之,如果我们能够从php的层面上,直接实现alertTip方法的功能是不是就对了?
前文我们介绍tcp server的时候了解到,首先我们要想让web应用同server进行“互撩”,swoole_client少不了,既然有swoole_client,swoole_server肯定也少不了。但是目前server正在跑websocket,难不成我们在单独跑一个tcp server?对,我们就是要在websocket server的基础之上,想办法再跑一个tcp server。
为了使用多端口复合协议,swoole为server提供了listen方法,可以让当前server监听新的端口。
比如我们可以让刚刚创建的websocket server额外监听9502端口,这个端口主要负责tcp的工作。
$this->_tcp = $this->_serv->listen("127.0.0.1", 9502, SWOOLE_SOCK_TCP);
$this->_tcp->set([
"open_eof_check" => true, //打开EOF检测
"package_eof" => "\r\n", //设置EOF
"open_eof_split" => true, // 自动分包
]);
$this->_tcp->on("Receive", [$this, "onReceive"]);
listen函数返回的是swoole_server_port对象,需要注意的是swoole_server_port的set函数只能设置一些特定的参数,比如socket参数、协议相关等,像worker_num、log_file、max_request等等这些都是不支持的。就tcp服务器而言,swoole_server_port对象也仅仅对onConnect\onReceive\onClose这三个回调支持,其他的一律不可用,详细可翻阅swoole手册查看。
下面我们就以评论审核通知来看看多端口复合协议的玩法。
再来看下我们现在的流程
用户回复某评论 => 评论进入审核状态 ;很明显这个过程我们不需要做什么
管理员审核该评论 => 通知被回复的人;这个时候我们要做的就等同于alertTip函数要做的
server端除了刚刚设置的$this->_tcp一段代码之外,我们单独绑定了onReceive回调,下面看onReceive回调的实现
public function onReceive($serv, $fd, $fromId, $data)
{
try {
$data = json_decode($data, true);
if (!isset($data["event"])) {
throw new \Exception("params error, needs event param.", 1);
}
$method = $data["event"];
// 调起对应的方法
if(!method_exists($this, $method)) {
throw new \Exception("params error, not support method.", 1);
}
$this->$method($fd, $data);
return true;
} catch (\Exception $e) {
$msg = $e->getMessage();
throw new \Exception("{$msg}", 1);
}
}
可以看到,除了进行简单的判断之外,如果tcp客户单携带一个event=alertTip即可
在这之前,websocket客户端的代码我们依然以前面的为例,假设要回复的用户uid=100,我们运行server之后,先让uid=100的客户端连接到server,运行的客户端地址栏添加uid参数等于100即可
下面我们再写一个tcp client,连接9502端口,我们的tcp server在这个端口监听
class Client
{
private $client;
public function __construct ()
{
$this->client = new Swoole\Client(SWOOLE_SOCK_TCP);
if (!$this->client->connect("127.0.0.1", 9502)) {
$msg = "swoole client connect failed.";
throw new \Exception("Error: {$msg}.");
}
}
/**
* @param $data Array
* send data
*/
public function sendData ($data)
{
$data = $this->togetherDataByEof($data);
$this->client->send($data);
}
/**
* 数据末尾拼接EOF标记
* @param Array $data 要处理的数据
* @return String json_encode($data) . EOF
*/
public function togetherDataByEof($data)
{
if (!is_array($data)) {
return false;
}
return json_encode($data) . "\r\n";
}
}
$client = new Client;
$client->sendData([
"event" => "alertTip",
"toUid" => 100,
]);
现在无论是websocket服务器、tcp 服务器还是websocket客户端 tcp客户端都已经准备就绪了,下面我们浏览器直接访问下tcp client,如果正常的话,websocket客户端所在页面会弹出有新回复的通知。
看动图运行结果