Java笔记(十四) 并发基础知识

并发基础知识

一、线程的基本概念

线程表示一条单独的执行流,它有自己的程序计数器,有自己的栈。

1.创建线程

1)继承Thread

Java中java.lang.Thread这个类表示线程,一个类可以继承Thread并重写run方法来实现一个线程:

public class MyThread extends Thread{
    @Override
    public void run() {
        System.out.println("thread name: " + Thread.currentThread().getName() +
                " thread id: " + Thread.currentThread().getId());
        System.out.println("Running my thread!");
    }

    public static void main(String[] args) {
        MyThread thread = new MyThread();
        //启动线程
        thread.start();
        /*thread name: Thread-0 thread id: 11
        Running my thread!*/
    }
}

2)实现Runnable接口

public class MyRunnable implements Runnable{
    @Override
    public void run() {
        System.out.println("thread name:" + Thread.currentThread().getName());
    }

    public static void main(String[] args) {
        System.out.println("Main thread name : "
                + Thread.currentThread().getName()); //Main thread name : main
        Thread thread = new Thread(new MyRunnable()); //thread name:Thread-0
        thread.start();
    }
}

2.线程的基本属性和方法

1)id和name

id: 一个递增整数,每创建一个线程就加一

name:默认值是Thread-后跟一个编号,name可以在Thread的构造方法中指定,可以通过setName方法进行设置。

2)优先级

优先级从1到10,默认为5,相关方法:

public final void setPriority(int newPriority)
public final int getPriority()

在编程中不要过分依赖优先级。

3)状态

线程有一个状态概念,Thread获取状态方法:

public State getState()

返回值类型为Thread.State,它是一个枚举值:

public enum State {
NEW, //线程还没调用start
//调用start后线程在执行run方法且没有阻塞时状态为RUNNABLE,
//不过,这并不代表CPU一定在执行该线程的代码,可能正在执行也可能在
//等待操作系统分配时间片,只是它在等待其他条件。
RUNNABLE,
BLOCKED,
WAITING,
TIMED_WAITING,
TERMINATED; //线程已经结束运行
}

Thread还有一个方法,返回线程是否alive:

//线程被启动后,run方法运行结束前,返回值都是true
public final native boolean isAlive()

4.是否是daemon线程 

一般情况下整个程序只有在所有的线程都结束后,线程才会退出。

而daemon线程不一样,它是守护线程,当它辅助的主线程结束时,它也会结束。

public final void setDaemon(boolean on)
public final boolean isDaemon()

5.sleep方法

Thread有一个静态方法,调用该方法会让当前线程睡眠指定时间,单位是毫秒。

public static native void sleep(long millis) throws InterruptedException;

在睡眠期间,该线程会让出CPU,但睡眠时间不是确切的给定毫秒数,可能有一定偏差。

睡眠期间,线程可以被中断,如果被中断,sleep会抛异常。

6.yield方法 

public static native void yield();

调用该方法意思是:我现在不急着占用CPU,你可以先让其他线程运行。

不过这对系统调度器也仅仅是建议,调度器如何处理不一定,它可能忽略该调用。

7.join方法 

在前面的MyThread例子中,MyThread可能没执行完,main线程就可能执行完了。

Thread有一个join方法,可以让调用join的线程等待该线程的结束。

public final void join() throws InterruptedException
//限定等待的最长时间
public final synchronized void join(long millis) throws InterruptedException
MyThread thread = new MyThread();
thread.start();
//让main线程在子线程调用结束后再退出,相当于阻塞了main线程
thread.join();

3.共享内存及可能出现的问题

虽然每个线程表示一条单独的执行流,有自己的程序计数器和栈,

但线程之间可以共享内存,它们可以访问和操作相同的对象。

public class ShareMemoryDemo {
    private static int shared = 0;
    private static void incrShared() {
        shared ++;
    }
    static class ChildThread extends Thread {
        List<String> list;
        public ChildThread(List<String> list) {
            this.list = list;
        }
        @Override
        public void run() {
            incrShared();
            list.add(Thread.currentThread().getName());
        }
    }
    public static void main(String[] args) throws InterruptedException {
        ArrayList<String> list = new ArrayList<>();
        ChildThread t1 = new ChildThread(list);
        ChildThread t2 = new ChildThread(list);
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println(shared);
        System.out.println(list);
        /*2
        [Thread-0, Thread-1]*/
    }
}

