Netty 源码解读(二)-ChannelPipeline、ChannelHandler、ChannelHandlerContext

1. ChannelPipeline、ChannelHandler、ChannelHandlerContext 的关系

1. 每创建一个Socket 就会分配一个全新的ChannelPipeline (简称pipeline)

2. 每一个 ChannelPipeline 内部包含多个 ChannelHandlerContext (简称Context)

3. Context一起组成了一个双向链表,这些Context 用于封装我们调用addLast 时添加的Channelhandler(以下简称Handler)

也就是说ChannelSocket 和 ChannelPipeline 是一对一的关系,而pipeline内部的多个Context 行成了链表,Context 只是对Handler 的封装。

当一个请求进来时会进入socket 对应的pipeline,并经过pipeline 所有的handler。 可以理解为过滤器模式。

2. 设计

1. ChannelPipeline 设计

该接口继承了ChannelInboundInvoker、 ChannelOutboundInvoker、 Iterable 接口。 标识可以调用数据出站的方法和入站的方法, 同时也能遍历内部的链表。

public interface ChannelPipeline
        extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {

处理过程如下:

(1)入站事件由入站处理程序自下而上的方向处理。入站处理程序通常处理由地步的IO线程生成入站数据,入站数据通常从SocketChannel#read(ByteBuffer) 获取

(2) 通常一个pipeline 有多个handler。例如一个典型的服务器在每个通常的管道中都会有一下处理程序:

协议解码器-将二进制数据转换为Java 对象;

协议编码器-将java 对象转换为二进制数据

业务逻辑处理程序-执行实际业务逻辑

(3) 业务程序不能将线程阻塞,会影响IO的速度,进而影响整个Netty 程序的性能。如果业务程序很快,可以放在IO线程中,反之就需要异步执行。 或者在添加handler的时候添加一个线程池。

2. ChannelHandler 作用以及设计

(1) 源码

public interface ChannelHandler {

    // 当把channelHandler 添加到pipeline 时被调用
    void handlerAdded(ChannelHandlerContext ctx) throws Exception;

    // 当从pipeline 移除时调用
    void handlerRemoved(ChannelHandlerContext ctx) throws Exception;

    // 处理发生异常时调用
    @Deprecated
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;

    @Inherited
    @Documented
    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @interface Sharable {
        // no value
    }
}

(2) 作用: 作用是处理IO事件或拦截IO事件,并将其转发给下一个handler。 handler 处理事件是分入站和出站的(入站是说读取数据到程序处理的过程,出站是说写出数据到调用内核write方法写出去数据的过程)。两个方向的操作都是不同的,因此,netty 定义了两个子接口继承ChannelHandler。

入站:ChannelInboundHandler

出站: ChannelOutboundHandler

入站出站都可以处理的handler:ChannelDuplexHandler

public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implements ChannelOutboundHandler {

3. ChannelHandlerContext 作用

ChannelHandlerContext 同时继承了 ChannelInboundInvoker, ChannelOutboundInvoker。ChannelHandlerContext 也 定义了一些自己的方法。这些方法能够获取Context 上下文环境的对象,比如channel、executor、handler、pipeline, 内存分配器,关联的handler 是否被删除等信息。Context 就是包装了handler相关的一切,以方便Contex 可以在pipeline 方便的操作handler。

public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {

如下自身的方法:

3. 创建过程

1. 任何一个SocketChannel 创建的同时都会创建一个pipeline

io.netty.channel.AbstractChannel#AbstractChannel(io.netty.channel.Channel) 源码如下:

    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

    protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this);
    }

io.netty.channel.DefaultChannelPipeline#DefaultChannelPipeline:

    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }

可以看到链表有两个伪节点(头和尾)。

1》头节点:

io.netty.channel.DefaultChannelPipeline.TailContext (入站处理handler)

    final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {

        TailContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, TAIL_NAME, true, false);
            setAddComplete();
        }
。。。
}

2》伪节点:

HeadContext 是一个入站和出站都兼顾的handler

    final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {

        private final Unsafe unsafe;

        HeadContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, HEAD_NAME, false, true);
            unsafe = pipeline.channel().unsafe();
            setAddComplete();
        }
。。。
}

  创建是在客户端建立连接时:

 2. 当用户或系统内部调用pipeline的addXX 方法添加handler 时,都会创建一个包装这handler 的Context

 io.netty.channel.DefaultChannelPipeline#addLast(io.netty.channel.ChannelHandler...)

    @Override
    public final ChannelPipeline addLast(ChannelHandler... handlers) {
        return addLast(null, handlers);
    }

    @Override
    public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
        if (handlers == null) {
            throw new NullPointerException("handlers");
        }

        for (ChannelHandler h: handlers) {
            if (h == null) {
                break;
            }
            addLast(executor, null, h);
        }

        return this;
    }

    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            checkMultiplicity(handler);

            newCtx = newContext(group, filterName(name, handler), handler);

            addLast0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventloop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }

    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }

解释:

