JUC并发学习笔记

JUC并发

什么是JUC

JUC就是java.util .concurrent工具包的简称。这是一个处理线程的工具包,JDK 1.5开始出现的。

业务:普通的线程代码,Thread。

Runable 没有返回值,效率比 Callable 低。

进程和线程

进程:操作系统中运行的程序就是线程,一个进程可以包含多个线程,是系统资源分配的的单位。Java默认线程:main线程,GC线程。

线程:系统运算调度的单位,是进程中实际的运作单位。

Java可以开启线程吗?

Thread部分源码:

public synchronized void start() {
   
    if (threadStatus != 0)
        throw new IllegalThreadStateException();

    group.add(this);

    boolean started = false;
    try {
        start0();
        started = true;
    } finally {
        try {
            if (!started) {
                group.threadStartFailed(this);
            }
        } catch (Throwable ignore) {
            /* do nothing. If start0 threw a Throwable then
              it will be passed up the call stack */
        }
    }
}

private native void start0();

Java是不能开启线程的,start调用了本地方法,底层是C++。

并发,并行

并发:多个线程操作一个资源,不是同时执行的(单核,模拟多个线程,快速交替)

并行:多个线程同时执行(多核下)

package com.zr.demo1;

public class Test01 {
    public static void main(String[] args) {
        //获取cpu核心数
        System.out.println(Runtime.getRuntime().availableProcessors());
    }
}

并发编程的本质:充分利用cpu的资源。

线程的状态

线程的六个状态:可从源码中得到

Thread部分源码

public enum State {
  
    //创建
    NEW,

   //运行
    RUNNABLE,

   //阻塞
    BLOCKED,

    //等待
    WAITING,

    //超时等待
    TIMED_WAITING,

    //终止
    TERMINATED;
}

wait/sleep的区别

wait:Object类下的。会释放锁。只能在同步代码块中使用。不需要捕获异常。

sleep:Thread类下的。不会释放锁。可以在任何地方使用。必须要捕获异常。

Lock(锁)

lock

package com.zr.demo1;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class SaleTicketDemo02 {
    public static void main(String[] args) {

        Ticket2 ticket2 = new Ticket2();

        new Thread(()->{ for (int i = 0; i < 40; i++) ticket2.sale(); },"A").start();

        new Thread(()->{ for (int i = 0; i < 40; i++) ticket2.sale(); },"B").start();

        new Thread(()->{ for (int i = 0; i < 40; i++) ticket2.sale(); },"C").start();
    }
}
//资源类  OOP
class Ticket2{
    //属性  方法
    private int number = 30;

    Lock lock = new ReentrantLock();

    public void sale(){

        lock.lock();  //加锁

        try {  //业务代码
            if (number>0){
                System.out.println(Thread.currentThread().getName()+"卖出了第"+(number--)+"张票,剩余"+number+"张");
            }
        } catch (Exception e) {
            lock.unlock();  //解锁
        }
    }
}

Synchronized / Lock区别

  1. Synchronized 是Java关键字,Lock是Java一个类。
  2. Synchronized 无法获取锁的状态,Lock可以判断释放获取到了锁。
  3. Synchronized 会自动释放锁,Lock必须手动释放锁,如果不释放锁,死锁。
  4. Synchronized 阻塞会死等,Lock就不一定。
  5. Synchronized 可重入锁,不可以中断,非公平锁。Lock 可重入锁,可以判断锁,默认非公平锁(可以自己设置)。
  6. Synchronized 适合少量的代码同步问题。Lock 适合大量的代码同步代码。

生产者消费者问题

package com.zr.pc;
//生产者,消费者

import javax.activation.DataHandler;

/**
 * 线程之间的通信问题,生产者消费者问题,等待唤醒,通知唤醒
 * 线程A,B操作同一个变量
 */
public class Test1 {
    public static void main(String[] args) {
        Data data = new Data();
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"A").start();
        
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"B").start();
    }
}

class Data{
    private int number = 0;

    //+1
    public synchronized  void increment() throws InterruptedException {
        if (number!=0){
            this.wait();  //等待
        }
        number++;
        System.out.println(Thread.currentThread().getName()+"===>"+number);
        this.notifyAll();  //通知
    }

    //-1
    public synchronized  void decrement() throws InterruptedException {
        if (number==0){
            this.wait();  //等待
        }
        number--;
        System.out.println(Thread.currentThread().getName()+"===>"+number);
        this.notifyAll();  //通知
    }
}

如果再增加 两个线程C,D(一个增加,一个减少)。此时测试会发现不安全。 会同时唤醒两个等待的线程,同时加1或者减1。

虚假唤醒,lang包下的Object类下的wait方法(API)

应改为: if改为while防止虚假唤醒

package com.zr.pc;
//生产者,消费者

import javax.activation.DataHandler;

/**
 * 线程之间的通信问题,生产者消费者问题,等待唤醒,通知唤醒
 * 线程A,B操作同一个变量
 */
public class Test1 {
    public static void main(String[] args) {
        Data data = new Data();
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"A").start();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"B").start();
        
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"C").start();
        
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"D").start();
    }
}

class Data{
    private int number = 0;

    //+1
    public synchronized  void increment() throws InterruptedException {
        while (number!=0){
            this.wait();  //等待
        }
        number++;
        System.out.println(Thread.currentThread().getName()+"===>"+number);
        this.notifyAll();  //通知
    }

    //-1
    public synchronized  void decrement() throws InterruptedException {
        while (number==0){
            this.wait();  //等待
        }
        number--;
        System.out.println(Thread.currentThread().getName()+"===>"+number);
        this.notifyAll();  //通知
    }
}

JUC版本的生产者消费者问题

代码实现:

package com.zr.pc;
//生产者,消费者

import javax.activation.DataHandler;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 线程之间的通信问题,生产者消费者问题,等待唤醒,通知唤醒
 * 线程A,B操作同一个变量
 */
public class Test2 {
    public static void main(String[] args) {
        Data2 data = new Data2();
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"A").start();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"B").start();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"C").start();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"D").start();
    }
}

