分布式开发

基于Socket搭建简易的Rpc交互

api组件

  • 提供交互接口以及请求的参数对象
// 交互接口
public interface IService {
    String getToken(String userId);
    String receiveMsg();
}

// 交互数据封装
public class RpcRequestMsg implements Serializable {
    private String className;
    private String methodName;
    private Object[] args;

    public RpcRequestMsg() {
    }

    public RpcRequestMsg(String className, String methodName, Object[] args) {
        this.className = className;
        this.methodName = methodName;
        this.args = args;
    }

    public String getClassName() {
        return className;
    }

    public void setClassName(String className) {
        this.className = className;
    }

    public String getMethodName() {
        return methodName;
    }

server组件

①建立Socket端口监听 ②Socket连接后,Socket读事件获取请求信息 ③解析请求信息并执行对应方法 ④Socket写事件返回相应
⑤Spring切入托管:在Bean初始化后置处理时扫描RemoteServer注解对象,封装成BeanMethod存放到容器中(用于步骤3反射调用)
⑥Spring切入托管:监听ContextApplication对象的Refreshed事件,当容器初始化完成后,触发①操作

Socket

Socket监听逻辑 - RpcProxyServer

public class RpcProxyServer {
    // 采用线程池处理连接请求
    private final ExecutorService executorService = Executors.newFixedThreadPool(5);

