Netty学习笔记

Netty是什么?

概述——JBOSS提供的一个开源的Java网络应用框架

特点——异步,基于事件驱动。提供了TCP/IP、HTTP协议栈,方便定制开发私有协议栈

本质——NIO框架

应用——开发高性能高可靠的网络IO程序,例如在分布式系统中作为RPC的基础通信组件。另外在大数据领域也被广泛应用,Akka,Flink,Spark等项目都用到了Netty

数据传输的基础——TCP协议

下图可以帮助了解Netty在Java网络编程所处的位置

image-20210315141613432

三种经典的I/O模型

BIO

概述

同步并阻塞,客户端每来一个连接请求,服务器就要启动一个对应的线程进行处理,高并发(同一时间出现大量请求)场景下,服务器资源消耗严重,压力很大。同时,如果连接什么也不做,服务器仍然会让线程维持,造成不必要的线程开销。最重要的是,这种模式下,数据的读取写入必须阻塞在一个线程内等待其完成。

工作机制

1)服务器启动一个ServerSocket监听连接请求

2)客户端启动Socket对服务器发起连接请求,服务端默认情况下为每个客户建立一个线程与之进行通讯

3)客户端发出请求后,咨询服务端是否有线程响应。有响应,客户端线程在请求结束后继续执行;无响应,进入等待,等待超时后连接请求被拒绝。

image-20210315144516566

改进措施:使用线程池,实现并发,但并不能减少线程的使用个数。

应用场景:连接数目比较小且固定,对服务器资源要求比较高。

实战

服务端借助线程池实现,客户端用Telnet模拟

package com.youzikeji.bio;

import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.*;

