居于Netty实现的一个简单的RPC

Netty:基于java NIO 网络通信框架,具有高效、简单、快速的应用特点。在当下互联网高并发场景下得到很好地应用,现在用java写的高并发产品(如dubbo 、zookeeper、hadoop、rocketmq)大都应用了netty作为底层的通信技术。
RPC:远程调用,通过RPC框架,使得我们可以像调用本地方法一样地调用远程机器上的方法。
1.定义常量类,包含主机地址,端口号。
1 /**
2  * @author hsl 2020-07-02 13:26
3  * 一个常亮工具类 主机地址,端口号
4  */
5 public class Constants {
6     public static String REMOTE_HOST = "localhost";
7     public static int PORT = 6789;
8 }

2.RPC框架,使得我们可以像调用本地方法一样地调用远程机器上的方法,那么我们就定义接口并实现。

1 package com.netty.nettyOne.service.dateTimeService;
2 
3 /**
4  * @author hsl 2020-07-02 13:29
5  */
6 public interface DateTimeService {
7     public String getNow(String param);
8 }
 1 package com.netty.nettyOne.service.dateTimeService;
 2 
 3 import java.text.SimpleDateFormat;
 4 import java.util.Date;
 5 
 6 /**
 7  * @author hsl 2020-07-02 13:29
 8  */
 9 public class DateTimeServiceImpl implements DateTimeService {
10 
11     @Override
12     public String getNow(String param) {
13         return new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date());
14     }
15 }

3.NettyServer

 1 package com.netty.nettyOne.server;
 2 
 3 import com.netty.nettyOne.service.dateTimeService.DateTimeServiceImpl;
 4 import io.netty.channel.ChannelHandlerContext;
 5 import io.netty.channel.ChannelInboundHandlerAdapter;
 6 
 7 /**
 8  * @author hsl 2020-07-02 14:00
 9  * 服务处理类实现
10  */
11 public class NettyServerHandler extends ChannelInboundHandlerAdapter {
12     @Override
13     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
14         //System.out.println("Server has received Message: [" + msg + "]");
15         //处理客户端发送来的信息
16         System.out.println("客户端发送的信息:" + msg.toString());
17         //响应
18         String result = new DateTimeServiceImpl().getNow(msg.toString());
19         System.out.println("服务器响应的信息:" + result);
20         ctx.writeAndFlush(result);
21     }
22 
23 //    @Override
24 //    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
25 //
26 //    }
27 
28     @Override
29     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
30         ctx.close();    //抛出关闭通信channel
31     }
32 }
 1 package com.netty.nettyOne.server;
 2 
 3 
 4 import io.netty.bootstrap.ServerBootstrap;
 5 import io.netty.channel.ChannelFuture;
 6 import io.netty.channel.ChannelInitializer;
 7 import io.netty.channel.ChannelPipeline;
 8 import io.netty.channel.nio.NioEventLoopGroup;
 9 import io.netty.channel.socket.SocketChannel;
10 import io.netty.channel.socket.nio.NioServerSocketChannel;
11 import io.netty.handler.codec.string.StringDecoder;
12 import io.netty.handler.codec.string.StringEncoder;
13 
14 /**
15  * @author hsl 2020-07-02 13:39
16  */
17 public class NettyServer {
18     //主机名称 端口号
19     private String host;
20     private int port;
21 
22     public NettyServer(String host, int port) {
23         this.host = host;
24         this.port = port;
25     }
26 
27     /**
28      * 对外公开的方法
29      */
30     public void serverStart() {
31         serverStart0(host, port);
32     }
33 
34     /**
35      * @param host 主机名称
36      * @param port     端口号
37      */
38     private void serverStart0(String host, int port) {
39         //处理 ACCEPT 事件的线程工作组
40         NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
41 
42         //处理 READ/WRITER 时间的线程工作组
43         NioEventLoopGroup workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors());
44 
45         ServerBootstrap server = new ServerBootstrap();
46 
47         server.group(bossGroup, workerGroup)
48                 .channel(NioServerSocketChannel.class)
49                 .childHandler(new ChannelInitializer<SocketChannel>() {
50                     @Override
51                     protected void initChannel(SocketChannel socketChannel) throws Exception {
52                         ChannelPipeline pipeline = socketChannel.pipeline();
53                         pipeline.addLast(new StringEncoder())
54                                 .addLast(new StringDecoder())
55                                 .addLast(new NettyServerHandler());  // 此处添加业务处理Handler
56                     }
57                 });
58 
59         try {
60             //启动服务
61             ChannelFuture channelFuture = server.bind(host, port).sync();
62             System.out.println("NettyServer is started ..... ");
63 
64             //关闭服务
65             channelFuture.channel().closeFuture().sync();
66         } catch (InterruptedException e) {
67             e.printStackTrace();
68         } finally {
69             bossGroup.shutdownGracefully();
70             workerGroup.shutdownGracefully();
71         }
72     }
73 }

