Netty学习笔记(四)——实现dubbo的rpc

1、rpc基本介绍

RPC ( Remote Procedure Call) -远程过程调用,是一个计算机通信协议。该协议允许运行于一台计算机的程序调
用另一台计算机的子程序,两个或多个应用程序分布不同的服务器上,而他们之间的调用就像调用本地方法一样

常见的 RPC框架有:比较知名的如阿里的Dubbo、google的gRPC、Go语言的rpex、Apache的thrift,Spring的Spring Cloud。

2、rpc流程

1)服务消费方(client)以本地调用方式调用服务

2)clientstub接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体

3)clientstub将消息进行编码并发送到服务端

4) server stub收到消息后进行解码

5)serverstub根据解码结果调用本地的服务

6)本地服务执行并将结果返回给server stub

7) server stub将返回导入结果进行编码并发送至消费方

8)clientstub接收到消息并进行解码

9)服 务消费方(client)得到结果

3、用Netty 实现一个简单的RPC框架

需求:消费者远程调用提供者的服务,提供者返回一个字符串,消费者接受提供者返回的数据
1》创建一个公共接口

2》创建一个提供者

3》创建一个消费者

示意图:

公共接口:

//公共接口,提供者和消费者都需要
public
interface HelloService { public String hello(String msg); }

provider:服务端

实现类:

public class HelloImp implements HelloService {
    //消费方调用此方法时,返回值
    @Override
    public String hello(String msg) {
        System.out.println("收到客户端信息:"+msg);
       if(msg != null){
           return "收到客户端的信息"+msg+",我是服务端";
       }else{
           return "收到客户端的信息,我是服务端";
       }
    }
}

启动类:

public class MyServerBootstrap {
    
    public static void main(String[] args) {
      //启动服务提供者
new MyNettyServer().startServer("127.0.0.1",7000); } }
public class MyNettyServer {

    public void startServer(String host , int port){
startServer0(host,port); }     //初始化并启动服务端
public void startServer0(String host, int port){ EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup,workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(new MyServerRpcHandler()); } }); try { ChannelFuture channelFuture = serverBootstrap.bind(host, port).sync(); channelFuture.addListener((sc)->{ if(sc.isSuccess()){ System.out.println("服务器启动成功"); }else{ System.out.println("服务器启动失败"); } }); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } }

业务处理handler:

public class MyServerRpcHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("收到的客户端信息:"+msg);
        String message =   msg.toString().substring(msg.toString().lastIndexOf("#")+1);
        String result = new HelloImp().hello(message);
        ctx.writeAndFlush(result);

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println(cause.getMessage());
        ctx.close();
    }
}

consumer:客户端

启动类:

public class MyClientBootStrap {
    private static final String providerName = "helloWorld#";

    public static void main(String[] args) {
      //创建消费者对象 MyNettyClient consumer
= new MyNettyClient();
      //创建代理对象 HelloService helloService
= (HelloService)consumer.getBean(HelloService.class, providerName);
      //通过代理对象调用方法 String res
= helloService.hello("hello 你好 服务端"); System.out.println("从客户端返回的结果["+res+"]"); } }
public class MyNettyClient {

    private static MyClientRpcHandler clientRpcHandler;
    //创建自定义的线程池 ExecutorService executorService
= new ThreadPoolExecutor( 2, 5, 20, TimeUnit.SECONDS, new LinkedBlockingDeque<>(3), new ThreadPoolExecutor.CallerRunsPolicy());   //使用代理模式,创建一个代理对象 public Object getBean(final Class<?> serviceClass , final String providerName){ System.out.println("getBean。。。。被调用"); return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),new Class<?>[]{serviceClass},(proxy, method, args) -> { if(clientRpcHandler == null){ initClient(); } System.out.println("clientRpcHandler..."+clientRpcHandler); clientRpcHandler.setPara(providerName+args[0]); return executorService.submit(clientRpcHandler).get(); }); }; public void initClient(){ clientRpcHandler = new MyClientRpcHandler(); EventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY,true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(clientRpcHandler); } }); try { bootstrap.connect("127.0.0.1", 7000).sync(); } catch (InterruptedException e) { e.printStackTrace(); } } }

业务handler:

public class MyClientRpcHandler extends ChannelInboundHandlerAdapter implements Callable{

    private ChannelHandlerContext context;//上下文对象
    private String result;//返回的结果
    private String para;//客户端传给服务端的信息

    //与服务端连接后第一个被调用 @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("channelActive被调用。。。"); context = ctx; System.out.println("context"+context); }
  //读取服务器的数据 @Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("MyClientRpcHandler。。。。channelRead被调用"); result = msg.toString(); notify();//唤醒等待的线程 } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println(cause.getMessage()); ctx.close(); } @Override public synchronized Object call() throws Exception { System.out.println("call被调用。。。"); context.writeAndFlush(para);//发送数据到服务端 wait();//等待channelRead获取到服务端返回的结果数据后,唤醒线程 return result;//服务端返回的数据 } public void setPara(String para){ System.out.println("setPara。。。被调用"); this.para = para; } }
原文地址:https://www.cnblogs.com/tdyang/p/11981911.html