Swoole RPC 的实现

开始今天的文章吧,这篇文章实现了一个简单的 RPC 远程调用,在实现之前需要先了解什么是 RPC,不清楚的可以看下之前发的这篇文章 《我眼中的 RPC》。

下面的演示代码主要使用了 Swoole 的 Task 任务池,通过 OnRequest/OnReceive 获得信息交给 Task 去处理。

举个工作中的例子吧,在电商系统中的两个模块,个人中心模块和订单管理模块,这两个模块是独立部署的,可能不在一个机房,可能不是一个域名,现在个人中心需要通过 用户ID 和 订单类型 获取订单数据。
实现效果
客户端

HTTP 请求

    //代码片段    
    <?php    
    $demo = [    
        'type'  => 'SW',    
        'token' => 'Bb1R3YLipbkTp5p0',    
        'param' => [    
            'class'  => 'Order',    
            'method' => 'get_list',    
            'param' => [    
                'uid'  => 1,    
                'type' => 2,    
            ],    
        ],    
    ];    
    $ch = curl_init();    
    $options = [    
        CURLOPT_URL  => 'http://10.211.55.4:9509/',    
        CURLOPT_POST => 1,    
        CURLOPT_POSTFIELDS => json_encode($demo),    
    ];    
    curl_setopt_array($ch, $options);    
    curl_exec($ch);    
    curl_close($ch);


TCP 请求

    //代码片段    
    $demo = [    
        'type'  => 'SW',    
        'token' => 'Bb1R3YLipbkTp5p0',    
        'param' => [    
            'class'  => 'Order',    
            'method' => 'get_list',    
            'param' => [    
                'uid'  => 1,    
                'type' => 2,    
            ],    
        ],    
    ];    
    $this->client->send(json_encode($demo));


请求方式

    SW 单个请求,等待结果

发出请求后,分配给 Task ,并等待 Task 执行完成后,再返回。

    SN 单个请求,不等待结果

发出请求后,分配给 Task 之后,就直接返回。
发送数据

 

  $demo = [    
        'type'  => 'SW',    
        'token' => 'Bb1R3YLipbkTp5p0',    
        'param' => [    
            'class'  => 'Order',    
            'method' => 'get_list',    
            'param' => [    
                'uid'  => 1,    
                'type' => 2,    
            ],    
        ],    
    ];


    type 同步/异步设置

    token 可进行权限验证

    class 请求的类名

    method 请求的方法名

    uid 参数一

    type 参数二

返回数据



    request_method 请求方式

    request_time 请求开始时间

    response_time 请求结束时间

    code 标识

    msg 标识值

    data 约定数据

    query 请求参数

代码
OnRequest.php

    <?php    
    if (!defined('SERVER_PATH')) exit("No Access");    
    class OnRequest    
    {    
        private static $query;    
        private static $code;    
        private static $msg;    
        private static $data;    
        public static function run($serv, $request, $response)    
        {    
            try {    
                $data = decrypt($request->rawContent());    
                self::$query   = $data;    
                if (empty($data)) {    
                    self::$code = '-1';    
                    self::$msg  = '非法请求';    
                    self::end($request, $response);    
                }    
                //TODO 验证Token    
                switch ($data['type']) {    
                    case 'SW': //单个请求,等待结果    
                        $task = [    
                            'request' => $data,    
                            'server'  => 'http'    
                        ];    
                        $rs = $serv->task(json_encode($task), -1, function ($serv, $task_id, $rs_data) use ($request, $response) {    
                            self::$code = '1';    
                            self::$msg  = '成功';    
                            self::$data = $rs_data['response'];    
                            self::end($request, $response);    
                        });    
                        if ($rs === false) {    
                            self::$code = '-1';    
                            self::$msg  = '失败';    
                            self::end($request, $response);    
                        }    
                        break;    
                    case 'SN': //单个请求,不等待结果    
                        $task = [    
                            'request' => $data,    
                            'server'  => 'http'    
                        ];    
                        $rs = $serv->task(json_encode($task));    
                        if ($rs === false) {    
                            self::$code = '-1';    
                            self::$msg  = '失败';    
                        } else {    
                            self::$code = '1';    
                            self::$msg  = '成功';    
                        }    
                        self::end($request, $response);    
                        break;    
                    default:    
                        self::$code = '-1';    
                        self::$msg  = '非法请求';    
                        self::end($request, $response);    
                }    
            } catch(Exception $e) {    
            }    
        }    
        private static function end($request = null, $response = null)    
        {    
            $rs['request_method'] = $request->server['request_method'];    
            $rs['request_time']   = $request->server['request_time'];    
            $rs['response_time']  = time();    
            $rs['code']           = self::$code;    
            $rs['msg']            = self::$msg;    
            $rs['data']           = self::$data;    
            $rs['query']          = self::$query;    
            $response->end(json_encode($rs));    
            self::$data = [];    
            return;    
        }    
    }

这里我还准备了一分学习图和资料,如下:

链接:https://pan.baidu.com/s/1v5gm7n0L7TGyejCmQrMh2g 提取码:x2p5

免费分享,但是X度限制严重,如若链接失效点击链接或搜索加群 群号518475424



OnReceive.php

    <?php    
    if (!defined('SERVER_PATH')) exit("No Access");    
    class OnReceive    
    {    
        private static $request_time;    
        private static $query;    
        private static $code;    
        private static $msg;    
        private static $data;    
        public static function run($serv, $fd, $reactor_id, $data)    
        {    
            try {    
                self::$request_time = time();    
                $data = decrypt($data);    
                self::$query = $data;    
                //TODO 验证Token    
                switch ($data['type']) {    
                    case 'SW': //单个请求,等待结果    
                        $task = [    
                            'fd'           => $fd,    
                            'request'      => $data,    
                            'server'       => 'tcp',    
                            'request_time' => self::$request_time,    
                        ];    
                        $rs = $serv->task(json_encode($task));    
                        if ($rs === false) {    
                            self::$code = '-1';    
                            self::$msg  = '失败';    
                            self::handlerTask($serv, $fd);    
                        }    
                        break;    
                    case 'SN': //单个请求,不等待结果    
                        $task = [    
                            'fd'           => $fd,    
                            'request'      => $data,    
                            'server'       => 'tcp',    
                            'request_time' => self::$request_time,    
                        ];    
                        $rs = $serv->task(json_encode($task));    
                        if ($rs === false) {    
                            self::$code = '-1';    
                            self::$msg  = '失败';    
                        } else {    
                            self::$code = '1';    
                            self::$msg  = '成功';    
                        }    
                        self::handlerTask($serv, $fd);    
                        break;    
                    default:    
                        self::$code = '-1';    
                        self::$msg  = '非法请求';    
                        self::handlerTask($serv, $fd);    
                }    
            } catch(Exception $e) {    
            }    
        }    
        private static function handlerTask($serv, $fd)    
        {    
            $rs['request_method'] = 'TCP';    
            $rs['request_time']   = self::$request_time;    
            $rs['response_time']  = time();    
            $rs['code']           = self::$code;    
            $rs['msg']            = self::$msg;    
            $rs['data']           = self::$data;    
            $rs['query']          = self::$query;    
            $serv->send($fd, json_encode($rs));    
        }    
    }
原文地址:https://www.cnblogs.com/it-3327/p/11812745.html