netty服务器启动类

 1 package com.netty.nettyOne.server;
 2 
 3 import com.netty.nettyOne.util.Constants;
 4 
 5 /**
 6  * @author hsl 2020-07-02 14:29
 7  */
 8 public class NettyServerBootstrap {
 9     public static void main(String[] args) {
10         //初始化服务器
11         NettyServer nettyServer = new NettyServer(Constants.REMOTE_HOST, Constants.PORT);
12         nettyServer.serverStart();
13     }
14 }

4.NettyClient

 1 package com.netty.nettyOne.client;
 2 
 3 import io.netty.channel.ChannelHandlerContext;
 4 import io.netty.channel.ChannelInboundHandlerAdapter;
 5 
 6 import java.util.concurrent.Callable;
 7 
 8 /**
 9  * @author hsl 2020-07-02 14:48
10  */
11 public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
12     //上下文对象
13     /**
14      * {@link #channelActive}
15      */
16     private ChannelHandlerContext context;
17     //存放服务器返回结果
18     private String result;
19     //存放请求参数
20     private String requestParam;
21 
22     public void setRequestParam(String requestParam) {
23         this.requestParam = requestParam;
24     }
25 
26     @Override
27     public synchronized Object call() throws Exception {
28         System.out.println("NettyClientHandler call() Invoacated ... ");
29         //发送请求
30         context.writeAndFlush(requestParam);
31         //发送完了请求之后,等待....
32         wait();
33         //被唤醒之后,继续处理(返回结果即可)
34         return result;
35     }
36 
37     /**
38      * @param ctx
39      * @throws Exception
40      * 当 该 NettyClientHandler 被初始化的时候,就会调用 该 channelActive 方法一次
41      */
42     @Override
43     public void channelActive(ChannelHandlerContext ctx) throws Exception {
44         System.out.println(" channelActive 被调用  ");
45         // 向服务端发送请求。
46         this.context = ctx;
47     }
48 
49     /**
50      * @param ctx
51      * @param msg
52      * @throws Exception
53      * channelRead 用来处理服务端返回来的数据。
54      */
55     @Override
56     public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
57         result = msg.toString();
58         System.out.println("Client received result: " + result);
59 
60         //唤醒线程
61         notify();
62     }
63 
64     @Override
65     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
66         ctx.close();
67     }
68 }
package com.netty.nettyOne.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author hsl 2020-07-02 14:43
 */
public class NettyClient {
    private String host;
    private int port;

    public NettyClient(String host, int port) {
        this.host = host;
        this.port = port;
        //启动nettyCLient
        clientStart();
    }

    /**
     * 初始化一个线程池,用来处理服务端返回的结果数据
     */
    private static int cpu_cores = Runtime.getRuntime().availableProcessors();
    private static ExecutorService threadPool = Executors.newFixedThreadPool(cpu_cores);

    private NettyClientHandler clientHandler;

    /**
     * 客户端的启动方法
     */
    private void clientStart() {
        clientStart0(host, port);
    }

    private void clientStart0(String host, int port) {
        //服务工作组
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        //客户端服务启动类
        Bootstrap client = new Bootstrap();
        clientHandler = new NettyClientHandler();

        //绑定服务参数
        client.group(workerGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        pipeline.addLast(new StringEncoder())
                                .addLast(new StringDecoder())
                                .addLast(clientHandler);
                    }
                });

        try {
            //客户端启动
            client.connect(host, port).sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * NettyClient 端提供一个 代理方法,获取服务类的一个代理类
     *
     * @param serviceClass
     */
    public Object getProxy(final Class<?> serviceClass) {
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                new Class<?>[]{serviceClass}, new InvocationHandler() {
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        System.out.println("客户端发送的请求参数是:" + args[0].toString());
                        clientHandler.setRequestParam(args[0].toString());
                        return threadPool.submit(clientHandler).get();
                    }
                });
    }
}
 1 package com.netty.nettyOne.client;
 2 
 3 import com.netty.nettyOne.service.dateTimeService.DateTimeService;
 4 import com.netty.nettyOne.util.Constants;
 5 
 6 /**
 7  * @author hsl 2020-07-02 15:30
 8  */
 9 public class NettyClientBootstrap {
10     public static void main(String[] args) {
11         //启动客户端
12         NettyClient client = new NettyClient(Constants.REMOTE_HOST, Constants.PORT);
13 
14         //获取一个服务端 服务对象的 代理对象
15         DateTimeService dateTimeService = (DateTimeService) client.getProxy(DateTimeService.class);
16         //调用服务
17         String result = dateTimeService.getNow("Netty牛逼");
18         // 输出结果
19         System.out.println("Client Received Result From Server: [" + result + "]");
20     }
21 }

6.实现结果

服务器打印

客户端打印

原文地址:https://www.cnblogs.com/ualong/p/13259416.html