JUC

一、volatile关键字与内存可见性

首先看一段代码

public class VolatileTest {
    public static void main(String[] args) {
        ThreadDemp td = new ThreadDemp();
        new Thread(td).start();
        while(true){
            if(td.isFlag()){
                System.out.println("--------------");
                break;
            }
        }
    }
}

class ThreadDemp implements Runnable{
    private boolean flag = false;
    @Override
    public void run() {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        flag = true;
        System.out.println("flag="+flag);
    }

    public boolean isFlag(){
        return flag;
    }
}

正常来说,在td线程运行后将flag变为true后,程序结束。然而运行后会发现,程序确实打印出来flag=true这条语句,但是却不能结束。

这是因为JVM会为每一线程都分配一个独立的缓存。flag是主存当中的线程共享数据,在它分别被加载到td线程和main线程中后,td线程会对其进行更改再把结果写回主存。但是,对于main线程来讲,因为td线程进行了延迟,所以main线程获取的flag一定是false,又因为main线程中while(true)循环运行效率非常快,以至于main线程无暇再从主存读取flag,因此flag的更改也对其无影响,因此程序无法退出。

内存可见性问题是,当多个线程操作共享数据时,彼此不可见。

要解决这个问题,一个方法就是加锁,但是加锁会导致效率降低。另一个方法就是使用volatile关键字,使得数据是可见的。也可以理解为线程对volatile修饰的数据的读与写都是直接在主存中进行的。

相较于synchronized,volatile是一种轻量级的同步策略。但:

  • volatile不具有“互斥性”
  • volatile不能保证变量的“原子性”

二、原子变量与CAS算法

i++的原子性问题,在底层i++其实相当于

int temp = i;
i = i + 1;
i = temp;

先阅读下面代码

public class AtomicTest {
    public static void main(String[] args) {
        AtomicDemo ad = new AtomicDemo();
        for (int i = 0; i < 10; ++i) {
            new Thread(ad).start();
        }

    }
}

class AtomicDemo implements Runnable {
    private volatile int serialNumber = 0;

    @Override
    public void run() {
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println(getSerialNumber());
    }

    public int getSerialNumber() {
        return serialNumber++;
    }
}

运行的结果为

2
0
0
1
3
4
5
6
7
8

因为i++相当于三步操作,所以volatile只能保证这三步操作都在主存中进行,但是每个线程可以执行不同的步骤,所以volatile不能保证原子性。

原子变量

为了解决上述问题,使用原子变量。

在jdk1.5后java.util.concurrent.atomic包下提供了常用的原子变量:

  • 使用volatile保证内存可见性
  • CAS(Compare-And-Swap)算法保证数据的原子性。CAS算法是硬件对并发操作共享数据的支持。CAS包括了三个操作数:内存值V,预估值A,更新值B,当且仅当V==A时,V=B,否则,不进行任何操作。

三、同步容器类ConcurrentHashMap

ConcurrentHashMap采用“锁分段机制”。

我们知道HashMap与HashTable的不同就在于HashTable是线程安全的,原因就是HashTable使用了锁,但是HashTable是全表锁,一旦对其中一个内容进行操作,整张表全被都被锁,效率非常低。

而ConcurrentHashMap默认将表分为16段,每段都有独立的锁。不同线程访问不同段是不会相互干扰的。

java.util.concurrent包中提供了用于多线程上下文中的Collection实现:ConcurrentHashMap、ConcurrentSkipListMap、ConCurrentSkipSet、CopyOnWriteArrayList和CopyOnWriteArraySet。当期望许多线程访问一个给定collection时,ConcurrentHashMap通常优于同步的HashMap,ConcurentSkipListMap通常优于同步的TreeMap。当期望的读数和遍历源源大于列表的更新数时,copyOnWriteArrayList优于同步的ArrayList。

四、CountDownLatch闭锁

CountDownLatch是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。

闭锁可以延迟线程的进度知道其达到终止状态,闭锁可以用来确保某些活动知道其他活动都完成才继续执行:

  • 确保某个计算在其需要的所有资源都被初始化之后才继续执行;
  • 确保某个服务在其依赖的所有其他服务器都已经启动之后才启动;
  • 等待直到某个操作所有参与者都准备就绪在继续执行。