1. io.netty.channel.DefaultChannelPipeline#checkMultiplicity 检查该实例是否是共享的,如果不是并且已经被别的pipeline 使用了,则抛出异常

2. 调用io.netty.channel.DefaultChannelPipeline#newContext 创建一个DefaultChannelHandlerContext

    private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
        return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
    }

3. 调用io.netty.channel.DefaultChannelPipeline#addLast0 添加到链条尾部

4. 做一些其他处理

4. 调用过程

读的时候从head开始, 写的时候从tail 开始。当一个请求进来时,会先调用pipeline 的相关方法,如果是入站事件由fireXXX开始,表示开始管道的流动,让后面的handler 继续处理。

调用过程可以用下图标识:

1. 入站读取数据追踪

io.netty.channel.DefaultChannelPipeline#fireChannelRead代码如下:

    @Override
    public final ChannelPipeline fireChannelRead(Object msg) {
        AbstractChannelHandlerContext.invokeChannelRead(head, msg);
        return this;
    }

读取数据时调用过程如下:

 1》 继续调用io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(io.netty.channel.AbstractChannelHandlerContext, java.lang.Object)

    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }

2》 io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(java.lang.Object)

    private void invokeChannelRead(Object msg) {
        if (this.invokeHandler()) {
            try {
                ((ChannelInboundHandler)this.handler()).channelRead(this, msg);
            } catch (Throwable var3) {
                this.invokeExceptionCaught(var3);
            }
        } else {
            this.fireChannelRead(msg);
        }

    }

  这时候会调用handler的channelRead 方法。也就是具体的handler 的方法。

3》io.netty.channel.DefaultChannelPipeline.HeadContext#channelRead 方法如下: 相当于没做任何逻辑处理,直接调用下一个处理器处理

        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            ctx.fireChannelRead(msg);
        }

4》io.netty.channel.AbstractChannelHandlerContext#fireChannelRead

    @Override
    public ChannelHandlerContext fireChannelRead(final Object msg) {
        invokeChannelRead(findContextInbound(), msg);
        return this;
    }

io.netty.channel.AbstractChannelHandlerContext#findContextInbound 如下:(可以看到入站是找inbound属性为true的context,然后继续进行调用)

    private AbstractChannelHandlerContext findContextInbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while (!ctx.inbound);
        return ctx;
    }

5》继续调用io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(io.netty.channel.AbstractChannelHandlerContext, java.lang.Object)方法(同1》 一样)

  也就是如果希望pipeline 中的context 继续处理,需要在handler中继续调用 ctx.fireXXX 方法,比如io.netty.handler.logging.LoggingHandler#channelRead

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (logger.isEnabled(internalLevel)) {
            logger.log(internalLevel, format(ctx, "READ", msg));
        }
        ctx.fireChannelRead(msg);
    }

2. 出站数据跟踪

1》io.netty.channel.DefaultChannelPipeline#write(java.lang.Object, io.netty.channel.ChannelPromise)

    public final ChannelFuture write(Object msg, ChannelPromise promise) {
        return tail.write(msg, promise);
    }

2》io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, io.netty.channel.ChannelPromise)

    public ChannelFuture write(final Object msg, final ChannelPromise promise) {
        write(msg, false, promise);

        return promise;
    }

3》io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, boolean, io.netty.channel.ChannelPromise)

    private void write(Object msg, boolean flush, ChannelPromise promise) {
        ObjectUtil.checkNotNull(msg, "msg");
        try {
            if (isNotValidPromise(promise, true)) {
                ReferenceCountUtil.release(msg);
                // cancelled
                return;
            }
        } catch (RuntimeException e) {
            ReferenceCountUtil.release(msg);
            throw e;
        }

        final AbstractChannelHandlerContext next = findContextOutbound(flush ?
                (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }
        } else {
            final AbstractWriteTask task;
            if (flush) {
                task = WriteAndFlushTask.newInstance(next, m, promise);
            }  else {
                task = WriteTask.newInstance(next, m, promise);
            }
            if (!safeExecute(executor, task, promise, m)) {
                // We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes
                // and put it back in the Recycler for re-use later.
                //
                // See https://github.com/netty/netty/issues/8343.
                task.cancel();
            }
        }
    }

4》io.netty.channel.AbstractChannelHandlerContext#invokeWriteAndFlush

    private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
        if (invokeHandler()) {
            invokeWrite0(msg, promise);
            invokeFlush0();
        } else {
            writeAndFlush(msg, promise);
        }
    }

5》io.netty.channel.AbstractChannelHandlerContext#invokeWrite0  这里调用handler的write 方法

    private void invokeWrite0(Object msg, ChannelPromise promise) {
        try {
            ((ChannelOutboundHandler) handler()).write(this, msg, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    }

6》handler 处理完成如果需要继续处理调用ctx.write(msg, promise);    会重新调用到 io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, io.netty.channel.ChannelPromise)

【当你用心写完每一篇博客之后,你会发现它比你用代码实现功能更有成就感!】
原文地址:https://www.cnblogs.com/qlqwjy/p/15091141.html