【Netty】Netty实现简单RPC

Netty实现RPC

RPC(Remote Procedure Call)远程过程调用,一种计算机通信协议

  • 即:一台计算机的程序调用另一台计算机的子程序,并且不需要对这个交互,进行额外的编程;

RPC机制

RPC调用者要调用远程API,首先调用RPCProxy代理,再通过RCPInvoker调用者,打开RCPConnector连接;

这里的RPCChannel就是Netty的通信方式(SocketChannel);

RPC的协议,我们进行自定义,可以通过字符串来定义一个协议头,代表,要调用的目标方法服务;

常用RPC框架

阿里Dubbo、Apache的thrift、Spring的Spring Cloud、google的gRPC

实现流程

  1. 创建接口,定义抽象方法,用于约定消费者和提供者;
  2. 创建提供者,需要监听消费者请求,并按照约定返回数据;
  3. 创建消费者,该类需要透明的调用自己不存在的方法,使用Netty来发送请求;

公用接口

RPC的两端,都要有此接口;

客户端目的就是要调用服务端的实现;所以客户端不实现此接口;

server端实现此接口;

public interface HelloService {
    String hello(String msg);
}

RPC Server

  1. 在server端,实现接口,以供远程调用
// 多次调用,同一个实例,会创建多个实例,而非同一个!
public class HelloServiceImpl implements HelloService {
    @Override
    public String hello(String msg) {
        if (null != msg) {
            return "RPC returns message :" + msg;
        } else {
            return "message is null";
        }
    }
}
  1. Server端通过Netty实现
public class NettyServer {
    //netty server的启动和初始化
    public static void startServer(String hostName, int port) {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new StringDecoder());
                        pipeline.addLast(new StringEncoder());
                        pipeline.addLast(new NettyServerHandler());
                    }
                });
            ChannelFuture channelFuture = serverBootstrap.bind(hostName, port).sync();
            System.out.println("-------- Netty server starts --------");
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
  1. Server端自定义Handler

    主要是读取客户端的消息;

    通过消息,获取规定好的协议头,来判断,具体调用哪个方法;

    这里的协议头为:HelloService#hello#,后面跟上方法参数;

    拿到协议头后,截取出方法参数,调用对应方法,拿到返回结果;

public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    // read:读取客户端消息,并调用服务
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("client msg : " + msg);
        /**
         * 规定RPC调用协议:
         * 要想调用某个服务,在这里自定义协议头:必须以字符串"HelloService#hello#"开头(接口#方法)
         */
        if (msg.toString().startsWith("HelloService#hello#")) {
            String result = new HelloServiceImpl().hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
            ctx.writeAndFlush(result);
        }
    }

    // exception
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}
  1. 最后是Server端的启动类:启动Netty Server
public class RPCServerBootstrap {
    public static void main(String[] args) {
        NettyServer.startServer("localhost", 8899);
    }
}

RPC Client

  • 客户端是通过一个线程调用RPC的;也就是call方法,发送RPC请求给Server;

  • 请求发送之后,消息并不是立刻返回的,手动将线程阻塞;

  • Server端将RPC调用结果,返回给Client,是首先进入Handler的channelRead方法中;

    在此方法中,拿到msg数据,并执行notify,唤醒等待的线程;

  • 阻塞的线程,被唤醒,并拿到数据,返回RPC调用结果;

执行流程:

1. call线程发送RPC请求,以及参数,发送之后,阻塞等待被唤醒;
2. Server端根据参数,确定要调用的方法,以及参数,执行方法,返回结果
3. client的Handler的channelRead方法,读取到返回的结果;
4. 唤醒call线程,拿到RPC结果,返回;
  1. 客户端这边,我们先完成自定义的ClientHandler

    注意:call方法与channelRead必须一起同步,否则无法唤醒线程;

public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
    private final Lock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();
    // ctx上下文,需要在call方法中使用ctx
    private ChannelHandlerContext context;
    // RPC返回结果
    private String result;
    // client调用RPC传入参数:即方法参数
    private String para;
    void setPara(String para) {
        this.para = para;
    }
    /**
     * call方法发送请求数据,并阻塞,等待被唤醒;
     * 发送RPC请求---> wait等待唤醒--->  channelRead notify唤醒 --> 获取RPC结果
     */
    @Override
    public Object call() throws Exception {
        lock.lock();
        try {
            System.out.println("------ NettyClientHandler.call is called ---------");
            // 发送RPC请求参数
            context.writeAndFlush(para);
            // 等待结果
            condition.await();
            // 唤醒之后,返回结果
            System.out.println("----- NettyClientHandler.call is notified ------");
            return result;
        } finally {
            lock.unlock();
        }
    }
    // 与server创建连接时,调用 channelActive
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("--------- channelActive is called ---------");
        context = ctx;
    }
    // 收到server数据后,必须与call方法同步!
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        lock.lock();
        try {
            // RPC返回结果
            result = msg.toString();
            // 唤醒等待的线程 call方法
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}
  1. Netty客户端的初始化代码

    这里通过提交线程池的方法启动RPC请求的发送;

    通过代理的方式,调用并返回结果;

public class NettyClient {
    // 创建一个线程池 线程数 = CPU处理器数量
    private static int nThread = Runtime.getRuntime().availableProcessors();
    private static ThreadPoolExecutor threadPoolExecutor =
            new ThreadPoolExecutor(nThread, nThread, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
    private static NettyClientHandler clientHandler;
    /**
     * 使用代理模式
     * 反射获取一个代理对象
     */
    public Object getBean(final Class<?> serivceClass, final String providerName) {

        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                new Class<?>[]{serivceClass}, (proxy, method, args) -> {
                    System.out.println("------------ Proxy ------------");
                    if (clientHandler == null) {
                        // 初始化
                        initClient();
                    }
                    //设置要发给服务器端的信息
                    //providerName 协议头 args[0] 就是客户端调用api hello(???), 参数
                    clientHandler.setPara(providerName + args[0]);
					// 提交线程到线程池,返回结果
                    return threadPoolExecutor.submit(clientHandler).get();
                });
    }
    // 初始化客户端
    public static void initClient() {
        // 必须提前实例化!
        clientHandler = new NettyClientHandler();
        try {
            NioEventLoopGroup group = new NioEventLoopGroup();
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(clientHandler);
                        }
                    });
            bootstrap.connect("localhost", 8899).sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  1. 最后是客户端启动类
public class RPCClientBootstrap {
    // 相当于协议头,代表要调用哪个方法
    public static final String PROVIDER_NAME = "HelloService#hello#";

    public static void main(String[] args) {
        // 创建client
        NettyClient client = new NettyClient();
        // 获取proxy代理
        HelloService helloServiceProxy = (HelloService) client.getBean(HelloService.class, PROVIDER_NAME);
        // 代理对象调用PRC获取结果
        String res = helloServiceProxy.hello("RPC Request");
        System.out.println("RPC result: " + res);
    }
}
原文地址:https://www.cnblogs.com/mussessein/p/12627453.html