04Channel 与 ChannelPipeline

Channel 与 ChannelPipeline
相信大家都知道了, 在 Netty 中每个 Channel 都有且仅有一个 ChannelPipeline 与之对应, 它们的组成关系如下:
head---->handler----->tail
通过上图我们可以看到, 一个 Channel 包含了一个 ChannelPipeline,
而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表.
这个链表的头是 HeadContext, 链表的尾是 TailContext, 并且每个 ChannelHandlerContext 中又关联着一个 ChannelHandler.
上面的图示给了我们一个对 ChannelPipeline 的直观认识, 但是实际上 Netty 实现的 Channel 是否真的是这样的呢? 我们继续用源码说话.


在第一章 Netty 源码分析之 一 揭开 Bootstrap 神秘的红盖头 中, 我们已经知道了一个 Channel 的初始化的基本过程,
下面我们再回顾一下.
下面的代码是 AbstractChannel 构造器:
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}

在newChannelPipeline函数中把this也就是NioSocketChannel传过去
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}

在channelPipeline里面持有一个channel对象
bstractChannel 有一个 pipeline 字段, 在构造器中会初始化它为 DefaultChannelPipeline的实例. 这里的代码就印证了一点: 每个 Channel 都有一个 ChannelPipeline.
接着我们跟踪一下 DefaultChannelPipeline 的初始化过程.
首先进入到 DefaultChannelPipeline 构造器中:

protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);

tail = new TailContext(this);
head = new HeadContext(this);

head.next = tail;
tail.prev = head;
}

在 DefaultChannelPipeline 构造器中, 首先将与之关联的 Channel 保存到字段 channel 中, 然后实例化两个 ChannelHandlerContext,
一个是 HeadContext 实例 head, 另一个是 TailContext 实例 tail. 接着将 head 和 tail 互相指向, 构成一个双向链表.
特别注意到, 我们在开始的示意图中, head 和 tail 并没有包含 ChannelHandler,
这是因为 HeadContext 和 TailContext 继承于 AbstractChannelHandlerContext 的同时也实现了 ChannelHandler 接口了,
因此它们有 Context 和 Handler 的双重属性.

headContext和tailContext是ChannelPipeline的内部类
headContext和tailContext不仅都继承了AbstractChannelHandlerContext还都实现了ChannelHandler接口
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler

ChannelInBoundHandler或者ChannelOutboundHandler就是继承了ChannelHandler接口
public interface ChannelInboundHandler extends ChannelHandler
public interface ChannelOutboundHandler extends ChannelHandler

接着看一下 HeadContext 的构造器:
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
接着调用父类AbstractChannelHandlerContext的构造函数
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
boolean inbound, boolean outbound) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
this.inbound = inbound;
this.outbound = outbound;
// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
ordered = executor == null || executor instanceof OrderedEventExecutor;
}

ChannelInitializer 的添加
上面一小节中, 我们已经分析了 Channel 的组成, 其中我们了解到,
最开始的时候 ChannelPipeline 中含有两个 ChannelHandlerContext(同时也是 ChannelHandler),
但是这个 Pipeline并不能实现什么特殊的功能, 因为我们还没有给它添加自定义的 ChannelHandler.

通常来说, 我们在初始化 Bootstrap, 会添加我们自定义的 ChannelHandler, 就以我们熟悉的 EchoClient 来举例吧:
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new EchoClientHandler());
}
});

上面代码的初始化过程, 相信大家都不陌生. 在调用 handler 时, 传入了 ChannelInitializer 对象,
它提供了一个 initChannel 方法供我们初始化 ChannelHandler. 那么这个初始化过程是怎样的呢? 下面我们就来揭开它的神秘面纱.

ChannelInitializer 实现了 ChannelHandler,
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter

public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler

那么它是在什么时候添加到 ChannelPipeline 中的呢?
进行了一番搜索后, 我们发现它是在 Bootstrap.init 方法中添加到 ChannelPipeline 中的.
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}

ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}

return regFuture;
}

init方法里面传入的是NioSocketChannel
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(config.handler());

final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}

final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
}
}

从NioSocketChannel里面拿到pipeline
ChannelPipeline p = channel.pipeline();
p.addLast(config.handler());

config.handler()返回的就是bootstrap.handler()其实就是 ChannelInitializer

