001 服务端Channel的创建

一 .入口

在我们的服务器启动的代码之中,存在如下的代码:

ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)

我们调用channel方法传入来一个NioServerSocketChannel的字节码对象.

服务端的channel正是使用这个字节码对象通过反射的方式进行创建的.

我们回到主程序的入口bind()方法.

二 .初始化Channel对象

ChannelFuture regFuture = initAndRegister();

  在bind()方法之中的第一行代码如上.

在该方法之中包含核心的逻辑分成三个:

创建Channel对象

初始化Channel对象

注册Channel对象.

我们下面分析Channel的创建.

channel = channelFactory.newChannel();

  通过ChannelFactory对象调用newChannel()方法创建得到Channel对象.

public T newChannel() {
        try {
            return clazz.getConstructor().newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }

我们发现是通过反射创建的对象.

下面我们需要分析一下Channel的默认构造函数.

    public NioServerSocketChannel() {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }

我们首先看看newSocket()方法的实现.

    public NioServerSocketChannel() {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }

实际上直接调用jdk的方法创建一个ServerSocketChannel对象.

public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        // 创建NioServerSocketChannelConfig 与当前的对象进行绑定,后面都使用这个配置对象进行Channel的配置
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

在该构造函数之中包含两个逻辑,一个调用父类的构造函数完成构造,另外一个就是创建conifg对象,保存我们对服务端channel的配置信息.

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        // 保存jdk的Channel
        this.ch = ch;
        // 保存感兴趣的事件
        this.readInterestOp = readInterestOp;
        try {
            // 配置Channel为非阻塞的模式
            ch.configureBlocking(false);
        } catch (IOException e) {
            try {
                ch.close();
            } catch (IOException e2) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "Failed to close a partially initialized socket.", e2);
                }
            }

            throw new ChannelException("Failed to enter non-blocking mode.", e);
        }
    }

我们需要看看父类的构造函数.

protected AbstractChannel(Channel parent) {
        this.parent = parent;
        // 创建Channel的id
        id = newId();
        // 创建unsafe()对象,这个对象在后面会介绍
        unsafe = newUnsafe();
        // 创建ChannelPipeline对象,也就是说一个Channel对象会和一个ChannelPipeline进行绑定
        pipeline = newChannelPipeline();
    }

构造了一个Channel最为核心的组件,unsafe对象和pipeline对象.

好了现在我们完成了Channel的创建过程,下面分析Channel的初始化工作.

init(channel);

这个方法实际上是一个抽象方法,由ServerBootStrap进行实现.

void init(Channel channel) throws Exception {
        // 保存属性
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }

        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e : attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }

        ChannelPipeline p = channel.pipeline();

        // 这个也就是我们说的workerGroup对象
        final EventLoopGroup currentChildGroup = childGroup;
        // 这个就是我们保存的childChannel,也就是初始化的ChannelHandler.
        final ChannelHandler currentChildHandler = childHandler;

        // 保存childChannel的属性,也就是那些tcp属性
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
        }

        // 向管道之中增加一个Channel的初始化器
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                // 我们通过handler()方法,存储的Handler对象
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                // 会向pipeline对象之中增加一个ServerBootstrapAcceptor 对象
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                // channel , 当前的处理器 子channel的配置属性
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

代码比较多,但是核心的代码吗主要分成下面的几个部分.

[1] 保存服务端channel的配置信息

[2]保存客户端连接的配置信息.

[3]如果服务器端拥有handle,那么配置进去.

[4] 向服务端channel之中添加一个ServerBootstrapAcceptor对象,携带的参数都是客户端连接信息.

在这个方法之中我们调用了execute()方法,在后面的内容之中我们会分析事件循环组的内容.

服务端channel的注册:

doBind0(regFuture, channel, localAddress, promise);

下面分析注册的过程.

channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    // 这个操作是在boss线程完成的
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });

实际上是向事件循环组之中放入了一个任务,我们现在不管事件循环组的功能,就当做一个线程池就好了,就向向线程组之中提交任务一样.

channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
pipeline.bind(localAddress, promise);
tail.bind(localAddress, promise);
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        if (isNotValidPromise(promise, false)) {
            // cancelled
            return promise;
        }

        // 找寻outbound的处理器
        final AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeBind(localAddress, promise);
        } else {
            safeExecute(executor, new Runnable() {
                @Override
                public void run() {
                    next.invokeBind(localAddress, promise);
                }
            }, promise, null);
        }
        return promise;
    }

找到一个ountbind()处理器,然后会调用next.invokeBind()方法.

private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
        if (invokeHandler()) {
            try {
                ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
            } catch (Throwable t) {
                notifyOutboundHandlerException(t, promise);
            }
        } else {
            bind(localAddress, promise);
        }
    }

现在pipeline之中仅仅只有tail,head,he之前增加的一个ServerbootAcceptor.

三个之中唯一的ountbind()处理器就是Head.

        public void bind(
                ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
                throws Exception {
            unsafe.bind(localAddress, promise);
        }

我们发现现在调用了unsafe()对象进行绑定.

 protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }

现在我们终于找到实际调用jdk的底层帮助实现了channel的注册.

以上就是我们通过bind()方法分析得出的结论,下文我们分析事件循环组的功能.

原文地址:https://www.cnblogs.com/trekxu/p/13586811.html