public class BioServer {
    public static void main(String[] args) throws IOException {
        //利用线程池实现BIO的server端
        //创建一个线程池,通过七大参数
        ExecutorService threadPool = new ThreadPoolExecutor(
                5,
                10,
                3,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(5),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
        );

        //1.创建ServerSocket
        ServerSocket serverSocket = new ServerSocket(6666);
        System.out.println("服务器启动了");
        while (true){
            //监听,等待客户端连接
            final Socket socket = serverSocket.accept();
            System.out.println("连接到一个客户端");

            //2.创建一个线程与之通讯
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    handler(socket);
                }
            });
        }
    }

    //线程和客户端通信的方法
    public static void handler(Socket socket){
        System.out.println("线程id : " + Thread.currentThread().getId());
        InputStream is = null;
        try {
            //接收数据的缓冲区
            byte[] bytes = new byte[1024];
            //通过socket获取输入流
            is = socket.getInputStream();

            //3.循环读取客户端发送的数据
            while (true){
                System.out.println("线程id : " + Thread.currentThread().getId());
                int read = is.read(bytes);
                if (read != -1){
                    System.out.println(new String(bytes, 0, read));
                } else {
                    //读完break
                    break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            //释放资源
            if (is != null){
                try {
                    is.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if (socket != null){
                try {
                    socket.close();
                    System.out.println("关闭与客户端的连接");
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

NIO

概述

同步非阻塞,客户端发送的请求会注册到多路复用器(选择器)上,多路复用器轮询到连接有I/O请求才进行处理。对于高负载、高并发的(网络)应用,应使用 NIO 的非阻塞模式来开发。

特点:支持面向缓冲的,基于通道的 I/O 操作方法。

应用场景:连接数目多且连接时间比较短,适用于聊天服务器,弹幕系统,服务器通信等场景。

image-20210315165226877

三大核心

Channel
  • 每个Channel都会对应一个Buffer

  • Channel是双向的,可以返回底层操作系统的情况

  • Channel的切换是基于事件驱动的

FileChannel + ByteBuffer——将字符串写入文本文件案例

image-20210315195058080

NioFileChannelDemo01.java

package com.youzikeji.nio;

import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

public class NioFileChannelDemo01 {
    public static void main(String[] args) throws IOException {
        String str = "hello, caoyusang";

        FileOutputStream os = new FileOutputStream("d:\file1.txt");

        //通过输出流获取对应的Channel,输出流把Channel包裹起来了
        FileChannel channel = os.getChannel();

        //为Channel创建一个对应的缓冲区
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

        //str放入缓冲区
        byteBuffer.put(str.getBytes());

        //对缓冲区进行flip,即position置0
        byteBuffer.flip();

        //将缓冲区数据写入到Channel
        channel.write(byteBuffer);

        //输出流关闭
        os.close();

    }
}

FileChannel + ByteBuffer——从文本文件读取文本,打印到控制台

NioFileChannelDemo02.java

package com.youzikeji.nio;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

public class NioFileChannelDemo02 {
    public static void main(String[] args) throws IOException {
        //创建文件的输入流
        File file = new File("d:\file1.txt");
        FileInputStream is = new FileInputStream(file);

        //通过输入流获取通道
        FileChannel channel = is.getChannel();

        //创建合适大小的缓冲区——根据文件大小
        ByteBuffer byteBuffer = ByteBuffer.allocate((int) file.length());

        //将Channel中的数据读到bytebuffer
        channel.read(byteBuffer);

        //将字节转成字符串,array方法return hb;
        System.out.println(new String(byteBuffer.array()));

    }
}

FileChannel+一个ByteBuffer实现文件的拷贝(需要读写)

NioFileChannelDemo03.java

package com.youzikeji.nio;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

public class NioFileChannelDemo03 {
    public static void main(String[] args) throws IOException {
        //获取被拷贝的文件的输入流
        File file = new File("d:\file1.txt");
        FileInputStream is = new FileInputStream(file);

        //获取输入流对应的channel
        FileChannel isChannel = is.getChannel();

        //获取输出流
        FileOutputStream os = new FileOutputStream("d:\file2.txt");

        //获取输出流对应的channel
        FileChannel osChannel = os.getChannel();

        //构建512大小的缓冲区
        ByteBuffer byteBuffer = ByteBuffer.allocate(512);

        //循环读取,防止读不完
        while (true){
            //清空buffer,复位操作,防止position==limit出现read一直为0的情况
            /*
            public Buffer clear() {
                position = 0;
                limit = capacity;
                mark = -1;
                return this;
            }
            */
            byteBuffer.clear();

            int read = isChannel.read(byteBuffer);
            //读到末尾
            if (read == -1){
                break;
            }
            //读的同时,写,注意要先读写反转
            byteBuffer.flip();
            osChannel.write(byteBuffer);
        }
        //关闭输入流和输出流
        is.close();
        os.close();




    }
}

FileChannel.transferFrom()实现文件的拷贝

NioFileChannelDemo04.java

package com.youzikeji.nio;

import java.io.*;
import java.nio.channels.FileChannel;

public class NioFileChannelDemo04 {
    public static void main(String[] args) throws IOException {
        //获取被拷贝的文件的输入流
        File file = new File("d:\file1.txt");
        FileInputStream is = new FileInputStream(file);

        //获取输入流对应的channel
        FileChannel isChannel = is.getChannel();

        //获取输出流
        FileOutputStream os = new FileOutputStream("d:\file3.txt");

        //获取输出流对应的channel
        FileChannel osChannel = os.getChannel();

        //使用transferFrom(src, begin, end)实现通道内数据的拷贝
        osChannel.transferFrom(isChannel, 0 , isChannel.size());

        //流的关闭
        is.close();
        os.close();
    }
}

Buffer
  • 内存块,可读可写,底层是一个数组
  • 普通buffer可以转换成只读buffer

顶层父类Buffer抽象类的参数

// Invariants: mark <= position <= limit <= capacity
private int mark = -1;	
private int position = 0;	 //下一个要被读或者写的数组元素的索引,每次读写完后都会更新
private int limit;			//缓存区的当前终点,不能对缓冲区超过极限的位置进行读写操作,可修改
private int capacity;		//缓冲区的容量,不可改变

Buffer的一个子类ByteBuffer

真正存放数据的是hb数组

image-20210315172258183

基本的buffer使用

package com.youzikeji.nio;

import java.nio.IntBuffer;

public class BasicBuffer {
    public static void main(String[] args) {
        //创建buffer
        IntBuffer intBuffer = IntBuffer.allocate(5);

        //put向buffer中放数据
        for (int i = 0; i < intBuffer.capacity(); i++) {
            intBuffer.put(i * 2);
        }

        //从buffer中取数据
        //先将buffer转换,进行读写切换
        intBuffer.flip();

        while (intBuffer.hasRemaining()){
            System.out.println(intBuffer.get());
        }

    }
}

MappedByteBuffer类——允许文件直接在内存中修改,操作系统不需要再拷贝一次

MappedByteBufferTest.java

package com.youzikeji.nio;

import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;

public class MappedByteBufferTest {
    public static void main(String[] args) throws IOException {
        //创建一个随机访问的文件流,读写模式
        RandomAccessFile randomAccessFile = new RandomAccessFile("d:\file1.txt", "rw");

        //获取通道
        FileChannel channel = randomAccessFile.getChannel();

        /**
         * 参数1: 读写模式
         * 参数2:可以直接修改的起始位置
         * 参数3: 映射到内存的大小,即可以修改的字节数
         */
        MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 5);
        mappedByteBuffer.put(0, (byte) 'H');
        mappedByteBuffer.put(3, (byte) '9');

        randomAccessFile.close();


    }
}
Buffer分散与聚集

分散:将数据写入buffer时,可以采用buffer数组,依次写入

聚集:从buffer读取数据时,可以采用buffer,依次读

Selector
  • 一个Selector对应一个线程,同时可以对应多个不同的Channel
  • 能够检测多个注册的通道上是否有事件发生,有则获取事件并针对其进行处理
  • Selector可以在各个通道上进行切换,即单线程多路复用

NIO vs BIO

1)BIO以流的方式处理数据,NIO以块的方式处理数据,块IO的效率要高很多。

2)BIO是阻塞的即数据的读写必须阻塞在一个线程内完成,NIO是非阻塞的,面向缓冲区。

3)BIO基于字节流和字符流进行操作,NIP基于管道和缓冲区进行操作,数据总是从通道读取到缓冲区,或者从缓冲区写入通道,单个Selector线程负责轮询监听多个管道中的事件。

AIO

异步非阻塞,基于事件和回调机制实现的,也就是应用操作之后会直接返回,不会堵塞在那里,当后台处理完成,操作系统会通知相应的线程进行后续的操作。AIO尚未得到广泛应用。

应用场景:连接数目多且连接时间较长。

Netty概述

NIO存在的一些问题

  • NIO的类库和API繁杂,使用麻烦,需要熟练掌握Select、ServerSocketChannel、SocketChannel、ByteBuffer等
  • NIO编程涉及Reactor模式,必须对多线程和网络编程相当熟悉
  • 开发工作量和难度比较大,客户端面临断连重连、网络闪断、半包读写、失败缓存和网络拥塞等问题
  • JDK NIO的Epoll bug,会导致Selector空轮询,最终导致CPU100%直到JDK1.7还未解决

Netty的优点

  • 对JDK自带的NIO的API进行了封装,解决了上述传统原生NIO网络编程出现的问题
  • 设计优雅、使用方便、安全、社区活跃、高性能、吞吐更高、延迟更低,减少了资源消耗和不必要的内存复制

线程模型

Reactor模式

单Reactor单线程

image-20210502101356806

说明

(1)select是I/O复用的标准网络编程API,可以实现应用程序通过一个阻塞对象监听多路连接请求

(2)Reactor通过select监控客户端的请求事件,收到事件后通过dispatch进行请求的分发处理

(3)如果是建立连接事件,则由Acceptor通过Accept处理连接请求,然后创建一个Handler对象处理连接完成后的后续业务处理

(4)如果不是建立连接事件,则Reactor会分发调用连接对应的Handler响应不同类型的请求

(5)最后由Handler完成read —> 业务处理 —> send的完整流程

模式的优缺点

(1)优点:模型简单,没有多线程、进程通信、竞争的问题

(2)缺点:只有一个线程,无法完全发挥多核CPU的性能。因为Reactor和Handler在同一个线程中,即请求监听和请求处理在同一个线程中完成,并发高的情况下,Handler在处理某个业务时,整个线程无法处理其他请求事件。

单Reactor多线程

image-20210502104022933

说明

(1)Reactor通过select监控客户端的请求事件,收到事件后通过dispatch进行请求的分发处理

(2)如果是建立连接事件,则由Acceptor通过Accept处理连接请求,然后创建一个Handler对象处理连接完成后的后续业务处理

(3)如果不是建立连接事件,则Reactor会分发调用连接对应的Handler响应不同类型的请求,该模式下handler只负责事件响应,而不做任何的业务处理,只做read和send,而具体的业务处理交付给worker线程池的某个线程

(4)worker线程池会分配独立线程完成真正的业务,并把结果返回给handler

模式的优缺点

(1)优点:可以充分利用多核CPU的处理能力

(2)缺点:多线程数据共享和访问较为复杂;reactor还是负责所有事件的监听和响应,即连接的监听和响应仍然是在单线程中运行,高并发场景下容易出现性能瓶颈。

主从Reactor多线程

image-20210502110433664

说明

(1)Reactor主线程MainReactor通过select监控客户端的请求事件,收到事件后通过Acceptor处理连接事件

(2)当Acceptor处理连接事件后,MainReactor将连接分配给下一级的SubReactor

(3)SubReactor将连接加入到连接队列进行监听,并创建handler进行各种事件处理

(4)当有新的事件发生时,SubReactor就会调用对应的handler进行处理

(5)handler通过read读取数据,将业务处理移交worker线程池

(6)worker线程池分配独立的一个线程进行业务处理,返回结果给对应的handler

(7)handler收到响应结果后,通过send将结果返回给client

模式优缺点

(1)优点:父线程和子线程数据交互简单职责明确,父线程只需要接受连接请求,子线程完成后续的I/O及业务处理

(2)编程复杂度较高

Netty线程模式

网络图

img

我自己画的

image-20210503094734623

说明

(1)Netty抽象出两组线程池,Boss Group专门负责接收客户端的连接请求,Worker Group专门负责网络的读写

(2)Boss Group和worker Group的类型都是NioEventLoopGroup,相当于一个事件循环组,组中有多个事件循环,每个事件循环都是一个NioEventLoop

(3)NioEventLoop表示一个不断循环执行的处理任务的线程,NioEventLoop通过Selector监听绑定在其上的socket的网络通讯

(4)每个Boss Group中的NioEventLoop循环执行的过程分为三步:

​ 1)轮询accept事件,及连接请求事件

​ 2)处理accept事件,与client建立连接,生成NioSocketChannel,并将其注册到某个worker NioEventLoop中的Selector上

​ 3)处理任务队列中的任务,即runAllTasks

(5)每个Woker Group中的NioEventLoop的执行逻辑是:

​ 1)轮询处理读写(R/W)事件

