分布式原理之RPC框架

PRC 即远程服务调用,是实现分布式最基本的方式,而这一实现基于的又是java的反射功能所实现的动态代理。

完全可以自己写一个不到200行的rpc服务:

只需要实现四个类:

0.本地客户端

1. 远程服务的本地代理

* 功能:
* 0. 将本地的接口调用转化成JDK动态代理,在动态代理中实现接口的远程调用
* 1. 创建Socket客户端,根据指定地址调用远程服务真正实现者
* 2. 同步阻塞获取服务返回应答

2.远程的服务代理

  * 功能:

*  0.监听TCP连接,收到新的连接后,将其封装成task,交给线程池去执行
* 1.将客户端发送的码流反序列化成对象,调用服务实现,获取执行结果
* 2.将执行结果序列化,通过Socket发送给客户端

3.远程服务的真正实现者

key : 为什么要用反射呢?因为远程请求来的时候,并不知道请求的是哪个类的哪个方法,所以需要动态反射出来类,并得到相应调用的方法。

result:最终的效果就是调用远程的服务就像调用本地的一样,客户端并不用管调用的服务在哪里,框架将一切都封装好了。

 代码:

/**
 * 本地客户端
 */
public class RPCClient {
    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    RPCExporter.exporter("locolhost", 8080);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
        RPCImporter<EchoService> importer = new RPCImporter<>();
        EchoService echo = importer.importer(EchoServiceIpml.class,
                new InetSocketAddress("locolhost", 8080));
        System.out.println(echo.echo("Are you ok? "));
    }
}
/**
 * 远程服务的本地代理
 * 功能:
 *  0. 将本地的接口调用转化成JDK动态代理,在动态代理中实现接口的远程调用
 *  1. 创建Socket客户端,根据指定地址调用远程服务真正实现者
 *  2. 同步阻塞获取服务返回应答
 */

public class RPCImporter<T> {
    public T importer(Class<?> serviceClass, InetSocketAddress address) {
        return (T) Proxy.newProxyInstance(serviceClass.getClassLoader(),
                new Class<?>[]{serviceClass.getInterfaces()[0]}, new InvocationHandler() {
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        Socket socket = null;
                        ObjectInputStream inputStream = null;
                        ObjectOutputStream outputStream = null;
                        try {
                            socket = new Socket();
                            socket.connect(address);
                            outputStream = new ObjectOutputStream(socket.getOutputStream());
                            outputStream.writeUTF(serviceClass.getName());
                            outputStream.writeUTF(method.getName());
                            outputStream.writeObject(method.getParameterTypes());
                            outputStream.writeObject(args);
                            inputStream = new ObjectInputStream(socket.getInputStream());
                            return inputStream.readShort();
                        } catch (Exception e) {
                            e.printStackTrace();
                        } finally {
                            if (socket != null) socket.close();
                            if (inputStream != null) inputStream.close();
                            if (outputStream != null) outputStream.close();
                        }
                        return null;
                    }
                });
    }
}
/**
 * 远程的服务代理
 * 功能:
 *  0.监听TCP连接,收到新的连接后,将其封装成task,交给线程池去执行
 *  1.将客户端发送的码流反序列化成对象,调用服务实现,获取执行结果
 *  2.将执行结果序列化,通过Socket发送给客户端
 */

public class RPCExporter {
    private static Executor threadPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    public static void exporter(String hostName, int port) throws IOException {
        ServerSocket serverSocket = new ServerSocket();
        serverSocket.bind(new InetSocketAddress(hostName, port));
        try {
            while (true) {
                threadPool.execute(new ServerTask(serverSocket.accept()));
            }
        } finally {
            serverSocket.close();
        }
    }

    private static class ServerTask implements Runnable {
        private Socket _client;

        public ServerTask(Socket client) {
            _client = client;
        }

        @Override
        public void run() {
            ObjectOutputStream outputStream = null;
            ObjectInputStream inputStream = null;
            try {
                inputStream = new ObjectInputStream(_client.getInputStream());
                String interfaceName = inputStream.readUTF();
                String methodName = inputStream.readUTF();
                Class<?> service = Class.forName(interfaceName);
                Class<?>[] paramsType = (Class<?>[]) inputStream.readObject();
                Method method = service.getMethod(methodName, paramsType);
                Object[] args = (Object[]) inputStream.readObject();
                Object result = method.invoke(service.newInstance(), args);
                outputStream = new ObjectOutputStream(_client.getOutputStream());
                outputStream.writeObject(result);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (inputStream != null) {
                    try {
                        inputStream.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if (outputStream != null) {
                    try {
                        outputStream.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}
/**
 * 远程的服务真正实现者
 */
public class EchoServiceIpml implements  EchoService {

    @Override
    public String echo(String ping) {
        return ping != null ? ping + "---> I'm ok" : "I'm ok";
    }
}
原文地址:https://www.cnblogs.com/shawshawwan/p/9364998.html