RPC

一、概述:Remote Procedure Call 远程过程调用,简单的理解是一个节点请求另一个节点提供的服务,即一台客户端向另外一台服务器请求某个方法,调用该方法。

二、设计思路:将服务端待调用的方法以map集合的形式,注册到服务注册中心,服务注册中心通过ServerSocket创建连接对象,设置IP和端口号,客户端再通过Socket创建连接对象和动态代理的方式,即可建立和服务器的连接,并可以调用放置到注册中心中的接口进行传参,再将传完参数的接口执行返回值序列化成对象返回给客户端;


三、服务端:

(1)编写待调用的接口和接口实现类

package org.example.server;

public interface HelloService {
    void say(String s);
}
package org.example.server;

public class HelloServiceImpl implements HelloService{
    @Override
    public void say(String s) {
        System.out.println(s);
    }
}

(2)编写服务注册中心

package org.example.server;

import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ServerCenter implements Server{
    private static HashMap<String,Class> serviceRegister = new HashMap();
    private static final int port = 9999;
    //连接池中存在多个连接对象,每个连接对象都可以处理一个客户端请求
    private static ExecutorService executor= Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    @Override
    public void start() throws IOException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InstantiationException, InvocationTargetException {
        //思考?如何实现并发连接
        ServerSocket server = new ServerSocket(); //该构造器可填两个参数,一个是IP地址,一个是端口号,因为是本机测试所以用.bind绑定端口号就可以
        server.bind(new InetSocketAddress(port));
        System.out.println("只有while true 时多线程!!!");
        while (true){
            System.out.println("server start.....");
            Socket socket = server.accept(); //监听客户端的访问,等待连接(当使用while true调用多线程时,我也是很懵逼,这样死循环不会导致死机吗?但是后面了解到server.accept()会在这里等待客户端连接,起到阻塞的作用,只有当某个客户端再次发起连接后才会继续往下执行下一个while)
            executor.execute(new ServiceTask(socket));
        }
    }

    @Override
    public void register(Class Service, Class serviceImpl) {
        serviceRegister.put(Service.getName(),serviceImpl);
    }

    private static class ServiceTask implements  Runnable{
        private Socket socket;
        public ServiceTask(Socket socket){
            this.socket = socket;
        }
        @Override
        public void run() {
            //接收到客户端的连接和请求
            try {
          //将从客户端发送过来的请求序列化成一个对象 ObjectInputStream input
= new ObjectInputStream(socket.getInputStream()); String serviceName = input.readUTF(); String methodName = input.readUTF(); Class[] parameterTypes = (Class[]) input.readObject(); Object[] arguments = (Object[]) input.readObject(); Class ServiceClass = serviceRegister.get(serviceName); Method method = ServiceClass.getMethod(methodName, parameterTypes); Object result = method.invoke(ServiceClass.newInstance(), arguments); //将方法执行完毕的返回值序列化后发送给客户端 ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream()); output.writeObject(result); } catch (IOException | ClassNotFoundException e) { e.printStackTrace(); } catch (NoSuchMethodException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InstantiationException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } } } }
package org.example.server;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;

public class ServerStart {
    public static void main(String[] args) throws NoSuchMethodException, IOException, InstantiationException, IllegalAccessException, InvocationTargetException, ClassNotFoundException {
       //可能有多个客户端进行连接,采用多线程创建对象方式,在ServerCenter注册中心中采用Executors.newFixedThreadPool对象创建线程池,
    //再创建一个匿名内部类ServiceTask实现Runnable接口创建线程,线程的特点:当run方法一旦执行结束,则该线程结束,所以需要while(true) 来重复创建线程,但.accept()方法会使while(true)阻塞,只有当客户端连接之后,才会继续创建线程,如此巧妙
     new Thread( 
                () -> {
                    Server server = new ServerCenter();
                    server.register(HelloService.class,HelloServiceImpl.class);
                    try {
                        server.start();
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (ClassNotFoundException e) {
                        e.printStackTrace();
                    } catch (NoSuchMethodException e) {
                        e.printStackTrace();
                    } catch (IllegalAccessException e) {
                        e.printStackTrace();
                    } catch (InstantiationException e) {
                        e.printStackTrace();
                    } catch (InvocationTargetException e) {
                        e.printStackTrace();
                    }
                }
        ).start();
    }
}


四、客户端:可以创建多个客户端进行连接,测试并发连接是否成功

package org.example.client;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.Socket;

public class Client {
    //获取代表服务端接口的动态代理对象(HelloService)
    //serviceInterface:请求接口的名字
    //addr:待请求的服务端Ip和端口号
    @SuppressWarnings("unchecked") //将检查异常压制,不会报黄线
    public static <T> T getRemoteProxyObj(Class serviceInterface, InetSocketAddress addr)
    {
        //a参数:需要代理哪个接口(HelloService接口),就需要将HelloService类加载器传入到第一个参数
        //b参数:需要代理哪个对象,具备哪些方法
        //c参数:传入handler是将该方法重写后的
        return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[]{serviceInterface}, new InvocationHandler() {
            //参数a:需要代理的对象
            //参数b:需要代理的对象的方法
            //参数c:需要代理对象的方法的参数
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                //客户端通过socket向服务端发送请求
                //发送:通过OutPutStream
                //接受:通过InPutStream
                Socket socket = new Socket();
                socket.connect(addr);
                ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());//发送:序列化流
                //接口名称、方法名称、方法参数类型、方法的参数
                output.writeUTF(serviceInterface.getName());
                output.writeUTF(method.getName());
                output.writeObject(method.getParameterTypes());
                output.writeObject(args);
                //接受服务端处理后的返回值
                ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
                return  input.readObject();
            }
        });
    }
}
package org.example.client;
import org.example.server.HelloService;
import java.net.InetSocketAddress;

public class ClientStart {
    public static void main(String[] args) throws ClassNotFoundException {
     //参数a:要调用的接口名称;参数b:要连接的服务端ip地址和端口 HelloService service
= Client.getRemoteProxyObj(Class.forName("org.example.server.HelloService"),new InetSocketAddress("127.0.0.1",9999)); service.say("我是客户端1,我连接成功!"); } }

程序运行结果:

原文地址:https://www.cnblogs.com/ibear/p/14622956.html