​ 2)处理I/O事件,在对应的NioSocketChannel中处理

​ 3)处理任务队列的任务

(6)每个Worker Group的NioEventLoop处理具体的业务时,会使用管道Pipeline,可以通过Pipeline获取对应的Channel的处理器ChannelHandler,从而进行真正的业务处理

Netty实战

Netty实现简单的TCP通信

NettyTcpServer.java

package com.youzikeji.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;


public class NettyTcpServer {
    public static void main(String[] args) {

        //创建BossGroup(只处理连接请求)和WorkerGroup(处理真正的业务)
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            //创建服务端的启动对象
            ServerBootstrap bootstrap = new ServerBootstrap();

            //进行参数设置
            bootstrap.group(bossGroup, workerGroup)     //设置两个线程组
                    .channel(NioServerSocketChannel.class)      //使用NioServerSocketChannel作为服务器的通道实现
                    .option(ChannelOption.SO_BACKLOG, 128)      //标识当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度
                    .childOption(ChannelOption.SO_KEEPALIVE, true)      //启用心跳保活机制
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new NettyTcpServerHandler());
                        }
                    });
            System.out.println("服务器准备好了");

            //绑定端口并同步
            ChannelFuture cf = bootstrap.bind(7777).sync();

            //对关闭通道进行监听
            cf.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }
}

