七、pipeLine概述

pipeline的初始化

  • pipeline在创建Channel的时候被创建
    在前面几节可以看到在服务端和客户端创建Channel的时候都会调用AbstractChannel构造方法创建Pipeline:
  protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

进入newChannelPipeline方法,最终会调用DefaultChannelPipeline()方法:

  protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }

在该方法中创建了tail和head,并指明两者关系为双向链表结构。
并将Channel和Pipeline绑定。

  • Pipeline节点数据结构:ChannelHandlerContext
    ChannelHandlerContext继承自三个接口AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker。
    AttributeMap中为一个Map结构,存放了一些属性。
    ChannelInboundInvoker和ChannelOutboundInvoker定义了一些事件如传播读事件、绑定事件、异常事件等。

  • Pipeline中的两大哨兵:head和tail

    在初始化ChannelPipeline时,我们看到在其中创建了HeadContext和TailContext。
    TailContext实现了ChannelInboundHandler,说明是一个inbound处理器,再看一下构造函数:inbound标识为true

 TailContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, TAIL_NAME,inbound: true, outbound: false);
            setAddComplete();
        }
        //本身是一个节点,包含的处理器也是自身
        public ChannelHandler handler() {
            return this;
        }

HeadContext实现了ChannelOutboundHandler和ChannelInboundHandler:但在构造函数中inbound标识为false,outbound标识为true

        //unsafe主要是读写等相关的操作
        private final Unsafe unsafe;
        HeadContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, HEAD_NAME, false, true);
            //设置为Channel的unsafe
            unsafe = pipeline.channel().unsafe();
            setAddComplete();
        }
        @Override
        public ChannelHandler handler() {
          return this;
        }        

添加和删除ChannelHandler

添加ChannelHandler

   serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) {
                    //流水线管理子通道中的Handler处理器
                    //向子通道流水线中添加一个Handler处理器
                    ch.pipeline().addLast(new SimpleNettyServerHandler());
                }
            });

跟进addlast方法,到达DefaultChannelPipeline.addlast():

  • 判断是否重复添加checkMultiplicity(handler);
    如果未添加并且不是可共享(利用反射获取注解Sharable 判断),则将added属性置为true
  • 创建节点并添加至链表
  1. 创建节点
 newCtx = newContext(group, filterName(name, handler), handler);

filterName方法

private String filterName(String name, ChannelHandler handler) {
        if (name == null) {
           //创建一个名字
            return generateName(handler);
        }
        //检查是否重复
        checkDuplicateName(name);
        return name;
    }

newContext方法返回DefaultChannelHandlerContext.

new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
  1. 添加节点
addLast0(newCtx);
private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }
  1. 回调完成添加事件
 //在此处,若executor 为null则获取channel().eventLoop()赋值给executor
 EventExecutor executor = newCtx.executor();
            //判断是否为当前线程
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
            //在当前线程则直接执行
            callHandlerAdded0(newCtx);

在此处添加handler,

   ctx.handler().handlerAdded(ctx);
            ctx.setAddComplete();

最终调用的为ChannelInitializer的handlerAdded方法

    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isRegistered()) {
            initChannel(ctx);
        }
    }
        try {
             initChannel((C) ctx.channel());
            } catch (Throwable cause) {
                // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
                // We do so to prevent multiple calls to initChannel(...).
                exceptionCaught(ctx, cause);
            } finally {
                remove(ctx);
            }

initChannel(ctx)回调至初始化添加代码,将自定义的Handler添加到Channel上。
最终移除ChannelHandlerContext

删除ChannelHandler

  • 找到节点
getContextOrDie(handler)---//遍历查找节点并返回
AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);

  • 链表删除节点
assert ctx != head && ctx != tail;
//标准链表删除
private static void remove0(AbstractChannelHandlerContext ctx) {
        AbstractChannelHandlerContext prev = ctx.prev;
        AbstractChannelHandlerContext next = ctx.next;
        prev.next = next;
        next.prev = prev;
    }
  • 回调删除Handler事件
   //获取当前executor
   EventExecutor executor = ctx.executor();
          //判断是否在当前线程
            if (!executor.inEventLoop()) {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerRemoved0(ctx);
                    }
                });
                return ctx;
            }
             callHandlerRemoved0(ctx);
             ----》回调到Handler的handlerRemoved方法
              try {
                ctx.handler().handlerRemoved(ctx);
            } finally {
                ctx.setRemoved();
            }

事件的传播

