Netty核心之ChannelHandler-netty学习笔记(6)-20210804

ChannelHandler 的生命周期

handlerAdded(): 添加到 ChannelPipeline 时调用.

handlerRemoved(): 从 ChannelPipeline 中移除时调用.

exceptionCaught(): 处理过程中在 ChannelPipeline 中有错误产生时调用.

处理 I/O 事件或截获 I/O 操作, 并将其转发到 ChannelPipeline 中的下一个处理程序. ChannelHandler 本身不提供许多方法, 但通常必须实现其子类型之一:

    • ChannelInboundHandler: 处理入站数据以及各种状态变化.
    • ChannelOutboundHandler: 处理出站数据并且允许拦截所有的操作.

Netty 提供了一个简单的 ChannelHandler 框架实现,给所有声明方法签名。这个类 ChannelHandlerAdapter 的方法,主要推送事件 到 pipeline 下个 ChannelHandler 直到 pipeline 的结束。这个类 也作为 ChannelInboundHandlerAdapter 和ChannelOutboundHandlerAdapter 的基础。所有三个适配器类的目的是作为自己的实现的起点;您可以扩展它们,覆盖你需要自定义的方法。

ChannelInboundHandler接口

以下方法将会在数据被接收时或者 与其对应的Channel状态发生改变时被调用

/**
 * {@link ChannelHandler} which adds callbacks for state changes. This allows the user
 * to hook in to state changes easily.
 */
public interface ChannelInboundHandler extends ChannelHandler {
 
    /**
     * 当Channel已经注册到它的EventLoop并且能够处理I/O时被调用
     */
    void channelRegistered(ChannelHandlerContext ctx) throws Exception;
 
    /**
     * 当Channel从它的EventLoop注销并且无法处理任何I/O时被调用
     */
    void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
 
    /**
     * 当Channel处于活动状态时被调用;Channel已经连接/绑定并且已经就绪
     */
    void channelActive(ChannelHandlerContext ctx) throws Exception;
 
    /**
     * 当Channel离开活动状态并且不再连接它的远程节点时被调用
     */
    void channelInactive(ChannelHandlerContext ctx) throws Exception;
 
    /**
     * 当从Channel读取数据时被调用
     */
    void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
 
    /**
     * 当Channel上的一个读操作完成时被调用
     */
    void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
 
    /**
     * 当ChannelnboundHandler.fireUserEventTriggered()方法被调用时被调用,因为一个POJO被传经了ChannelPipeline
     */
    void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
 
    /**
     * 当Channel的可写状态发生改变时被调用。用户可以确保写操作不会完成得太快(以避免发生OutOfMemoryError)或者可以在Channel变为再次可写时恢复写入。可以通过调用Channel的isWritable()方法来检测Channel的可写性。与可写性相关的阈值可以通过Channel.config().setWriteHighWaterMark()和Channel.config().setWriteLowWater-Mark()方法来设置
     */
    void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
 
    /**
     * 如果抛出一个可抛出的异常对象,则调用。
     */
    @Override
    @SuppressWarnings("deprecation")
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}

ChannelOutboundHandler接口

出站操作和数据将由ChannelOutboundHandler处理。它的方法将被Channel、ChannelPipeline以及ChannelHandlerContext调用。ChannelOutboundHandler的一个强大的功能是可以按需推迟操作或者事件,这使得可以通过一些复杂的方法来处理请求。例如,如果到远程节点的写入被暂停了,那么你可以推迟冲刷操作并在稍后继续。

public interface ChannelOutboundHandler extends ChannelHandler {
    /**
     * 当请求将Channel绑定到本地地址时被调用
     */
    void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
 
    /**
     * 当请求将Channel连接到远程节点时被调用
     */
    void connect(
            ChannelHandlerContext ctx, SocketAddress remoteAddress,
            SocketAddress localAddress, ChannelPromise promise) throws Exception;
 
    /**
     * 当请求将Channel从远程节点断开时被调用
     */
    void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
 
    /**
     * 当请求关闭Channel时被调用
     */
    void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
 
    /**
     * 当请求将Channel从它的EventLoop注销时被调用
     */
    void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
 
    /**
     * 当请求从Channel读取更多的数据时被调用
     */
    void read(ChannelHandlerContext ctx) throws Exception;
 
    /**
     * 当请求通过Channel将数据写到远程节点时被调用
     */
    void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
 