NettyTcpServerHandler.java

package com.youzikeji.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

//HandlerAdapter是Netty提供的适配器,规范
public class NettyTcpServerHandler extends ChannelInboundHandlerAdapter {

    /**
     *
     * @param ctx : 上下文对象,含有pipeline, Channel等
     * @param msg : 客户端传来的数据
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("server ctx: " + ctx);

        //打印msg看看,先将msg转化成netty提供的ByteBuf
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("客户端的地址:" + ctx.channel().remoteAddress());

    }

    //数据读取完毕

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端", CharsetUtil.UTF_8));
    }

    //异常处理

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

NettyTcpClient.java

package com.youzikeji.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class NettyTcpClient {
    public static void main(String[] args) throws InterruptedException {
        //创建group
        EventLoopGroup group = new NioEventLoopGroup();

        try {
            //创建启动类
            Bootstrap bootstrap = new Bootstrap();
            //参数设置
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new NettyTcpClientHandler());
                        }
                    });
            System.out.println("客户端就绪");
            //连接服务器并同步
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 7777).sync();

            //关闭通道监听
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }
    }

}

NettyTcpClientHandler.java

package com.youzikeji.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

public class NettyTcpClientHandler extends ChannelInboundHandlerAdapter {

    //客户端就绪就会触发
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client ctx: " + ctx);
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 服务器", CharsetUtil.UTF_8));

    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("服务其消息: " + byteBuf.toString(CharsetUtil.UTF_8));
        System.out.println("服务器地址: " + ctx.channel().remoteAddress());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

服务端

image-20210318102229436

客户端

image-20210318102238524

Netty实现基本的RPC框架

RPC概述

RPC原理图

RPC(Remote Procedure Call)—远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。比如两个不同的服务 A、B 部署在两台不同的机器上,那么服务 A 如果想要调用服务 B 中的某个方法该怎么办呢?使用 HTTP请求当然可以,但是可能会比较慢而且一些优化做的并不好。 RPC 的出现就是为了让计算机调用远程服务就像调用本地服务一样快。

RPC的执行流程

  1. 服务消费方(client)调用以本地调用方式调用服务;
  2. client stub接收到调用后负责将方法、参数等组装成能够进行网络传输的消息体;
  3. client stub找到服务地址,并将消息发送到服务端;
  4. server stub收到消息后进行解码
  5. server stub根据解码结果调用本地的服务
  6. 本地服务执行并将结果返回给server stub;
  7. server stub将返回结果打包成消息并发送至消费方;
  8. client stub接收到消息,并进行解码
  9. 服务消费方得到最终结果

框架设计

image-20210318104129844

具体实现

项目结构

image-20210503112605108

代码剖析

服务接口HelloService.java —— 定义了服务提供者提供服务的规范

package com.youzikeji.rpc.service;

public interface HelloService {

    String hello(String msg);
}

服务实现类HelloServiceImpl.java —— 服务的具体实现(业务,返回服务调用结果)

package com.youzikeji.rpc.provider;

import com.youzikeji.rpc.service.HelloService;

public class HelloServiceImpl implements HelloService {
    public String hello(String msg) {
        System.out.println("收到客户端消息:" + msg);
        if (msg != null) {
            return "客户端您好,已收到您的消息[" + msg + "]";
        } else {
            return "客户端您好,已收到您的消息";
        }
    }
}

服务端

  • ServerBootstrap.java —— 服务端启动类

    package com.youzikeji.rpc.provider;
    
    import com.youzikeji.rpc.server.NettyServer;
    
    //ServerBootStrap会启动一个服务提供者,即NettyServer
    public class ServerBootstrap {
        public static void main(String[] args) {
            NettyServer.startServer("127.0.0.1", 7000);
        }
    }
    
  • NettyServerHandler.java —— 具体的业务处理器

    package com.youzikeji.rpc.server;
    
    import com.youzikeji.rpc.provider.HelloServiceImpl;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    
    public class NettyServerHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    	
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            //获取客户端发送的消息,并根据约定好的的协议调用服务,这里简单地规定消息必须以某个字符串作为开头
            if (msg.toString().startsWith("HelloService#hello#")){
                String result = new HelloServiceImpl().hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
                ctx.writeAndFlush(result);
            }
        }
    }
    
  • NettyServer.java —— Netty构建服务端

    package com.youzikeji.rpc.server;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    
    public class NettyServer {
    
        //对外暴露方法
        public static void startServer(String hostname, int port){
            startServer0(hostname, port);
        }
        /**
         * 启动服务
         * @param hostname 主机名
         * @param port 端口号
         */
        private static void startServer0(String hostname, int port) {
            //Boss线程池和Worker线程池
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            try {
                //创建启动类
                ServerBootstrap serverBootstrap = new ServerBootstrap();
    
                serverBootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ChannelPipeline pipeline = socketChannel.pipeline();
                                pipeline.addLast(new StringDecoder());
                                pipeline.addLast(new StringEncoder());
                                pipeline.addLast(new NettyServerHandler());     //业务处理器
                            }
                        });
                //绑定并同步监听主机端口,然后做异步处理
                ChannelFuture channelFuture = serverBootstrap.bind(hostname, port).sync();
                System.out.println("服务提供方开始提供服务...");
                channelFuture.channel().closeFuture().sync();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    

