JavaNIO

1. 描述

可异步关闭和中断的Channel。

(1)实现InterruptibleChannel接口的Channel支持异步关闭:如果一个线程IO阻塞在一个可中断的channel,另一个线程可以执行channel的close方法。这将导致阻塞线程收到AsynchronousCloseException异常。

(2)实现InterruptibleChannel接口的Channel支持中断:如果一个线程IO阻塞在一个可中断的Channel,另一个线程可以执行阻塞线程的interrupt方法。这将导致Channel关闭,阻塞线程收到ClosedByInterruptException异常,阻塞线程将是interrupted状态。

(3)如果线程已经中断,然后在Channel执行阻塞IO操作,channel将关闭,线程将立刻收到ClosedInterruptException异常,且中断状态会保持。

(4)只有实现InterruptibleChannel的Channel才支持异步关闭和中断。

2. Interruptible,InterruptibleChannel,AbstractInterruptibleChannel 

2.1 API

package sun.nio.ch;
public interface Interruptible {

    public void interrupt(Thread t);

}

package java.nio.channels;
public interface InterruptibleChannel
    extends Channel
{

     // 关闭Channel
     // 任何线程在channel上面IO阻塞,都会收到AsynchronousCloseException异常
    public void close() throws IOException;

}

package java.nio.channels.spi;
 // 可中断Channel的基本实现
 // 这个类封装底层机制去实现Channel异步关闭和中断。一个具体Channel必须在可能导致IO阻塞之前执行begin方法,之后执行end方法;其中为了保证end必须执行,必须放在try{}finally{}
 // 如:
 // boolean completed = false;
 // try {
 //        begin();
 //        completed = ...; // 执行IO阻塞操作
 //        return;   
 // } finally {
 //        end(completed);
 // }      
 // completed标示IO操作是否完成,即对调用者是可见的。
public abstract class AbstractInterruptibleChannel
    implements Channel, InterruptibleChannel
{

    private final Object closeLock = new Object(); // 关闭需要加锁
    private volatile boolean open = true; // 是否打开

    protected AbstractInterruptibleChannel() { }

    // 关闭Channel
    // 如果Channel已经关闭,这个方法立刻返回;否则标记channel为关闭状态,然后执行implCloseChannel去执行真实的关闭操作。
    public final void close() throws IOException {
        synchronized (closeLock) {
            if (!open)
                return;
            open = false;
            implCloseChannel();
        }
    }

    // 这个方法被close调用去执行真实的channel关闭工作。该方法只有在channel还没有关闭的情况下被调用,从来不会被调用多次。
    // 此方法的实现必须设定:在调用关闭方法的时候,在此通道上的I/O操作中阻塞的任何其他线程立即返回,要么抛出异常,要么返回正常。
    protected abstract void implCloseChannel() throws IOException;

    public final boolean isOpen() {
        return open;
    }


    // -- Interruption machinery --

    private Interruptible interruptor; 
    private volatile Thread interrupted; // 被中断的线程

    // 标记可能IO阻塞的开始,这个方法必须和end匹配使用。
    protected final void begin() {
        if (interruptor == null) {
            interruptor = new Interruptible() {
                    public void interrupt(Thread target) {
                        synchronized (closeLock) {
                            if (!open)
                                return;
                            open = false;
                            interrupted = target;
                            try {
                                AbstractInterruptibleChannel.this.implCloseChannel();
                            } catch (IOException x) { }
                        }
                    }};
        }
        blockedOn(interruptor);
        Thread me = Thread.currentThread();
        if (me.isInterrupted())
            interruptor.interrupt(me);
    }

    // 标记可能IO阻塞的结束,这个方法必须和begin匹配使用。
    // completed: IO操作是否完成。
    // 如果Channel被异步关闭,抛出AsynchronousCloseException
    // 如果阻塞线程被中断,抛出ClosedByInterruptException
    protected final void end(boolean completed)
        throws AsynchronousCloseException
    {
        blockedOn(null);
        Thread interrupted = this.interrupted;
        if (interrupted != null && interrupted == Thread.currentThread()) {
            interrupted = null;
            throw new ClosedByInterruptException();
        }
        if (!completed && !open)
            throw new AsynchronousCloseException();
    }


    // 设置当前线程实例的blocker字段值为intr。
    static void blockedOn(Interruptible intr) {         // package-private
        sun.misc.SharedSecrets.getJavaLangAccess().blockedOn(Thread.currentThread(),
                                                             intr);
    }
}

