一个高在线(可以超过1024)多线程的socket echo server(pthreads 和 libevent扩展)

研究了3周吧,本来打算用pthreads+event扩展的,结果event扩展太原始了,太多函数了,实在不知道怎么在外部随时发送数据给客户端,所以改用libevent,

改用libevent之后花了2个小时就运行起来了。

当然并不敢说稳定,而且有个地方用了一个“适应”bug的地方,避免bug

两个扩展都从pecl.php.net下载就可以了,

安装,不想写了,16:25了还没吃早饭 午饭

上代码,欢迎讨论:

我的QQ群:
PHPer&Webgame&移动开发,群号:95303036

<?php
class statWorker extends Worker {
	public function __construct() {
		
	}
	public function run(){
		while(!$this->isShutdown()){
				sleep(3);
		}
	}
}
class sendAllWorker extends Worker {
	public function __construct(&$_logicWorker) {
		$this->logicWorker = $_logicWorker;
	}
	public function run(){
		
	}
}
class data extends Stackable{
	public function __construct() {
		echo __FILE__.'-'.__LINE__.'<br/>'.chr(10);
	}
	public function run(){
		echo __FILE__.'-'.__LINE__.'<br/>'.chr(10);
	}
}
class logicWorker extends Worker{
	public $data;
	public function __construct(&$_data) {
		echo __FILE__.'-'.__LINE__.'<br/>'.chr(10);
		$this->data = $_data;
	}
	public function run(){
		$data = $this->data;
		$count=0;
		while(1){
			if($arr =(array) $this->shift()){
				echo __FILE__.'-'.__LINE__.'<br/>'.chr(10);
				if(!$arr) continue;
				if(isset($arr[0])){
					echo $arr[0].' from '.$arr[1].chr(10);
					//$data[] = array('You say:'.$arr[0], $arr[1]%2==0?0:$arr[1]);
					$data[] = array('You say:'.$arr[0], 0);
				}

				if((++$count)%1000==0){
					printf("Work Mermory used %.3fMB RAM, time: %3f===> %d 
",  
					memory_get_peak_usage(true)/1048576, (microtime(true) - $stime), $count);
					$stime = microtime(true);
				}
			}else usleep(100000);
		}
	}
}
class listenerWorkerRunner extends Worker {
	public function __construct(&$_listener){
		$this->listener = $_listener;
	}
	
	public function run(){
		$this->listener->dispatch();
	}
}
class senderWorker extends Worker {
	public function __construct(&$_data, &$clients) {
		$this->data = $_data;
		$this->clients = $clients;
	}
	public function run(){
		echo __FILE__.'-'.__LINE__.'<br/>'.chr(10);
		$data = $this->data;
		$clients = $this->clients;
		while(1)
		{
			$arr= $data->shift();
			if($arr){
				$id = $arr[1];
				$msg = $arr[0];
				echo '===================================senderWorker GOT: '.trim($msg).' from '.$id.chr(10);
				//var_dump($clients);
				if(is_array($id))
					foreach($id as $i)
						fwrite($clients[$i], $msg);
				else if($id===0){
					//必须这样,否则会报错,$clients的最后一个成员会变成 resource(2) of type (unknown) 
					$_clients = (array) $clients;
					var_dump($_clients);
					foreach($_clients as $i=>$c)
						fwrite($clients[$i], $msg);
				}else
					if($clients[$id]) fwrite($clients[$id], $msg);
			}else{
				//echo 'senderWorker IS RUNNING...'.chr(10);
				usleep(100000);
			}
		}
	}
}

