剖析下聊天室

由来

环境:PHP7、Swoole、linux

对聊天室有点感兴趣,对于网络协议有一点一知半解,所以决定借助swoole实现个简单的聊天室,来简单剖析下原理,知道原理以后就可以考虑用其他语言或者自己造轮子写个,当然这是后话。

源码我放置github( https://github.com/WalkingSun/SwooleServer ),有兴趣可以借鉴借鉴。

系统设计

即时通讯的网络通信基于长连接,通信方式有TCP、UDP、socket、websocket等,本次实现是websocket,系统建立常驻内存的websocket服务,客户端即浏览器两者建立连接通信。通信过程如下:

image

关于客户端连接websocket服务,本文不做细述,websocket服务的建立借助swoole,需要在服务端的open、recieve、send、close建立回调处理,为了方便我将连接的客户端信息放入swoole_table(一个基于共享内存和锁实现的超高性能,并发数据结构)。

代码仅供参考:

websocket 服务类:

<?php
/**
 * Created by PhpStorm.
 * User: WalkingSun
 * Date: 2018/10/28
 * Time: 15:54
 */

class WsServer
{
    const host = '0.0.0.0';
    const port = '9501';

    public $swoole;

    public $config = ['gcSessionInterval' => 60000];

    public $openCallback;       //open回调

    public $messageCallback;    //message回调

    public $runApp;    //request回调

    public $workStartCallback;       //work回调

    public $finishCallback;     //finish回调

    public $closeCallback;       //close回调

    public $taskCallback;       //task回调

    public function __construct( $host, $port, $mode, $socketType, $swooleConfig=[], $config=[])
    {
        $host = $host?:self::host;
        $port = $port?:self::port;
        $this->swoole = new Swoole_websocket_server($host,$port,$mode,$socketType);
        $this->webRoot = $swooleConfig['document_root'];
        if( !empty($this->config) ) $this->config = array_merge($this->config, $config);
        $this->swoole->set($swooleConfig);

        $this->swoole->on('open',[$this,'onOpen']);
        $this->swoole->on('message',[$this,'onMessage']);
        $this->swoole->on('request',[$this,'onRequest']);
        $this->swoole->on('WorkerStart',[$this,'onWorkerStart']);            //增加work进程
        $this->swoole->on('task',[$this,'onTask']);            //增加task任务进程
        $this->swoole->on('finish',[$this,'onFinish']);
        $this->swoole->on('close',[$this,'onClose']);
    }

    public function run(){
        $this->swoole->start();
    }

    /**
     * 当WebSocket客户端与服务器建立连接并完成握手后会回调此函数
     * @param $serv swoole_websocket_server 服务对象
     * @param $request swoole_http_server 服务对象
     */
    public function onOpen( swoole_websocket_server $serv,  $request){
        call_user_func_array( $this->openCallback, [ $serv, $request ] );

        //定时器(异步执行)
//        if($request->fd == 1){
//            swoole_timer_tick(2000,function($timer_id){
//                echo time().PHP_EOL;
//            });
//        }
    }

    /**
     *当服务器收到来自客户端的数据帧时会回调此函数。
     * @param $server  swoole_websocket_server 服务对象
     * @param $frame  swoole_websocket_frame对象,包含了客户端发来的数据帧信息
     * $frame->fd,客户端的socket id,使用$server->push推送数据时需要用到
    $frame->data,数据内容,可以是文本内容也可以是二进制数据,可以通过opcode的值来判断
    $frame->opcode,WebSocket的OpCode类型,可以参考WebSocket协议标准文档
    $frame->finish, 表示数据帧是否完整,一个WebSocket请求可能会分成多个数据帧进行发送(底层已经实现了自动合并数据帧,现在不用担心接收到的数据帧不完整)
     */
    public function onMessage(swoole_websocket_server $serv, swoole_websocket_frame $frame ){

        call_user_func_array( $this->messageCallback, [ $serv, $frame ]);

    }

    /**
     * @param $serv  swoole_websocket_server 服务对象
     * @param $fd  连接的文件描述符
     * @param $reactorId  来自那个reactor线程
     * onClose回调函数如果发生了致命错误,会导致连接泄漏。通过netstat命令会看到大量CLOSE_WAIT状态的TCP连接
     * 当服务器主动关闭连接时,底层会设置此参数为-1,可以通过判断$reactorId < 0来分辨关闭是由服务器端还是客户端发起的。
     */
    public function onClose( swoole_websocket_server $serv , $fd , $reactorId ){

        call_user_func_array( $this->closeCallback ,[ $serv , $fd , $reactorId ]);

    }

    /**
     * 在task_worker进程内被调用。worker进程可以使用swoole_server_task函数向task_worker进程投递新的任务。当前的Task进程在调用onTask回调函数时会将进程状态切换为忙碌,这时将不再接收新的Task,当onTask函数返回时会将进程状态切换为空闲然后继续接收新的Task。
     * @param $serv  swoole_websocket_server 服务对象
     * @param $task_id int 任务id,由swoole扩展内自动生成,用于区分不同的任务。$task_id和$src_worker_id组合起来才是全局唯一的,不同的worker进程投递的任务ID可能会有相同
     * @param $src_worker_id int 来自于哪个worker进程
     * @param $data mixed 任务的内容
     */
    public function onTask(swoole_server $serv,  $task_id,  $src_worker_id,  $data){

        call_user_func_array( $this->taskCallback , [ $serv, $task_id, $src_worker_id, $data ]);

//        sleep(10);
//        onTask函数中 return字符串,表示将此内容返回给worker进程。worker进程中会触发onFinish函数,表示投递的task已完成。
//        return "task {$src_worker_id}-{$task_id} success";
    }

    /**
     * 当worker进程投递的任务在task_worker中完成时,task进程会通过swoole_server->finish()方法将任务处理的结果发送给worker进程。
     * @param $serv  swoole_websocket_server 服务对象
     * @param $task_id int  任务id
     * @param $data string  task任务处理的结果内容
     * task进程的onTask事件中没有调用finish方法或者return结果,worker进程不会触发onFinish
    执行onFinish逻辑的worker进程与下发task任务的worker进程是同一个进程
     */
    public function onFinish(swoole_server $serv,  $task_id,  $data){

        call_user_func_array( $this->finishCallback ,[ $serv,$task_id,$data]);
//        echo $data;
//        return $data;
    }

    public function onRequest( $request, $response ){
        call_user_func_array( $this->runApp, [ $request, $response ]);
    }

    public function onWorkerStart( $server,  $worker_id ){
        call_user_func_array( $this->workStartCallback , [$server,  $worker_id]);
    }
}

起服务和回调设置:

   class SwooleController extends BasicController{

       public $host;

       public $port;

       public  $swoole_config=[];

       public static $table;


       public function actionStart(){

           $config = include __DIR__ . '/../config/console.php';

           if( isset($config['swoole']['log_file']) ) $this->swoole_config['log_file'] = $config['swoole']['log_file'];

           if( isset($config['swoole']['pid_file']) ) $this->swoole_config['pid_file'] = $config['swoole']['pid_file'];

           $this->swoole_config = array_merge(
               [
                   'document_root' => $config['swoole']['document_root'],
                   'enable_static_handler'     => true,
   //                'daemonize'=>1,
                   'worker_num'=>4,
                   'max_request'=>2000,
   //            'task_worker_num'=>100,
                   //检查死链接  使用操作系统提供的keepalive机制来踢掉死链接
                   'open_tcp_keepalive'=>1,
                   'tcp_keepidle'=> 1*60,     //连接在n秒内没有数据请求,将开始对此连接进行探测
                   'tcp_keepcount' => 3,       //探测的次数,超过次数后将close此连接
                   'tcp_keepinterval' => 0.5*60,     //探测的间隔时间,单位秒

                   //swoole实现的心跳机制,只要客户端超过一定时间没发送数据,不管这个连接是不是死链接,都会关闭这个连接
   //            'heartbeat_check_interval' => 10*60,        //每m秒侦测一次心跳
   //            'heartbeat_idle_time' => 30*60,            //一个TCP连接如果在n秒内未向服务器端发送数据,将会被切断
               ],$this->swoole_config
           );

           $this->host = $config['swoole']['host'];
           $this->port = $config['swoole']['port'];
           $swooleServer = new WsServer(  $this->host,$this->port,$config['swoole']['mode'],$config['swoole']['socketType'],$this->swoole_config,$config);

           //连接信息保存到swoole_table
           self::$table = new swoole_table(10);
           self::$table->column('username',SwooleTable::TYPE_STRING, 10);
           self::$table->column('avatar',SwooleTable::TYPE_STRING, 255);
           self::$table->column('msg',SwooleTable::TYPE_STRING, 255);
           self::$table->column('fd',SwooleTable::TYPE_INT, 6);
           self::$table->create();

           $swooleServer->openCallback = function( $server , $request ){
               echo "server handshake with fd={$request->fd}
";
           };

           $swooleServer->runApp = function( $request , $response ) use($config,$swooleServer){

               //全局变量设置及app.log
               $this->globalParam( $request );
               $_SERVER['SERVER_SWOOLE'] = $swooleServer;

               //记录日志
               $apiData = $_SERVER;
               unset($apiData['SERVER_SWOOLE']);
               Common::addLog( $config['log'] , ($apiData) );

               //解析路由
               $r = $_GET['r'];
               $r = $r?:( isset($config['defaultRoute'])?$config['defaultRoute']:'index/index');
               $params = explode('/',$r);
               $controller = __DIR__.'/../controllers/'.ucfirst($params[0]).'Controller.php';

               $result = '';
               if( file_exists( $controller ) ){
                   require_once $controller;
                   $class = new ReflectionClass(ucfirst($params[0]).'Controller');
                   if( $class->hasMethod( 'action'.ucfirst($params[1]) ) ){
                       $instance  = $class->newInstanceArgs();
                       $method = $class->getmethod('action'.ucfirst($params[1])); // 获取类中方法
                       ob_start();
                       $method->invoke($instance);    // 执行方法
                       $result = ob_get_contents();
                       ob_clean();
                   }else{
                       $result = 'NOT FOUND!';
                   }
               }else{
                   $result = "$controller not exist!";
               }

               $response->end( $result );
           };

           $swooleServer->workStartCallback = function( $server,  $worker_id ){

           };

           $swooleServer->taskCallback = function( $server , $request ){
               //发送通知或者短信、邮件等

           };

           $swooleServer->finishCallback = function( $serv,  $task_id,  $data ){

   //            return $data;
           };

           $swooleServer->messageCallback = function( $server,  $iframe  ){
               //记录客户端信息
               echo "Client connection fd {$iframe->fd} ".PHP_EOL;

               $data = json_decode( $iframe->data ,1 );

               if( !empty($data['token']) ){
                   if( $data['token']== 'simplechat_open' ){
                       if( !self::$table->exist($iframe->fd) ){
                           $user = array_merge($data,['fd'=>$iframe->fd]);
                           self::$table->set($iframe->fd,$user);

                           //发送连接用户信息
                           foreach (self::$table as $v){
                               if($v['fd']!=$iframe->fd){
                                   $pushData = array_merge($user,['action'=>'connect']);
                                   $server->push($v['fd'],json_encode($pushData));
                               }
                           }
                       }

                   }

                   if( $data['token']=='simplechat' ){
                       //查询所有连接用户,分发消息

                       foreach (self::$table as $v){
                           if($v['fd']!=$iframe->fd){
                               $pushData = ['username'=>$data['username'],'avatar'=>$data['avatar'],'time'=>date('H:i'),'data'=>$data['data'],'action'=>'send'];
                               $server->push($v['fd'],json_encode($pushData));
                           }
                       }
                   }
               }

               //接受消息,对消息进行解析,发送给组内人其他人

           };

           $swooleServer->closeCallback = function(  $server,  $fd,  $reactorId ){

               if(  self::$table->exist($fd) ){
                   //退出房间处理
                   self::$table->del($fd);
                   foreach (self::$table as $v){
                       $pushData = ['fd'=>$fd,'username'=>'','avatar'=>'','time'=>date('H:i'),'data'=>'','action'=>'remove'];
                       $server->push($v['fd'],json_encode($pushData));
                   }
               }

               echo  "Client close fd {$fd}".PHP_EOL;
           };

           $this->stdout("server is running, listening {$this->host}:{$this->port}" . PHP_EOL);
           $swooleServer->run();
       }


       public function actionStop(){
           $r = $this->sendSignal( SIGTERM );
           if( $r ){
               $this->stdout("server is stopped, stop listening {$this->host}:{$this->port}" . PHP_EOL);
           }
       }

       public function actionRestart(){
           $this->sendSignal(SIGTERM);     //向主进程发送SIGTERM实现关闭服务器
           $this->actionStart();
       }

       public function actionReload(){
           $this->sendSignal(SIGUSR1);   //向主进程/管理进程发送SIGUSR1信号,将平稳地restart所有Worker进程
       }

   }

起了服务,客户端就可以连接通信了。

心跳检测

起了半天后服务常会断掉,查看监听端口进程状态,服务器输入:

$ netstat -anp |grep 9501

发现大量CLOSE_WAIT状态,常用状态有 ESTABLISHED 表示正在通信,TIME_WAIT 表示主动关闭,CLOSE_WAIT 表示被动关闭。

TIME_WAIT和CLOSE_WAIT两种状态如果一直被保持,意味着对应数目的通道就一直被占用,且“占着茅坑不使劲”,一旦句柄数达到上限,新的请求就无法处理。而且因为swoole是master-worker模式,
基本上http、tcp通信都是在worker进程,CLOSE_WAIT一直在,子进程将一直无法释放,随着时间的推移CLOSE_WAIT状态的进程越来越多,阻碍新的连接进来,websocket服务不可用。

主动关闭 和 被动关闭
TCP关闭 四次挥手过程如下:

image

挥手流程:

1、 客户端是调用函数close(),这时,客户端会发送一个FIN给服务器。

2、 服务器收到FIN,关闭套接字读通道,并将自己状态设置为CLOSE_WAIT(表示被动关闭),
并返回一个ACK给客户端。

3、 客户端收到ACK,关闭套接字写通道
接下来,服务器会调用close():

1、 服务器close(),发送一个FIN到客户端。

2、 客户端收到FIN,关闭读通道,并将自己状态设置成TIME_WAIT,发送一个ACK给服务器。

3、 服务器收到ACK,关闭写通道,并将自己状态设置为CLOSE。

4、 客户端等待两个最大数据传输时间,然后将自己状态设置成CLOSED。

由此我们看到CLOSE-WAIT 状态,TIME-WAIT 状态 产生的过程,产生的原因是复杂的,比如说网络通信中断、用户手机网络切换wifi网络、网络通信丢包等,故此tcp挥手过程会出现中断,继而
产生这些关闭状态。

为了解决这些占用连接数的异常连接,需要检测连接是否是活动的,对于死连接我们需要释放关闭它。

TIME_WAIT 主动关闭

主动关闭的一方在发送最后一个ACK包后,无论对方是否收到都会进入状态,等待2MSL(Maximum Segment Lifetime数据包的最大生命周期,是一个数据包能在互联网上生存的最长时间,若超过这个时间则该数据包将会消失在网络中)
的时间,才会释放网络资源。

TIME_WAIT状态的存在主要有两个原因:

1)可靠地实现TCP全双工连接的终止。在关TCP闭连接时,最后的ACK包是由主动关闭方发出的,如果这个ACK包丢失,则被动关闭方将重发FIN包,因此主动方必须维护状态信息,以允许它重发这个
ACK包。如果不维持这个状态信息,那么主动方将回到CLOSED状态,并对被动方重发的FIN包响应RST包,而被动关闭方将此包解释成一个错误。因而,要实现TCP全双工连接的正常终止,必须能够处
理四次握手协议中任意一个包丢失的情况,主动关闭方必须维持状态信息进入TIME_WAIT状态。

