RabbitMQ 实现远程过程调用RPC

RPC调用的顺序
1. 在客户端初始化的时候,也就是SimpleRpcClient类初始化的时候,它会随机的创建一个callback队列,用于存放服务的返回值,这个队列是exclusive的。连接断开就没有了。
2. 客户端在发送Request的时候,会加上两个参数:ReplyTo和CorrelationId,前者用于告诉服务返回值放在哪个队列里面或路由,后者用于配对每次的Request。这两个属性都放在客户端发送消息的附带的IBasicProperties字典中。
3. 把消息放入服务的监控队列里,消息里面自然有调用方法的参数。
4. 服务在所监控的队列中收到数据后,进行运算,并把返回值放入到客户端指定的callback队列中去。
5. 客户端在发送完Request后,便去自己创建的callback队列监听,如果获得到数据,则查看里面的CorrelationId,如果和调用Request一致,则返回结果。

Server 端:

 class Program
    {
        static void Main(string[] args)
        {
            ConnectionFactory factory = new ConnectionFactory()
            {
                HostName = "192.168.254.40",
                UserName = "admin",
                Password = "admin",
            };

            //第一步:创建connection 
            var connection = factory.CreateConnection();

            //第二步:创建一个channel
            var channel = connection.CreateModel();

            channel.QueueDeclare("rpc_queue", true, false, false, null);

            Subscription subscription = new Subscription(channel, "rpc_queue");

            MySimpleRpcServer server = new MySimpleRpcServer(subscription);

            Console.WriteLine("server 端启动完毕!!!");

            server.MainLoop();

            Console.Read();
        }
    }

    public class MySimpleRpcServer : SimpleRpcServer
    {
        public MySimpleRpcServer(Subscription subscription) : base(subscription)
        {

        }

        public override byte[] HandleCall(bool isRedelivered, IBasicProperties requestProperties, byte[] body, out IBasicProperties replyProperties)
        {
            return base.HandleCall(isRedelivered, requestProperties, body, out replyProperties);
        }

        public override byte[] HandleSimpleCall(bool isRedelivered, IBasicProperties requestProperties, byte[] body, out IBasicProperties replyProperties)
        {
            replyProperties = null;

            var msg = string.Format("当前文字长度为:{0}", Encoding.UTF8.GetString(body).Length);

            return Encoding.UTF8.GetBytes(msg);
            //return base.HandleSimpleCall(isRedelivered, requestProperties, body, out replyProperties);
        }

        public override void ProcessRequest(BasicDeliverEventArgs evt)
        {
            base.ProcessRequest(evt);
        }
    }

Client 端:

static void Main(string[] args)
        {
            ConnectionFactory factory = new ConnectionFactory()
            {
                HostName = "192.168.254.40",
                UserName = "admin",
                Password = "admin",
            };

            //第一步:创建connection
            var connection = factory.CreateConnection();

            //第二步:创建一个channel
            var channel = connection.CreateModel();

            SimpleRpcClient client = new SimpleRpcClient(channel, string.Empty, ExchangeType.Direct, "rpc_queue");


            var bytes = client.Call(Encoding.UTF8.GetBytes("hello world!!!!"));

            var result = Encoding.UTF8.GetString(bytes);
        }
原文地址:https://www.cnblogs.com/yxlblogs/p/10243868.html