[编织消息框架][netty源码分析]8 Channel 实现类NioSocketChannel职责与实现

Unsafe是托委访问socket,那么Channel是直接提供给开发者使用的

Channel 主要有两个实现 NioServerSocketChannel同NioSocketChannel 致于其它不常用不在研究范围内

NioServerSocketChannel 是给server用的,程序由始至终只有一个NioServerSocketChannel

NioSocketChannel 是给客户端用的,每个连接生成一个NioSocketChannel 对象

NioSocketChannel同NioSocketChannel的继承关系

NioSocketChannel -> AbstractNioByteChannel -> AbstractNioChannel -> AbstractChannel

NioServerSocketChannel -> AbstractNioMessageChannel-> AbstractNioChannel -> AbstractChannel

小提示:如果看文字不够直观可以在eclipse里按快捷键 选择类 ctrl+t 

channel有unsafe相应的实现类,反之亦是。其实功能是很简单的,划分太多对象目的是对某部分功能重用,有时也可能因过渡设计造成

对于channel我们主要分析 I/O read/write操作

public class NioServerSocketChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.ServerSocketChannel {
    private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

    //构造时就绑定SelectorProvider,然后注册OP_ACCEPT
    public NioServerSocketChannel() {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }
    
    public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }
    
 
    /**
    server read操作对应的为readMessages
    参数是个数组,是C语言书写风格,如果需要返回多种类型数据,那么传个对象进去外部就能获取到
    这里比较重要,当有接收到socket时,生成NioSocketChannel对象
   读者如果还有印象的话在讲NioEventLoop 有提示netty read 操作是不分 OP_ACCEPT、OP_READ的,可以在这方法打上断点观察
*/ @Override protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = javaChannel().accept(); try { if (ch != null) { //生成NioSocketChannel buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { ch.close(); } return 0; } //server 应该没有write操作才对,因为server是一对多处理,不知道发给那一个clinet @Override protected void doWrite(ChannelOutboundBuffer in) throws Exception {} }
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
    public NioSocketChannel(Channel parent, SocketChannel socket) {
        super(parent, socket);
        config = new NioSocketChannelConfig(this, socket.socket());
    }
    
    //////////////////////////////这部分是unsafe底层调用上层的实现//////////////////////////////////////////////
    @Override
    protected int doReadBytes(ByteBuf byteBuf) throws Exception {
        final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
        //这里设置byteBuf写入数据坐标
        allocHandle.attemptedBytesRead(byteBuf.writableBytes());
        return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
    }

    @Override
    protected int doWriteBytes(ByteBuf buf) throws Exception {
        final int expectedWrittenBytes = buf.readableBytes();
        return buf.readBytes(javaChannel(), expectedWrittenBytes);
    }

    @Override
    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        for (;;) {
            int size = in.size();
            //没有数据退出
            if (size == 0) {
                clearOpWrite();
                break;
            }
            
            long writtenBytes = 0;    //记录写数据size
            boolean done = false;    //是否完成
            boolean setOpWrite = false;


            ByteBuffer[] nioBuffers = in.nioBuffers();
            int nioBufferCnt = in.nioBufferCount();
            long expectedWrittenBytes = in.nioBufferSize();
            SocketChannel ch = javaChannel();

            //这里有三种分支处理
            //如果没有ByteBuffer 有可能只发送几个byte
            //1跟default逻辑其实是一样的
            switch (nioBufferCnt) {
                case 0:
                    //调用父类 AbstractNioByteChannel doWrite,逻辑基本相同,不同的是AbstractNioByteChannel处理的是byte 实现调用的是 doWriteBytes(ByteBuf buf)方法。。。
                    super.doWrite(in);
                    return;
                case 1:
                    //这里只循环16次,可以看出是复制下面代码的哈。。。
                    ByteBuffer nioBuffer = nioBuffers[0];
                    for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                        final int localWrittenBytes = ch.write(nioBuffer);
                        if (localWrittenBytes == 0) {
                            setOpWrite = true;
                            break;
                        }
                        expectedWrittenBytes -= localWrittenBytes;
                        writtenBytes += localWrittenBytes;
                        if (expectedWrittenBytes == 0) {
                            done = true;
                            break;
                        }
                    }
                    break;
                default:
                    //多个ByteBuffer时跟上面逻辑一样
                    for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                        final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                        if (localWrittenBytes == 0) {
                            setOpWrite = true;
                            break;
                        }
                        expectedWrittenBytes -= localWrittenBytes;
                        writtenBytes += localWrittenBytes;
                        if (expectedWrittenBytes == 0) {
                            done = true;
                            break;
                        }
                    }
                    break;
            }

            // Release the fully written buffers, and update the indexes of the partially written buffer.
            in.removeBytes(writtenBytes);

            if (!done) {
                // Did not write all buffers completely.
                incompleteWrite(setOpWrite);
                break;
            }
        }
    }
}
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    //生成NioSocketChannel时就绑定 unsafe pipeline
    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }
}
protected abstract class AbstractUnsafe implements Unsafe {
    private void register0(ChannelPromise promise) {
        try {
            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }
            boolean firstRegistration = neverRegistered;
            doRegister();
            // doRegister 是调用 AbstractNioChannel selectionKey = javaChannel().register(eventLoop().selector, 0, this);
            neverRegistered = false;
            registered = true;
            //这里是添加 Handler 每个Handler会生成一个Context
            pipeline.invokeHandlerAddedIfNeeded();

            safeSetSuccess(promise);
            //通知Handler Registered
            pipeline.fireChannelRegistered();
            if (isActive()) {
                if (firstRegistration) {
                    //通知Handler Active
                    pipeline.fireChannelActive();
                } else if (config().isAutoRead()) {
                    beginRead();
                }
            }
        } catch (Throwable t) {
            //.......
        }
    }
}

小结:看似很复杂的Channel实现其实没想象难,大多数读写坐标记录交给ByteBuf处理掉了

1.server每个client连接转换成NioSocketChannel对象

2.构建NioSocketChannel时就已经生成 unsafe、pipeline

原文地址:https://www.cnblogs.com/solq111/p/7066208.html