public class CountDownLatchTest {
    public static void main(String[] args) {
        CountDownLatch count = new CountDownLatch(5);
        LatchDemo ld = new LatchDemo(count);
        long start = System.currentTimeMillis();
        for(int i = 0; i < 5; i++){
            new Thread(ld).start();
        }

        try {
            //阻塞等待,直到count的数目为0
            count.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //得出所有线程执行完毕的总时间
        long end = System.currentTimeMillis();
        System.out.println("time="+(end-start));
    }
}

class LatchDemo implements Runnable{
    private CountDownLatch count;

    public LatchDemo(CountDownLatch count) {
        this.count = count;
    }

    @Override
    public void run() {
        synchronized(this){
            for(int i = 0; i < 50000; ++i){
                if(i % 2 == 0){
                    System.out.println(i);
                }
            }
            //每执行一次,数量减一
            count.countDown();
        }
    }
}

五、实现Callable接口

创建线程一共有四种方式,继承Thread类,实现Runnable接口、实现Callable接口、通过线程池。

Callable相较于Runnable接口,方法可以有返回值,并且可以抛出异常。

执行Callable方式,需要FutureTask实现类的支持,用于接收运算结果。FutureTask是Future接口的实现类。

public class CallableTest {
    public static void main(String[] args) {
        CallableDemo callableDemo = new CallableDemo();
        FutureTask<Integer> result = new FutureTask<>(callableDemo);

        new Thread(result).start();

        try {
            Integer sum = result.get();
            System.out.println(sum);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

class CallableDemo implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        int sum = 0;
        for (int i = 0; i < 100; i++) {
            sum += i;
        }
        return sum;
    }
}

FutureTask的get方法可以获取到call()方法的返回值,并且此方法要等线程结束才能获取返回值,故也可实现闭锁。

六、同步锁Lock

用于解决多线程安全问题,jdk1.5以后提供的一种方法。synchronized是隐式锁,而Lock是显示锁,需要通过lock()方法上锁,通过unlock()方法解锁

使用condition进行线程间的通信

Condition condition = lock.newCondition();

condition有方法await()、signal()和signalAll()作用与wait、notify和notifyAll相同。

线程按序交替执行

三个线程按照指定的顺序执行

public class ABCAternateTest {
    public static void main(String[] args) {
        AlternateDemp ad = new AlternateDemp();

        new Thread(new Runnable() {
            @Override
            public void run() {
                for(int i=0;i<20;++i){
                    ad.loopA(i);
                }

            }
        },"A").start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                for(int i=0;i<20;++i){
                    ad.loopB(i);
                }
            }
        },"B").start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                for(int i=0;i<20;++i){
                    ad.loopC(i);
                }
            }
        },"C").start();
    }
}

class AlternateDemp{
    //标识
    private int num = 1;

    private Lock lock = new ReentrantLock();
    private Condition condition1 = lock.newCondition();
    private Condition condition2 = lock.newCondition();
    private Condition condition3 = lock.newCondition();

    public void loopA(int totalLoop){
        lock.lock();
        try {
            if (num != 1){
                condition1.await();
            }
            for (int i = 0; i < 1; ++i){
                System.out.println(Thread.currentThread().getName()+"	"+i+"	"+totalLoop);
            }
            num = 2;
            condition2.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }

    }
    public void loopB(int totalLoop){
        lock.lock();
        try {
            if (num != 2){
                condition2.await();
            }
            for (int i = 0; i < 1; ++i){
                System.out.println(Thread.currentThread().getName()+"	"+i+"	"+totalLoop);
            }
            num = 3;
            condition3.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }

    }
    public void loopC(int totalLoop){
        lock.lock();
        try {
            if (num != 3){
                condition3.await();
            }
            for (int i = 0; i < 1; ++i){
                System.out.println(Thread.currentThread().getName()+"	"+i+"	"+totalLoop);
            }
            num = 1;
            condition1.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }

    }

}

七、读写锁ReadWriteLock

ReadWriteLock内部维护两个锁,一个读锁,一个写锁。读锁可以同时被多个读线程持有,而写锁一次只能由一个线程持有。

