Netty之写成功后是怎么收到通知的

  有时候使用Netty要发送的两个消息有依赖关系,第一个发送成功才能发送第二个,代码里是可以这么写的

ChannelFuture channelFuture = ch.writeAndFlush(line + "
");
                channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
                    @Override
                    public void operationComplete(Future<? super Void> future) throws Exception {
                        if (future.isSuccess()) {
                            System.out.print("first message send success" + "
");
                            ch.writeAndFlush("first message send success" + "
");
                        }
                    }
                });

  上面一定要加" ",因为在客户端的初始化channel里配上了  new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()) 

  本文就通过源码看看这个listener是怎么实现的

二 源码剖析

  版本为 

<dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.9.Final</version>
        </dependency>

  上一篇我们分析了客户端是怎么写消息出去的 https://www.cnblogs.com/juniorMa/p/14301756.html

  写消息分成两步

  1 把消息包成一个Entry放到缓存队列里 ChannelOutboundBuffer 

  2 执行flush,通过调用JDK中的channel.write完成真正的写消息

  在执行把消息加到消息缓存队列中时,包成Entry,其实是把Promise作为参数的。也就是说一个Entry有一个成员变量有提交时的Promise的引用。

  Entry entry = Entry.newInstance(msg, size, total(msg), promise);

  

  当执行flush之后,会调用各种channel的doWrite方法

   NioSocketChannel.doWrite() 

protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        for (;;) {
            int size = in.size();
            if (size == 0) {
                // All written so clear OP_WRITE
                clearOpWrite();
                break;
            }
            long writtenBytes = 0;
            boolean done = false;
            boolean setOpWrite = false;

            // Ensure the pending writes are made of ByteBufs only.
            ByteBuffer[] nioBuffers = in.nioBuffers();
            int nioBufferCnt = in.nioBufferCount();
            long expectedWrittenBytes = in.nioBufferSize();
            SocketChannel ch = javaChannel();

            // Always us nioBuffers() to workaround data-corruption.
            // See https://github.com/netty/netty/issues/2761
            switch (nioBufferCnt) {
                case 0:
                    // We have something else beside ByteBuffers to write so fallback to normal writes.
                    super.doWrite(in);
                    return;
                case 1:
                    // Only one ByteBuf so use non-gathering write
                    ByteBuffer nioBuffer = nioBuffers[0];
                    for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                        final int localWrittenBytes = ch.write(nioBuffer);
                        if (localWrittenBytes == 0) {
                            setOpWrite = true;
                            break;
                        }
                        expectedWrittenBytes -= localWrittenBytes;
                        writtenBytes += localWrittenBytes;
                        if (expectedWrittenBytes == 0) {
                            done = true;
                            break;
                        }
                    }
                    break;
                default:
                    for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                        final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                        if (localWrittenBytes == 0) {
                            setOpWrite = true;
                            break;
                        }
                        expectedWrittenBytes -= localWrittenBytes;
                        writtenBytes += localWrittenBytes;
                        if (expectedWrittenBytes == 0) {
                            done = true;
                            break;
                        }
                    }
                    break;
            }

            // Release the fully written buffers, and update the indexes of the partially written buffer.
            in.removeBytes(writtenBytes);

  注意看最后的一行代码

// Release the fully written buffers, and update the indexes of the partially written buffer.
in.removeBytes(writtenBytes);

  依次处理每一个Entry

public void removeBytes(long writtenBytes) {
        for (;;) {
            Object msg = current();
            if (!(msg instanceof ByteBuf)) {
                assert writtenBytes == 0;
                break;
            }

            final ByteBuf buf = (ByteBuf) msg;
            final int readerIndex = buf.readerIndex();
            final int readableBytes = buf.writerIndex() - readerIndex;

            if (readableBytes <= writtenBytes) {
                if (writtenBytes != 0) {
                    progress(readableBytes);
                    writtenBytes -= readableBytes;
                }
                remove();

  

public boolean remove() {
        Entry e = flushedEntry;
        if (e == null) {
            clearNioBuffers();
            return false;
        }
        Object msg = e.msg;

        ChannelPromise promise = e.promise;
        int size = e.pendingSize;

        removeEntry(e);

        if (!e.cancelled) {
            // only release message, notify and decrement if it was not canceled before.
            ReferenceCountUtil.safeRelease(msg);
            safeSuccess(promise);
            decrementPendingOutboundBytes(size, false, true);
        }
safeSuccess就是执行通知每个listener逻辑的方法,通过这里就会执行我们添加在listener里的逻辑

  

原文地址:https://www.cnblogs.com/juniorMa/p/14308000.html