netty 详解(一)架构设计、异步模型、任务队列、入门案例

录:

1、netty 是什么
2、netty 架构设计
    2.1、线程模型
    2.2、传统阻塞 I/O 服务模型
    2.3、Reactor 模式
    2.4、单 Reactor 单线程
    2.5、单 Reactor 多线程
    2.6、主从 Reactor 多线程
    2.7、Netty工作原理架构图
3、Netty 编程之 helloworld
4、自定义 ChannelInboundHandlerAdapter  收发消息
5、任务队列 taskQueue 和 scheduledTaskQueue
6、Netty 异步模型
7、Netty 入门案例--HTTP 服务

1、netty 是什么    <--返回目录

Netty 是一个异步事件驱动的网络应用框架,用于快速开发可维护的高性能服务器和客户端。

下面是我总结的使用 Netty 不使用 JDK 原生 NIO 的原因

  • 使用 JDK 自带的 NIO 需要了解太多的概念,编程复杂,一不小心 bug 横飞
  • Netty 底层 IO 模型随意切换,而这一切只需要做微小的改动,改改参数,Netty 可以直接从 NIO 模型变身为 IO 模型
  • Netty 自带的拆包解包,异常检测等机制让你从 NIO 的繁重细节中脱离出来,让你只需要关心业务逻辑
  • Netty 解决了 JDK 的很多包括空轮询在内的 bug
  • Netty 底层对线程,selector 做了很多细小的优化,精心设计的 reactor 线程模型做到非常高效的并发处理
  • 自带各种协议栈让你处理任何一种通用协议都几乎不用亲自动手
  • Netty 社区活跃,遇到问题随时邮件列表或者 issue
  • Netty 已经历各大 rpc 框架,消息中间件,分布式通信中间件线上的广泛验证,健壮性无比强大

2、netty 架构设计    <--返回目录

  不同的线程模式,对程序的性能有很大影响,为了搞清 netty 线程模式,我们来系统分析下各个线程模式,最后看看 netty 线程模型有什么优越性。

2.1、线程模型    <--返回目录

  目前存在的线程模型有:

  • 传统阻塞 I/O 服务模型
  • Reactor 模式(反应器模式、分发者模式 Dispatcher、通知者模式 Notifier)

  根据 Reactor 的数量和处理资源池线程的数量不同,有 3 种典型的实现:

  • 单 Reactor 单线程
  • 单 Reactor 多线程
  • 主从 Reactor 多线程

  netty 线程模式:netty 主要基于主从 Reactor 多线程模型做了一定的改进,其中主从 Reactor 多线程模式有多个 Reactor

2.2、传统阻塞 I/O 服务模型    <--返回目录

  模型特点:

  • 采用阻塞 IO 模式 获取输入的数据
  • 每个连接都需要独立的线程完成数据的输入,业务处理,数据返回

问题分析:

  • 当并发数很大,就会创建大量的线程,占用很大系统资源
  • 连接创建后,如果当前线程暂时没有数据可读,该线程回阻塞在 read 操作,造成线程资源的浪费

2.3、Reactor 模式    <--返回目录

  针对传统阻塞 IO 服务模型的 2 个缺点,解决方案:

  • 基于 IO 复用模型:多个连接共用一个阻塞对象,应用程序只需要在一个阻塞对象等待,无需阻塞等待所有连接。当某个连接有新的数据可以处理时,操作系统通知应用程序,线程从阻塞状态返回,开始进行业务处理。
  • 基于线程池复用线程资源:不必为每个连接创建线程,将连接完成后的业务处理任务分配给线程进行处理,一个线程可以处理多个连接的业务。

  IO 复用结合线程池,就是 Reactor 模式基本设计思想。

2.4、单 Reactor 单线程    <--返回目录

  方案说明:

  • select 是前面 IO 复用模型介绍的标准网络编程 API,可以实现应用程序通过一个阻塞对象监听多路连接请求
  • Reactor 对象通过 Select 监控客户端请求事件,收到事件后通过 Dispatch 进行分发
  • 如果是建立连接请求事件,则由 Acceptor 通过 accept 处理连接请求,然后创建一个 handler 对象处理连接完成后的后续业务处理
  • 如果不是建立连接事件,则 Reactor 会分发调用连接对应的 handler 来响应
  • handler 会完成 read -> 业务处理 -> send 的完整业务流程

  服务器端用一个线程通过多路复用搞定所有的 IO 操作(包括连接、读写等),编码简单,清晰明了,但是如果客户端连接数量较多,将无法支撑。

 