config是bootstrap的成员变量,而且吧bootstrap传给他做参数
private final BootstrapConfig config = new BootstrapConfig(this);

addLast方法要一步一步的跟踪,进入DefaultChannelPipeline
public final ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
}

在进入重载函数
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}

for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
addLast(executor, null, h);
}

return this;
}

在进入重载函数

newContext是AbstractChannelHandlerContext对象,把handler也就是ChannelInitializer作为参数传入进去
在newCtx的时候把handler也就是ChannelInitializer封装在AbstractChannelHandlerContext里面

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);

newCtx = newContext(group, filterName(name, handler), handler);

addLast0(newCtx);

// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}

EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}

在 addLast0函数中进行添加
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}

有朋友可能就有疑惑了, 我明明插入的是一个 ChannelInitializer 实例,
为什么在 ChannelPipeline 中的双向链表中的元素却是一个 ChannelHandlerContext? 为了解答这个问题, 我们继续在代码中寻找答案吧.
我们刚才提到, 在 Bootstrap.init 中会调用 p.addLast() 方法, 将 ChannelInitializer 插入到链表末端,
而 ChannelInitializer是封装在AbstractChannelHandlerContext里面的

可以清楚地看到, ChannelInitializer 仅仅实现了 ChannelInboundHandler 接口,
因此这里实例化的 DefaultChannelHandlerContext 的 inbound = true, outbound = false.
不就是 inbound 和 outbound 两个字段嘛, 为什么需要这么大费周章地分析一番? 其实这两个字段关系到 pipeline 的事件的流向与分类,
因此是十分关键的, 不过我在这里先卖个关子, 后面我们再来详细分析这两个字段所起的作用. 在这里, 读者只需要记住,
ChannelInitializer 所对应的 DefaultChannelHandlerContext 的 inbound = true, outbound = false 即可.

当创建好 Context 后, 就将这个 Context 插入到 Pipeline 的双向链表中

------------------------------------------------------------------

自定义 ChannelHandler 的添加过程

我们已经分析了一个 ChannelInitializer 如何插入到 Pipeline 中的, 接下来就来探讨一下 ChannelInitializer 在哪里被调用,
ChannelInitializer 的作用, 以及我们自定义的 ChannelHandler 是如何插入到 Pipeline 中的.

首先在 AbstractBootstrap.initAndRegister中, 通过 group().register(channel), 调用 MultithreadEventLoopGroup.register 方法
MultithreadEventLoopGroup是NioEventloopGroup的父类

public ChannelFuture register(Channel channel) {
return next().register(channel);
}

next返回的是NioeventLoop,NioEventLoop是SingleThreadEventLoop, regiser方法在SingleThreadEventLoop里面


在MultithreadEventLoopGroup.register 中, 通过 next() 获取一个可用的 SingleThreadEventLoop, 然后调用它的 register

在 SingleThreadEventLoop.register 中, 通过 channel.unsafe().register(this, promise) 来获取 channel 的 unsafe() 底层操作对象,
然后调用它的 register.
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}

public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
promise.channel().unsafe().register(this, promise);
unsafe()是在NioSocketChannel里面创建出来的NioSocketChannelUnsafe


register方法是AbstractUnsafe, AbstractUnsafe是NioSockketChannelUnsafe的曾祖父
参数eventLoop是nioEventLoop,
promise是上面new DefaultChannelPromise 里面持有channel

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}

AbstractChannel.this.eventLoop = eventLoop;

if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}

在 AbstractUnsafe.register 方法中, 调用 register0 方法注册 Channel

在 AbstractUnsafe.register0 中, 调用 AbstractNioChannel#doRegister 方法

AbstractNioChannel.doRegister 方法通过 javaChannel().register(eventLoop().selector, 0, this) 将 Channel 对应的 Java NIO SockerChannel 注册到一个 eventLoop 的 Selector 中, 并且将当前 Channel 作为 attachment.

---------------------------------------------------------------------------------------------------------

而我们自定义 ChannelHandler 的添加过程, 发生在 AbstractUnsafe.register0 中,
在这个方法中调用了 pipeline.fireChannelRegistered() 方法, 其实现如下:

private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;

// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();

safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}


调用的是DefaultChannelPipeline
@Override
public final ChannelPipeline fireChannelRegistered() {
AbstractChannelHandlerContext.invokeChannelRegistered(head);
return this;
}