ChildThread的run方法访问了共享变量shared和list。

执行流、内存和程序代码之间的关系:

1)不同的执行流可以访问和操作相同的变量

2)不同的执行流可以执行相同的程序代码

所以,在分析代码执行过程时,理解代码在被哪个线程执行是很重要的

3)当多条执行流执行相同的程序代码时,每条执行流都有自己的栈,方法中

的参数和局部变量都有自己的一份。

当多条执行流可以操作相同的变量时,可能会出现意料之外的结果,包括竞态条件和内存可见性问题。

1.竞态条件 

所谓竞态条件(race condition)是指,当多个线程访问和操作同一个对象时,最终结果和执行时序

有关,可能正确也可能不正确。

public class CounterThread extends Thread{
    private static int counter = 0;
    @Override
    public void run() {
        for (int i = 0; i < 1000; i++) {
            counter++;
        }
    }
    public static void main(String[] args) throws InterruptedException {
        int num = 1000;
        Thread[] threads = new Thread[num];
        for (int i = 0; i< num; i++) {
            threads[i] = new CounterThread();
            threads[i].start();
        }
        for (int i = 0; i < 1000; i++) {
            threads[i].join();
        }
        System.out.println(counter); //998400
    }
}

期望结果应该是10000000,实际结果为998400,为什么呢?

因为counter++这个操作不是原子操作,它分为了3个步骤:

1)去counter的当前值

2)在当前值上加1

3)将新值重新赋值给counter

两个线程可能同时执行第一步,取到了相同的counter值。

2.内存的可见性 

多个线程可以共享和访问和操作相同的变量,但一个线程对一个共享变量的修改,

另一个线程不一定能马上见到,甚至永远见不到。

public class VisibilityDemo {
    private static boolean shutdown = false;
    static class MyThread extends Thread {
        @Override
        public void run() {
            while (!shutdown) {
                //do nothing
            }
            System.out.println("exit myThread");
        }
    }
    public static void main(String[] args) throws InterruptedException {
        new MyThread().start();
        Thread.sleep(1000);
        shutdown = true;
        System.out.println("exit main!");
    }
}

期望的结果是两个线程都退出,但实际执行时,很可能会发现myThread永远不退出,

也就是说在myThread执行流看来,shutdown永远为false,即使main线程已经更改为了ture。

这就是内存可见性问题,在计算机系统中,除了内存,数据还会被缓存在CPU的寄存器以及

各级缓存中,当访问一个变量时,可能从CPU寄存器和各级缓存取,而不是从内存,当

修改一个变量时,也可能是先修改到缓存中,稍后再同步更新到内存中。在单线程程序中,这

一般不是问题,但在多线程程序中,尤其是有多个CPU的情况下,这就是严重问题。一个线程对

内存的修改,另一个线程看不到,一是修改没有及时同步到到内存,二是另一个线程根本就没有从内存中读取。

二、理解synchronized(同步)

1.用法和基本原理

synchronized可以用于:

1)实例方法:

public class Counter {
    private int count;
    public synchronized void incre() {
        count ++;
    }
    public synchronized int getCount() {
        return count;
    }
}
public class CounterThread extends Thread {
    private Counter counter;
    public CounterThread(Counter counter) {
        this.counter = counter;
    }
    @Override
    public void run() {
        for (int i = 0; i < 1000; i++) {
            counter.incre();
        }
    }
    public static void main(String[] args) throws InterruptedException {
        int num = 1000;
        Counter counter = new Counter();
        Thread[] threads = new Thread[num];
        for (int i = 0; i < num; i++) {
            threads[i] = new CounterThread(counter);
            threads[i].start();
            threads[i].join();
        }
        System.out.println(counter.getCount());
    }
}

看上去,synchronized使得同时只能有一个线程执行实例方法,

但这个理解是不确切的。多个线程是可以同时执行一个synchronized方法的,

