【分布式】RPC初探

事先声明:本文代码参考自Dubbo作者的博客

RPC(Remote Procedure Call)远程过程调用,是分布式系统当中必不可少的一个玩意。比如说在单机系统当中,我想调用某个方法,直接调就可以了对吧,但是当环境变成多机分布式系统时,A机器上想调用B机器上的某个方法时,就需要用到RPC了。RPC的原理在知乎这个问答上有很清楚的解释。

简单点来说,就是客户端利用了socket把希望调用的方法的信息(方法名、方法需要的参数等)传给服务器端,服务器端把这些信息反序列化之后利用反射去调用指定的方法,并把返回值再通过socket返回给客户端。下面是代码示例,关键部分我写了自己理解的注释。

代码主要用到了socket通信和JDK的动态代理,这两部分我在之前的博客中也都有涉及。

RPCServer.java

public class RPCServer {
    private static final int PORT = 8000;
    /**
     * 暴露服务
     *
     * @param service 服务的对象实例
     * */
    public static void open(final Object service) throws Exception {
        if (service == null) {
            throw new IllegalArgumentException();
        }
        System.out.println("Service is opening for " + service.getClass().getName() + " at port: " + PORT);
        //开启ServerSocket监听8000端口
        final ServerSocket server = new ServerSocket(PORT);
        for (;;) {
            try {
                //接收到一个客户端请求
                final Socket client = server.accept();
                //开一个线程处理
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                            try {
                                ObjectInputStream input = new ObjectInputStream(client.getInputStream());
                                try {
                                    String methodName = input.readUTF();
                                    System.out.println(">>>>methodName: " + methodName);
                                    Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
                                    Object[] arguments = (Object[]) input.readObject();
                                    System.out.println(">>>>arguments: " + arguments.toString());
                                    ObjectOutputStream out = new ObjectOutputStream(client.getOutputStream());
                                    try {
                                        //利用反射获取到方法对象
                                        Method method = service.getClass().getMethod(methodName, parameterTypes);
                                        //调用方法并获取返回值
                                        Object result = method.invoke(service, arguments);
                                        //把返回值写入socket,返回给客户端
                                        out.writeObject(result);
                                        //                                out.flush();
                                    } catch (Throwable t) {
                                        out.writeObject(t);
                                    } finally {
                                        out.close();
                                    }
                                } finally {
                                    input.close();
                                }
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                    }
                }).start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 指定在远程主机上的服务
     *
     * @param <T> 接口泛型
     * @param interfaceClass 接口
     * @param host 远程主机IP
    * */

    @SuppressWarnings("unchecked")
    public static <T> T refer(final Class<T> interfaceClass, final String host) {
        if (interfaceClass == null) {
            throw new IllegalArgumentException("invalid interface");
        }
        if (host == null || "".equals(host)) {
            throw new IllegalArgumentException("invalid host");
        }
        System.out.println("Get remote service " + interfaceClass.getName() + " from server " + host + ":" + PORT);
        //动态代理返回对象实例,并且利用泛型转成服务类型
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass},
                new InvocationHandler() {
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        Socket socket = new Socket(host, PORT);
                        try {
                            ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream());
                            try {
                                //发送方法名
                                out.writeUTF(method.getName());
                                //发生方法参数列表
                                out.writeObject(method.getParameterTypes());
                                //发生方法需要的参数
                                out.writeObject(args);
                                ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
                                try {
                                    //获取返回值
                                    Object result = input.readObject();
                                    if (result instanceof Throwable) {
                                        throw (Throwable) result;
                                    }
                                    return result;
                                }finally {
                                    input.close();
                                }
                            }finally {
                                out.close();
                            }
                        } finally {
                            socket.close();
                        }
                    }
                });
    }
}

接口 HelloService.java

public interface HelloService {
    public String show(String name);
}

接口实现 HelloServiceImpl.java

public class HelloServiceImpl implements HelloService {
    @Override
    public String show(String name) {
        System.out.println(name);
        return "name: " + name;
    }
}

测试:

服务端测试代码 ServerTest.java

public class ServerTest {
    public static void main(String[] args) throws Exception {
        HelloService helloService = new HelloServiceImpl();
		//开启RPC服务,并且绑定一个对象实例,指定服务器上的服务类型
        RPCServer.open(helloService);
    }
}

客户端测试代码 ClientTest.java

public class ClientTest {
    public static void main(String[] args) {
        try {
			//调用指定IP的远程主机上的指定服务
            HelloService service = RPCServer.refer(HelloService.class, "127.0.0.1");
            System.out.println(service.show("hello"));
        }catch (Exception e) {
            e.printStackTrace();
        }
    }
}

结果如下:

服务端:

客户端:

思考

关于这段示例代码,有哪些改进的地方呢?首先我想到的是把TCP通信模型改成NIO通信,不要用BIO这种低并发的模型;其次是传输的信息可以用其他方式进行压缩或者叫序列化,减少传输的大小从而降低服务器压力和提高传输速度;还有就是这段代码使用的动态代理是JDK自带的方法,有个很大的缺点是必须要接口,之前的文章也提到了,可以采用CGlib来改善一下。目前能想到的就这三点了,找时间我再来完善一下。

同时也可以去看看Dubbo源码。

原文地址:https://www.cnblogs.com/puyangsky/p/6220948.html