2.5、单 Reactor 多线程    <--返回目录

  方案说明:

  • Reactor 对象通过 select 监控客户端请求事件,收到事件后,通过 dispatch 进行分发
  • 如果建立连接请求,则由 Acceptor 通过 accept 处理连接请求,然后创建一个 handler 对象处理完成连接后的各种事件
  • 如果不是连接请求,则由 reactor 分发调用连接对象对应的 handler 来处理
  • handler 之负责响应事件,不做具体的业务处理,通过 read 读取数据后,会分发给后面的 worker 线程池的某个线程处理业务
  • worker 线程池会分配独立线程完成真正的业务,并将结果返回给 handler
  • handler 收到响应后,通过 send 将结果返回给 cliet

  缺点:reactor 处理所有的事件的监听和响应,在单线程运行,在高并发场景容易出现性能瓶颈。

2.6、主从 Reactor 多线程    <--返回目录

  主 Reactor 负责连接事件;子 Reactor 负责监听读写事件

2.7、Netty工作原理架构图    <--返回目录

  1)Netty 抽象出两组线程池,BossGroup 专门负责接受客户端的连接,WorkerGroup 专门负责网络的读写

  2)BossGroup 和 WorkGroup 类型都是 NioEventLoopGroup

  3)NioEventLoopGroup 相当于一个事件循环组,这个组含有多个事件循环,每个事件循环时 NioEventLoop

  4)NioEventLoop 表示一个不断循环的执行处理任务的线程,每个 NioEventLoop 都有一个 Selector,用于监听绑定在其上的 socket 的网络通讯

  5)NioEventLoopGroup 可以有多个线程,即可以含有多个 NioEventLoop

  6)每个 Boss NioEventLoop 循环执行的步骤

  • 轮询 accept 事件
  • 处理 accept 事件,与 client 建立连接,生成 NiobiumSocketChannel,并将其注册到 worker NioEveltLoop 上的 Selector
  • 处理任务队列的任务,即 runAllTasks

  7) 每个 Worker NioEventLoop 循环执行的步骤

  • 轮询 read/write 事件
  • 处理 read/write 事件,在对应 NioSocketChannel 处理
  • 处理任务队列的任务,即 runAllTasks

3、Netty 编程之 helloworld    <--返回目录

  pom 引入依赖

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.22.Final</version>
</dependency>
View Code

  NettyServer

package com.oy;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;

public class NettyServer {

    public static void main(String[] args) throws Exception {

        // 1.创建 BossGroup 和 workerGroup
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();

        // 2.创建服务器端的启动对象
        ServerBootstrap serverBootstrap = new ServerBootstrap();

        // 3.链式编程,配置参数
        serverBootstrap
                .group(boss, worker)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列得到连接个数
                .childOption(ChannelOption.SO_KEEPALIVE, true) // 设置保持获得连接状态
                .childHandler(new ChannelInitializer<NioSocketChannel>() { // 给 WorkerGroup 的 EventLoop 对应的管道设置处理器
                    protected void initChannel(NioSocketChannel ch) {
                        ch.pipeline().addLast(new StringDecoder());
                        ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
                            @Override
                            protected void channelRead0(ChannelHandlerContext ctx, String msg) {
                                System.out.println(msg);
                            }
                        });
                    }
                });

        // 4.绑定端口,运行服务器
        ChannelFuture future = serverBootstrap.bind(8000).sync();
        System.out.println("server started and listen " + 8000);

        // 5.对关闭通道进行监听
        future.channel().closeFuture().sync();
    }

}

  NettyClient

package com.oy;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;

import java.util.Date;

public class NettyClient {
    public static void main(String[] args) throws InterruptedException {
        Bootstrap bootstrap = new Bootstrap();
        NioEventLoopGroup group = new NioEventLoopGroup();

        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel ch) {
                        ch.pipeline().addLast(new StringEncoder());
                    }
                });

        Channel channel = bootstrap.connect("127.0.0.1", 8000).channel();

        while (true) {
            channel.writeAndFlush(new Date() + ": hello world11111111!");
            Thread.sleep(2000);
        }
    }
}

4、自定义 ChannelInboundHandlerAdapter  收发消息    <--返回目录

  NettyServer

package com.oy;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

import java.net.InetSocketAddress;

public class NettyServer {

    private int port;

    public static void main(String[] args) {
        new NettyServer(8080).start();
    }

    public NettyServer(int port) {
        this.port = port;
    }