    /**
     * 当请求通过Channel将入队数据冲刷到远程节点时被调用
     */
    void flush(ChannelHandlerContext ctx) throws Exception;
}

ChannelPromise 和 ChannelFuture

ChannelFuture 表示 Channel 中异步I/O操作的结果, 在 netty 中所有的 I/O 操作都是异步的, I/O 的调用会直接返回, 可以通过 ChannelFuture 来获取 I/O 操作的结果或者状态信息.

当 I/O 操作开始时, 将创建一个新对象. 新的对象是未完成的-它既没有成功, 也没有失败, 也没有被取消, 因为 I/O 操作还没有完成.

如果 I/O 操作已成功完成(失败或取消), 则对象将标记为已完成, 其中包含更具体的信息, 例如故障原因.

请注意, 即使失败和取消属于已完成状态.

ChannelPromise 是 ChannelFuture 的一个子接口, 其定义了一些可写的方法, 如 setSuccess() 和 setFailure(), 从而使 ChannelFuture 不可变.

优先使用addListener(GenericFutureListener),而非await()

当做了一个 I/O 操作并有任何后续任务的时候, 推荐优先使用 addListener(GenericFutureListener) 的方式来获得通知, 而非 await()

addListener(GenericFutureListener) 是非阻塞的. 它会把特定的 ChannelFutureListener 添加到 ChannelFuture 中, 然后 I/O 线程会在 I/O 操作相关的 future 完成的时候通知监听器.

ChannelFutureListener 会利于最佳的性能和资源的利用, 因为它一点阻塞都没有. 而且不会造成死锁.

ChannelHandler适配器

ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter类作为自己的ChannelHandler的适配器类。这两个适配器分别提供了ChannelInboundHandler和ChannelOutboundHandler的基本实现。通过扩展抽象类ChannelHandlerAdapter,它们获得了它们共同的超接口ChannelHandler的方法。生成的类的层次结构如:

ChannelHandlerAdapter还提供了实用方法isSharable()。如果其对应的实现被标注为Sharable,那么 这个方法将返回true,表示它可以被添加到多个ChannelPipeline中。(@Sharable注解)

在ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter中所提供的方法体调用了其相关联的ChannelHandlerContext上的等效方法,从而将事件转发到了ChannelPipeline中的下一个ChannelHandler中。

资源管理

为了诊断潜在的(资源泄漏)问题,Netty提供了class ResourceLeakDetector它将对你应用程序的缓冲区分配做大约1%的采样来检测内存泄露。相关的开销是非常小的。

Netty目前定义了4种泄漏检测级别:

级别

描述

DISABLED

禁用泄漏检测。只有在详尽的测试之后才应使用

SIMPLE

使用1%的默认采样率检测并报告任何发现的泄露。这是默认级别,适合绝大部分情况。

ADVANCED

使用默认的采样率,报告所发现的任何的泄露以及对应的消息被访问的位置

PARANOID

类似于ADVANCED,但是其将会对每次(对消息的)访问都进行采样。这对性能将会有很大影响,在调试阶段使用

泄露检测级别可以通过将下面的Java系统属性设置为表中的一个值来定义:

java-Dio.netty.leakDetectionLevel=ADVANCED

如果一个消息被消费或者丢弃了,并且没有传递给ChannelPipeline中的下一个ChannelOutboundHandler,那么用户就有责任调用ReferenceCountUtil.release()。如果消息到达了实际的传输层,那么当它被写入时或者Channel关闭时,都将被自动释放。

channelRead()和channelReadComplete() 方法的区别:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ByteBuf in = (ByteBuf) msg;
    //将消息记录到控制台
    System.out.println( "Server received: " + in.toString(CharsetUtil.UTF_8) + ",size is " + in.toString(CharsetUtil.UTF_8).length());
    ctx.write(in);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
    System.out.println("channelReadComplete");
    ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}

channelRead表示接收消息,可以看到msg转换成了ByteBuf,然后打印,也就是把Client传过来的消息打印了一下,你会发现每次打印完后,channelReadComplete也会调用,如果你试着传一个超长的字符串过来,超过1024个字母长度,你会发现channelRead会调用多次,而channelReadComplete只调用一次。

