JavaIO演进之路

先提两大问题:

  • 为什么要学Netty?

Spring5 底层用Netty

Spring Boot 不需要netty,它内部实现了web容器

Zookeeper 也是用的Netty

Dubbo 分布式服务框架 多协议支持(RPC) Netty

有可能成为Java架构师的你的筑基

  • Netty能帮我们解决什么问题?

框架:简化开发一系列解决方案的集合,封装IO操作的框架

复杂业务场景中,没有说用一个单独IO API

IO + 多线程来解决问题 Netty是用来封装IO操作

本篇学习目标:

  • 掌握Java中BIO、NIO、AIO之间的区别及应用场景。
  • 透彻理解阻塞(Block)与非阻塞(Non-Block)区别。
  • 透彻理解同步(Synchronization)和异步(Asynchronous)的区别。

本篇内容定位:

  •  适合具有网络通信开发经验的人群。
  • 适合具有1-3年Java Web开发经验的人群。

input和output:

input和output是相对于内存而言的,向内存中写入input,从内存中读出output

 阻塞要等待机器内存数据完全准备好,才能读向程序内存,非阻塞有动态轮询机制。

 阻塞和非阻塞是一种读取数据的策略,非阻塞实现了多路复用

 数据先进入缓冲区,然后轮询完成

同步和异步:

在处理数据的时候,在同一时间点,能做多个处理:异步,在同一时间只能做一个处理:同步。

阻塞(Block)和非阻塞(Non-Block)

阻塞:往往需要等待缓冲区中的数据准备好过后才处理其他事情,否则一直等待在那里。

非阻塞:当我们的进程访问我们的数据缓冲区的时候,如果数据没有准备好则直接返回,不会等待。如果数据已经准备好,也直接返回。

 阻塞和非阻塞是进程在访问数据的时候,数据是否准备就绪的一种处理方式,当数据没有准备的时候。

IO(BIO)、NIO与AIO对比:

 IO(BIO) Block IO  同步阻塞IO

NIO  Non-Block IO 同步非阻塞IO (可以用线程池实现异步)

AIO(NIO2) Async IO 异步非阻塞IO (事件驱动,回调)

 改进目的:提升IO操作的性能、IO框架出现

代码案例

BIO服务端:

/**
 * @ClassName BIOServer
 * @Author 周聪
 * @Date 2021/2/1 21:50
 * @Version 1.0
 * @Description BIO服务端
 */
public class BIOServer {

    /**
     * 服务的网络IO模型的封装对象
     */
    ServerSocket serverSocket;