只要它们访问的对象不同即可。所以,synchronized实例方法保护的是同一个

对象方法的调用,确保同时只能有一个线程执行。大致过程如下:

1)尝试获得锁,如果能够获得锁,继续下一步,否则加入等待队列,阻塞并等待唤醒;

2)执行实例方法代码;

3)释放锁,如果等待队列上有等待线程,从中抽取一个并唤醒,如果有多个等待线程,唤醒

哪一个是不一定的,不保证公平性。

当线程不能获得锁的时候,他会加入等待队列,状态会变为BLOCKED.

注意:一般在保护变量时,需要在所有访问该变量的方法上加上synchronized。

再次强调:synchronized保护的是对象而非代码,只要访问的是同一个对象的synchronized方法,

即使是不同的代码,也会被同步顺序访问。比如,Counter中的两个实例方法,incre和getCount,

对于同一个Counter对象,一个线程想去执行incre,另一个线程想执行getCount,它们是不能同时执行的,

会被synchronized同步顺序执行。

2)静态方法 

synchronized保护的是类对象,而非实例对象(实际上,每个对象都有一个锁和等待队列,类对象也不例外)。

public class StaticCounter {
    private static int count = 0;
    public static void incr() {
        synchronized (StaticCounter.class) {
            count ++;
        }
    }
    public static int getCount() {
        synchronized (StaticCounter.class) {
            return count;
        }
    }
}

3)代码块 

public class Counter {
    private int count;
    public void incre() {
        //synchronized括号里面的就是保护对象{}里就是同步执行代码
        synchronized (this) {
            count ++;
        }
    }
    public synchronized int getCount() {
        synchronized (this) {
            return count;
        }
    }
}
public class Counter {
    private int count;
    private Object obj = new Object();
    public void incre() {
        //synchronized同步的对象可以是任意对象,任意对象都有一个锁和等待队列
        synchronized (obj) {
            count ++;
        }
    }
    public synchronized int getCount() {
        synchronized (obj) {
            return count;
        }
    }
}

2.进一步理解synchronized

1)可重入性 

对同一个执行线程,在它获得了锁之后,在调用其他需要同样锁的代码时,可以直接调用。

比如,在一个synchronized实例方法内,可以调用同一个实例中的synchronized实例方法。

2)内存可见性 

synchronized除了保证原子操作外,它还有一个重要作用,就是保证内存可见性,

在释放锁时,所有写入都会写入内存,而获得锁后,都会从内存中读取最新数据。

不过,如果只是为了保证内存可见性,synchronized成本有点高,可以给变量加修饰符volatile代替。

加了volatile后,Java会在操作对应变量时加入特殊指令,保证读写到内存最新值,而非缓存值。

3)死锁 

死锁举例:有a、b两个线程,a线程持有锁A,在等待锁B,b线程持有锁B,在等待锁A,

a、b陷入了互相等待,最后谁都执行不下去。

public class DeadLockDemo {
    private static Object lockA = new Object();
    private static Object lockB = new Object();
    public static void startThreadA() {
        Thread t = new Thread(() -> {
            synchronized (lockA) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (lockB) {}
            }
        });
        t.start();
    }
    public static void startThreadB() {
        Thread t = new Thread(() -> {
            synchronized (lockB) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (lockA) {}
            }
        });
        t.start();
    }
    public static void main(String[] args) {
        startThreadA();
        startThreadA();
    }
}

如何解决死锁问题:首先,应该尽量避免在持有一个锁的同时去申请另一个锁,

如果确实需要多个锁,所有代码都应该按照相同的顺序去申请锁。

3.同步容器

Collections类中有一些方法,可以返回线程安全的同步容器:

public static <T> Collection<T> synchronizedCollection(Collection<T> c)
public static <T> List<T> synchronizedList(List<T> list)
public static <K,V> Map<K,V> synchronizedMap(Map<K,V> m)

它们是给所有容器方法加上synchronized来实现安全的,例如:

static class SynchronizedCollection<E> implements Collection<E> {
    final Collection<E> c; //Backing Collection
    final Object mutex; //Object on which to synchronize
    SynchronizedCollection(Collection<E> c) {
        if(c==null)
            throw new NullPointerException();
        this.c = c;
        mutex = this;
    }

public int size() { synchronized (mutex) {return c.size();} } public boolean add(E e) { synchronized (mutex) {return c.add(e);} } public boolean remove(Object o) { synchronized (mutex) {return c.remove(o);} } // }

这里的线程安全针对的是容器对象,指的是当多个线程并发访问同一个容器对象时,  

不需要额外的同步操作,也不会出现错误结果。这样是不是就绝对安全了呢?不是

的我们还需要注意以下情况:

1)复合操作 

public class EnhancedMap <K, V> {
    Map<K, V> map;
    public EnhancedMap(Map<K,V> map){
        this.map = Collections.synchronizedMap(map);
    }
    public V putIfAbsent(K key, V value){
        V old = map.get(key);
        if(old!=null){
            return old;
        }
        return map.put(key, value);
    }
    public V put(K key, V value){
        return map.put(key, value);
    }
//
}

其中的putAbsent方法语义是只有在原方法没有对应键的情况下才添加,

这是一个检查然后在更新的操作,在多线程的情况下,可能有多个线程

执行完了检查这一步,都发现Map中没有对应的键,然后就会都调用put,这就破坏了该方法的语义。

2.伪同步 

那么给该方法加上synchronized就线程安全了吗?

public synchronized V putIfAbsent(K key, V value){
    V old = map.get(key);
    if(old!=null){
        return old;
    }
    return map.put(key, value);
}

答案是否定的,原因是同步对象错了。putAbsent同步使用的是EnhancedMap对象,

而其他方法使用的是Collections当作的map对象。要解决这个问题,所有方法必须

使用相同的锁。所以可以改为:

public V putIfAbsent(K key, V value){
    synchronized(map){
        V old = map.get(key);
        if(old!=null){
            return old;
        }
        return map.put(key, value);
    }
}

3.迭代 

对于同步容器对象,虽然单个操作是安全的,但迭代并不是:

public class DeadLockDemo {

    private static void startModifyThread(final List<String> list) {
        Thread modifyThread = new Thread(new Runnable() {
            @Override
            public void run() {
                for(int i = 0; i < 100; i++) {
                    list.add("item " + i);
                    try {
                        Thread.sleep((int) (Math.random() * 10));
                    } catch (InterruptedException e) {
                    }
                }
            }
        });
        modifyThread.start();
    }

    private static void startIteratorThread(final List<String> list) {
        Thread iteratorThread = new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    for(String str : list) {
                    }
                }
            }
        });
        iteratorThread.start();
    }

    public static void main(String[] args) {
        final List<String> list = Collections
                .synchronizedList(new ArrayList<String>());
        startIteratorThread(list);
        startModifyThread(list);
    }
    /*Exception in thread "Thread-0" java.util.ConcurrentModificationException
    at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
    at java.util.ArrayList$Itr.next(ArrayList.java:851)
    at com.cdert.roadpms.test.utils.DeadLockDemo$2.run(DeadLockDemo.java:34)
    at java.lang.Thread.run(Thread.java:748)*/
}

如果在遍历的同时容器发生了结构性变化就会抛出该异常。

可以改为(在遍历的时候给整个容器对象加锁):

private static void startIteratorThread2(final List<String> list) {
    Thread iteratorThread = new Thread(new Runnable() {
        @Override
        public void run() {
            while(true) {
                synchronized(list){
                    for(String str : list) {
                    }
                }
            }
        }
    });
    iteratorThread.start();
}

二、线程的基本协作机制

一)协作的场景

1)生产者/消费者模式:生产者线程和消费者线程通过共享队列进行协作,生产者将数据或任务放到

队列上,而消费者从队列取数据或者任务,如果队列长度有限,在队列满的时候,生产者需要等待,

在队列为空的时候,消费者需要等待。

2)同时开始:在一些程序,尤其是仿真程序中,要求多个线程同时开始。

3)等待结束:主线程将任务分解成若干子任务,为每个子任务创建一个线程,主线程在继续执行其他