所以这就比较清晰了吧,因为ByteBuf是有长度限制的,所以超长了,就会多次读取,也就是调用多次channelRead,而channelReadComplete则是每条消息只会调用一次,无论你多长,分多少次读取,只在该条消息最后一次读取完成的时候调用,所以这段代码把关闭Channel的操作放在channelReadComplete里,放到channelRead里可能消息太长了,结果第一次读完就关掉连接了,后面的消息全丢了。

ChannelPipeline接口

ChannelPipeline是一个拦截流经Channel的入站和出站事件的Channel-Handler实例链,那么就很容易看出这些ChannelHandler之间的交互是组成一个应用程序数据和事件处理逻辑的核心。

每一个新创建的Channel都将会被分配一个新的ChannelPipeline。这项关联是永久性的;Channel既不能附加另外一个ChannelPipeline,也不能分离其当前的。在Netty组件的生命周期中,这是一项固定的操作,不需要开发人员的任何干预。

根据事件的起源,事件将会被ChannelInboundHandler或者ChannelOutboundHandler处理。随后,通过调用ChannelHandlerContext实现,它将被转发给同一超类型的下一个ChannelHandler。

ChannelHandlerContext:

ChannelHandlerContext使得ChannelHandler能够和它的ChannelPipeline以及其他的ChannelHandler交互。ChannelHandler可以通知其所属的ChannelPipeline中的下一个ChannelHandler,甚至可以动态修改它所属的ChannelPipeline。 ChannelHandlerContext具有丰富的用于处理事件和执行I/O 操作的API。

从事件途径 ChannelPilpeline 的角度来看, ChannelPipeline 的头部和尾端取决于该事件是入站的还是出站的.

而 Netty 总是将 ChannelPilpeline 的入站口 (左侧) 作为头部, 将出站口 (右侧) 作为尾端.

当通过调用 ChannelPilpeline.add*() 方法将入站处理器和出站处理器混合添加到 ChannelPilpeline 之后, 每一个 ChannelHandler 从头部到尾端的顺序就是我们添加的顺序.

在 ChannelPilpeline 传播事件时, 它会测试 ChannelPilpeline 中的下一个 ChannelHandler 的类型是否和事件的运动方向相匹配. 如果不匹配, ChannelPilpeline 将跳过该 ChannelHandler 并前进到下一个, 直到它找到和该事件期望的方向相匹配的为止.

修改 ChannelPipeline

这里指修改 ChannelPipeline 中的 ChannelHandler 的编排.

通过调用 ChannelPipeline 上的相关方法, ChannelHandler 可以添加, 删除或者替换其他的 ChannelHandler, 从而实时地修改 ChannelPipeline 的布局.

addFirst  // 将 ChannelHandler 插入第一个位置
addBefore // 在某个 ChannelHandler 之前添加一个
addAfter  // 在某个 ChannelHandler 之后添加一个
addLast   // 将 ChannelHandler 插入最后一个位置
remove    // 移除某个 ChannelHandler
replace   // 将某个 ChannelHandler 替换成指定 ChannelHandler

ChannelHandlerContext 接口

ChannelHandlerContext 代表了 ChanelHandler 和 ChannelPipeline 之间的关联, 每当有 ChanelHandler 添加到 ChannelPipeline 中, 都会创建 ChannelHandlerContext.

ChannelHandlerContext 的主要功能是管理它所关联的 ChannelPipeline 和同一个 ChannelPipeline 中的其他 ChanelHandler 之间的交互.

ChannelHandlerContext 有很多的方法, 其中一些方法也存在于 Channel 和 ChannelPipeline 上, 但是有一点重要的不同.

如果调用 Channel 和 ChannelPipeline 上的这些方法将沿着 ChannelPipeline 进行传播(从头或尾开始).

而调用位于 ChannelHandlerContext 上的相同方法, 则将从当前所关联的 ChannelHandler 开始, 并且只会传播给位于该 ChannelPipeline 中的下一个能够处理该事件的 ChannelHandler.

这样做可以减少 ChannelHandler 的调用开销.

  • 1、Channel绑定到ChannelPipeline
  • 2、ChannelPipeline绑定到包含ChannelHandler的Channel
  • 3、ChannelHandler
  • 4、当添加ChannelHandler到ChannelPipeline时,ChannelHandlerContext被创建

