基于netty框架的轻量级RPC实现(附源码)

前言

  Rpc( Remote procedure call):是一种请求 - 响应协议。RPC由客户端启动,客户端向已知的远程服务器发送请求消息,以使用提供的参数执行指定的过程。远程服务器向客户端发送响应,应用程序继续其进程。当服务器正在处理该调用时,客户端被阻塞(它等待服务器在恢复执行之前完成处理),除非客户端向服务器发送异步请求,例如XMLHttpRequest。在各种实现中存在许多变化和细微之处,导致各种不同(不兼容)的RPC协议。

  技术选型:

  1. Protostuff:它基于 Protobuf 序列化框架,面向 POJO,无需编写 .proto 文件。
  2. Netty:基于NIO的网络编程框架,封装了NIO细节,使用更加方便
  3. SpringBoot:Spring 的组件的集合,内部封装服务器,实现了自动加载

1.封装请求的pojo和响应的pojo

    

public class RpcRequest {
    public RpcRequest() {
    }

    private Long id;
    /**
     * rpc name
     */
    private String className;
    /**
     * 方法名
     */
    private String methodName;
    /**
     * 参数
     */
    private HashMap<Class<?>, Object> arguments;

     //get and set ...

  

public class RpcResponse {
    public RpcResponse() {
    }

    private Long id;
    private Integer code;
    private Object result;
    private String failMsg;
    // get and set ...

2.server端对request进行解码,对response进行编码。反之client端对request进行编码,对response进行解码,因此需要编写两个编码和解码器,在不同端,对不同pojo进行编码解码

  编码类只对属于某个 genericClass的类进行编码,SerializationUtil为使用Protobuffer工具封装的一个工具类

@ChannelHandler.Sharable
public class RpcEncode extends MessageToByteEncoder {
    //client 端为 request, server 端为 response
    private Class<?> genericClass;

    public RpcEncode(Class<?> clazz) {
        this.genericClass = clazz;
    }

    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext,
                          Object o, ByteBuf byteBuf) throws Exception {
        if (genericClass.isInstance(o)) {
            byte[] data = SerializationUtil.serialize(o);
            byteBuf.writeInt(data.length);
            byteBuf.writeBytes(data);
        }
    }
}

  同样的,解码 

public class RpcDecode extends ByteToMessageDecoder {
    private Class<?> genericClass;

    public RpcDecode(Class<?> clazz) {
        this.genericClass = clazz;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx,
                          ByteBuf in, List<Object> out) throws Exception {
        int dataLength = in.readInt();
        //一个整数4个字节
        if (dataLength < 4) {
            return;
        }
        in.markReaderIndex();
        if (in.readableBytes() < dataLength) {
            in.resetReaderIndex();
            return;
        }
        byte[] data = new byte[dataLength];
        in.readBytes(data);
        Object obj = SerializationUtil.deserialize(data, genericClass);
        out.add(obj);
    }

3. server端将数据解码后,开始使用handler处理client的请求,handler里包含一个map,里面value是使用@RpcService后的bean,key是注解的value,通过RpcRequest的className,从map的key进行匹配,找到bean之后,通过反射执行  methodName对应的方法 和arguments的参数

  @RpcService用于标识在发布的服务类上,value为client 请求的classname,该注解继承了@Component注解,将会被spring注册为bean

    

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcService {
    String value();

    String description() default "";
}

  通过@RpcService 标识的类,将被注册为bean实例,这里将在 LrpcHandlerAutoConfiguration类中,将这些标识了该注解的bean实例找出来,传入handler中执行client的请求方法

  

@Configurable
@Component
public class LrpcHandlerAutoConfiguration implements ApplicationContextAware {
    private ApplicationContext context;
    @Value("${lrpc.server}")
    public String port;

    @Bean
    public RpcHandler rpcHandler() {
        Map<String, Object> rpcBeans = context.getBeansWithAnnotation(RpcService.class);
        Set<String> beanNameSet = rpcBeans.keySet();
        for (String beanName : beanNameSet) {
            Object obj = rpcBeans.get(beanName);
            //rpcService注解会 把value的值传递给component
            RpcService annotation = obj.getClass().getDeclaredAnnotation(RpcService.class);
            //默认bean name
            if (StringUtils.isBlank(annotation.value()) || annotation.value().equals(beanName)) {
                continue;
            }
            rpcBeans.put(annotation.value(), rpcBeans.get(beanName));
            //去掉重复
            rpcBeans.remove(beanName);
        }
        return new RpcHandler(rpcBeans);
    }
//..........................

RpcHandler的构造函数,注入了一份rpcBeans的引用,当client的RpcRequest请求时,将从该rpcBeans中获取对应的bean

@ChannelHandler.Sharable
public class RpcHandler extends SimpleChannelInboundHandler<RpcRequest> {
    private static final Logger logger = Logger.getLogger(RpcHandler.class.getName());
    private Map<String, Object> rpcBeans;

