Netty ChannelPipeline组件作用

ChannelPipeline是ChannelHandler的容器,类似于Servlet的Filter过滤器,它负责处理和拦截 inbound(入站) 或者 outbound(出站) 的事件和操作;(也可以这样理解:ChannelPipeline 是 保存 ChannelHandler 的 List,拦截穿过 Channel 的输入输出 event, ChannelPipeline 实现了拦截器的一种高级形式,使得用户可以对事件的处理以及ChannelHanler之间交互获得完全的控制权);

每一个新创建的 Channel(也可以说是每创建一个新的连接) 都将会被分配一个新的 ChannelPipeline,这项关联是永久性的; Channel 既不能附加另外一个 ChannelPipeline,也不能分离其当前的;在 Netty 组件的生命周期中,这是一项固定的操作,不需要开发人员的任何干预;

//把一个业务处理类(handler)添加到链中的第一个位置
ChannelPipeline addFirst(ChannelHandler... handlers);

//把一个业务处理类(handler)添加到链中的最后一个位置
ChannelPipeline addLast(ChannelHandler... handlers);

  

事件将会被 ChannelInboundHandler 或者 ChannelOutboundHandler,处理;随后, 通过调用 ChannelHandlerContext 实现,它将被转发给同一超类型的下一个ChannelHandler;

ChannelHandlerContext使得ChannelHandler能够和它的ChannelPipeline以及其他的ChannelHandler 交互;保存Channel相关的所有上下文信息,同时关联一个ChannelHandler对象;即ChannelHandlerContext中包含一个具体的事件处理器 ChannelHandler,同时ChannelHandlerContext中也绑定了对应的pipeline和channel的信息,方便对 ChannelHandler进行调用;

  

通过 ChannelHandlerContext 触发的操作的事件流,ChannelHandler可以通知其所属的ChannelPipeline 中 的下一 个ChannelHandler,甚至可以动态修改它所属的ChannelPipeline;

虽然被调用的 Channel ChannelPipeline 上的 write()方法将一直传播事件通过整个 ChannelPipeline,但是在 ChannelHandler 的级别上,事件从一个 ChannelHandler到下一个 ChannelHandler 的移动是由 ChannelHandlerContext 上的调用完成的;类似下面这张图;

对于客户端和服务端,入站和出站是相对的;以客户端应用程序为例,如果事件的传播方向是从客户端到服务端的(发送请求到服务端),那么对于客户端为出站的(即客户端发送给服务端的数据会通过pipeline中的一系列ChannelOutboundHandler,并被这些Handler处理),对于服务端是入站的;

Channel、ChannelPipeline、ChannelHandlerContext 都可以调用此方法,前两者的事件传播会经过整个ChannelPipeline,而ChannelHandlerContext就只会在后续的Handler里面传播;

ChannelInboundHandler之间的传递,主要通过调用ChannelHandlerContext里面的FireXXX()方法来实现下个Handler的调用;

关于有多个入站出站ChannelHandler的执行顺序,测试如下;

  • 服务端

  服务端启动类添加Handler

serverBootstrap.group(bossGroup, workGroup)
    .channel(NioServerSocketChannel.class)
    .option(ChannelOption.SO_BACKLOG,1024)
    .childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new InboundHandler1());
            ch.pipeline().addLast(new InboundHandler2());
            ch.pipeline().addLast(new OutboundHandler1());
            ch.pipeline().addLast(new OutboundHandler2());
        }
    });

  OutboundHandler1

public class OutboundHandler1 extends ChannelOutboundHandlerAdapter {
    private final static Logger log = LoggerFactory.getLogger(OutboundHandler1.class);

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {

        ByteBuf data = (ByteBuf) msg;
        log.info("|OutboundHandler1 write : "+data.toString(CharsetUtil.UTF_8));
        ctx.write(Unpooled.copiedBuffer("OutboundHandler1 "+data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8));
        ctx.flush();
    }
}

  OutboundHandler2

public class OutboundHandler2 extends ChannelOutboundHandlerAdapter {
    private final static Logger log = LoggerFactory.getLogger(OutboundHandler2.class);

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {

        ByteBuf data = (ByteBuf) msg;
        log.info("|OutboundHandler2 write : "+data.toString(CharsetUtil.UTF_8));
        ctx.write(Unpooled.copiedBuffer("OutboundHandler2 "+data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8));
        ctx.flush();
    }
}

  InboundHandler1

public class InboundHandler1 extends ChannelInboundHandlerAdapter {
    private final static Logger log = LoggerFactory.getLogger(InboundHandler1.class);

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        ByteBuf data = (ByteBuf) msg;
        log.info("|InboundHandler1 channelRead 服务端收到数据:" + data.toString(CharsetUtil.UTF_8));