static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
}

next.executor()返回的是channel里面的eventLoop,

@Override
public EventExecutor executor() {
if (executor == null) {
return channel().eventLoop();
} else {
return executor;
}
}

public Channel channel() {
return pipeline.channel();
}

eventLoop是在AbstractUnsafe.register函数里面赋值给AbstractChannel。this.eventLoop对象的
AbstractUnsafe是AbstractChannel的内部类可以调用外部类

AbstractUnsafe.register(EventLoop eventLoop, ChannelPromise promise)

还记得 head 的 类层次结构图不, head 是一个 AbstractChannelHandlerContext 实例,
并且它没有重写 fireChannelRegistered 方法,
因此 head.fireChannelRegistered 其实是调用的 AbstractChannelHandlerContext.fireChannelRegistered:

上面的代码很简单, 就是调用了 head.fireChannelRegistered() 方法而已.

关于上面代码的 head.fireXXX 的调用形式, 是 Netty 中 Pipeline 传递事件的常用方式, 我们以后会经常看到.

还记得 head 的 类层次结构图不, head 是一个 AbstractChannelHandlerContext 实例,
并且它没有重写 fireChannelRegistered 方法,
因此 head.fireChannelRegistered 其实是调用的 AbstractChannelHandlerContext.fireChannelRegistered:

static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
}

executor返回的就是channel里面的NioEventLoop
public EventExecutor executor() {
if (executor == null) {
return channel().eventLoop();
} else {
return executor;
}
}

我们已经强调过了, 每个 ChannelHandler 都与一个 ChannelHandlerContext 关联,
我们可以通过 ChannelHandlerContext 获取到对应的 ChannelHandler.
因此很显然了, 这里 handler() 返回的, 其实就是 head 对象,
并接着调用了 head.channelRegistered 方法.

private void invokeChannelRegistered() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRegistered();
}
}


ctx是head对象
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
invokeHandlerAddedIfNeeded();
ctx.fireChannelRegistered();
}
调用head的 fireChannelRegistered 函数
public ChannelHandlerContext fireChannelRegistered() {
invokeChannelRegistered(findContextInbound());
return this;
}

很显然, 这个代码会从 head 开始遍历 Pipeline 的双向链表,
然后找到第一个属性 inbound 为 true 的 ChannelHandlerContext 实例.
想起来了没? 我们在前面分析 ChannelInitializer 时, 花了大量的笔墨来分析了 inbound 和 outbound 属性,
你看现在这里就用上了. 回想一下, ChannelInitializer 实现了 ChannelInboudHandler,
因此它所对应的 ChannelHandlerContext 的 inbound 属性就是 true,
因此这里返回就是 ChannelInitializer 实例所对应的 ChannelHandlerContext. 即:

返回的就是DefaultChannelHandlerContext
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}

接着调用DefaultChannelHandlerContext的 invokeChannelRegistered, handler 返回的就是ChannelInitializer
private void invokeChannelRegistered() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRegistered();
}
}

调用 ChannelInitializer 的channelRegistered 函数
参数 ctx 是 this也就是 DefaultChannelHandlerContext
@Override
@SuppressWarnings("unchecked")
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
// Normally this method will never be called as handlerAdded(...) should call initChannel(...) and remove
// the handler.
if (initChannel(ctx)) {
// we called initChannel(...) so we need to call now pipeline.fireChannelRegistered() to ensure we not
// miss an event.
ctx.pipeline().fireChannelRegistered();
} else {
// Called initChannel(...) before which is the expected behavior, so just forward the event.
ctx.fireChannelRegistered();
}
}

@SuppressWarnings("unchecked")
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
try {
initChannel((C) ctx.channel());
} catch (Throwable cause) {
// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
// We do so to prevent multiple calls to initChannel(...).
exceptionCaught(ctx, cause);
} finally {
remove(ctx);
}
return true;
}
return false;
}

initChannel 这个方法我们很熟悉了吧, 它就是我们在初始化 Bootstrap 时, 调用 handler 方法传入的匿名内部类所实现的方法:
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new EchoClientHandler());
}
});

因此当调用了这个方法后, 我们自定义的 ChannelHandler 就插入到 Pipeline 了,

原文地址:https://www.cnblogs.com/handsome1013/p/10038337.html