    public RpcHandler(Map<String, Object> rpcBeans) {
        this.rpcBeans = rpcBeans;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcRequest msg) throws Exception {
        RpcResponse rpcResponse = handle(msg);
        ctx.channel().writeAndFlush(rpcResponse).addListener(ChannelFutureListener.CLOSE);
    }

    private RpcResponse handle(RpcRequest msg) throws InvocationTargetException {
        RpcResponse rpcResponse = new RpcResponse();
        Object obj = rpcBeans.get(msg.getClassName());
        //TODO 暂时这样吧
        if (Objects.isNull(obj)) {
            System.out.println("未找到service");
            rpcResponse.setResult(null);
            rpcResponse.setCode(404);
            logger.warning("请求的service未找到,msg:" + msg.toString());
            return rpcResponse;
        }
        rpcResponse.setId(msg.getId());
        //解析请求,执行相应的rpc方法
        Class<?> clazz = obj.getClass();
        String methodName = msg.getMethodName();
        HashMap<Class<?>, Object> arguments = msg.getArguments();
        FastClass fastClass = FastClass.create(clazz);
        FastMethod method = fastClass.getMethod(methodName,
                arguments.keySet().toArray(new Class[arguments.size()]));
        Object result = method.invoke(obj, arguments.values().toArray());
        rpcResponse.setResult(result);
        rpcResponse.setCode(200);
        return rpcResponse;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
        logger.warning(cause.toString());
    }
}

4. 启动 LrpcServer,LrpcChannelInit 在 LrpcHandlerAutoConfiguration中进行初始化,同时注入 lrpc.server 环境变量给port参数

@Component
public class LrpcServerImpl implements LrpcServer, ApplicationListener<ApplicationReadyEvent> {
    @Autowired
    LrpcChannelInit lrpcChannelInit;

    @Override
    public void connect() {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .channel(NioServerSocketChannel.class)
                    .childHandler(lrpcChannelInit)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
            ChannelFuture future = bootstrap.bind(new InetSocketAddress(lrpcChannelInit.getPort())).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    @Override
    public void onApplicationEvent(ApplicationReadyEvent event) {
        connect();
    }
}

5. 客户端handler类 LrpClientHandler,里面持一把对象锁,因为netty返回数据总是异步的,这里将异步转成同步,利用 Object的wait()和notify()方法实现,LrpClientHandler这里是多例的,不存在竞争状态,因此是线程安全的

  

public class LrpClientHandler extends SimpleChannelInboundHandler<RpcResponse> {
    private final Object lock = new Object();
    private volatile RpcResponse rpcResponse = null;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcResponse msg) throws Exception {
        rpcResponse = msg;
        synchronized (lock) {
            lock.notifyAll();
        }
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    public Object getLock() {
        return lock;
    }

    public RpcResponse getRpcResponse() {
        return rpcResponse;
    }

    public void setRpcResponse(RpcResponse rpcResponse) {
        this.rpcResponse = rpcResponse;
    }

}

6. LrpcClientChannelInit 中持有一个LrpcClientHandler的引用,在初始化该类时同时初始化LrpcClientHandler

public class LrpcClientChannelInit extends ChannelInitializer {
    private LrpClientHandler lrpClientHandler;

    public LrpcClientChannelInit() {
        lrpClientHandler = new LrpClientHandler();
    }

    @Override
    protected void initChannel(Channel ch) {
        //请求加密
        ch.pipeline().addLast(new RpcEncode(RpcRequest.class))
                .addLast(new RpcDecode(RpcResponse.class))
                .addLast(new LoggingHandler(LogLevel.INFO))
                .addLast(lrpClientHandler);
    }
    public synchronized void initHandler(LrpClientHandler lrpClientHandler){
        this.lrpClientHandler = lrpClientHandler;
    }
    public LrpClientHandler getLrpClientHandler() {
        return lrpClientHandler;
    }