class Data2{
    private int number = 0;

    Lock lock = new ReentrantLock();
    Condition condition = lock.newCondition();
    //condition.await();  //等待
    //condition.signalAll();  //唤醒全部

    //+1
    public void increment() throws InterruptedException {

        lock.lock();
        try {
            while (number!=0){
                condition.await();  //等待
            }
            number++;
            System.out.println(Thread.currentThread().getName()+"===>"+number);
            condition.signalAll();  //通知
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    //-1
    public void decrement() throws InterruptedException {
        lock.lock();
        try {
            while (number==0){
                condition.await();  //等待
            }
            number--;
            System.out.println(Thread.currentThread().getName()+"===>"+number);
            condition.signalAll();  //通知
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

结果是随机的状态:

此时Condition的作用就体现出来了,可以精准的通知和唤醒线程。

测试:

package com.zr.pc;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author Zhour  813794474@qq.com
 * A调B,B调C,C调A
 */
public class Test3 {
    public static void main(String[] args) {
        Data3 data = new Data3();
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                data.printA();
            }
        },"A").start();
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                data.printB();
            }
        },"B").start();
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
               data.printC();
            }
        },"C").start();
    }
}

class Data3{  //资源类 lock

    private Lock lock = new ReentrantLock();
    Condition condition1 = lock.newCondition();
    Condition condition2 = lock.newCondition();
    Condition condition3 = lock.newCondition();
    int num = 1;

    public void printA(){
        lock.lock();
        try {
            //业务
            while (num!=1){
                condition1.await();
            }
            System.out.println(Thread.currentThread().getName() + "==>AAAAA");
            num = 2;
            condition2.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
    public void printB(){
        lock.lock();
        try {
            //业务
            while (num!=2){
                condition2.await();
            }
            System.out.println(Thread.currentThread().getName()+"==>BBBBB");
            num = 3;
            condition3.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
    public void printC(){
        lock.lock();
        try {
            //业务
            while (num!=3){
                condition3.await();
            }
            System.out.println(Thread.currentThread().getName()+"==>CCCCC");
            num = 1;
            condition1.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

测试结果:

八锁现象

如何判断锁的是谁。

package com.zr.lock8;

import java.util.concurrent.TimeUnit;

public class Test1 {
    public static void main(String[] args) throws InterruptedException {
        Phone phone = new Phone();
        new Thread(()->{
            phone.sendMassage();
        },"A").start();

        TimeUnit.SECONDS.sleep(1);

        new Thread(()->{
            phone.call();
        },"B").start();
    }
}

class Phone{
    //synchronized 锁的对象是方法的调用者
    //两个方法是一把锁,谁先拿到谁先执行
    public synchronized void sendMassage(){
        System.out.println("发短信");
    }
    public synchronized void call(){
        System.out.println("打电话");
    }
}
package com.zr.lock8;

import java.util.concurrent.TimeUnit;

/**
 *没有锁的方法不受影响
 */
public class Test2 {
    public static void main(String[] args) throws InterruptedException {
        //现在有两把锁
        Phone2 phone = new Phone2();
        Phone2 phone1 = new Phone2();
        new Thread(()->{
            phone.sendMassage();
        },"A").start();

        TimeUnit.SECONDS.sleep(1);

        new Thread(()->{
            phone1.call();
        },"B").start();


        new Thread(()->{
            phone.hello();
        },"B").start();
    }
}

class Phone2{
    //synchronized 锁的对象是方法的调用者
    //两个方法是一把锁,谁先拿到谁先执行
    public synchronized void sendMassage(){
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("发短信");
    }
    public synchronized void call(){
        System.out.println("打电话");
    }
    //这里没有锁,不受锁的影响
    public void hello(){
        System.out.println("hello");
    }
}
package com.zr.lock8;

import java.util.concurrent.TimeUnit;

/**
 * 静态的同步方法,锁的是phone.class!!!
 */

public class Test3 {
    public static void main(String[] args) throws InterruptedException {

        //两个对象的class类模板只有一个,两个锁都是锁的class
        Phone3 phone = new Phone3();
        Phone3 phone1 = new Phone3();

        new Thread(()->{
            phone.sendMassage();
        },"A").start();

        TimeUnit.SECONDS.sleep(1);

        new Thread(()->{
            phone1.call();
        },"B").start();

    }
}

class Phone3{
    //synchronized 锁的对象是方法的调用者
    //两个方法是一把锁,谁先拿到谁先执行
    //static 类一加载就有了 锁的是class
    public static synchronized void sendMassage(){
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("发短信");
    }
    public static synchronized void call(){
        System.out.println("打电话");
    }
}
package com.zr.lock8;

import java.util.concurrent.TimeUnit;

/**
 * 一个静态同步方法,一个普通同步方法
 */
public class Test4 {
    public static void main(String[] args) throws InterruptedException {

        Phone4 phone = new Phone4();

        new Thread(()->{
            phone.sendMassage();
        },"A").start();

        TimeUnit.SECONDS.sleep(1);

        new Thread(()->{
            phone.call();
        },"B").start();

    }
}

class Phone4{
    //synchronized 锁的对象是方法的调用者
    //两个方法是一把锁,谁先拿到谁先执行
    //static 类一加载就有了 锁的是class
    //静态同步方法  锁class类模板
    public static synchronized void sendMassage(){
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("发短信");
    }
    //普通同步方法  锁的是调用者
    public synchronized void call(){
        System.out.println("打电话");
    }
}

小结

new this 具体的一个实例

static class 唯一的一个模板

不安全集合

ArrayList多线程下 add 方法不安全。

CopyOnWriteArrayList(安全)

package com.zr.unsafe;

import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;

//java.util.ConcurrentModificationException  使用ArrayList并发修改异常
public class ListTest {
    public static void main(String[] args) {
        //并发下ArrayList是不安全的
        /**
         * 解决方案
         * 1.List<String> arrayList = new Vector(); 底层synchronized
         * 2.List<String> arrayList = Collections.synchronizedList(new ArrayList<>());
         * 3.List<String> arrayList = new CopyOnWriteArrayList();
         */
        //CopyOnWrite 写入并复制  底层 lock
        
        List<String> arrayList = new CopyOnWriteArrayList();
        
        for (int i = 1; i <= 10; i++) {
            new Thread(()->{
                arrayList.add(UUID.randomUUID().toString().substring(0,5));
                System.out.println(arrayList);
            },String.valueOf(i)).start();
        }
    }
}

HashSet不安全。

CopyOnWriteArraySet(安全)

package com.zr.unsafe;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArraySet;

public class SetTest {
    public static void main(String[] args) {
        // Set<String> set = new HashSet();  不安全
        // 解决
        //1.Set set = Collections.synchronizedSet(new HashSet());
        //2. Set set = new CopyOnWriteArraySet();

        Set set = new CopyOnWriteArraySet();
        for (int i = 0; i < 20; i++) {
            new Thread(()->{
                set.add(UUID.randomUUID().toString().substring(0,5));
                System.out.println(Thread.currentThread().getName()+set);
            },String.valueOf(i)).start();
        }
    }
}

HashSet底层,就是HashMap

public HashSet() {
    map = new HashMap<>();
}
//add  key是无法重复的
 public boolean add(E e) {
        return map.put(e, PRESENT)==null;
    }
  private static final Object PRESENT = new Object();  //常量

HashMap不安全。

回顾map

实现:

package com.zr.unsafe;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

public class MapTest {
    public static void main(String[] args) {
        //Map<String,Object> map = new HashMap();  不安全
        //解决线程不安全
        Map<String,Object> map = new ConcurrentHashMap();

        for (int i = 0; i < 20; i++) {
            new Thread(()->{
                map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0,5));
                System.out.println(map);
            },String.valueOf(i)).start();
        }
    }
}

Callable

  1. 可以有返回值
  2. 可以抛出异常
  3. 方法不同,call()

Runable

FutureTask

package com.zr.callable;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class CallableTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //new Thread().start(); 怎么启动callable

        myThread myThread = new myThread();
        FutureTask futureTask = new FutureTask(myThread);
        new Thread(futureTask,"A").start();
        new Thread(futureTask,"B").start();  //结果会被缓存

        //获取callable的返回结果
        String o = (String) futureTask.get();  //这个get方法可能会产生阻塞 放到最后一行或者使用异步通信
        System.out.println(o);
    }
}
class myThread implements Callable<String> {

    @Override
    public String call(){
        System.out.println("call");
        //如果是耗时的操作
        return "123";
    }
}

注意:

  1. 有缓存
  2. 结果可能需要等待,会阻塞

常用的辅助类(必会)

CountDownLatch

测试代码

package com.zr.add;

import java.util.concurrent.CountDownLatch;

//计数器
public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        //总数是 6  有必须要执行完的任务时使用
        CountDownLatch countDownLatch = new CountDownLatch(5);
        for (int i = 1; i <=5; i++) {
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+"go out");
                countDownLatch.countDown();  //计数器减一
            },String.valueOf(i)).start();
        }
        countDownLatch.await();  //等待计数器归0,再向下执行
        System.out.println("clsoe");
    }
}