    public void start() {
        /**
         * 创建两个EventLoopGroup,即两个线程池,boss线程池用于接收客户端的连接,一个线程监听一个端口,一般只会监听一个端口所以只需一个线程
         * work池用于处理网络连接数据读写或者后续的业务处理(可指定另外的线程处理业务,work完成数据读写)
         */
        EventLoopGroup boss = new NioEventLoopGroup(1);
        EventLoopGroup work = new NioEventLoopGroup();

        try {
            /**
             * 实例化一个服务端启动类,
             * group()指定线程组
             * channel()指定用于接收客户端连接的类,对应java.nio.ServerSocketChannel
             * childHandler()设置编码解码及处理连接的类
             */
            ServerBootstrap server = new ServerBootstrap()
                    .group(boss, work)
                    .channel(NioServerSocketChannel.class)
                    .localAddress(new InetSocketAddress(port))
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline()
                                    //.addLast("decoder", new StringDecoder())
                                    //.addLast("encoder", new StringEncoder())
                                    .addLast(new NettyServerHandler());
                        }
                    });

            // 绑定端口
            ChannelFuture future = server.bind().sync();
            System.out.println("server started and listen " + port);
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            boss.shutdownGracefully();
            work.shutdownGracefully();
        }
    }

    public static class NettyServerHandler extends ChannelInboundHandlerAdapter {

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("HelloWorldServerHandler active");
        }

        /**
         * 读取客户端发送的数据
         * ChannelHandlerContext ctx: 上下文对象,含有管道 pipeline,通道 channel,连接地址
         * Object msg: 客户端发送的数据
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("server channelRead...");

            // 读取客户端发送的数据
            ByteBuf buf = (ByteBuf) msg;
            System.out.println("from " + ctx.channel().remoteAddress() + ", " + buf.toString(CharsetUtil.UTF_8));

            //System.out.println(ctx.channel().remoteAddress() + "->Server :" + msg.toString());
            // 返回消息
            //ctx.write("server write, 收到消息" + msg);
            //ctx.flush();
        }

        /**
         * 数据读取完毕
         */
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端", CharsetUtil.UTF_8));
        }

        /**
         * 处理异常,关闭通道
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.channel().close();
        }
    }
}

  NettyClient

package com.oy;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

public class NettyClient {
    private static final String HOST = "127.0.0.1";
    private static final int PORT= 8080;

    public static void main(String[] args){
        new NettyClient().start(HOST, PORT);
    }

    public void start(String host, int port) {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap client = new Bootstrap()
                    .group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline()
                                    //.addLast("decoder", new StringDecoder())
                                    //.addLast("encoder", new StringEncoder())
                                    .addLast(new NettyClientHandler());
                        }
                    });

            ChannelFuture future = client.connect(host, port).sync();
            //future.channel().writeAndFlush("Hello Netty Server ,I am a netty client");
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }
    }

    public static class NettyClientHandler extends ChannelInboundHandlerAdapter {
        /**
         * 通道就绪触发该方法
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("HelloWorldClientHandler Active");
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 服务器", CharsetUtil.UTF_8));
        }

        /**
         * 当通道有读取事件时触发该方法
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf) msg;
            System.out.println("收到服务器响应: " + buf.toString(CharsetUtil.UTF_8));
        }
    }
}

5、任务队列 taskQueue 和 scheduledTaskQueue    <--返回目录

  任务队列中的 Task 有三种典型使用场景

  • 用户程序自定义的普通任务
  • 用户自定义定时任务
  • 非当前 Reactor 线程调用 Channel 的各种方法。例如在推送系统的业务线程里,根据用户的标识,找到对应的 Channel 引用,然后调用 Write 类方法向该用户推送消息,就会进入到这种场景。最终的 Write 会提交到任务队列中被异步消费
public static class NettyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("HelloWorldServerHandler active");
    }

    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("server channelRead...");

        // 读取客户端发送的数据
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("from " + ctx.channel().remoteAddress() + ", " + buf.toString(CharsetUtil.UTF_8));

        // 模拟业务处理耗时
        //Thread.sleep(5 * 1000);
        //ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端1
", CharsetUtil.UTF_8));

        // 用户自定义的任务,任务添加到 taskQueue 中
        ctx.channel().eventLoop().execute(new Runnable() {

            public void run() {
                try {
                    Thread.sleep(5 * 1000);
                    ctx.writeAndFlush(Unpooled.copiedBuffer(new Date().toLocaleString() + "hello, 客户端1
", CharsetUtil.UTF_8));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

        });

        // 用户自定义定义任务, 任务添加到 scheduledTaskQueue 中
        ctx.channel().eventLoop().schedule(new Runnable() {
            public void run() {
                try {
                    Thread.sleep(5 * 1000);
                    ctx.writeAndFlush(Unpooled.copiedBuffer(new Date().toLocaleString() + "hello, 客户端 shedule
", CharsetUtil.UTF_8));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        },5 , TimeUnit.SECONDS);
    }

    /**
     * 数据读取完毕
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer(new Date().toLocaleString() + "hello, 客户端2", CharsetUtil.UTF_8));
    }

    /**
     * 处理异常,关闭通道
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.channel().close();
    }
}

6、Netty 异步模型    <--返回目录

  异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的组件在完成后,通过状态、通知和回调来通知调用者。

  Netty 中的 IO 操作时异步的,包括 Bind、Write、Connect 等操作会简单的返回一个 ChannelFuture。

  调用者并不能立刻获得结果,而是通过 Future-Listener 机制,用户可以方便的主动获取或者通过通知机制获得 IO 操作结果。

  Netty 的异步模型是建立在 Future 和 callback 之上的,callback 就是回调。重点说 Future,它的核心思想是:假设一个方法 fun,计算过程可能非常耗时,等待 fun 返回显然不合适,那么可以在调用 fun 的时候,立马返回一个 Future,后续可以通过 Future 取监控方法 fun 的处理过程(即 Future-Listener 机制)。

    // 绑定端口
    final ChannelFuture future = server.bind(8080).sync();

    future.addListener(new ChannelFutureListener() {
        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            if (future.isDone()) {
                System.out.println("监听端口 8080 成功");
            } else {
                System.out.println("监听端口 8080 失败");
            }
        }
    });

7、Netty 入门案例--HTTP 服务    <--返回目录

 

   HttpServer

package com.oy.http;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class HttpServer {
    public static void main(String[] args) {
        EventLoopGroup boss = new NioEventLoopGroup(1);
        EventLoopGroup work = new NioEventLoopGroup();

        try {
            ServerBootstrap server = new ServerBootstrap()
                    .group(boss, work)
                    .channel(NioServerSocketChannel.class)
                    //.localAddress(new InetSocketAddress(port))
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new MyChannelInitializer());

            // 绑定端口
            ChannelFuture future = server.bind(8080).sync();
            System.out.println("server started and listen " + 8080);
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            boss.shutdownGracefully();
            work.shutdownGracefully();
        }
    }
}
View Code

  MyChannelInitializer

package com.oy.http;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;

public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {

    protected void initChannel(SocketChannel socketChannel) throws Exception {

        /* 向管道加入处理器 */
        ChannelPipeline pipeline = socketChannel.pipeline();
        // HttpServerCodec: netty 提供的处理 http 的编-解码器
        pipeline.addLast("MyHttpServerCodec", new HttpServerCodec());
        // 添加自定义的处理器
        pipeline.addLast("MyHttpServerHandler", new MyHttpServerHandler());

    }

}
View Code

  MyHttpServerHandler

