Netty源码—二、server启动(2)

我们在使用Netty的时候的初始化代码一般如下

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
    // 配置服务器的NIO线程组
    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .option(ChannelOption.SO_BACKLOG, 1024)
        .childHandler(new ChildChannelHandler());

    // 绑定端口,同步等待成功
    ChannelFuture f = b.bind(port).sync();
    // 等待服务端监听端口关闭
    f.channel().closeFuture().sync();
} finally {
    // 优雅退出,释放线程池资源
    bossGroup.shutdownGracefully();
    workerGroup.shutdownGracefully();
}

前面已经说过线程池的启动过程,接下来就是通过builder模式构造启动参数,接下来看看bind的过程。channel的注册和ip:port的绑定都是在bind方法中进行的,bind方法的主要逻辑是

  1. 初始化channel
  2. channel注册到selector

NioServerSocketChannel

先看看channel的初始化,server端使用的NioServerSocketChannel封装了JDK的ServerSocketChannel,初始化过程如下:

// 配置使用的channel的时候会指定对应的channelFactory
public B channel(Class<? extends C> channelClass) {
    if (channelClass == null) {
        throw new NullPointerException("channelClass");
    }
    return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        // channelFactory是ReflectiveChannelFactory
        channel = channelFactory.newChannel();
        init(channel);
    } catch (Throwable t) {
        if (channel != null) {
            // channel can be null if newChannel crashed (eg SocketException("too many open files"))
            channel.unsafe().closeForcibly();
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
    }

    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }

    // If we are here and the promise is not failed, it's one of the following cases:
    // 1) If we attempted registration from the event loop, the registration has been completed at this point.
    //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
    // 2) If we attempted registration from the other thread, the registration request has been successfully
    //    added to the event loop's task queue for later execution.
    //    i.e. It's safe to attempt bind() or connect() now:
    //         because bind() or connect() will be executed *after* the scheduled registration task is executed
    //         because register(), bind(), and connect() are all bound to the same thread.

    return regFuture;
}

上面使用的是io.netty.channel.ReflectiveChannelFactory#newChannel来创建channel,利用反射创建实例,使用的是NioServerSocketChannel的无参构造方法,在午无参造方法中调用newChannel

// 创建serverChannel的时候先调用newSocket,然后调用下面的构造方法
public NioServerSocketChannel(ServerSocketChannel channel) {
    // 设置当前socket监听的事件,由于是server一定要添加accept事件
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
// io.netty.channel.socket.nio.NioServerSocketChannel#newSocket
private static ServerSocketChannel newSocket(SelectorProvider provider) {
    try {
        /**
             *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
             *  {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
             *
             *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
             */
        return provider.openServerSocketChannel();
    } catch (IOException e) {
        throw new ChannelException(
            "Failed to open a server socket.", e);
    }
}

ServerSocketChannelImpl(SelectorProvider sp) throws IOException {
    super(sp);
    // 创建一个socket,返回的是socket对应的文件描述符
    this.fd =  Net.serverSocket(true);
    this.fdVal = IOUtil.fdVal(fd);
    this.state = ST_INUSE;
}

// sun.nio.ch.Net#serverSocket
static FileDescriptor serverSocket(boolean stream) {
    // socket0是一个native方法,返回的是int类型的linux的文件描述符,使用newFD转化为Java的文件描述符
    return IOUtil.newFD(socket0(isIPv6Available(), stream, true));
}

// jdk/src/solaris/native/sun/nio/ch/Net.c
JNIEXPORT int JNICALL
Java_sun_nio_ch_Net_socket0(JNIEnv *env, jclass cl, jboolean preferIPv6,
                            jboolean stream, jboolean reuse)
{
	// 省略中间代码...
    // 调用socket方法创建一个socket,并返回对应的文件描述符
    fd = socket(domain, type, 0);
    if (fd < 0) {
        return handleSocketError(env, errno);
    }

	// 省略中间代码...
    return fd;
}

不难看出channel初始化的过程就是创建了一个socket,接下来看看channel的注册

// config()返回的是ServerBootstrapConfig
// group()返回的是parentGroup,对应开始的例子是bossGroup,也就是NioEventLoopGroup
// 所以是调用的是NioEventLoopGroup.register,该方法继承自MultithreadEventLoopGroup
ChannelFuture regFuture = config().group().register(channel);

// io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel)
public ChannelFuture register(Channel channel) {
    // 使用的是bossGroup,next方法选出第一个NioEventLoop,调用NioEventLoop.register,该方法继承自SingleThreadEventLoop
    return next().register(channel);
}

// io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel)
public ChannelFuture register(Channel channel) {
    // 注册的还是使用一个promise,可以异步注册
    return register(new DefaultChannelPromise(channel, this));
}

// io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.ChannelPromise)
public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    // channel返回的是NioServerSocketChannel
    // unsafe返回的是io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe
    // 所以调用的是NioMessageUnsafe.register,该方法继承自AbstractUnsafe
    promise.channel().unsafe().register(this, promise);
    return promise;
}

// io.netty.channel.AbstractChannel.AbstractUnsafe#register
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    // 省略中间代码...
	// 当前线程是main线程,eventLoop是bossGroup中的一个线程,所以这里返回false,会在新线程中执行register0
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            // 在eventLoop中执行
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
		// 省略中间代码...
        }
    }
}