原理:

countDownLatch.countDown():数量减一

countDownLatch.await():直到计数器归0后,才向下执行

CyclicBarrier

测试代码:(可以想象成加法计数器)

package com.zr.add;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
            System.out.println("集齐七个了");
        });
        for (int i = 1; i <= 7; i++) {
            final int temp = i;
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+"第"+temp+"个");
                try {
                    cyclicBarrier.await();  //等待
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

Semaphore

信号量

测试代码

package com.zr.add;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreDemo {
    public static void main(String[] args) {
        //停车位 限流 
        Semaphore semaphore = new Semaphore(3);
        for (int i = 1; i <= 6; i++) {
            new Thread(()->{
                try {
                    semaphore.acquire();  //获取
                    System.out.println(Thread.currentThread().getName()+"抢到车位");
                    TimeUnit.SECONDS.sleep(2);
                    System.out.println(Thread.currentThread().getName()+"离开车位");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    semaphore.release();  //释放
                }
            },String.valueOf(i)).start();
        }
    }
}

原理:

semaphore.acquire(); 获取,假如满了,就等待释放后再执行。

semaphore.release(); 释放,将得到的信号量释放。

共享资源互斥的时候使用,并发限流,控制最大线程数。

读写锁

测试代码

package com.zr.rw;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * ReadWriteLock
 * 独占锁(写锁)一次只能被一个线程占有
 * 共享锁(读锁)多个线程可以同时占有
 */
public class ReadWriteLockDemo {
    public static void main(String[] args) {
        MyCacheLock myCache = new MyCacheLock();
        //写入
        for (int i = 0; i < 5; i++) {
            final int temp = i;
            new Thread(()->{
                myCache.put(temp+"",temp+"");
            },String.valueOf(i)).start();
        }
        //读取
        for (int i = 0; i < 5; i++) {
            final int temp = i;
            new Thread(()->{
                myCache.get(temp+"");
            },String.valueOf(i)).start();
        }
    }
}
//加锁
class MyCacheLock{
    private volatile Map<String,Object> map = new HashMap<>();
    //读写锁 更加细粒度的控制
    private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    //存
    public void put(String key,Object value){
        readWriteLock.writeLock().lock();
        try {
            System.out.println(Thread.currentThread().getName()+"写入"+key);
            map.put(key,value);
            System.out.println(Thread.currentThread().getName()+"写入ok");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            readWriteLock.writeLock().unlock();
        }
    }
    //取
    public void get(String key){
        readWriteLock.readLock().lock();
        try {
            System.out.println(Thread.currentThread().getName()+"读取"+key);
            Object o = map.get(key);
            System.out.println(Thread.currentThread().getName()+"读取的key是"+o);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            readWriteLock.readLock().unlock();
        }
    }
}
/**
 * 自定义缓存
 */
class MyCache{
    private volatile Map<String,Object> map = new HashMap<>();
    //存
    public void put(String key,Object value){
        System.out.println(Thread.currentThread().getName()+"写入"+key);
        map.put(key,value);
        System.out.println(Thread.currentThread().getName()+"写入ok");

    }
    //取
    public void get(String key){
        System.out.println(Thread.currentThread().getName()+"读取"+key);
        Object o = map.get(key);
        System.out.println(Thread.currentThread().getName()+"读取的key是"+o);
    }
}

阻塞队列

BlockingQueue

阻塞队列:多线程并发处理,线程池。

四组API

方式 抛出异常 不抛出异常,有返回值 阻塞等待 超时等待
添加 add offer put offer
移除 remove poll take poll
判断队列首 element peek

测试一:

package com.zr.queue;

import java.util.Collections;
import java.util.concurrent.ArrayBlockingQueue;

public class Test {
    public static void main(String[] args) {
        test1();
    }
    /**
     * 抛出异常
     */
    public static void test1(){
        //队列的大小
        ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<>(3);

        System.out.println(queue.add("a"));
        System.out.println(queue.add("b"));
        System.out.println(queue.add("c"));
         System.out.println(queue.element());  //队首元素

        //抛出异常 java.lang.IllegalStateException: Queue full
        //System.out.println(queue.add("c"));

        System.out.println(queue.remove());  //FIFO
        System.out.println(queue.remove());
        System.out.println(queue.remove());
    }
}

测试二:

 /**
     * 不怕抛出异常  有返回值
     */
    public static void test2(){
        ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<>(3);

        System.out.println(queue.offer("a"));
        System.out.println(queue.offer("b"));
        System.out.println(queue.offer("c"));
        System.out.println(queue.offer("d"));  //返回布尔值  不抛异常
		System.out.println(queue.peek());  //队首元素
        
        System.out.println("===============");
        System.out.println(queue.poll());
        System.out.println(queue.poll());
        System.out.println(queue.poll());
        System.out.println(queue.poll());  //返回null 不抛出异常
    }

测试三:

/**
 * 等待,阻塞(一直阻塞)
 */
public static void test3() throws InterruptedException {
    //队列的大小
    ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<>(3);

    queue.put("a");
    queue.put("b");
    queue.put("c");
    //queue.put("d");  //队列没有位置了  会一直阻塞

    System.out.println(queue.take());
    System.out.println(queue.take());
    System.out.println(queue.take());
    // System.out.println(queue.take());  //没有这个元素 也会一直阻塞

}

测试四:

/**
 *等待超时(过时不候)
 */
public static void test4() throws InterruptedException {
    //队列的大小
    ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<>(3);

    System.out.println(queue.offer("a"));
    System.out.println(queue.offer("b"));
    System.out.println(queue.offer("c"));
    System.out.println(queue.offer("d",2, TimeUnit.SECONDS));  //超时退出
    System.out.println("==========");
    System.out.println(queue.poll());
    System.out.println(queue.poll());
    System.out.println(queue.poll());
    System.out.println(queue.poll(2,TimeUnit.SECONDS));
}

SynchronousQueue

同步队列,没有容量,进去一个元素必须取出来,才能再放一个元素。

package com.zr.queue;

import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

/**
 * 同步队列
 * SynchronousQueue不存储元素 put进去一个值 必须toke出来 才能在put
 */
public class SynchronousQueueDemo {
    public static void main(String[] args) {
        SynchronousQueue<String> blockingQueue = new SynchronousQueue<>();
        new Thread(()->{

            try {
                System.out.println(Thread.currentThread().getName()+"put 1");
                blockingQueue.put("1");
                System.out.println(Thread.currentThread().getName()+"put 2");
                blockingQueue.put("2");
                System.out.println(Thread.currentThread().getName()+"put 3");
                blockingQueue.put("3");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"T1").start();
        new Thread(()->{
            try {
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName()+"-->"+blockingQueue.take());
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName()+"-->"+blockingQueue.take());
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName()+"-->"+blockingQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"T2").start();
    }
}

线程池(重点)

池化技术!

程序的运行,本质会占用系统的资源!优化资源的使用,统一管理分配--->池化技术 !

线程池,连接池,内存池,对象池....

线程池的好处:

  1. 降低资源的消耗
  2. 提高响应的速度,创建和销毁十分浪费资源
  3. 方便统一管理

线程可以复用,控制最大并发数,统一管理!

线程池:三大方法,七大参数,四种拒绝策略!(可参考阿里巴巴开发手册)

测试代码

package com.zr.pool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 *Executors 工具类 三大方法、
 */
public class Demo01 {
    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newSingleThreadExecutor();//单个线程
        // Executors.newFixedThreadPool(5);  //固定的大小
        //Executors.newCachedThreadPool();  //可伸缩的

        try {
            for (int i = 0; i < 10; i++) {
                //使用了线程池之后要使用线程池来创建线程
                threadPool.execute(()->{
                    System.out.println(Thread.currentThread().getName()+":ok!");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //线程池使用完后要关机
            threadPool.shutdown();
        }
    }
}

七大参数

源码分析

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

本质:ThreadPoolExecutor

public ThreadPoolExecutor(int corePoolSize,  //核心线程池大小
                          int maximumPoolSize,  //最大核心线程池的大小
                          long keepAliveTime,  //最大存活时间(超时未调用释放)
                          TimeUnit unit,  //超时单位
                          BlockingQueue<Runnable> workQueue,  //阻塞队列
                          ThreadFactory threadFactory,  //线程工厂,一般不用动
                          RejectedExecutionHandler handler //拒绝策略) { 
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

四种拒绝策略

手动创建线程池

package com.zr.pool;

import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * new ThreadPoolExecutor.AbortPolicy() //不处理,队列满了 抛出异常
 * new ThreadPoolExecutor.CallerRunsPolicy()  //main线程执行
 * new ThreadPoolExecutor.DiscardPolicy() //队列满了 丢掉任务 不会抛出异常
 * new ThreadPoolExecutor.DiscardOldestPolicy());  //将最老的任务丢弃,尝试提交新的任务,不会跑异常
 */
public class Demo02 {
    //自定义创建线程池
    public static void main(String[] args) {
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                2,
                5,
                3,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.CallerRunsPolicy());  //将最老的任务丢弃,尝试提交新的任务,不会跑异常
        try {
            //最大承载 queue+max
            // 超出最大承载 使用拒绝处理
            for (int i = 0; i < 10; i++) {
                //使用了线程池之后要使用线程池来创建线程
                threadPool.execute(()->{
                    System.out.println(Thread.currentThread().getName()+":ok!");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //线程池使用完后要关机
            threadPool.shutdown();
        }
    }
}

最大线程该如何设置

CPU密集型:计算量大的,线程数量可设置为电脑核心数+1,Runtime.getRuntime().availableProcessors()获取电脑核心数。

IO密集型:读写操作非常多的,线程数量可设置为电脑核心数*2。

四大函数式接口

现在必须掌握的:lambda表达式,链式编程,函数式接口,Stream流式计算。

函数式接口:只有一个方法的接口。

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}
//很多@FunctionalInterface
//简化编程模型,新版本的框架底层大量应用
//foreach(消费者类型的函数式接口)

测试代码:Function

package com.zr.function;

import java.util.function.Function;

/**
 * function  函数型接口
 * 有一个输入参数,有一个输出
 * 只要是函数式接口,可以用lambda表达式简化
 */
public class Demo01 {
    public static void main(String[] args) {
        //工具类  输出出入的值
        // Function function = new Function<String,String>() {
        //     @Override
        //     public String apply(String str) {
        //         return str;
        //     }
        // };
        Function<String,String> function = (str)->{return str;};
        System.out.println(function.apply("123"));
    }
}

测试代码:断定型接口 Predicate

package com.zr.function;

import java.util.function.Predicate;

/**
 * 断定型接口,有一个输入值,返回值是 布尔值
 */
public class Demo02 {
    public static void main(String[] args) {
        //判断字符串是否为空
        // Predicate<String> predicate = new Predicate<String>() {
        //     @Override
        //     public boolean test(String str) {
        //         return str.isEmpty();
        //     }
        // };
        Predicate<String> predicate = (str)->{return str.isEmpty();};  //(str)的()可以省略
        System.out.println(predicate.test("asdf"));
    }
}

Consumer:消费型接口

测试代码:

package com.zr.function;

import java.util.function.Consumer;

/**
 * 消费型接口  只有输入 没有返回值
 */
public class Demo03 {
    public static void main(String[] args) {
        // Consumer<String> consumer = new Consumer<String>() {
        //     @Override
        //     public void accept(String str) {
        //         System.out.println(str);
        //     }
        // };
        Consumer<String> consumer = (str)->{
            System.out.println(str);
        };
        consumer.accept("abc");
    }
}

Supplier:供给型接口

测试代码:

package com.zr.function;

import java.util.function.Supplier;

/**
 * 供给型接口 只返回 不输入
 */
public class Demo04 {
    public static void main(String[] args) {
        // Supplier supplier = new Supplier<Integer>() {
        //     @Override
        //     public Integer get() {
        //         return 1024;
        //     }
        // };
        Supplier supplier = ()->{
            return 1024;
        };
        System.out.println(supplier.get());
    }
}

Stream流式计算

大数据:存储+计算

集合,数据库是来存储的。

计算都应该交给流来计算。

测试代码:User

package com.zr.stream;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class User {
    private int id;
    private String name;
    private int age;
}

Test

package com.zr.stream;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
 * 一行代码实现,现有五个用户,筛选
 * 1.id是偶数的
 * 2.年龄大于22
 * 3.用户名转化为大写字母
 * 4.用户名字母倒着排序
 * 5.只输出一个用户
 */
public class Test {
    public static void main(String[] args) {
        User u1 = new User(1,"a",20);
        User u2 = new User(2,"b",21);
        User u3 = new User(3,"c",22);
        User u4 = new User(4,"d",23);
        User u5 = new User(6,"e",24);
        //转化为list
        List<User> list = Arrays.asList(u1, u2, u3, u4, u5);
        //计算交给流处理
        // lambda表达式,链式编程,函数式接口,Stream流式计算
        list.stream()
                .filter(u->{return u.getId()%2==0;})
                .filter(u->{return u.getAge()>22;})
                .map(u->{return u.getName().toUpperCase();})
                .sorted((uu1,uu2)->{return uu2.compareTo(uu1);})
                .limit(1)
                .forEach(System.out::println);
    }
}

ForkJoin

分支合并

在1.7之后出来的,并行执行任务,提高效率的!大数据量的。

ForkJoin的特点:工作窃取。

维护的是双端队列,A线程任务执行完后会把B线程还未执行完的任务拿过来一部分执行。

ForkJoin操作:ForkJoinPool 通过它来执行

测试代码:

package com.zr.forkjoin;

import java.util.concurrent.RecursiveTask;

/**
 * 求和计算的任务
 * 使用 ForkJoinPool 来执行
 * 计算任务 forkJoinPool.execute(forkJoinTask task)
 */
public class ForkJoinDemo extends RecursiveTask<Long> {
    private long start;
    private long end;

    //临界值
    private long temp = 100000L;

    public ForkJoinDemo(long start, long end) {
        this.start = start;
        this.end = end;
    }

    //计算方法
    @Override
    protected Long compute() {
        long sum = 0;
        if ((end-start)<temp){
            for (long i = start; i < end; i++) {
                sum += i;
            }
            return sum;
        }else {  //forkjoin
            long mid = (start+end)/2;  //中间值
            ForkJoinDemo task1 = new ForkJoinDemo(start, mid);
            task1.fork();  //拆分任务,把线程压入线程队列
            ForkJoinDemo task2 = new ForkJoinDemo(mid+1, end);
            task2.fork();
            return task1.join() + task2.join();
        }
    }
}

test

package com.zr.forkjoin;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;

public class Test {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //test1();
        //test2();
        test3();
    }

    public static void test1(){
        long sum = 0;
        long start = System.currentTimeMillis();
        for (long i = 0; i < 10_0000_0000; i++) {
            sum+=i;
        }
        long end = System.currentTimeMillis();
        System.out.println("sum="+sum+"时间:"+(end-start));
    }
    //forkjoin
    public static void test2() throws ExecutionException, InterruptedException {
        long start = System.currentTimeMillis();
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinDemo task = new ForkJoinDemo(0, 10_0000_0000);
        ForkJoinTask<Long> submit = forkJoinPool.submit(task);
        Long sum = submit.get();
        long end = System.currentTimeMillis();
        System.out.println("sum="+sum+"时间:"+(end-start));
    }
    //流计算
    public static void test3(){

        long start = System.currentTimeMillis();

        long sum = LongStream.rangeClosed(0, 10_0000_0000).parallel().reduce(0, Long::sum);
        long end = System.currentTimeMillis();
        System.out.println("sum="+"时间:"+(end-start));
    }
}

异步回调

Future设计的初衷,是对将来某个时间的结果建模。

测试代码:

package com.zr.future;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
 * 异步调用 CompletableFuture
 * 异步执行
 * 成功回调
 * 失败回调
 */
public class Demo01 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //没有返回值的异步回调
        // CompletableFuture<Void> future = CompletableFuture.runAsync(()->{
        //     try {
        //         TimeUnit.SECONDS.sleep(2);
        //     } catch (InterruptedException e) {
        //         e.printStackTrace();
        //     }
        //     System.out.println(Thread.currentThread().getName()+"runAsync==>Void");
        // });
        // System.out.println("111");
        // future.get();

        //有返回值的异步回调
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getName()+"supplyAsync==>Integer");
            //int i = 10/0;
            return 1024;
        });
        System.out.println(future1.whenComplete((t,u)->{
            System.out.println("t=>"+t+"; u=>"+u);  //t 正常的返回结果 u 错误的信息
        }).exceptionally((e)->{
            System.out.println(e.getMessage());
            return 233;
        }).get());
    }
}