    public void publisher(int port){
        // 注册端口监听
        try(ServerSocket serverSocket = new ServerSocket(port)){
            while (true){
                // 当连接被建立后,获取新线程进行处理
                Socket socket = serverSocket.accept();
                executorService.execute(new ProcessorHandler(socket));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

Socket处理逻辑 - ProcessorHandler

// 操作Socket完成读写处理
public class ProcessorHandler implements Runnable{

    private Socket socket = null;

    public ProcessorHandler(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        try(InputStream is = socket.getInputStream();
            ObjectInputStream ois = new ObjectInputStream(is);

            OutputStream os = socket.getOutputStream();
            ObjectOutputStream oos = new ObjectOutputStream(os);
        ){
            // 请求信息获取
            RpcRequestMsg rpcRequestMsg = (RpcRequestMsg)ois.readObject();

            // 完成请求信息的采集后,委托Mediator执行对应方法并获取返回值
            Object rs = Mediator.getInstance().process(rpcRequestMsg);

            // 结果响应
            oos.writeObject(rs);
            oos.flush();
        } catch (IOException | ClassNotFoundException e) {
            e.printStackTrace();
        }
    }
}

MethodInvoke

public class Mediator {
    // 维护了一份Key-Method对象的映射,Key=全类名.方法名,Method中包含bean和Method对象
    public static Map<String,BeanMethod> route = new ConcurrentHashMap<>();

    private volatile static Mediator instance = null;

    private Mediator(){};

    // 双重检查锁 - 懒汉式单例
    public static Mediator getInstance(){
        if(instance == null){
            synchronized (Mediator.class){
                if(instance == null){
                    instance = new Mediator();
                }
            }
        }
        return instance;
    }

    // 具体的处理逻辑
    public Object process(RpcRequestMsg rpcRequestMsg){
        // 根据MethodId 获取到 BeanMethod对象
        String key = rpcRequestMsg.getClassName() + "." + rpcRequestMsg.getMethodName();
        BeanMethod beanMethod = route.get(key);
        if(beanMethod == null) 
            return null;
        
        // 反射调用
        Object obj = beanMethod.getBean();
        Method method = beanMethod.getMethod();
        try {
            return method.invoke(obj,rpcRequestMsg.getArgs());
        } catch (IllegalAccessException | InvocationTargetException e) {
            e.printStackTrace();
        }
        return null;
    }
}

@Data
public class BeanMethod {
    private Object bean;
    private Method method;

    public BeanMethod(Object bean, Method method) {
        this.bean = bean;
        this.method = method;
    }
}

Spring切入

// 声明自定义注解,用于Spring切入点中获取特定bean对象
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RemoteServer {
}

@RemoteServer
public class ServiceImpl implements IService {
    @Override
    public String getToken(String userId) {
        System.out.println("ServiceImpl - getToken():" + userId);
        return userId;
    }

    @Override
    public String receiveMsg() {
        Date date = new Date();
        System.out.println("ServiceImpl - receiveMsg():" + date.toString());
        return date.toString();
    }
}

Spring初始化后置处理器,扫描RemoteServer注解,并封装成BeanMethod注册到Mediator.route容器

@Component
public class RpcMethodRegisterBeanPostProcessor implements BeanPostProcessor {
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if(bean.getClass().isAnnotationPresent(RemoteServer.class)){
            Method[] methods = bean.getClass().getDeclaredMethods();
            for(Method method : methods){
                String key = bean.getClass().getInterfaces()[0].getName() + "." + method.getName();
                BeanMethod beanMethod = new BeanMethod(bean,method);
                Mediator.route.put(key,beanMethod);
            }
        }
        return bean;
    }
}

Spring事件监听器,当容器refreshed完成后,开启RpcProxyServer服务监听

@Component
public class SocketServerInitialEvent implements ApplicationListener<ContextRefreshedEvent>{
    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        RpcProxyServer rpcProxyServer = new RpcProxyServer();
        rpcProxyServer.publisher(8888);
    }
}

client组件

api接口 -> 动态代理对象 -> invokeHandler(){socket操作}

通过@Bean标签在Configuration类中将iService的代理对象注册为Bean

@Configuration
public class MyConfiguration {
    @Bean
    public IService iService(){
        return RpcProxyClient.clientProxy(IService.class,"localhost",8888);
    }

    public static void main(String[] args) {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(MyConfiguration.class);
        IService service = context.getBean(IService.class);
        System.out.println(service.receiveMsg());
    }
}

public class RpcProxyClient {
    public static <T> T clientProxy(final Class<T> interfaceCls, final String host, final int port){
        return (T)Proxy.newProxyInstance(interfaceCls.getClassLoader(),new Class[]{interfaceCls},new RemoteInvocationHandler(host,port));
    }
}

// 动态代理,委托执行器

public class RemoteInvocationHandler implements InvocationHandler {
    private String host = null;
    private int port;

    public RemoteInvocationHandler(String host, int port) {
        this.host = host;
        this.port = port;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        ClientSocketHandler clientSocketHandler = new ClientSocketHandler(host,port);

        // 构造请求信息
        RpcRequestMsg rpcRequestMsg = new RpcRequestMsg(proxy.getClass().getInterfaces()[0].getName(),method.getName(),args);
        return clientSocketHandler.handler(rpcRequestMsg);
    }
}

// 实际的client socket业务逻辑

public class ClientSocketHandler {
    private String host;
    private int port;

    public ClientSocketHandler(String host, int port) {
        this.host = host;
        this.port = port;
    }

    private Socket newSocket() throws IOException {
        return new Socket(host,port);
    }

    public Object handler(RpcRequestMsg rpcRequestMsg) throws IOException {
        Socket socket = newSocket();
        try (OutputStream os = socket.getOutputStream();
             ObjectOutputStream oos = new ObjectOutputStream(os);

             InputStream is = socket.getInputStream();
             ObjectInputStream ois = new ObjectInputStream(is);
        ){
            oos.writeObject(rpcRequestMsg);
            oos.flush();

            String msg = (String) ois.readObject();
            System.out.println("客户端 - 读入:" + msg);

            while (socket.isInputShutdown() && socket.isOutputShutdown()) {
                socket.close();
            }
            return msg;
        } catch (IOException | ClassNotFoundException e) {
            e.printStackTrace();
        }
        return null;
    }
}

欢迎疑问、期待评论、感谢指点 -- kiqi,愿同您为友

-- 星河有灿灿,愿与之辉

原文地址:https://www.cnblogs.com/kiqi/p/14429029.html