客户端

  • ClientBootstrap.java —— 客户端启动类,通过构建远程服务代理对象实现服务调用

    package com.youzikeji.rpc.customer;
    
    import com.youzikeji.rpc.client.NettyClient;
    import com.youzikeji.rpc.service.HelloService;
    
    public class ClientBootstrap {
    
        //定义协议头
        public static final String head = "HelloService#hello#";
    
        public static void main(String[] args) {
            //创建消费者
            NettyClient cus = new NettyClient();
    
            //创建代理对象
            HelloService service = (HelloService) cus.getBean(HelloService.class, head);
    
            //通过代理对象调用服务提供者的方法
            String res = service.hello("您好, RPC");
    
            System.out.println("调用的结果:" + res);
    
        }
    }
    
  • NettyClientHandler.java —— 将调用服务的参数等信息发送给服务器,等待服务代理对象返回调用结果

    package com.youzikeji.rpc.client;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    
    import java.util.concurrent.Callable;
    
    public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
    
        private ChannelHandlerContext context;
        private String result;  //返回的结果
        private String param;   //客户端调用方法时传入的参数
    
        /**
         * 被代理对象调用,发送数据给服务器,等待被唤醒,然后返回结果
         * @return 返回结果
         * @throws Exception 异常
         */
        @Override
        public synchronized Object call() throws Exception {
            context.writeAndFlush(param);
            //等待channelRead获取服务器返回的结果后,唤醒
            wait();
            return result;
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    
        /**
         * 与服务器连接创建后就被调用
         * @param ctx
         * @throws Exception 异常
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            context = ctx;  //其他方法中会使用到ctx
        }
    
        /**
         * 收到服务器的数据后,调用方法
         * @param ctx
         * @param msg
         * @throws Exception 异常
         */
        @Override
        public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            result = msg.toString();
            //唤醒等待的线程
            notify();
        }
    
        void setParam(String param){
            this.param = param;
        }
    }
    
  • NettyClient.java

    package com.youzikeji.rpc.client;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    
    import java.lang.reflect.Proxy;
    import java.util.Objects;
    import java.util.concurrent.*;
    
    public class NettyClient {
        //创建线程池
        public static ExecutorService executor = new ThreadPoolExecutor(
                3,
                Runtime.getRuntime().availableProcessors(),
                3,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(5),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
        );
    
        private static NettyClientHandler client;
    
        //代理模式,获取代理对象
        public Object getBean(final Class<?> serviceClass, final String head) {
            return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                    new Class<?>[]{serviceClass}, ((proxy, method, args) -> {
                        if (client == null) {
                            initClient();
                        }
                        //设置要发给服务端的信息
                        client.setParam(head + args[0]);
                        return executor.submit(client).get();
                    }));
        }
    
        //初始化客户端
        public static void initClient() {
            client = new NettyClientHandler();
    
            EventLoopGroup group = new NioEventLoopGroup();
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(
                            new ChannelInitializer<SocketChannel>() {
                                @Override
                                protected void initChannel(SocketChannel socketChannel) throws Exception {
                                    ChannelPipeline pipeline = socketChannel.pipeline();
                                    pipeline.addLast(new StringDecoder());
                                    pipeline.addLast(new StringEncoder());
                                    pipeline.addLast(client);
                                }
                            }
                    );
            try {
                bootstrap.connect("127.0.0.1", 7000).sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
        }
    
    }
    

结果

先后运行ServerBootstrap.java和ClientBootstrap.java

服务端结果如下图:

image-20210503150355519

客户端结果如下:

image-20210503150404678

原文地址:https://www.cnblogs.com/caoyusang/p/14906973.html