JMM

Volatile是Java虚拟机的轻量级同步机制。

  1. 保证可见性
  2. 不保证原子性
  3. 禁止指令重排

JMM:Java内存模型。不真实存在,概念,约定!

关于JMM的一些同步约定:

  1. 线程加锁前:必须把主存中变量最新值读取到工作内存中
  2. 线程解锁前:必须把共享变量立即同步到主存
  3. 加锁和解锁是同一把锁

如果线程A修改了值,线程B还在使用之前读到的值,此时就需要引入Volatile!

内存交互操作有8种,虚拟机实现必须保证每一个操作都是原子的,不可在分的(对于double和long类型的变量来说,load、store、read和write操作在某些平台上允许例外)

    • lock (锁定):作用于主内存的变量,把一个变量标识为线程独占状态
    • unlock (解锁):作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定
    • read (读取):作用于主内存变量,它把一个变量的值从主内存传输到线程的工作内存中,以便随后的load动作使用
    • load (载入):作用于工作内存的变量,它把read操作从主存中变量放入工作内存中
    • use (使用):作用于工作内存中的变量,它把工作内存中的变量传输给执行引擎,每当虚拟机遇到一个需要使用到变量的值,就会使用到这个指令
    • assign (赋值):作用于工作内存中的变量,它把一个从执行引擎中接受到的值放入工作内存的变量副本中
    • store (存储):作用于主内存中的变量,它把一个从工作内存中一个变量的值传送到主内存中,以便后续的write使用
    • write  (写入):作用于主内存中的变量,它把store操作从工作内存中得到的变量的值放入主内存的变量中

  JMM对这八种指令的使用,制定了如下规则:

    • 不允许read和load、store和write操作之一单独出现。即使用了read必须load,使用了store必须write
    • 不允许线程丢弃他最近的assign操作,即工作变量的数据改变了之后,必须告知主存
    • 不允许一个线程将没有assign的数据从工作内存同步回主内存
    • 一个新的变量必须在主内存中诞生,不允许工作内存直接使用一个未被初始化的变量。就是怼变量实施use、store操作之前,必须经过assign和load操作
    • 一个变量同一时间只有一个线程能对其进行lock。多次lock后,必须执行相同次数的unlock才能解锁
    • 如果对一个变量进行lock操作,会清空所有工作内存中此变量的值,在执行引擎使用这个变量前,必须重新load或assign操作初始化变量的值
    • 如果一个变量没有被lock,就不能对其进行unlock操作。也不能unlock一个被其他线程锁住的变量
    • 对一个变量进行unlock操作之前,必须把此变量同步回主内存