写与写互斥,读与写也互斥,只有读操作可以同时进行

public class ReadWriteLockTest {
    public static void main(String[] args) {
        ReadWriteLockDemo demo = new ReadWriteLockDemo();

        new Thread(new Runnable() {
            @Override
            public void run() {
                demo.set((int) (Math.random()*101));
            }
        },"Write").start();
        for(int i=0;i<100;++i){
            new Thread(new Runnable() {
                @Override
                public void run() {
                    demo.get();
                }
            }).start();
        }

    }
}

class ReadWriteLockDemo {
    private int num = 0;
    private ReadWriteLock lock = new ReentrantReadWriteLock();


    public void get() {
        lock.readLock().lock();
        try {
            System.out.println(Thread.currentThread().getName() +"	"+ num);
        } finally {
            lock.readLock().unlock();
        }
    }

    public void set(int num){
        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        lock.writeLock().lock();
        try {
            System.out.println(Thread.currentThread().getName());
            this.num = num;
        } finally {
            lock.writeLock().unlock();
        }
    }
}

八、线程八锁

  1. 非静态方法的锁默认为this,静态方法的锁为对应的Class实例
  2. 某一个时刻内,只能有一个线程持有锁,无论几个方法。

九、线程池

提供了一个线程队列,队列中保存着所有等待状态的线程,避免了创建与销毁的额外开销,提高了响应速度。

线程池的体系结构

java.util.concurrent.Executor:负责线程的使用与调度的根接口

  • ExecutorService子接口:线程池的主要接口
    • ThreadPoolExecutor实现类。
    • ScheduledExecutorService子接口:负责线程的调度。
      • ScheduledThreadPoolExecutor:继承了ThreadPoolExecutor,实现了ScheduledExecutorService接口

工具类:Executors

  • ExecutorService newFixedThreadPool():创建固定大小的线程池
  • ExecutorService newCacheThreadPool():缓存线程池,线程池的数量不固定,可以根据需求自动的更改数量
  • ExecutorService newSingleThreadExecutor():创建单个线程池。线程池中只有一个线程。

ScheduledExecutorService newScheduledThreadPool():创建固定大小的线程,可以延迟或定时的执行任务。

public class ThreadPoolTest {
    public static void main(String[] args) {
        //1.创建线程池
        ExecutorService pool = Executors.newFixedThreadPool(5);
        ThreadDemo td = new ThreadDemo();

        //2.分配任务
        for (int i=0;i<10;++i){
            pool.submit(td);
        }
        //3.关闭线程池
        pool.shutdown();
    }
}

class ThreadDemo implements Runnable{
    private int num = 0;
    @Override
    public void run() {
        while(num < 100) {
            System.out.println(Thread.currentThread().getName()+"	"+ ++num);
        }
    }
}

十、ForkJoinPool分支合并框架

Fork/Join框架:就是在必要的情况下,将一个大人物,进行拆分(fork)成若干个小任务,再将一个个的小任务运算的结果进行join汇总。

public class ForkJoinTest {
    public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool();
        ForkJoinTask<Long> task = new ForkJoinSumCalculate(0L,100000000L);
        Long sum = pool.invoke(task);
        System.out.println(sum);
    }
}

class ForkJoinSumCalculate extends RecursiveTask<Long> {
    private long start;
    private long end;

    private static final long THURSHOLD = 100000L;

    public ForkJoinSumCalculate(long start, long end) {
        this.start = start;
        this.end = end;
    }


    @Override
    protected Long compute() {
        long length = end - start;
        //一次只计算10000个数
        if (length <= THURSHOLD) {
            long sum = 0;

            for (long i = 0; i <= end; ++i) {
                sum += i;
            }
            return sum;
        } else {
            //拆分
            long middle = (start + end) / 2;
            ForkJoinSumCalculate left = new ForkJoinSumCalculate(start, middle);
            left.fork();//进行拆分,同时压入线程队列
            ForkJoinSumCalculate right = new ForkJoinSumCalculate(middle + 1, end);
            right.fork();
            //合并
            return left.join() + right.join();
        }
    }
}
原文地址:https://www.cnblogs.com/ylcc-zyq/p/12811684.html