2)确保迷路重复数据包在网络中消失,防止上一次连接中的包迷路后重新出现,影响新连接。TCP数据包可能由于路由器异常而迷路,在迷路期间,数据包发送方可能因超时而重发这个包,迷路的
数据包在路由器恢复后也会被送到目的地,这个迷路的数据包就称为Lost Duplicate。在关闭一个TCP连接后,如果马上使用相同的IP地址和端口建立新的TCP连接,那么有可能出现前一个连接的迷
路重复数据包在前一个连接关闭后再次出现,影响新建立的连接。为了避免这一情况,TCP协议不允许使用处于TIME_WAIT状态的连接的IP和端口启动一个新连接,只有经过2MSL的时间,确保上一次
连接中所有的迷路重复数据包都已消失在网络中,才能安全地建立新连接。

如果Server主动关闭连接,同样会有大量的连接在关闭后处于TIME_WAIT状态,等待2MSL的时间后才能释放网络资源。对于并发连接,出现大量等待连接,新的连接进不来,会降低系统性能。

time_wait问题可以通过调整内核参数和适当的设置web服务器的keep-Alive值来解决。因为time_wait是自己可控的,要么就是对方连接的异常,要么就是自己没有快速的回收资源,总之不是由于自己程序错误引起的。

解决方式:

  • 试图让Client主动关闭连接,由于每个Client的并发量都比较低,因而不会产生性能瓶颈

  • 优化Server的系统TCP参数,使其网络资源的最大值、消耗速度和恢复速度达到平衡;

    修改/etc/sysctl.conf

  net.ipv4.tcp_tw_recycle = 1       #启用TIME-WAIT状态sockets的快速回收

  net.ipv4.tcp_tw_reuse = 1         #允许将TIME-WAIT sockets重新用于新的TCP连接,默认为0,表示关闭

  #缓存每个连接最新的时间戳,后续请求中如果时间戳小于缓存的时间戳,即视为无效,相应的数据包会被丢弃,启用这种行为取决于tcp_timestamps和tcp_tw_recycle
  net.ipv4.tcp_timestamps = 1

