[编织消息框架][netty源码分析]9 Promise 实现类DefaultPromise职责与实现

netty Future是基于jdk Future扩展,以监听完成任务触发执行
Promise是对Future修改任务数据
DefaultPromise是重要的模板类,其它不同类型实现基本是一层简单的包装,如DefaultChannelPromise
主要是分析await是如何等侍结果的

public interface Future<V> extends java.util.concurrent.Future<V> {
   Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
}
public interface Promise<V> extends Future<V> {
    Promise<V> setSuccess(V result);
    boolean trySuccess(V result);
    Promise<V> setFailure(Throwable cause);
    boolean tryFailure(Throwable cause);
    boolean setUncancellable();
}
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {

    @Override
    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
        return await0(unit.toNanos(timeout), true);
    }
    private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
        //已完成任务直接忽略
        if (isDone()) {
            return true;
        }
        //没有等侍时间返回处理记录
        if (timeoutNanos <= 0) {
            return isDone();
        }
        //已中断抛异常
        if (interruptable && Thread.interrupted()) {
            throw new InterruptedException(toString());
        }

        //checkDeadLock();
        //netty 认为是当前线程是死锁状态
        EventExecutor e = executor();
        if (e != null && e.inEventLoop()) {
            throw new BlockingOperationException(toString());
        }
        
        long startTime = System.nanoTime();
        long waitTime = timeoutNanos;
        boolean interrupted = false; 
        try {
            for (;;) {
                synchronized (this) {
                    if (isDone()) {
                        return true;
                    }
                    //最大检查次数为 Short.MAX_VALUE
                    //很奇怪的逻辑,处理完后又自减
                    if (waiters == Short.MAX_VALUE) {
                        throw new IllegalStateException("too many waiters: " + this);
                    }
                    ++waiters;
                    try {
                        //阻塞的代码只是一行参数1是milliseconds,参数2是辅助用的大于0时milliseconds+1,如果是0的话会无限制阻塞
                        wait(waitTime / 1000000, (int) (waitTime % 1000000));
                    } catch (InterruptedException e) {
                        if (interruptable) {
                            throw e;
                        } else {
                            interrupted = true;
                        }
                    } finally {
                        waiters--;
                    }
                }
                //这里是double check跟并发无影响的逻辑放在synchronized外面
                if (isDone()) {
                    return true;
                } else {
                    waitTime = timeoutNanos - (System.nanoTime() - startTime);
                    if (waitTime <= 0) {
                        return isDone();
                    }
                }
            }
        } finally {
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }
}
public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint {
    private final Channel channel;
    public DefaultChannelPromise(Channel channel) {
        this.channel = channel;
    }
    public DefaultChannelPromise(Channel channel, EventExecutor executor) {
        super(executor);
        this.channel = channel;
    }
}
原文地址:https://www.cnblogs.com/solq111/p/7071177.html