private void register0(ChannelPromise promise) {
    try {
		// 省略中间代码...
        // 这里面主要是调用ServerSocketChannelImpl.register,注册的过程中主要是将需要监听的文件描述符添加到EPollArrayWrapper中
        doRegister();
        neverRegistered = false;
        registered = true;

        // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
        // user may already fire events through the pipeline in the ChannelFutureListener.
        pipeline.invokeHandlerAddedIfNeeded();

        safeSetSuccess(promise);
        pipeline.fireChannelRegistered();
        // Only fire a channelActive if the channel has never been registered. This prevents firing
        // multiple channel actives if the channel is deregistered and re-registered.
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                // This channel was registered before and autoRead() is set. This means we need to begin read
                // again so that we process inbound data.
                //
                // See https://github.com/netty/netty/issues/4805
                beginRead();
            }
        }
    } catch (Throwable t) {
        // 省略中间代码...
    }
}

下面看看channel注册过程中做了哪些事情

// sun.nio.ch.SelectorImpl#register
// 这里ch是ServerSocketChannelImpl
// attachment是NioServerSocketChannel
// ops是0,这里并不注册需要监听的事件
// selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
protected final SelectionKey register(AbstractSelectableChannel ch,
                                      int ops,
                                      Object attachment)
{
    if (!(ch instanceof SelChImpl))
        throw new IllegalSelectorException();
    // 创建一个SelectionKeyImpl,
    SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
    k.attach(attachment);
    synchronized (publicKeys) {
        // 调用sun.nio.ch.EPollSelectorImpl#implRegister
        implRegister(k);
    }
    // 设置当前channel关注的事件
    k.interestOps(ops);
    return k;
}

protected void implRegister(SelectionKeyImpl ski) {
    if (closed)
        throw new ClosedSelectorException();
    SelChImpl ch = ski.channel;
    int fd = Integer.valueOf(ch.getFDVal());
    fdToKey.put(fd, ski);
    // poolWrapper是epoll监听事件所需数据结构的java版本
    // add方法调用setUpdateEvents来指定当前socket监听的事件
    pollWrapper.add(fd);
    keys.add(ski);
}

/**
 * struct epoll_event {
 *     __uint32_t events;
 *     epoll_data_t data;
 * };
 * 由于一开始并不知道会监听多少个socket,所以jdk默认指定了MAX_UPDATE_ARRAY_SIZE
 * 如果小于MAX_UPDATE_ARRAY_SIZE则使用数组eventsLow存储每个socket监听的事件,eventsLow的下标就是socket对应的文件描述符
 * 如果大于等于MAX_UPDATE_ARRAY_SIZE个则使用EPollArrayWrapper#eventsHigh,也就是一个map来保存每个socket监听的事件
 * 
 * 注意这个时候调用setUpdateEvents的events参数是0,也就是还没有执行监听的事件类型
 */
private void setUpdateEvents(int fd, byte events, boolean force) {
    if (fd < MAX_UPDATE_ARRAY_SIZE) {
        if ((eventsLow[fd] != KILLED) || force) {
            eventsLow[fd] = events;
        }
    } else {
        Integer key = Integer.valueOf(fd);
        if (!isEventsHighKilled(key) || force) {
            eventsHigh.put(key, Byte.valueOf(events));
        }
    }
}

需要注意的时候上面并没有设置当前channel监听的事件,真正设置监听的事件类型是在beginRead方法里面,在当前channel被激活的时候会调用beginRead方法

// io.netty.channel.nio.AbstractNioChannel#doBeginRead
protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;

    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        // readInterestOp是16,在NioServerSocketChannel构造方法里面指定了这个channel需要监听accept事件
        // 这里才是真正设置socket监听事件的地方
        // 下面这个方法最后会调用到sun.nio.ch.EPollArrayWrapper#setInterest
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

// sun.nio.ch.EPollArrayWrapper#setInterest
void setInterest(int fd, int mask) {
    synchronized (updateLock) {
        // record the file descriptor and events
        int oldCapacity = updateDescriptors.length;
        if (updateCount == oldCapacity) {
            int newCapacity = oldCapacity + INITIAL_PENDING_UPDATE_SIZE;
            int[] newDescriptors = new int[newCapacity];
            System.arraycopy(updateDescriptors, 0, newDescriptors, 0, oldCapacity);
            updateDescriptors = newDescriptors;
        }
        updateDescriptors[updateCount++] = fd;

        // events are stored as bytes for efficiency reasons
        byte b = (byte)mask;
        assert (b == mask) && (b != KILLED);
        // 上面已经说过这个方法了,把当前socket对应的文件描述符监听的事件设置为b
        setUpdateEvents(fd, b, false);
    }
}

到这里一个serverSocketChannel注册成功了,而且也设置了关注的事件,接下来看看完成ip:port的绑定

public ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException {
    synchronized (lock) {
        // 省略中间代码...
        // 调用native方法的bind,最后调用linux的bind方法
        Net.bind(fd, isa.getAddress(), isa.getPort());
        // 最后调用listen方法完成监听serverSocket的文件描述符
        Net.listen(fd, backlog < 1 ? 50 : backlog);
        synchronized (stateLock) {
            localAddress = Net.localAddress(fd);
        }
    }
    return this;
}

总结

server在bind的过程中主要初始化了NioServerSocketChannel,并将channel注册到selector,添加了channel需要监听的事件,接下来该socketChannel就可以监听端口接受来自客户端的请求了。

原文地址:https://www.cnblogs.com/sunshine-2015/p/9357760.html