任务之前需要等待每个子任务执行完毕。

4)异步结果:在主从协作模式中,主线程手工创建子线程往往笔记麻烦,一种常见的模式是把子线程封装

为异步调用,异步调用马上返回,但返回的不是最终结果,而是一个称为Future的对象,通过它可以在随后获得最终结果。

5)集合点:在一些程序中,比如并行迭代计算中,每个线程负责一部分计算,然后在集合点等待其他线程完成,所有线程

到齐后,交还数据和计算结果,再进行下一次迭代。

二、wait/notify

在Java根父类中定义了一些线程协作的基本方法,这些方法

分为两类:wait和notify。wait主要有两个方法:

public final void wait() throws InterruptedException
//单位为毫秒表示最长等待时间,wait(0)表示无限期等待,无参wait等于调用wait(0)
public final native void wait(long timeout) throws InterruptedException;

每个对象都有一把锁和等待队列,一个线程在进入synchronized代码块时,

会尝试获取锁,如果获取不到就会把当前线程加入等待队列,其实,除了

用于锁的等待队列,每个对象还有另一个等待队列,叫做条件队列,该队列

用于线程间的协作。调用wait就会把当前线程(调用该方法使用的线程)放到条件

队列上并阻塞,表示当前线程无法执行,它需要等待一个条件,这个条件不能由

它自己发起,需要其他线程发起。当其他线程改变条件后,应该调用notify方法。

//notify将会从条件队列中选取一个线程,将其从队列中移除并唤醒
public final native void notify();
//notifyAll与notify的区别是:它会移除条件队列中的所有线程,并全部唤醒
public final native void notifyAll();

简单的协作示例:

public class WaitThread extends Thread{
    private volatile boolean fire = false;
    @Override
    public void run() {
        try {
            synchronized(this) {
                while (!fire) {
                    wait();
                }
            }
            System.out.println("fired!");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public synchronized void fire() {
        this.fire = true;
        notify();
    }
    public static void main(String[] args) throws InterruptedException {
        WaitThread waitThread = new WaitThread();
        waitThread.start();
        Thread.sleep(1000);
        System.out.println("fire");
        waitThread.fire();
    }
}

两个线程都要访问变量fire,容易出现竞态条件,所有相关代码都要被

synchronized保护。实际上,wait/notify方法只能在synchronized代码

块内被调用,如果调用wait/notify方法时,当前线程没有持有对象锁,会抛出异常。

看看另一种情况:

public class WaitThread extends Thread{
    private volatile boolean fire = false;
    private volatile int count = 0;
    @Override
    public void run() {
        try {
            synchronized(this) {
                while (!fire) {
                    sleep(1000);
                    System.out.println(++count);
                }
            }
            System.out.println("fired");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    //实例对象的锁不会被释放,该方法永远不会被执行
    public synchronized void fire() {
        System.out.println("fire it"); 
        this.fire = true;
    }
    public static void main(String[] args) throws InterruptedException {
        WaitThread waitThread = new WaitThread();
        waitThread.start();
        Thread.sleep(1000);
        System.out.println("fire");
        waitThread.fire();
    }
}

思考:如果wait必须被synchronized保护,那一个线程在wait时,另一个线程

怎么可能调用到同样被synchronized保护的notify方法(注意两个被保护的synchronized方法是一个实例的)?

它不需要等待锁么?我们需要进一步理解wait的内部过程才能解开谜团!!虽然是在synchronized内,但调用

wait时,线程会释放对象锁。wait的具体过程是:

1)把当前线程放入条件等待队列,释放对象锁,阻塞等待,线程状态变为WAITING或TIMED_WAITING。

2)等待时间到或者被其他线程调用notify/notifyAll从条件队列中移除,这时,要重新竞争对象锁:

如果能够重新获得锁,线程状态变为RUNNABLE,并从wait调用中返回。

否则,该线程加入对象锁等待队列,线程变为BLOCKED,只有在获得锁后才会从wait调用中返回。

线程从wait调用中返回后,不代表其等待条件就一定成立了,它需要重新检查其等待条件,一般

调用模式是:

synchronized (obj) {
while(条件不成立)
obj.wait();
…//执行条件满足后的操作
}

比如上例中的代码:

synchronized (this) {
    while(!fire) {
    wait();
    }
}

调用notify会把条件队列中的线程唤醒并从队列中移除,但它并不会

释放调用它的代码块获得的对象锁,也就是说,只有在包含notify的

synchronized代码块执行完毕后,等待线程才会从wait调用中返回。

三、生产者消费者模式

public class MyQueue<E> {
    private Queue<E> queue = null;
    private int limit;
    public MyQueue(int limit) {
        this.limit = limit;
        queue = new ArrayDeque<E>(limit);
    }
    //给生产者用的方法,往队列上放数据,满了就wait
    public synchronized void put(E e) throws InterruptedException {
        while (queue.size() == limit) wait();
        queue.add(e);
        //由于条件不同但又使用相同的的等待队列,所以
        //要用notifyAll而不能调用notify
        notifyAll();
    }
    //take给消费者用的方法空了就wait
    public synchronized E take() throws InterruptedException {
        while (queue.isEmpty()) wait();
        E e = queue.poll();
        notifyAll();
        return e;
    }
}

只能有一个条件等待队列,这是 wait/notify机制的缺陷,这使得对于条件的分析变得复杂。

public class Producer extends Thread{
    MyQueue<String> queue;
    public Producer(MyQueue<String> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        int num = 0;
        try {
            while (true) {
                String task = String.valueOf(num);
                queue.put(task);
                System.out.println("produce: " + task);
                num ++;
                Thread.sleep(3000);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
public class Consumer extends Thread{
    MyQueue<String> queue;
    public Consumer(MyQueue<String> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        try {
            while (true) {
                String task = queue.take();
                System.out.println("consumer: " + task);
                Thread.sleep(3000);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

三、同时开始

public class FireFlag {
    private volatile boolean fired = false;
    public synchronized void waitForFire() throws InterruptedException {
        while (!fired) {
            wait();
        }
    }
    public synchronized void fire() {
        this.fired = true;
        notifyAll();
    }
}
public class Racer extends Thread{
    private FireFlag flag;
    public Racer(FireFlag fireFlag) {
        this.flag = fireFlag;
    }
    @Override
    public void run() {
        try {
            flag.waitForFire();
            System.out.println("start run " + Thread.currentThread().getName());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        int num = 10;
        FireFlag flag = new FireFlag();
        Thread[] threads = new Thread[num];
        for (int i = 0; i < num; i++) {
            threads[i] = new Racer(flag);
            threads[i].start();
        }
        Thread.sleep(1000);
        flag.fire();
    }
}

四、等待结束

join方法实际上就是调用了wait方法:

while (isAlive()) {
    wait(0);
}

只要线程是活的,isAlive()返回true,join就一直等待。谁来通知它呢?

当线程运行结束时,Java系统调用notifyAll来通知。使用join比较麻烦,需要

主线程逐一等待每个子线程。这里我们展示一种新写法:

public class MyLatch {
    //未完成的线程个数,初始值为子线程总个数
    private int threadsCount;
    public MyLatch(int threadsCount) {
        this.threadsCount = threadsCount;
    }
    public synchronized void await() throws InterruptedException {
        while (threadsCount > 0) wait();
    }
    public synchronized void countDown() {
        threadsCount --;
        if (threadsCount <= 0) this.notifyAll();
    }
}
public class Worker extends Thread{
    MyLatch myLatch;
    public Worker(MyLatch myLatch) {
        this.myLatch = myLatch;
    }
    @Override
    public void run() {
        try {
            Thread.sleep(5000);
            myLatch.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        int num = 10;
        MyLatch myLatch = new MyLatch(num);
        Thread[] threads = new Thread[num];
        for (int i = 0; i < num; i++) {
            threads[i] = new Worker(myLatch);
            threads[i].start();
        }
        myLatch.await();
        System.out.println("ready to end !");
    }
}

五、异步结果

在Java中表示子任务的接口是Callable,声明为:

public interface Callable<V> {
    V call() throws Exception;
}

为表示异步调用的结果,定义一个MyFuture接口:

public interface MyFuture <V> {
    //get方法返回真正的结果,如果结果没有计算完成,
    //get方法会阻塞直到计算完成
    V get() throws Exception ;
}

为方便主线程调用子任务,定义一个MyExcecutor类,其中定义一个execute

方法表示执行子任务并返回调用结果。

//利用该方法,对于主线程,就不需要创建并管理子线程了,
//并且可以方便地获取异步调用的结果
public <V> MyFuture<V> execute(final Callable<V> task)

实例:

//表示子任务得接口
interface MyCallable<V> {
    V call() throws Exception;
}
//表示异步调用的结果
interface MyFuture<V> {
    //返回真正的结果
    V get() throws Exception;
}
interface MyExecutor {
    <V> MyFuture<V> execute(final MyCallable<V> task);
}
//执行任务的子线程
public class ExecuteThread<V> extends Thread{

    private V result = null;
    private Exception exception = null;
    private volatile boolean done = false;
    private MyCallable<V> task;
    private Object lock;

    public ExecuteThread(MyCallable<V> task, Object lock) {
        this.task = task;
        this.lock = lock;
    }

    @Override
    public void run() {
        try {
            result = task.call();
        } catch (Exception e) {
            exception = e;
        } finally {
            synchronized (lock) {
                done = true;
                lock.notifyAll();
            }
        }
    }

    public V getResult() {
        return result;
    }
    public boolean isDone() {
        return done;
    }

    public Exception getException() {
        return exception;
    }
}
public class MyExecutorImpl implements MyExecutor{
    @Override
    public <V> MyFuture<V> execute(MyCallable<V> task) {
        final Object lock = new Object();
        final ExecuteThread<V> thread = new ExecuteThread<>(task, lock);
        MyFuture<V> myFuture = new MyFuture<V>(){
            @Override
            public V get() throws Exception{
                synchronized (lock) {
                    System.out.println("Try to get result");
                    while (!thread.isDone()) {
                        try {
                            //阻塞直到任务执行完毕
                            lock.wait();
                        } catch (Exception e) {}
                    }
                    if (thread.getException() != null) {
                        throw thread.getException();
                    }
                }
                System.out.println("Got it!!!");
                return thread.getResult();
            }
        };
        thread.start();
        return myFuture;
    }
}
public class MainClass {
    public static void main(String[] args) {
        MyExecutor executor = new MyExecutorImpl();
        MyCallable<Integer> task = new MyCallable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println("Start do the call.");
                int result = (int) (Math.random() * 1000);
                Thread.sleep(5000);
                System.out.println("Do the call done.");
                return result;
            }
        };
        //异步调用,返回一个MyFuture对象
        MyFuture<Integer> future = executor.execute(task);
        //执行其他操作
        try {
            System.out.println("Start do other things.");
            Thread.sleep(3000);
            System.out.println("Other things done. ");
            System.out.println("Ready to get.");
            //获取异步调用结果
            Integer result = future.get();
            System.out.println("The result is " + result);
        } catch (Exception e) {
            e.printStackTrace();
        }
        /*Start do other things.
        Do the call done.
        Other things done.
        The result is 802*/
    }
}

六、集合点

public class AssemblePoint {
    //未到达集合点的线程数量,初始为所有线程数
    private int threadsCount;
    public AssemblePoint(int threadsCount) {
        this.threadsCount = threadsCount;
    }
    public synchronized void await() throws InterruptedException {
        System.out.println("Now the count is " + threadsCount);
        if (threadsCount > 0) {
            threadsCount --;
            if (threadsCount == 0 ) {
                notifyAll();
                System.out.println("All the threads done.");
            }
            else {
                while (threadsCount !=0) {
                    this.wait();
                }
            }
        }
    }
}
public class AssemblePointDemo {
    static class Tourist extends Thread {
        AssemblePoint point;
        public Tourist(AssemblePoint point) {
            this.point = point;
        }
        @Override
        public void run() {
            try {
                //模拟各自独自运行
                Thread.sleep((long) (Math.random() * 1000));
                //该线程运行完毕,集合
                point.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        int num = 10;
        Tourist[] threads = new Tourist[num];
        AssemblePoint point = new AssemblePoint(num);
        for (int i = 0; i < num; i++) {
            threads[i] = new Tourist(point);
            threads[i].start();
        }
    }
}

三、线程的中断

 一)取消/关闭机制

在Java中停止一个线程的主要机制是中断,中断并不是强迫终止一个线程,它是一种协作机制,

是给线程传递一个取消信号,但是由线程线程决定如何以及何时退出。每个线程都有一个标志位,表示该线程是否被中断了。

//返回线程的中断标志位
public boolean isInterrupted()
//中断对应的线程
public void interrupt()
//该方法是静态方法,实际会调用Thread.currentThread()操作当前线程,返回标志位,并清空
public static boolean interrupted()

 二)线程中断的反应

interrupt()对线程的影响与线程的状态和正在进行的IO操作有关,

我们主要讨论线程状态:

1)RUNNABLE:线程正在运行或者具备运行的条件只是在等待操作系统调度;

2)WAITING/TIME_WAITING:线程在等待某个条件或者超时;

3)BLOCKED:线程在等待锁,试图进入同步块

4)NEW/TERMINATED:线程还未启动或者已经结束。

1.RUNNABLE 

如果线程在运行中,且没有执行IO操作,interrupt()只是会设置线程的中断标致位,

没有任何其他作用。线程应该在运行过程中合适的位置检查中断标志位,例如:

public class InterruptRunnableDemo extends Thread {
    @Override
    public void run() {
        while(!Thread.currentThread().isInterrupted()) {
        }
        System.out.println("done ");
    }
}

2.WAITING/TIME_WAITING 

线程调用join/wait/sleep方法会进入WAITING/TIME_WAITING状态,

在这些状态时,对线程对象调用interrupt会使线程抛出InterruptedException。

抛出异常后,中断标志位会被清空,而不是设置。比如:

public static void main(String[] args) {
    Thread thread = new Thread(() -> {
        try {
            Thread.sleep(1000);
        } catch (Exception e) {
            System.out.println(Thread.currentThread().isInterrupted()); //false
        }
    });
    thread.start();
    try {
        Thread.sleep(100);
    } catch (Exception e) {}
    thread.interrupt();
}

捕获InterruptedException,通常表示希望线程结束,线程大致有两种处理方式:

1)向上传递该异常,这使得该方法也变成一个可中断方法,需要调用者进行处理;

2)有些情况不能向上传递异常,比如Thread的run方法,它的声明是固定的,不能

抛出受检异常,这时,应该捕获异常,进行合理的清理操作,清理后,一般应该调

用Thread的interrupt方法设置中断标志位,使得其他代码有方法知道它发生中断。

3.BLOCKED 

如果一个线程在等待锁,对线程调用interrupt只是会设置中断标志位,线程

仍然会处于BLOCKED状态。interrupt并不能使一个正在等待锁的线程正真“中断”。

public class InterruptSynchronizedDemo {
    private static Object lock = new Object();
    private static class MyThread extends Thread {
        @Override
        public void run() {
            synchronized (lock) {
                System.out.println("Coming in "); //never
                while (!Thread.currentThread().isInterrupted()) {
                }
            }
            System.out.println("exit");//never
        }
    }
    public static void test() throws InterruptedException {
        synchronized (lock) {
            MyThread thread = new MyThread();
            thread.start();
            Thread.sleep(1000);
            thread.interrupt();
            thread.join();
        }
    }
    public static void main(String[] args) {
        //test方法在持有锁的情况下启动线程thread,而线程thread
        //也尝试获得锁,所以会进入等待队列。
        try {
            test();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

在使用synchronized关键字获取锁的过程中不响应中断请求,这是synchronized的局限性。

4.NEW/TERMINATE

调用interrupt无任何效果。

三)如何正确地取消/关闭线程

interrupt方法不一定会真正中断线程,它只是一种协作机制。

对于以线程提供服务的程序模块而言,它应该封装取消/关闭操作,提供

单独的取消/关闭方法给调用者。

原文地址:https://www.cnblogs.com/Shadowplay/p/10035987.html