        // 执行下一个InboundHandler
        ctx.fireChannelRead(Unpooled.copiedBuffer("InboundHandler1 " + data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

  InboundHandler2

public class InboundHandler2 extends ChannelInboundHandlerAdapter {
    private final static Logger log = LoggerFactory.getLogger(InboundHandler2.class);

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
        throws Exception {
        ByteBuf data = (ByteBuf) msg;
        log.info("|InboundHandler2 channelRead 服务端收到数据:" + data.toString(CharsetUtil.UTF_8));
        ctx.writeAndFlush(Unpooled.copiedBuffer("InboundHandler2 "+data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
        throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
  • 客户端

  客户端启动类

bootstrap.group(group)
	.channel(NioSocketChannel.class)
	.remoteAddress(new InetSocketAddress(host, port))
	.option(ChannelOption.TCP_NODELAY,true)
	.handler(new ChannelInitializer<SocketChannel>() {
		@Override
		protected void initChannel(SocketChannel ch) throws Exception {
			ch.pipeline().addLast(new EchoClientHandler());
		}
	});

  

  EchoClientHandler

public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    private final static Logger logger = LoggerFactory.getLogger(EchoServerHandler.class);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {

        logger.info("Client received: " + msg.toString(CharsetUtil.UTF_8));

    }


    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        logger.info("Active");
        ctx.writeAndFlush(Unpooled.copiedBuffer("echo test...",CharsetUtil.UTF_8));
    }


    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        logger.info("EchoClientHandler channelReadComplete");
        ctx.close();
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

  

执行结果如下:

两个OutboundHandler并没有执行,很奇怪,翻看源码,发现是InboundHandler的使用不对; 

pipeline上添加handler最终会往执行下面的流程;

io.netty.channel.DefaultChannelPipeline#addLast(io.netty.util.concurrent.EventExecutorGroup, java.lang.String, io.netty.channel.ChannelHandler)

io.netty.channel.DefaultChannelPipeline#addLast0

最终pipeline上的实例如下:

如下图

 

  • 使用pipeline或channel发送数据

 将上面的InboundHandler2的channelRead方法中的发送数据方式改成如下方式:

ctx.pipeline().writeAndFlush(Unpooled.copiedBuffer("|InboundHandler2 "+data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8));
ctx.channel().writeAndFlush(Unpooled.copiedBuffer("|InboundHandler2 "+data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8));

  

当在InboundHandler执行write方法,执行流程如下;

调用链路如下:

io.netty.channel.AbstractChannel#writeAndFlush(java.lang.Object)

 ->io.netty.channel.DefaultChannelPipeline#writeAndFlush(java.lang.Object)

  ->io.netty.channel.AbstractChannelHandlerContext#writeAndFlush(java.lang.Object)

   ->io.netty.channel.AbstractChannelHandlerContext#writeAndFlush(java.lang.Object, io.netty.channel.ChannelPromise)

    ->io.netty.channel.AbstractChannelHandlerContext#findContextOutbound

  

io.netty.channel.DefaultChannelPipeline#writeAndFlush(java.lang.Object)

findContextOutbound会从tail节点往前找OutboundHandler,如下图;

最终的执行结果如下:

服务端:

客户端:

  • 使用ctx发送数据的方式

而InboundHandler2的channelRead方法的发送数据是下面这种方式:

ctx.writeAndFlush(xxx);

与pipeline或channel的发送不同的是在执行 io.netty.channel.AbstractChannelHandlerContext#writeAndFlush(java.lang.Object, io.netty.channel.ChannelPromise) 前会执行io.netty.channel.AbstractChannelHandlerContext#writeAndFlush(java.lang.Object);

此时ctx为InboundHandler2;

 

此时会从ctx往前找OutboundHandler,从pipeline上看ctx往前找是没有的,如下图;

 如果不想改变InboundHandler2的channelRead方法,可以将服务端的handler添加顺序更改就可以了,如下;

serverBootstrap.group(bossGroup, workGroup)
    .channel(NioServerSocketChannel.class)
    .option(ChannelOption.SO_BACKLOG,1024)
    .childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
		    ch.pipeline().addLast(new OutboundHandler1());
            ch.pipeline().addLast(new OutboundHandler2());
            ch.pipeline().addLast(new InboundHandler1());
            ch.pipeline().addLast(new InboundHandler2());
        }
    });

执行效果如下:

服务端

客户端:

最终handler间的事件传播方向,也就是执行方向,InboundHandler之间传递数据,通过ctx.fireChannelRead(msg)InboundHandler顺序执行,OutboundHandler逆序执行,如下;

传播的数据也会像包装一样,InboundHandler2会将InboundHandler1的数据打包,传递到OutboundHandler2,再打包,在传递到OutboundHandler1;感觉这个有点像StringBuilder的append方法;

 
原文地址:https://www.cnblogs.com/coder-zyc/p/14404454.html