netty的reactor模型和事件传递

1 reactor网络服务模型

(1) handle

(2)同步事件分离器

(3)dispatcher

(4)事件处理器

2 reactor的两个线程组,其中一个是负责监听客户端连接事件,另一个负责将具体的事件处理接入netty的channelHandle责任链,进行数据入站出站。

3 责任链处理,在netty的服务启动时默认初始化3个channelHandle,HeadHandle和TailHandle是一个空的handle,只作为头尾标记,除了这两个头尾以外ServerBootstrapAcceptorHandle作为数据入站处理器被添加进来。这个ServerBootstrapAcceptoHandle是挂接在服务端的NIOServerSocketChannel的pipeline中的,用来监听客户端的连接。

4 两个线程组进行事件交接的连接点就在ServerBootstrapAcceptor,一旦有新的客户端连接上来以后,就会由ServerBootstrapAcceptor执行channelRead方法,将初始化保存的“数据入站出站责任链”,挂接到新连接上来的channel对象中。

5 这样数据发送端与数据接收端的消息传递就可以经过挂接上的channelHandle来处理了。

6 处理流入数据是从HeadChannelHandle开始的,处理流出数据是从TailChannelHandle开始的,在AbstractChannelHandlerContext类中保存了两个成员变量:

abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext {

    volatile AbstractChannelHandlerContext next;
    volatile AbstractChannelHandlerContext prev;

    private final boolean inbound;
    private final boolean outbound;
    private final AbstractChannel channel;
    private final DefaultChannelPipeline pipeline;
    private final String name;
    private boolean removed;

在数据做流向动作时进行判断。

7 这里很重要,数据入站处理和数据流出处理(数据流入从head链表头开始处理,而数据流出是从链表尾开始,注册channelhandle链表节点是千万不能弄错顺序):

首先数据入站处理,我们拿解码器举例子,ReplayingDecoder或者ByteToMessageDecoder都可以,只是不同级别的实现。我们发现数据流转是通过ChannelHandleContext的fireChannelRead方法实现的,ChannelHandleContext本身是多个节点的链表实现的,而且上下文中的数据传递依赖于一个List<Object>,每次数据处理完成以后,将list传入ChannelHandleContext的fireChannelRead方法,寻找下一个链表节点进行数据处理:

 8 ctx.channel().writeAndFlush(res);和ctx.writeAndFlush(res);是不同的:

  ctx.channel().writeAndFlush(res);会从tailChannelHandle开始处理。

  ctx.writeAndFlush(res);则是从下一个ChannelHandle开始处理

  具体代码测试和结果如下:

static final AtomicInteger a = new AtomicInteger(0);
    static ChannelInboundHandler channelInboundHandler = new SimpleChannelInboundHandler<String>() {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
            System.out.println("11111收到客户端消息:" + msg);
            String res = msg + "你好";
            ctx.channel().writeAndFlush(res);
        }
    };

    static ChannelOutboundHandler channelOutboundHandler1 = new MessageToMessageEncoder<String>() {
        @Override
        protected void encode(ChannelHandlerContext ctx, String msg, List<Object> out) throws Exception {
            if (a.get() == 0) {
                a.getAndIncrement();
                System.out.println("数据出站1:" + msg);
                ctx.channel().writeAndFlush(msg);//如果使用channel中的writeAndFlush,相当于使用了tailChannelHandle的writeAndFlush要完整走完所有出站ChannelHandle
            } else {
                System.out.println("数据出站1:" + msg);
                ctx.writeAndFlush(msg);
            }
        }
    };
    static ChannelOutboundHandler channelOutboundHandler2 = new MessageToMessageEncoder<String>() {
        @Override
        protected void encode(ChannelHandlerContext ctx, String msg, List<Object> out) throws Exception {
            System.out.println("数据出站2:" + msg);
            ctx.writeAndFlush(msg);
        }
    };

    public static void main(String[] args) {
        EventLoopGroup parent = new NioEventLoopGroup(1);
        EventLoopGroup children = new NioEventLoopGroup(2);

        final EventLoopGroup handle = new NioEventLoopGroup();

        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(parent, children)
                .channel(NioServerSocketChannel.class)
                .handler(new LoggingHandler(LogLevel.DEBUG))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new ClientMonitorHandlerAdapter());//数据流入处理1
                        pipeline.addLast(new CustomDataFrameDecoder());//数据流入处理2
                        pipeline.addLast(new CustomDataFrameToStrDecoder());//数据流入处理3
                        pipeline.addLast(handle, channelInboundHandler);//数据流入处理4

                        pipeline.addLast(new CustomDataFrameEncoder());//数据流出处理3
                        pipeline.addLast(handle, channelOutboundHandler2);//数据流出处理2
                        pipeline.addLast(handle, channelOutboundHandler1);//数据流出处理1
                    }
                });
        ChannelFuture channelFuture = null;
        try {
            channelFuture = serverBootstrap.bind(9099).sync();
            channelFuture.channel().closeFuture().sync();

        } catch (InterruptedException e) {
            channelFuture.channel().close();
            parent.shutdownGracefully();
            children.shutdownGracefully();
        }

 运行的测试结果如下:

原文地址:https://www.cnblogs.com/zzq-include/p/12119611.html