先提两大问题:
- 为什么要学Netty?
Spring5 底层用Netty
Spring Boot 不需要netty,它内部实现了web容器
Zookeeper 也是用的Netty
Dubbo 分布式服务框架 多协议支持(RPC) Netty
有可能成为Java架构师的你的筑基
- Netty能帮我们解决什么问题?
框架:简化开发一系列解决方案的集合,封装IO操作的框架
复杂业务场景中,没有说用一个单独IO API
IO + 多线程来解决问题 Netty是用来封装IO操作
本篇学习目标:
- 掌握Java中BIO、NIO、AIO之间的区别及应用场景。
- 透彻理解阻塞(Block)与非阻塞(Non-Block)区别。
- 透彻理解同步(Synchronization)和异步(Asynchronous)的区别。
本篇内容定位:
- 适合具有网络通信开发经验的人群。
- 适合具有1-3年Java Web开发经验的人群。
input和output:
input和output是相对于内存而言的,向内存中写入input,从内存中读出output
阻塞要等待机器内存数据完全准备好,才能读向程序内存,非阻塞有动态轮询机制。
阻塞和非阻塞是一种读取数据的策略,非阻塞实现了多路复用
数据先进入缓冲区,然后轮询完成
同步和异步:
在处理数据的时候,在同一时间点,能做多个处理:异步,在同一时间只能做一个处理:同步。
阻塞(Block)和非阻塞(Non-Block)
阻塞:往往需要等待缓冲区中的数据准备好过后才处理其他事情,否则一直等待在那里。
非阻塞:当我们的进程访问我们的数据缓冲区的时候,如果数据没有准备好则直接返回,不会等待。如果数据已经准备好,也直接返回。
阻塞和非阻塞是进程在访问数据的时候,数据是否准备就绪的一种处理方式,当数据没有准备的时候。
IO(BIO)、NIO与AIO对比:
IO(BIO) Block IO 同步阻塞IO
NIO Non-Block IO 同步非阻塞IO (可以用线程池实现异步)
AIO(NIO2) Async IO 异步非阻塞IO (事件驱动,回调)
改进目的:提升IO操作的性能、IO框架出现
代码案例
BIO服务端:
/** * @ClassName BIOServer * @Author 周聪 * @Date 2021/2/1 21:50 * @Version 1.0 * @Description BIO服务端 */ public class BIOServer { /** * 服务的网络IO模型的封装对象 */ ServerSocket serverSocket; /** * 服务器 * * @param port */ public BIOServer(int port) { try { // Tomcat 默认端口8080 // 只要是Java写的底层都是ServerSocket serverSocket = new ServerSocket(port); System.out.println("BIO服务已启动,监听端口是:" + port); } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) throws IOException { new BIOServer(8080).listen(); } /** * 开始监听,并处理逻辑 * * @throws IOException */ public void listen() throws IOException { // 循环监听 while (true) { // 等待客户端连接,阻塞方法,只有客户端把数据发过来的时候才会动,否则一直等,程序不会进行下去 // Socket数据发送者在服务端的引用 Socket client = serverSocket.accept(); System.out.println(client.getPort()); // 对方发数据给我了,读Input InputStream is = client.getInputStream(); // JVM内存 // 网络客户端把数据发送到网卡,机器所得到的数据读到了JVM内存中 byte[] buff = new byte[1024]; int len = is.read(buff); if (len > 0) { String msg = new String(buff, 0, len); System.out.println("收到" + msg); } } } }
客户端(NIO和BIO都可用)
/** * @ClassName BIOClient * @Author 周聪 * @Date 2021/2/1 21:43 * @Version 1.0 * @Description BIO和NIO的客户端 */ public class BIOClient { // FileOutputStream、FileInputStream 这里不拿磁盘操作案例,大家都很熟悉 public static void main(String[] args) throws IOException { // 要和谁进行通信,服务器IP、服务器的端口 // 一台机器的端口号是有限的 Socket client = new Socket("localhost", 8080); // 输出 不管是客户端还是服务端,都有可能write和read OutputStream os = client.getOutputStream(); // 生成一个随机的ID String name = UUID.randomUUID().toString(); System.out.println("客户端发送数据:" + name); os.write(name.getBytes()); os.close(); client.close(); } }
NIO服务端
/** * @ClassName NIOServerDemo * @Author 周聪 * @Date 2021/2/1 22:27 * @Version 1.0 * @Description NIO服务端 NIO的操作过于繁琐,于是才有了Netty * Netty就是对这一系列非常繁琐的操作进行了封装. */ public class NIOServerDemo { private int port = 8080; // 准备两个东西 /** * 轮询器 Selector 大堂经理 */ private Selector selector; /** * 缓冲区 Buffer 等候区 */ private ByteBuffer buffer = ByteBuffer.allocate(1024); public NIOServerDemo(int port) { // 初始化大堂经理,开门营业 try { this.port = port; ServerSocketChannel server = ServerSocketChannel.open(); // 我得告诉地址,接客 IP/Port server.bind(new InetSocketAddress(this.port)); // BIO 升级版本 NIO ,为了兼容BIO ,NIO模型默认是采用阻塞式 server.configureBlocking(false); // 大堂经理准备就绪,接客 selector = Selector.open(); // 在门口端牌子,正在营业 server.register(selector, SelectionKey.OP_ACCEPT); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { new NIOServerDemo(8080).listen(); } public void listen() { System.out.println("listen on " + this.port + "."); // 轮询主线程 try { while (true) { System.out.println("listen on " + this.port + "."); // 大堂经理再叫号 selector.select(); // 每次都拿到所以的号子 Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 不断地迭代,就叫轮询 Iterator<SelectionKey> iterator = selectionKeys.iterator(); // 同步体现在这里,因为每次只能拿一个key,每次只能处理一种状态 while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); // 每一个key代表一种状态 // 每一个号对应一个业务,这里体现为 数据就绪,数据可读,数据可写等待... process(key); } } } catch (IOException e) { e.printStackTrace(); } } /** * 具体办业务的方法,坐班柜员 * 每一次轮询就是调用一次process方法,而每次调用只能干一件事 * 在同一时间点,只能干一件事 * * @param key */ private void process(SelectionKey key) { try { // 针对每一种状态给一个反应 if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); // 这个方法体现非阻塞,不管你数据有没有准备好 // 你给我一个状态和反馈 SocketChannel channel = server.accept(); channel.configureBlocking(false); // 当数据准备就绪的时候,将状态改为可读 key = channel.register(selector, SelectionKey.OP_READ); } else if (key.isReadable()) { // key.channel 从多路复用器中拿客户端的引用 SocketChannel channel = (SocketChannel) key.channel(); int len = channel.read(buffer); if (len > 0) { buffer.flip(); String content = new String(buffer.array(), 0, len); channel.register(selector, SelectionKey.OP_WRITE); // 在key上携带一个附件,一会再写出去 key.attach(content); System.out.println("读取内容:" + content); } } else if (key.isWritable()) { SocketChannel channel = (SocketChannel) key.channel(); String content = (String) key.attachment(); channel.write(ByteBuffer.wrap(("输出:" + content).getBytes())); channel.close(); } } catch (IOException e) { e.printStackTrace(); } } }
AIO服务端
/** * @ClassName AIOServer * @Author 周聪 * @Date 2021/2/1 23:21 * @Version 1.0 * @Description AIO服务端 */ public class AIOServer { private final int port; public AIOServer(int port) { this.port = port; listen(); } public static void main(String[] args) { int port = 8000; new AIOServer(port); } private void listen() { try { ExecutorService executorService = Executors.newCachedThreadPool(); AsynchronousChannelGroup threadPool = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1); // 开门营业 // 工作线程,用来侦听回调,事件响应的时候需要回调 final AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(threadPool); server.bind(new InetSocketAddress(port)); System.out.println("服务已启动,监听端口: " + port); // 准备接受数据 server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() { final ByteBuffer byteBuffer = ByteBuffer.allocate(1024); /** * 回调有两个状态:成功 * 实现completed方法来回调 由操作系统来触发 * @param result * @param attachment */ @Override public void completed(AsynchronousSocketChannel result, Object attachment) { System.out.println("IO操作成功,开始获取数据"); try { byteBuffer.clear(); result.read(byteBuffer).get(); byteBuffer.flip(); result.write(byteBuffer); byteBuffer.flip(); } catch (Exception e) { e.printStackTrace(); } finally { try { result.close(); server.accept(null, this); } catch (Exception e) { e.printStackTrace(); } } System.out.println("操作完成"); } /** * 回调有两个状态:失败 * @param exc * @param attachment */ @Override public void failed(Throwable exc, Object attachment) { System.out.println("IO操作失败: " + exc.getStackTrace()); } }); try { Thread.sleep(Integer.MAX_VALUE); } catch (Exception e) { e.printStackTrace(); } } catch (Exception e) { e.printStackTrace(); } } }
AIO客户端
/** * @ClassName AIOClient * @Author 周聪 * @Date 2021/2/1 23:39 * @Version 1.0 * @Description AIO客户端 */ public class AIOClient { private final AsynchronousSocketChannel clientChannel; public AIOClient() throws Exception { clientChannel = AsynchronousSocketChannel.open(); } public static void main(String[] args) throws Exception { new AIOClient().connect("localhost", 8000); } public void connect(String host, int port) throws Exception { clientChannel.connect(new InetSocketAddress(host, port), null, new CompletionHandler<Void, Object>() { /** * 回调成功状态的方法 * @param result * @param attachment */ @Override public void completed(Void result, Object attachment) { try { clientChannel.write(ByteBuffer.wrap("这是一条测试数据".getBytes())).get(); System.out.println("已发送至服务器"); } catch (Exception e) { e.printStackTrace(); } } /** * 回调失败状态的方法 * @param exc * @param attachment */ @Override public void failed(Throwable exc, Object attachment) { exc.printStackTrace(); } }); final ByteBuffer byteBuffer = ByteBuffer.allocate(1024); clientChannel.read(byteBuffer, null, new CompletionHandler<Integer, Object>() { /** * 回调成功状态的方法 * @param result * @param attachment */ @Override public void completed(Integer result, Object attachment) { System.out.println("IO操作完成:" + result); System.out.println("获取反馈结果:" + new String(byteBuffer.array())); } /** * 回调失败状态的方法 * @param exc * @param attachment */ @Override public void failed(Throwable exc, Object attachment) { exc.printStackTrace(); } }); try { Thread.sleep(Integer.MAX_VALUE); } catch (Exception e) { e.printStackTrace(); } } }
总结(应用场景)
JDK1.4以前都是BIO,之后NIO出现,性能得到大幅提升,Netty默认用的NIO
JDK1.7 NIO === > 出现NIO2(AIO) 操作系统的性能,决定的IO的性能(存在兼容问题),目前不是主流
所有的IO实现异步都很容易,加入线程就可以,线程过多会有问题(CPU会爆),Netty引入了反应堆(线程池+调度)的概念。而AIO提供异步不需要。
BIO/NIO/AIO 他们的底层都是TCP/IP协议
欢迎批评指正。 附:源码地址