swoole笔记之 主服务器swoole_websocket_server, 监听 tcp端口 ,任务投递, http请求

<?php
error_reporting(E_ALL);
set_time_limit(0);// 设置超时时间为无限,防止超时
date_default_timezone_set('Asia/shanghai');
header('Content-type:text/html;charset=utf-8');

/*存放websocket客户端*/
$table=new swoole_table(256);
$table->column('name',swoole_table::TYPE_STRING,300);
$table->column('id', swoole_table::TYPE_INT);
$table->create();


/*存放tcp客户端*/
$tcp_table=new swoole_table(256);
$tcp_table->column('name',swoole_table::TYPE_STRING,300);
$tcp_table->column('id', swoole_table::TYPE_INT);
$tcp_table->create();

  //主服务
$websocket_server = new swoole_websocket_server("0.0.0.0", 9517,SWOOLE_PROCESS,SWOOLE_SOCK_TCP);

//设置参数
$websocket_server->set(array(
'reactor_num' => 2, //线程数 通过此参数来调节poll线程的数量,以充分利用多核
'worker_num' => 2, //工作进程数
'task_worker_num'=>8, //异步任务工作进程
'backlog' => 128, //listen backlog
'max_request' => 1000, //最大请求书
'log_file'=>'/data/log/swoole.log', //记录swoole服务的错误
'log_level' => 1, //错误级别
// 'dispatch_mode' => 1, //当设置dispatch_mode = 1/3时会自动去掉onConnect/onClose事件回调。
'daemonize' => false, //加入此参数后,执行php server.php将转入后台作为守护进程运行,
'heartbeat_idle_time' => 3600, //允许最大空闲时间
'heartbeat_check_interval' => 3600, //心跳检测间隔
));

//监听端口 9518 协议:tcp 只监听此端口 tcp的链接,发送数据
$tcp_serv = $websocket_server->addListener('127.0.0.1',9518,SWOOLE_SOCK_TCP);
//返回 SwooleServerport 对象,默认集成主服务的协议配置,需要单独调用 SwooleServerPort 的set(),on()方法设置新的协议才会启用新的协议
//
$tcp_serv->set([
"open_eof_check"=>false,
"package_max_length"=>2048,
'heartbeat_check_interval'=>120,
'heartbeat_idle_time'=>1200,
'daemonize' => 1,
]);

//SwooleServerPort::on() 中没有事件 task, finish的回调, 只有connect,receive,close,packet 回调事件

//有新的连接进入时,在worker进程中回调(调用父类swoole_server回调)
$tcp_serv->on('connect', function (swoole_server $server,$fd,$reactorId)use($tcp_table){
echo "tcp监听:新的客户端[$fd]进入,来自线程[$reactorId] ";
$tcp_table->set($fd,['id'=>$fd,'name'=>'匿名']);
});

//只有tcp协议的服务才有此回调, websocket, http不可以
$tcp_serv->on('receive', function (swoole_server $server, $fd, $reactor_id, $data)use($table,$tcp_table){
echo "tcp监听:来自线程[$reactor_id]客户端[$fd]数据:$data ";
$server->task($data);
});


//客户端退出
$tcp_serv->on('Close', function (swoole_server $server, $fd)use($table) {
echo "tcp:connection close: " . $fd." ";
if($table->exist($fd)){
$table->del($fd);
}
});



//******************* WebSocket 服务 *****************************//

$websocket_server->on('receive',function(swoole_server $server, $fd, $reactor_id, $data){
echo "websocket监听:receive接收客户端id:".$fd." 消息:".$data." ";
echo "websocket监听:receive接收线程id:".$reactor_id. " ";
$task_id = $server->task($data);
echo "websocket监听:投放任务,任务进程id:".$task_id." ";
});

//处理异步任务(此回调函数在task进程中执行)
$websocket_server->on('task', function ($serv, $task_id, $from_id, $data) {
echo "websocket监听:任务回调数据:".$data." ";
echo "websocket监听:任务进程id:".$task_id." ";
echo "websocket监听:New AsyncTask[id=$task_id]".PHP_EOL;
//返回任务执行的结果
sleep(10);
$serv->finish($data."websocket监听:--任务完成");
});