Volatile

保证可见性

package com.zr.testvolatile;

import java.sql.Time;
import java.util.concurrent.TimeUnit;

public class JMMDemo {
    //未加volatile,while会死循环
    private static volatile int num = 0;
    public static void main(String[] args) throws InterruptedException {  //main线程

        new Thread(()->{  //未加volatile前,线程1对主内存的变化是不知道的
            while (num == 0){
                
            }
        }).start();
        TimeUnit.SECONDS.sleep(1);
        num = 1;
        System.out.println(num);
    }
}

不保证原子性

原子性,不可分割

线程A在执行任务的时候是不能被打扰的,也不能变为分割,要么同时成功,要么同时失败。

package com.zr.testvolatile;
//不保证原子性
public class Demo02 {
    //volatile不保证原子性
    private volatile static int num = 0;
    public static void main(String[] args) {
        //理论上的值应该为20000
        for (int i = 1; i <= 20; i++) {
            new Thread(()->{
                for (int j = 1; j <= 1000; j++) {
                    add();
                }
            }).start();
        }
        while (Thread.activeCount()>2){
            Thread.yield();
        }
        System.out.println(num);
    }
    public static void add(){
        num++;
    }
}

如果不加 Lock 或者 Synchronized 怎么保证原子性。

cmd进入编译编译后的文件夹中,java -c Demo02.class可以查看字节码。可以看到num++不是原子性的操作。