AbstractInterruptibleChannel定义了调用模式: 

boolean completed = false;  
try {  
    begin();  
    completed = ...;    // Perform blocking I/O operation  
    return ...;         // Return result  
} finally {  
    end(completed);  

(1)ClosedByInterruptException

怎么实现线程中断时关闭channel呢?不能指望调用者调用thread.interrupt()后,在调用Channel.close()

能够想到的办法就是将channel的close方法放到thread.interrupt()方法中,JDK即使这么做的,下面会对Thread的interrupt方法做介绍。  

(2)AsynchronousCloseException

当一个线程阻塞在channel上,另一个线程调用channel的close方法,怎么实现阻塞线程抛出异常呢?难点在channle的close方法必须通知阻塞的线程,让它中断。

具体可以看看sun.nio.ch.ServerSocketChannelImpl里面的implCloseSelectableChannel实现。  

2.2 类图

2.3 AbstractInterruptibleChannel源码分析

 首先看看Thread类的interrupt方法实现:

class Thread {
    // 该线程在可中断的I/O操作中被阻塞的对象,如果有的话。
    // blocker的interrupt方法在设置线程中断状态之后必须被执行。 
    private volatile Interruptible blocker;
    private final Object blockerLock = new Object();

    void blockedOn(Interruptible b) { // 设置blocker字段
        synchronized (blockerLock) {
            blocker = b;
        }
    }
 public void interrupt() {
        if (this != Thread.currentThread())
            checkAccess();

        synchronized (blockerLock) {
            Interruptible b = blocker;
            if (b != null) {
                interrupt0();           // Just to set the interrupt flag
                b.interrupt(this); // 回调blocker的interrupt方法
                return;
            }
        }
        interrupt0();
    }
    

}  

2.3.1 begin

下面看看AbstractInterruptibleChannel的begin方法:

protected final void begin() {
        if (interruptor == null) { // 初始化Interruptible对象
            interruptor = new Interruptible() {
                    public void interrupt(Thread target) {
                        synchronized (closeLock) {
                            if (!open)
                                return;
                            open = false;
                            interrupted = target;
                            try { // 关闭可中断的Channel
                                AbstractInterruptibleChannel.this.implCloseChannel();
                            } catch (IOException x) { }
                        }
                    }};
        }
        // 设置当前线程实例的blocker字段值为interruptor,保证在调用当前线程的interrupt方法时,可以回调前面初始化Interruptible对象的interrupt方法,从而关闭可中断Channel
        blockedOn(interruptor); 
        Thread me = Thread.currentThread();
        if (me.isInterrupted())
            interruptor.interrupt(me);
    }  

补充一点:AbstractInterruptibleChannel的blockedOn(Interruptible intr)方法的实现:

static void blockedOn(Interruptible intr) {         // package-private
        sun.misc.SharedSecrets.getJavaLangAccess().blockedOn(Thread.currentThread(),
                                                             intr);
    }  

JavaLangAccess是接口,里面有方法blockedOn

/** Set thread's blocker field. */
void blockedOn(Thread t, Interruptible b);  

 而在System.setJavaLangAccess()方法中有个JavaLangAccess的匿名内部类实现,其中blockedOn方法的实现如下:

public void blockedOn(Thread t, Interruptible b) {
                t.blockedOn(b); // 调用Thread类的方法设置blocker
            }

2.3.2 end

protected final void end(boolean completed)
        throws AsynchronousCloseException
    {
        blockedOn(null);// 清空线程的blocker字段
        // 如果线程被中断,则在begin里面初始化的Interruptible对象的interrupt方法里面设置了interrupted变量为被中断的线程。
        Thread interrupted = this.interrupted; 
        if (interrupted != null && interrupted == Thread.currentThread()) {
            interrupted = null;
            throw new ClosedByInterruptException(); // 如果被中断,则抛出ClosedByInterruptException异常
        }
        if (!completed && !open) // 如果被close,则抛出AsynchronousCloseException异常
            throw new AsynchronousCloseException();
    }

3. 参考资料

1. JDK1.8.0_111

2. http://jnullpointer.iteye.com/blog/2119982  

原文地址:https://www.cnblogs.com/lujiango/p/8478154.html