//处理异步任务的结果(此回调函数在worker进程中执行)
$websocket_server->on('finish', function ($serv, $task_id, $data) {
echo "websocket监听: 任务完成回调数据:".$data." ";
echo "websocket监听: 任务完成回调任务进程id:".$task_id." ";

});

$websocket_server->on('connect',function(swoole_server $server, $fd,$reactorId){
echo "websocket监听: websocket客户端".$fd."连接 ";
});
$websocket_server->on('open', function (swoole_websocket_server $server, $req)use($table) {
echo "websocket监听: connection open: " . $req->fd . " ";
$table->set($req->fd,['id'=>$req->fd,'name'=>'匿名']);
});

//接收http请求,把请求数据通过socket推送到客户端
$websocket_server->on('request',function($request,$response)use($table,$websocket_server){
echo "http请求: ";
echo "http连接id:".$response->fd." ";
echo "table数量:".$table->count()." ";
//可通过http向服务器投递任务
$websocket_server->task("http_task");
$data = $request->get;
if($data){
$json_data = json_encode($data,JSON_UNESCAPED_UNICODE);
foreach($table as $k=>$v){
$websocket_server->push($v['id'], $json_data);
}
}
$fd_str=[];
foreach ($websocket_server->connections as $id){
echo "连接id: ".$id." ";
$fd_str[]=$id;
}

$fd_str1 = join('|',$fd_str);

$response->header("Content-Type", "text/html; charset=utf-8");
$response->end(date("Y-m-d H:i:s 连接数:".count($websocket_server->connections)."==连接字符串: ".$fd_str1 ));
});


$websocket_server->on('Message', function (swoole_websocket_server $server, $frame)use($table,$tcp_table,$websocket_server) {
echo " 客户端".$frame->fd."发来消息message: " . $frame->data . " ";
$data=$frame->data;
$data=json_decode($data,true);
$count = count($websocket_server->connections);
$fd_str="";
foreach($websocket_server->connections as $fd){
$fd_str.=$fd." | ";
}
$str="<br /> 连接数:".$count." 连接id:".$fd_str;


if($data['type']=='login'){
//可通过长链接向服务器投递任务
$server->task('login_task');
$da=$table->get($frame->fd);
$da['name']=$data['content'];
$table->set($frame->fd,$da);
$data['content']="<span style='color:#abc'>".'欢迎'.$data['content']."</span>".$str;
}elseif($data['type']=='user'){
$websocket_server->push($frame->fd,'连接关闭');
$websocket_server->close($frame->fd);
$da=$table->get($frame->fd);
$data['content']="<span style='color:#1fd'>".$da['name']."</span>".' : '.$data['content'].$str;
}
//向websocket客户端推送数据
foreach($table as $k=>$v){
$server->push($v['id'], nl2br($data['content']));
}
//向tcp客户端推送数据
foreach($tcp_table as $k=>$v){
$server->send($v['id'], nl2br($data['content']));
}
});

//客户端退出
$websocket_server->on('Close', function (swoole_websocket_server $server, $fd)use($table) {
echo "connection close: " . $fd." ";
if($table->exist($fd)){
$table->del($fd);
}
});


$websocket_server->on("Start",function (){
echo date('Y-m-d H:i:s ').": websocket服务器启动 ";
});



$websocket_server->start();


此上代码可以支持websocket长链接, http请求, tcp链接。 通过http可以向websocket客户端发送信息, 可以在onRequest()事件回调中投递任务, websocket长链接也可以向服务投递任务。 tcp链接也可向服务器投递任务
注意避免死循环请求,例如:http请求投递了一个任务,任务中如果有向服务的http请求,应该规避该http请求不会投递任务,http请求--》投递任务---》http请求---》投递任务,这样的死循环。


原文地址:https://www.cnblogs.com/66w66/p/12943542.html