package com.oy.http;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;

import java.net.URI;

public class MyHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {

    protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
        System.out.println("===================================");
        System.out.println("msg 类型: " + msg.getClass().getName());
        System.out.println("客户端地址:" + ctx.channel().remoteAddress());

        // 判断 msg 是否是 http request 请求
        if (msg instanceof HttpRequest) {
            HttpRequest request = (HttpRequest) msg;
            URI uri = new URI(request.uri());
            System.out.println("请求 uri: " + uri.getPath());
            if ("/favicon.ico".equals(uri.getPath())) {
                System.out.println("请求 favicon.icon,不做响应");
                return;
            }

            // 回复信息给浏览器
            ByteBuf content = Unpooled.copiedBuffer("hello, 我是服务器", CharsetUtil.UTF_8);
            // 构造一个 http 的响应,即 http response
            DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
                    HttpResponseStatus.OK, content);
            response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=utf-8");
            response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());

            ctx.writeAndFlush(response);
        }
    }

}
View Code

参考:

  1)netty 官网:https://netty.io/

  2)《跟闪电侠学Netty》开篇:Netty是什么?

  3)掘金小册:Netty 入门与实战:仿写微信 IM 即时通讯系统

  4)Netty整体架构

  5)Netty工作原理架构图

原文地址:https://www.cnblogs.com/xy-ouyang/p/12820107.html