使用原子类,解决原子性问题。

测试代码:AtomicInteger

package com.zr.testvolatile;

import java.util.concurrent.atomic.AtomicInteger;

//不保证原子性
public class Demo03 {
    //volatile不保证原子性
    //原子类的
    private volatile static AtomicInteger num = new AtomicInteger();

    public static void main(String[] args) {
        //理论上的值应该为20000
        for (int i = 1; i <= 20; i++) {
            new Thread(()->{
                for (int j = 1; j <= 1000; j++) {
                    add();
                }
            }).start();
        }
        while (Thread.activeCount()>2){
            Thread.yield();
        }
        System.out.println(num);
    }
    public synchronized static void add(){
        num.getAndIncrement();  //AtomicInteger的加 1 方法  CAS
    }
}

这些原子类的底层直接和操作系统挂钩!在内存中修改值。unsafe类是一个特殊的存在(CAS)!

指令重排:自己写的程序,计算机并不会按照程序写的顺序来执行。

源代码--->编译器优化重排--->指令并行也可能会重排--->内存系统也会重排--->执行

处理器在指令重排的时候会考虑数据之间的依赖性。

volatile可以避免指令重排:

内存屏障!cpu指令,作用:

  1. 保证特定操作的执行顺序
  2. 保证某些变量的内存可见性(volatile利用这个保证了可见性)