    public void setLrpClientHandler(LrpClientHandler lrpClientHandler) {
        this.lrpClientHandler = lrpClientHandler;
    }
}

7. 持有执行远程方法的host和port,execute(RpcRequest r)中连接,传递参数

public class LrpcExecutorImpl implements LrpcExecutor {
    private String host;
    private Integer port;

    public LrpcExecutorImpl(String host, Integer port) {
        this.host = host;
        this.port = port;
    }


    @Override
    public RpcResponse execute(RpcRequest rpcRequest) {
        LrpcClientChannelInit lrpcClientChannelInit = new LrpcClientChannelInit();
        Bootstrap b = new Bootstrap();
        EventLoopGroup group = null;
        ChannelFuture future = null;
        try {
            group = new NioEventLoopGroup();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(lrpcClientChannelInit)
                    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
                    .option(ChannelOption.SO_KEEPALIVE, true);
            future = b.connect(new InetSocketAddress(host, port)).sync();
            //TODO 连接好了直接发送消息,同步则阻塞等待通知
            future.channel().writeAndFlush(rpcRequest).sync();
            Object lock = lrpcClientChannelInit.getLrpClientHandler().getLock();
            synchronized (lock) {
                lock.wait();
            }
            RpcResponse rpcResponse = lrpcClientChannelInit.getLrpClientHandler().getRpcResponse();
            if (null != rpcResponse) {
                future.channel().closeFuture().sync();
            }
            return rpcResponse;
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lrpcClientChannelInit.getLrpClientHandler().setRpcResponse(null);
            if (null != group) {
                group.shutdownGracefully();
            }
        }
        return null;
    }
//get and set ...
}

8.使用实例

     Server端发布服务

@RpcService("HelloService")
public class HelloServiceImpl implements HelloService {
    @Override
    public String say(String msg) {
        return "Hello Word!" + msg;
    }
}

    在application.properties中,注入环境变量:

#

lrpc.server=8888

    Client 配置服务地址和LrpcExecutor

   

lrpc.hello.host=xxxxxxxxxxxxxxxxxxx
lrpc.hello.port=8888
lrpc.hello.desc=hello rpc调用

  配置调用服务执行器 LrpcExecutor,保存在spring 容器bean里,可通过依赖注入进行调用

@Configuration
@Component
public class RpcConfiguration {

    @Bean("rpc.hello")
    @ConfigurationProperties(prefix = "lrpc.hello")
    public RpcServerProperties rpcClientCallProperties() {
        return new RpcServerProperties();
    }

    @Bean("helloRpcExecutor")
    LrpcExecutor lrpcExecutor(@Qualifier(value = "rpc.hello") RpcServerProperties rpcServerProperties) {
        return invoke(rpcServerProperties);
    }

    private LrpcExecutor invoke(RpcServerProperties config) {
        return new LrpcExecutorImpl(config.getHost(), config.getPort());
    }
}

  调用服务,methodName为ClassName对应类下的方法名: 

@Autowired
    @Qualifier(value = "helloRpcExecutor")
    private LrpcExecutor helloRpcExecutor;

    @GetMapping("/say")
    public String invoke(String msg) {
        RpcRequest rpcRequest = new RpcRequest();
        rpcRequest.setClassName("HelloService");
        rpcRequest.setMethodName("say");
        rpcRequest.setId(111L);
        HashMap<Class<?>, Object> arguments = new HashMap<>(8);
        arguments.put(String.class, "good");
        rpcRequest.setArguments(arguments);
        RpcResponse execute = helloRpcExecutor.execute(rpcRequest);
        System.out.println(execute.toString());
        return execute.toString();
    }

最后,以上为个人练手demo,未来有时间会把未完善的地方慢慢完善,最终目标是做成像dobble那样的pj,如有好的意见或疑惑欢迎各位大佬指点(morty630@foxmail.com),附上 github完整代码:https://github.com/To-echo/lrpc-all    (你的点赞是我的动力)

相关技术文档  

objenesis反射工具:http://objenesis.org/details.html

Protobuf 协议:https://developers.google.com/protocol-buffers/

Protobuffer序列化工具:https://github.com/protostuff/protostuff

RPC介绍:https://en.wikipedia.org/wiki/Remote_procedure_call

Netty官网:https://netty.io/

原文地址:https://www.cnblogs.com/coding400/p/9882789.html