Channel中信息的传递:

  • Channel或者ChannelPipeline上调用write()方法,会让整个事件在管道中进行传递。
  • ChannelHandler之间的数据传递则通过ChannelHandlerContext调用方法来实现。

  1. 事件传递给 ChannelPipeline 的第一个 ChannelHandler
  2. ChannelHandler 通过关联的 ChannelHandlerContext 传递事件给 ChannelPipeline 中的 下一个
  3. ChannelHandler 通过关联的 ChannelHandlerContext 传递事件给 ChannelPipeline 中的 下一个

想要实现从一个特定的 ChannelHandler 开始处理,你必须引用与 此ChannelHandler的前一个ChannelHandler 关联的 ChannelHandlerContext 。这个ChannelHandlerContext 将会调用与自身关联的 ChannelHandler 的下一个ChannelHandler 。

使用ChannelHandlerContext的API的时候,请牢记以下两点:

  1).ChannelHandlerContext和ChannelHandler之间的关联(绑定)是永远不会改变的,所以缓存对它的引用是安全的;
  2).相对于其他类的同名方法,ChannelHandlerContext的方法将产生更短的事件流,应该尽可能地利用这个特性来获得最大的性能。

 

被调用的Channel或ChannelPipeline上的write()方法将一直传播事件通过整个ChannelPipeline,但是在ChannelHandler的级别上,事件从一个ChannelHandler到下一个ChannelHandler的移动是由ChannelHandlerContext上的调用完成的。

要想调用从某个特定的ChannelHandler开始的处理过程,必须获取到在(ChannelPipeline)该ChannelHandler之前的ChannelHandler所关联的ChannelHandlerContext。这个ChannelHandlerContext将调用和它所关联的ChannelHandler之后的ChannelHandler。

消息将从下一个ChannelHandler开始流经ChannelPipeline,绕过了所有前面的ChannelHandler。

因为一个ChannelHandler可以从 属于多个ChannelPipeline,所以它也可以绑定到多个ChannelHandlerContext实例。 对于这种用法 指在多个ChannelPipeline中共享同一个ChannelHandler,对应的ChannelHandler必须要使用@Sharable注解标注;否则,试图将它添加到多个ChannelPipeline时将会触发异常。显而易见,为了安全地被用于多个并发的Channel(即连接),这样的ChannelHandler必须是线程安全的。

只应该在确定了你的ChannelHandler是线程安全的时才使用@Sharable注解。

在多个ChannelPipeline中安装同一个ChannelHandler的一个常见的原因是用于收集跨越多个Channel的统计信息。

异常处理

处理入站异常

如果在处理入站事件的过程中有异常被抛出,那么它将从它在ChannelInboundHandler里被触发的那一点开始流经ChannelPipeline。要想处理这种类型的入站异常,你需要在你的ChannelInboundHandler实现中重写下面的方法。

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        System.out.println("服务器异常..");
        cause.printStackTrace();
        ctx.close();
    }

因为异常将会继续按照入站方向流动(就像所有的入站事件一样),所以实现了前面所示逻辑的ChannelInboundHandler通常位于ChannelPipeline的最后。这确保了所有的入站异常都总是会被处理,无论它们可能会发生在ChannelPipeline中的什么位置。

ChannelHandler.exceptionCaught()的默认实现是简单地将当前异常转发给ChannelPipeline中的下一个ChannelHandler;

如果异常到达了ChannelPipeline的尾端,它将会被记录为未被处理;

要想定义自定义的处理逻辑,你需要重写exceptionCaught()方法。然后你需要决定是否需要将该异常传播出去。

处理出站异常

用于处理出站操作中的正常完成以及异常的选项,都基于以下的通知机制。

    每个出站操作都将返回一个ChannelFuture。注册到ChannelFuture的ChannelFutureListener将在操作完成时被通知该操作是成功了还是出错了 。

    几乎所有的ChannelOutboundHandler上的方法都会传入一个ChannelPromise的实例。作为ChannelFuture的子类,ChannelPromise也可以被分配用于异步通知的监听器。但是,ChannelPromise还具有提供立即通知的可写方法:

ChannelPromise setSuccess();
ChannelPromise setFailure(Throwable cause);

添加ChannelFutureListener只需要调用ChannelFuture实例上的addListener(ChannelFutureListener)方法,并且有两种不同的方式可以做到这一点。其中最常用的方式是,调用出站操作(如write()方法)所返回的ChannelFuture上的addListener()方法。

原文地址:https://www.cnblogs.com/sfnz/p/15098670.html