Context 包装 handler,多个 Context 在 pipeline 中形成了双向链表,入站方向叫 inbound,由 head 节点开始,出站方法叫 outbound ,由 tail 节点开始。而节点中间的传递通过 AbstractChannelHandlerContext 类内部的 fire 系列方法,找到当前节点的下一个节点不断的循环传播。
Handler涉及的环节包括:数据包解码、业务处理、目标数据编码、把数据写入到通道中,有出站和入站两种操作类型:
如下图:
在这里插入图片描述

  • 入站处理,触发的方向为:自底向上,由Netty的内部(如通道)到ChannelInboundHandler入站处理器。
  • 出站方向,触发的方向为:自顶向下,从ChannelOutboundHandler出站处理器到Netty的内部(如通道)。
    按照这种方向来分,前面数据包解码、业务处理这两个环节属于入站处理器的工作,后面目标数据编码、把数据包写到通道中属于出站处理器的工作。

ChannelInboundHandler通道入站处理器

当数据或者信息如占到Netty通道时,Netty将触发入站处理器ChannelInboundHandler对应的入站API,进行入站操作。
ChannelInboundHandler的主要操作如下:
在这里插入图片描述

inbound事件的传播

  • 以ChannelRead事件为例
    handler之间的传播信息通过fireXXX方法:其区别是从哪个节点开始传播。
  1. ctx.fireChannelRead(msg); 从当前节点往下传播事件
  2. ctx.channel().pipeline().fireChannelRead(msg);从头节点HeadContext开始传播
    新建一个ChannelInboundHandler,如下:
public class InboundHandlerA extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("InboundHandlerA:  "+msg);
        ctx.fireChannelRead(msg);
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.channel().pipeline().fireChannelRead("hello world");
    }
}

在ChannelActive处打上断点,进入:
从head开始传播:

 AbstractChannelHandlerContext.invokeChannelRead(head, msg)....
//此处next为headContext,调用headContext的involveChannelRead方法:
next.invokeChannelRead(m);
....
 ((ChannelInboundHandler) handler()).channelRead(this, msg);
....
 //没有做处理,继续向下传播
ctx.fireChannelRead(msg);
....
invokeChannelRead(findContextInbound(), msg);

看一下findContextInbound()方法,找到下一个Inbound的ctx

  AbstractChannelHandlerContext ctx = this;
       do {
           ctx = ctx.next;
       } while (!ctx.inbound);
       return ctx;

继续向下:

next.invokeChannelRead(m);
....
//此处调用的为InboundHandlerA 的ChannelRead方法
((ChannelInboundHandler) handler()).channelRead(this, msg)

继续向下,可以看到
此处findContextInbound返回的为TailContext

invokeChannelRead(findContextInbound(), msg);
....
//未处理信息的处理
onUnhandledInboundMessage(msg){
  ....
  try {
            logger.debug(
                    "Discarded inbound message {} that reached at the tail of the pipeline. " +
                            "Please check your pipeline configuration.", msg);
        } finally {
        //释放资源
            ReferenceCountUtil.release(msg);
        }
....
}

Tips: SimpleInboundHandler会帮你自动释放资源

outbound事件的传播

当业务处理完成后,需要操作Java NIO底层通道时,通过一系列的ChannelOutboundHandler通道出站处理器,完成Netty通道到底层的操作。比方说,建立底层连接、断开底层连接、写入底层Java NIO通道等。ChannelOutboundHandler接口定义了大部分的出站操作,如下:
在这里插入图片描述
出站处理的方向是通过上层Netty通道去操作底层Java IO通道。

public class OutboundHandlerA extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println("OutboundHandlerA:  " + msg);
        ctx.write(msg, promise);
    }
    //模拟向客户端发送消息
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        ctx.executor().schedule(() -> {
           //从头结点开始传播
            ctx.channel().write("hello world");
          //从当前节点开始往下传播ctx.write("hello world");  
        },3, TimeUnit.SECONDS);
    }
}

在handlerAdded处打上断点,跟进:

pipeline.write(msg);
....
//调用tailcontext的write方法
tail.write(msg);
....
//newPromise包装Channel
write(msg, newPromise());
....
write(msg, false, promise);
....
AbstractChannelHandlerContext next = findContextOutbound(){
     AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev;
        } while (!ctx.outbound);
        return ctx;
        }
....
next.invokeWrite(m, promise);

调用到自定义的OutboundHandlerA的write方法。
当处理完后,继续ctx.write方法则会调用到HeadContext的unsafe.write方法:

  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            unsafe.write(msg, promise);
        }
原文地址:https://www.cnblogs.com/demo-alen/p/13547220.html