CLOSE_WAIT 被动关闭

对方发送一个FIN后,程序自己这边没有进一步发送ACK以确认。换句话说就是在对方关闭连接后,程序里没有检测到,或者程序里本身就已经忘了这个时候需要关闭连接,于是这个资源就一直被程序占用着。

解决办法:

  • 释放关闭掉异常的连接;
  • 修复程序的bug,重新发布;

Keep-Alive

TCP中有一个Keep-Alive的机制可以检测死连接,LINUX内核包含对keepalive的支持,其中使用了三个参数:tcp_keepalive_time(开启keepalive的闲置时长)tcp_keepalive_intvl(keepalive探测包的发送
间隔)和tcp_keepalive_probes(如果对方不予应答,探测包的发送次数);如此服务端会隔断时间发送个探测包给客户端,可以是多次,如果在超出设置闲置时长,内核会关闭这个连接。

客户端主动发心跳

通过程序设置最大连接时长,如果客户端在这段时间内没有发送过数据,则关闭释放这个连接。

解决关闭问题

TIME_WAIT 倒是没有出现过, CLOSE_WAIT状态总会出现。

就看看文档,swoole有这些设置,当前使用的是TCP的keep-alive检测,只需改配置即可:

   ...
    //检查死链接  使用操作系统提供的keepalive机制来踢掉死链接
                'open_tcp_keepalive'=>1,
                'tcp_keepidle'=> 1*60,     //连接在n秒内没有数据请求,将开始对此连接进行探测
                'tcp_keepcount' => 3,       //探测的次数,超过次数后将close此连接
                'tcp_keepinterval' => 0.5*60,     //探测的间隔时间,单位秒
   ...

我设置的周期比较短,方便测试。

设置了这些看似稳定了,却还是会出现CLOSE_WAIT,后来查了日志,发生错误中断了,大概意思,代码中出现exit、die,显然常驻内存的swoole不支持这些,会立马中断程序。所以改些这些代码,
刚开始借助YII2.0写的,框架源码的问题,所以swoole这块服务需要单独出来,嗯。。。所以索性直接自己撸个。现在看来,服务跑起来稳定多了,一直没挂呢。

贴下临时地址:http://47.99.189.105:91/

系统监控及优化

系统很简单,但是作为研究,应该更透彻点。

我们的系统如何监控?如果说系统崩溃怎么办?能支撑多大并发?高并发下如何保持系统稳定。。。 一个高性能的即时通讯是如何架构的?

额,留待以后再研究下补充。

参考资料:

https://juejin.im/post/5c3b21e4e51d455231347349

原文地址:https://www.cnblogs.com/followyou/p/10360252.html