    /**
     * 服务器
     *
     * @param port
     */
    public BIOServer(int port) {
        try {
//            Tomcat 默认端口8080
//            只要是Java写的底层都是ServerSocket
            serverSocket = new ServerSocket(port);
            System.out.println("BIO服务已启动,监听端口是:" + port);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws IOException {
        new BIOServer(8080).listen();
    }

    /**
     * 开始监听,并处理逻辑
     *
     * @throws IOException
     */
    public void listen() throws IOException {
//        循环监听
        while (true) {
//            等待客户端连接,阻塞方法,只有客户端把数据发过来的时候才会动,否则一直等,程序不会进行下去
//            Socket数据发送者在服务端的引用
            Socket client = serverSocket.accept();
            System.out.println(client.getPort());
//            对方发数据给我了,读Input
            InputStream is = client.getInputStream();
//            JVM内存
//            网络客户端把数据发送到网卡,机器所得到的数据读到了JVM内存中
            byte[] buff = new byte[1024];
            int len = is.read(buff);
            if (len > 0) {
                String msg = new String(buff, 0, len);
                System.out.println("收到" + msg);
            }

        }
    }
}

客户端(NIO和BIO都可用)

/**
 * @ClassName BIOClient
 * @Author 周聪
 * @Date 2021/2/1 21:43
 * @Version 1.0
 * @Description BIO和NIO的客户端
 */
public class BIOClient {
//    FileOutputStream、FileInputStream 这里不拿磁盘操作案例,大家都很熟悉

    public static void main(String[] args) throws IOException {
//        要和谁进行通信,服务器IP、服务器的端口
//      一台机器的端口号是有限的
        Socket client = new Socket("localhost", 8080);
//        输出 不管是客户端还是服务端,都有可能write和read
        OutputStream os = client.getOutputStream();
//        生成一个随机的ID
        String name = UUID.randomUUID().toString();
        System.out.println("客户端发送数据:" + name);
        os.write(name.getBytes());
        os.close();
        client.close();
    }
}

NIO服务端

/**
 * @ClassName NIOServerDemo
 * @Author 周聪
 * @Date 2021/2/1 22:27
 * @Version 1.0
 * @Description NIO服务端 NIO的操作过于繁琐,于是才有了Netty
 * Netty就是对这一系列非常繁琐的操作进行了封装.
 */
public class NIOServerDemo {

    private int port = 8080;
    //    准备两个东西
    /**
     * 轮询器 Selector 大堂经理
     */
    private Selector selector;

    /**
     * 缓冲区 Buffer  等候区
     */
    private ByteBuffer buffer = ByteBuffer.allocate(1024);

    public NIOServerDemo(int port) {
//        初始化大堂经理,开门营业
        try {
            this.port = port;
            ServerSocketChannel server = ServerSocketChannel.open();
//            我得告诉地址,接客 IP/Port
            server.bind(new InetSocketAddress(this.port));
//            BIO 升级版本 NIO ,为了兼容BIO ,NIO模型默认是采用阻塞式
            server.configureBlocking(false);
//            大堂经理准备就绪,接客
            selector = Selector.open();
//            在门口端牌子,正在营业
            server.register(selector, SelectionKey.OP_ACCEPT);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        new NIOServerDemo(8080).listen();
    }

    public void listen() {
        System.out.println("listen on " + this.port + ".");
//        轮询主线程
        try {
            while (true) {
                System.out.println("listen on " + this.port + ".");
//                大堂经理再叫号
                selector.select();
//                每次都拿到所以的号子
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
//                不断地迭代,就叫轮询
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
//                同步体现在这里,因为每次只能拿一个key,每次只能处理一种状态
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    iterator.remove();
//                    每一个key代表一种状态
//                    每一个号对应一个业务,这里体现为 数据就绪,数据可读,数据可写等待...
                    process(key);
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 具体办业务的方法,坐班柜员
     * 每一次轮询就是调用一次process方法,而每次调用只能干一件事
     * 在同一时间点,只能干一件事
     *
     * @param key
     */
    private void process(SelectionKey key) {

        try {
//        针对每一种状态给一个反应
            if (key.isAcceptable()) {
                ServerSocketChannel server = (ServerSocketChannel) key.channel();
//                这个方法体现非阻塞,不管你数据有没有准备好
//                你给我一个状态和反馈
                SocketChannel channel = server.accept();
                channel.configureBlocking(false);
//                当数据准备就绪的时候,将状态改为可读
                key = channel.register(selector, SelectionKey.OP_READ);
            } else if (key.isReadable()) {
//                key.channel 从多路复用器中拿客户端的引用
                SocketChannel channel = (SocketChannel) key.channel();
                int len = channel.read(buffer);
                if (len > 0) {
                    buffer.flip();
                    String content = new String(buffer.array(), 0, len);
                    channel.register(selector, SelectionKey.OP_WRITE);
//                    在key上携带一个附件,一会再写出去
                    key.attach(content);
                    System.out.println("读取内容:" + content);
                }
            } else if (key.isWritable()) {
                SocketChannel channel = (SocketChannel) key.channel();
                String content = (String) key.attachment();
                channel.write(ByteBuffer.wrap(("输出:" + content).getBytes()));
                channel.close();
            }


        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

AIO服务端

/**
 * @ClassName AIOServer
 * @Author 周聪
 * @Date 2021/2/1 23:21
 * @Version 1.0
 * @Description AIO服务端
 */
public class AIOServer {

    private final int port;

    public AIOServer(int port) {
        this.port = port;
        listen();
    }

    public static void main(String[] args) {
        int port = 8000;
        new AIOServer(port);
    }

    private void listen() {
        try {
            ExecutorService executorService = Executors.newCachedThreadPool();
            AsynchronousChannelGroup threadPool = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1);
//            开门营业
//            工作线程,用来侦听回调,事件响应的时候需要回调
            final AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(threadPool);
            server.bind(new InetSocketAddress(port));
            System.out.println("服务已启动,监听端口: " + port);

//            准备接受数据
            server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {

                final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

                /**
                 * 回调有两个状态:成功
                 * 实现completed方法来回调 由操作系统来触发
                 * @param result
                 * @param attachment
                 */
                @Override
                public void completed(AsynchronousSocketChannel result, Object attachment) {
                    System.out.println("IO操作成功,开始获取数据");
                    try {
                        byteBuffer.clear();
                        result.read(byteBuffer).get();
                        byteBuffer.flip();
                        result.write(byteBuffer);
                        byteBuffer.flip();
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        try {
                            result.close();
                            server.accept(null, this);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println("操作完成");
                }

                /**
                 * 回调有两个状态:失败
                 * @param exc
                 * @param attachment
                 */
                @Override
                public void failed(Throwable exc, Object attachment) {
                    System.out.println("IO操作失败: " + exc.getStackTrace());
                }

            });

            try {
                Thread.sleep(Integer.MAX_VALUE);
            } catch (Exception e) {
                e.printStackTrace();
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

AIO客户端

/**
 * @ClassName AIOClient
 * @Author 周聪
 * @Date 2021/2/1 23:39
 * @Version 1.0
 * @Description AIO客户端
 */
public class AIOClient {

    private final AsynchronousSocketChannel clientChannel;

    public AIOClient() throws Exception {
        clientChannel = AsynchronousSocketChannel.open();
    }

    public static void main(String[] args) throws Exception {
        new AIOClient().connect("localhost", 8000);
    }

    public void connect(String host, int port) throws Exception {
        clientChannel.connect(new InetSocketAddress(host, port), null, new CompletionHandler<Void, Object>() {
            /**
             * 回调成功状态的方法
             * @param result
             * @param attachment
             */
            @Override
            public void completed(Void result, Object attachment) {
                try {
                    clientChannel.write(ByteBuffer.wrap("这是一条测试数据".getBytes())).get();
                    System.out.println("已发送至服务器");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

            /**
             * 回调失败状态的方法
             * @param exc
             * @param attachment
             */
            @Override
            public void failed(Throwable exc, Object attachment) {
                exc.printStackTrace();
            }
        });

        final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        clientChannel.read(byteBuffer, null, new CompletionHandler<Integer, Object>() {
            /**
             * 回调成功状态的方法
             * @param result
             * @param attachment
             */
            @Override
            public void completed(Integer result, Object attachment) {
                System.out.println("IO操作完成:" + result);
                System.out.println("获取反馈结果:" + new String(byteBuffer.array()));
            }

            /**
             * 回调失败状态的方法
             * @param exc
             * @param attachment
             */
            @Override
            public void failed(Throwable exc, Object attachment) {
                exc.printStackTrace();
            }
        });

        try {
            Thread.sleep(Integer.MAX_VALUE);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

总结(应用场景)

JDK1.4以前都是BIO,之后NIO出现,性能得到大幅提升,Netty默认用的NIO

JDK1.7   NIO  === > 出现NIO2(AIO) 操作系统的性能,决定的IO的性能(存在兼容问题),目前不是主流

所有的IO实现异步都很容易,加入线程就可以,线程过多会有问题(CPU会爆),Netty引入了反应堆(线程池+调度)的概念。而AIO提供异步不需要。

BIO/NIO/AIO  他们的底层都是TCP/IP协议

欢迎批评指正。 附:源码地址

原文地址:https://www.cnblogs.com/itzhoucong/p/14359887.html