class epoll{
    private static $socket;
    public static $connections;
    private static $buffers;
	private static $worker;
	private static $clients;
    function epoll($port, &$worker, &$clients){
		self::$clients = $clients;
		self::$worker = $worker;
        if($port<1024) die("Port must be a number which bigger than 1024
");
        self::$socket = stream_socket_server ('tcp://0.0.0.0:'.$port, $errno, $errstr);
        stream_set_blocking(self::$socket, 0);
        $base = event_base_new();
        $event = event_new();
        event_set($event, self::$socket , EV_READ | EV_PERSIST, 'epoll::ev_accept', $base);
        event_base_set($event, $base);
        event_add($event);
        event_base_loop($base);

        self::$connections = array();
        self::$buffers = array();
    }
    public static function ev_accept($socket, $flag, $base) {
        static $id = 0;

        $connection = stream_socket_accept($socket);
        stream_set_blocking($connection, 0);

        $id += 1;

        $buffer = event_buffer_new($connection, 'epoll::ev_read', NULL, 'epoll::ev_error', $id);
        event_buffer_base_set($buffer, $base);
        event_buffer_timeout_set($buffer, 30, 30);
        event_buffer_watermark_set($buffer, EV_READ, 0, 0xffffff);
        event_buffer_priority_set($buffer, 60);//超时自动断开时间
        event_buffer_enable($buffer, EV_READ | EV_PERSIST);

        // we need to save both buffer and connection outside
        self::$connections[$id] = $connection;
		self::$clients[$id] = $connection;
        self::$buffers[$id] = $buffer;
		
        echo 'In-> $id='.$id.',$connection='.$connection."
";
    }
    public static function ev_error($buffer, $error, $id) {
        event_buffer_disable(self::$buffers[$id], EV_READ | EV_WRITE);
        event_buffer_free(self::$buffers[$id]);
        echo 'Ot-> $id='.$id."
";
        fclose(self::$connections[$id]);
        unset(self::$buffers[$id], self::$connections[$id], self::$clients[$id]);
    }
    public static function ev_read($buffer, $id) {
        static $ct=0;
        while ($read = event_buffer_read($buffer, 256))
        {
            $ct+=strlen($read);
            if(strpos($read,'ct')!==false) echo 'Ct=>'.count(self::$connections).'
';
			self::$worker[] = array($read, $id);
        }
    }
	//--------------------------------------------
	//will not work!!
	public static function sendMsg($msg, $id){
		if(is_array($id))
			foreach($id as $i)
				fwrite(self::$clients[$i], $msg);
		else if($id===0)
			foreach($id as $i)
				fwrite(self::$clients[$i], $msg);
		else
			fwrite(self::$clients[$id], $msg);
	}
}

//暂未使用,计划用于统计在线之类的
$_statWorker = new statWorker();
$_statWorker->start();

/*
$_logicWorker 处理数据后通过$data传给$_senderWorker下发
因为worker stackable 似乎都不能共享变量(他们接收到的参数——通过worker[]=...,stackable=[],worker->stack几个方式收到的数据都是复制的,并且资源复制不成功,是null,通过public方法设定的参数同样是复制的,只有construct构造函数接收的参数才可以以引用的)
我了解到的是要给worker共享数据,只能通过给其构造函数传递一个stackable(传worker)类型的变量
*/
$data = new data();

//保存连接的客户端,epoll::sendMsg不会正常工作,在里面获取不到self::$clients,也只能通过向构造函数传递stackable变量的方法实现;是为什么我不知道,可能我的用法不对。
$clients = new data();

/*
处理接收上来的数据,比如查看背包,收取邮件,或者战斗 ,行走,但里面不要进行任何io操作——读写数据库甚至是memcache都不可以
logicWorker只做计算型工作,并且是主要的工作线程
那么你可能要问,用户的数据从哪里来?如何保存?
——答案是用别的线程,至少还需要2个线程:登陆线程,从数据库读数据;存档线程,把数据存放到数据。
这样工作按线程分开了,能够有一定并发数,

是多少,不知道,我目前的java程序就是这个方式,单服3000+在线,负载不到1,cpu 20%左右。
php可能要差一点儿

当然这个pthreads应该可以用很不稳定来形容,很容易coredump……,不过怎么都算是个进步吧,因为真的实现了多线程。
*/
$_logicWorker = new logicWorker($data);
/*
负责下发数据,其实,可以由$_logicWorker来执行,我这里主要是想测试数据共享,
*/
$_senderWorker = new senderWorker($data, $clients);

//启动线程和监听
$_logicWorker->start();
$_senderWorker->start();
new epoll(9808, $_logicWorker, $clients);

echo 'Running ... ----------> 
';


 

我的QQ群:
PHPer&Webgame&移动开发,群号:95303036
加群除了提问之外,请记得帮助别人,谢谢。

原文地址:https://www.cnblogs.com/lein317/p/5067584.html