单例模式

饿汉式

package classes.com.zr.single;
//饿汉式单例
public class Hungry {
    //构造器私有
    private Hungry(){

    }
    private final static Hungry hungry = new Hungry();

    public static Hungry getHangry(){
        return hungry;
    }
}

懒汉式 :DCL懒汉式

package classes.com.zr.single;
//懒汉式单例
public class Lazy01 {
    private Lazy01(){
        System.out.println(Thread.currentThread().getName()+"ok");
    }
    //volatile  防止创建对象时指令重排
    private volatile static Lazy01 lazy01;

    //不安全的单例
    // public static Lazy01 getInstance(){
    //     if (lazy01==null){
    //         lazy01 = new Lazy01();
    //     }
    //     return lazy01;
    //}

    //双重检查锁  DCL 懒汉式单例
    public static Lazy01 getInstance(){
        if (lazy01==null){
            synchronized (Lazy01.class){
                if (lazy01==null){
                    lazy01 = new Lazy01();  //不是原子性操作
                    /**
                     *  1.分配内存空间
                     *  2.执行构造方法,初始化对象
                     *  3.把对象指向内存空间
                      */
                }
            }
        }
        return lazy01;
    }

    public static void main(String[] args) {
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                Lazy01.getInstance();
            }
        }).start();
    }
}

使用反射破坏:

package classes.com.zr.single;

import java.lang.reflect.Constructor;

//懒汉式单例  反射破坏单例
public class Lazy02 {
    private Lazy02(){
        synchronized (Lazy02.class){
            if (lazy02!=null){
                throw new RuntimeException("不要使用反射来破坏单例");
            }
        }
    }
    //volatile  防止创建对象时指令重排
    private volatile static Lazy02 lazy02;


    //双重检查锁  检查 DCL 懒汉式单例
    public static Lazy02 getInstance(){
        if (lazy02==null){
            synchronized (Lazy01.class){
                if (lazy02==null){
                    lazy02 = new Lazy02();  //不是原子性操作
                }
            }
        }
        return lazy02;
    }

    public static void main(String[] args) throws Exception {
        Lazy02 instance1 = Lazy02.getInstance();
        Constructor<Lazy02> constructor = Lazy02.class.getDeclaredConstructor(null);
        constructor.setAccessible(true);  //可以访问
        Lazy02 instance2 = constructor.newInstance();  //创建实例
        System.out.println(instance1);
        System.out.println(instance2);

    }
}

静态内部类

package com.zr.single;
//静态内部类实现
public class Holder {
    private Holder(){

    }

    public static Holder getInstance(){
        return Test.HOLDER;
    }

    public static class Test{
        private static final Holder HOLDER = new Holder();
    }
}

枚举:反射无法破坏枚举

package com.zr.single;


import java.lang.reflect.Constructor;

//枚举  jdk 1.5 有的  本身也是一个类
public enum EnumSingle {

    INSTANCE;

    public EnumSingle getInstance(){
        return INSTANCE;
    }
}

class Test{
    public static void main(String[] args) throws Exception {
         EnumSingle instance1 = EnumSingle.INSTANCE;

         //String.class,int.class 通过jad反编译看到构造方法是有参数的
        Constructor<EnumSingle> declaredConstructor = EnumSingle.class.getDeclaredConstructor(String.class,int.class);
        declaredConstructor.setAccessible(true);
        EnumSingle instance2 = declaredConstructor.newInstance();

        System.out.println(instance1.hashCode());
        System.out.println(instance2.hashCode());
    }
}

无法破坏枚举:

深入理解CAS

什么是cas:compareAndSet 比较并交换,如果期望的值达到了就更新,否则就不更新,CAS cpu的并发原语。

package com.zr.cas;

import java.util.concurrent.atomic.AtomicInteger;

public class CASDemo {

    //CAS  compareAndSet 比较并交换
    public static void main(String[] args) {
        AtomicInteger atomicInteger = new AtomicInteger(2021);

        // public final boolean compareAndSet(int expect, int update)
        //如果期望的值达到了就更新,否则就不更新,CAS  cpu的并发原语
        System.out.println(atomicInteger.compareAndSet(2021, 2022));
        System.out.println(atomicInteger.get());
    }
}

unsafe:

getAndIncrement:增加1

getAndAddInt:自旋锁

CAS缺点:自旋锁循环会耗时,一次只能保证一个共享变量的原子性,会导致ABA问题。

CAS:ABA问题,狸猫换太子。

A,B两个线程访问内存中的变量1,A拿到后将变量变为2,再把2变为1,此时B并不知道这个1是不是原来访问的那个1。

package com.zr.cas;

import java.util.concurrent.atomic.AtomicInteger;

public class ABADemo {
    //CAS  compareAndSet 比较并交换
    public static void main(String[] args) {
        AtomicInteger atomicInteger = new AtomicInteger(2021);
        
        // public final boolean compareAndSet(int expect, int update)
        //如果期望的值达到了就更新,否则就不更新,CAS  cpu的并发原语
        System.out.println(atomicInteger.compareAndSet(2021, 2022));
        System.out.println(atomicInteger.get());

        System.out.println(atomicInteger.compareAndSet(2022, 2021));
        System.out.println(atomicInteger.get());

        System.out.println(atomicInteger.compareAndSet(2021, 2022));
        System.out.println(atomicInteger.get());
    }
}

原子引用

解决ABA问题:引入原子引用,对应的思想,乐观锁!

带版本号的原子操作。

package com.zr.cas;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicStampedReference;

public class RefrenceDemo {
    //注意,如果泛型是一个包装类,注意对象引用问题!!!
    static AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(1,1);

    public static void main(String[] args) {

        new Thread(()->{
            int stamp = atomicStampedReference.getStamp();  //获得版本号
            System.out.println("A线程拿到的是"+stamp);

            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(atomicStampedReference.compareAndSet(1, 2,
                    atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1));
            System.out.println("A2:"+atomicStampedReference.getStamp());

            System.out.println(atomicStampedReference.compareAndSet(2, 1,
                    atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1));
            System.out.println("A3:"+atomicStampedReference.getStamp());
        },"A").start();

        new Thread(()->{
            int stamp = atomicStampedReference.getStamp();  //获得版本号
            System.out.println("B线程拿到的是"+stamp);

            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(atomicStampedReference.compareAndSet(1, 6,
                    stamp, stamp + 1));
            System.out.println("B2:"+atomicStampedReference.getStamp());
        },"B").start();
    }
}

AtomicStampedReference

public AtomicStampedReference(V initialRef, int initialStamp) {
    pair = Pair.of(initialRef, initialStamp);
}

因为 Integer 有缓存值,这里的 initialRef 测试应在-128到127之间,超过会在堆中重新创建对象,是个大坑!!

各种锁的理解

公平锁/非公平锁

公平锁:非常公平,不能插队,线程讲究先来先到。

非公平锁:非常不公平,可以插队,线程不讲究先来先到。

Synchronzied,ReentrantLock默认是非公平的。

 public ReentrantLock() {
        sync = new NonfairSync();
    }

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

可重入锁

可重入锁(递归锁)

Synchronized

package com.zr.lock;

import java.awt.*;

//Synchronized
public class Demo01 {
    public static void main(String[] args) {
        Phone1 phone1 = new Phone1();
        new Thread(()->{
            phone1.sms();
        },"A").start();

        new Thread(()->{
            phone1.sms();
        },"B").start();
    }
}

class Phone1{
    public synchronized void sms(){
        System.out.println(Thread.currentThread().getName()+"==>sms");
        call();
    }

    public synchronized void call(){
        System.out.println(Thread.currentThread().getName()+"==>call");
    }
}

Lock

package com.zr.lock;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Demo02 {
    public static void main(String[] args) {
        Phone2 phone2 = new Phone2();
        new Thread(()->{
            phone2.sms();
        },"A").start();

        new Thread(()->{
            phone2.sms();
        },"B").start();
    }
}

class Phone2{
    Lock lock = new ReentrantLock();

    public synchronized void sms(){
        lock.lock();  //lock 加锁解锁必须成对出现
        try {
            System.out.println(Thread.currentThread().getName()+"==>sms");
            call();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public synchronized void call(){
        lock.lock();
        try {
            System.out.println(Thread.currentThread().getName()+"==>call");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

自旋锁

Unsafe.getAndAddInt:这里就应用了自旋锁

public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}

测试代码:利用CAS自己定义锁

package com.zr.lock;

import java.util.concurrent.atomic.AtomicReference;

//自旋锁
public class SpinLockDemo {

    //Thread null
    AtomicReference<Thread> atomicReference = new AtomicReference<>();
    //加锁
    public void myLock(){
        Thread thread = Thread.currentThread();
        System.out.println(Thread.currentThread().getName()+"==>myLock");
        //自旋锁
        while(!atomicReference.compareAndSet(null,thread)){
            //System.out.println(Thread.currentThread().getName()+"自旋中...");
        }
    }

    //解锁
    public void myUnLock(){
        Thread thread = Thread.currentThread();
        System.out.println(Thread.currentThread().getName()+"==>myUnLock");
        atomicReference.compareAndSet(thread,null);
    }
}

测试:

package com.zr.lock;

import java.util.concurrent.TimeUnit;

public class TestSpinLock {
    public static void main(String[] args) throws InterruptedException {
        SpinLockDemo lock = new SpinLockDemo();

        new Thread(()->{
            lock.myLock();
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.myUnLock();
            }
        },"A").start();

        TimeUnit.SECONDS.sleep(1);

        new Thread(()->{
            lock.myLock();
            try {

            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.myUnLock();
            }
        },"B").start();
    }
}

死锁

死锁:互斥,不可剥夺,请求保持,循环等待。

测试:

package com.zr.lock;

import java.util.concurrent.TimeUnit;

//模拟死锁
public class DeadLock {
    public static void main(String[] args) {
        String lockA = "lockA";
        String lockB = "lockB";

        new Thread(new myThread(lockA,lockB),"T1").start();
        new Thread(new myThread(lockB,lockA),"T2").start();
    }
}

class myThread implements Runnable{
    private String lockA;
    private String lockB;

    public myThread(String lockA, String lockB) {
        this.lockA = lockA;
        this.lockB = lockB;
    }

    @Override
    public void run() {
        synchronized (lockA){
            System.out.println(Thread.currentThread().getName()+"=lock:"+lockA+"==>get "+lockB);
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            synchronized (lockB){
                System.out.println(Thread.currentThread().getName()+"=lock:"+lockB+"==>get "+lockA);
            }
        }
    }
}

解决问题

  1. 使用 jps -l 定位进程号

  2. jstack 进程号(24380)

通过日志和堆栈信息排查问题!

文字截图来自 API 文档!

原文地址:https://www.cnblogs.com/zhou-zr/p/14698535.html