JUC 相关

同步监视器

  • condition同步监视器
  • condition类方法awaitsignalAll可替换Object方法wait,notify
/**
 *
 *  线程A执行+1操作,线程B执行-1操作。。。
 *
 A execute add ,num = 1
 D execute sub ,num = 0
 A execute add ,num = 1
 D execute sub ,num = 0
 A execute add ,num = 1
 D execute sub ,num = 0
 A execute add ,num = 1
 *
 *
 */
public class ConditionDemo {
    public static void main(String[] args) {
        Data data = new Data();

        new Thread(() ->{
            try {
                for (int i = 0; i < 10; i++) {
                    data.add();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"A").start();

        new Thread(() ->{
            try {
                for (int i = 0; i < 10; i++) {
                    data.sub();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "B").start();

        new Thread(() ->{
            try {
                for (int i = 0; i < 10; i++) {
                    data.add();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"C").start();

        new Thread(() ->{
            try {
                for (int i = 0; i < 10; i++) {
                    data.sub();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"D").start();
    }
}

class Data {
    private int num = 0;

    private ReentrantLock lock = new ReentrantLock();

    private Condition condition = lock.newCondition();

    //+1
    public void add() throws InterruptedException {
        lock.lock();
        try {
            // 使用while防止虚唤醒
            while (num != 0) {
                condition.await(); // 等待
            }
            System.out.println(Thread.currentThread().getName()+" execute add ,num = " + (++num));
            condition.signalAll();  // 唤醒全部线程
        } finally {
            lock.unlock();
        }

    }

    //-1
    public void sub() throws InterruptedException {
        lock.lock();
        try {
            // 使用while防止虚唤醒
            while (num == 0) {
                condition.await(); // 等待
            }
            System.out.println(Thread.currentThread().getName()+" execute sub ,num = " + (--num));
            condition.signalAll(); // 唤醒所有线程
        } finally {
            lock.unlock();
        }
    }
}

精准唤醒

  • 为每个线程添加Condition监视器
  • 使用Condition.single方法唤醒指定线程
package com.thread.juc;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 
 *  实现多个线程顺序执行,Condition.single 唤醒指定线程
 *
     A :: exec..
     B :: exec..
     C :: exec..
     A :: exec..
     B :: exec..
     C :: exec..
     A :: exec..
 * 
 */
public class Sequence {
    public static void main(String[] args) {
        Executer executer = new Executer();


        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                executer.ExecA();
            }
        },"A").start();

        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                executer.ExecB();
            }

        },"B").start();

        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                executer.ExecC();
            }

        },"C").start();
    }
}

class Executer{
    // 标记位,确定当前线程和下一个线程
    private int flag = 1;

    // 使用同一个锁对象
    private ReentrantLock lock = new ReentrantLock();

    // 为每个线程添加监视器
    private Condition condition1 = lock.newCondition();
    private Condition condition2 = lock.newCondition();
    private Condition condition3 = lock.newCondition();

    public void ExecA() {
        lock.lock();
        try {
            while (1 != flag) {
                condition1.await();
            }
            flag = 2;
            System.out.println(Thread.currentThread().getName()+" :: exec..");
            condition2.signal(); // 唤醒指定线程
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void ExecB() {
        lock.lock();
        try {
            while (2 != flag) {
                condition2.await();
            }
            flag = 3;
            System.out.println(Thread.currentThread().getName()+" :: exec..");
            condition3.signal(); // 唤醒指定线程
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void ExecC() {
        lock.lock();
        try {
            while (3 != flag) {
                condition3.await();
            }
            flag = 1;
            System.out.println(Thread.currentThread().getName()+" :: exec..");
            condition1.signal(); // 唤醒指定线程
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

CopyOnWrite

  • ConcurrentModificationException:并发修改异常
        // ConcurrentModificationException
        ArrayList<String> list = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                list.add(UUID.randomUUID().toString().substring(0,5));
                System.out.println("list = " + list);
            }).start();
        }

ArrayList线程不安全问题解决:

  1. Vector:方法锁
  2. Collections工具类:同步方法
  3. CopyOnWriteArrayList:写入时复制

CopyOnWrite: COW (写入时复制)

  1. CopyOnWriteArrayList:list
  2. CopyOnWriteArraySet:set
        List<String> list = new CopyOnWriteArrayList<>();
        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                list.add(UUID.randomUUID().toString().substring(0,5));
                System.out.println("list = " + list);
            }).start();
        }

        // 同理Set集合
        CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<>();
        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                set.add(UUID.randomUUID().toString().substring(0,5));
                System.out.println("set = " + set);
            }).start();
        }

Callable创建线程

  • 实现Callable泛型接口
  • 实例化适配器类FutureTask,传入Callable接口实现类
  • new Thread(futureTask, "A").start():执行线程
  • FutureTask.get():获取线程返回值
/**
 *   FutureTask 执行存在缓存,多个线程调用同一个实列使用同一个status
 *
 *
     Call()..
     call = 1
 *
 *
 */
public class CallableDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        MyThread myThread = new MyThread();

        // 适配器类
        FutureTask futureTask = new FutureTask<>(myThread);

        // 将适配类作为Thread的参数,执行线程
        new Thread(futureTask, "A").start();
        new Thread(futureTask, "B").start(); // 被缓存,故call未执行

        System.out.println("call = " + futureTask.get());
    }
}

class MyThread implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        System.out.println("Call()..");
        return 1;
    }
}

线程休眠

  TimeUnit.SECONDS.sleep(2);

辅助类

CountDownLatch

  • CountDownLatch: 实现计数器递减
  • CountDownLatch.countDown():计数器递减
  • CountDownLatch..await():等待计数器归零
/**
 *
     0 down..
     3 down..
     4 down..
     6 down..
     1 down..
     5 down..
     2 down..
     8 down..
     9 down..
     7 down..
     10个线程执行完毕!
 */
public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch downLatch = new CountDownLatch(10);

        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                downLatch.countDown(); // 计数器递减
                System.out.println(Thread.currentThread().getName()+" down..");
            }, String.valueOf(i)).start();
        }

        downLatch.await();  // 等待计数器归零
        System.out.println("10个线程执行完毕!");
    }
}

CyclicBarrier

  • CyclicBarrier:实现计数器递增
  • CyclicBarrier(int p, Runnable r):构造器接收一个线程数量Runnable,到达线程数量时执行Runnable(需调用await)
  • CyclicBarrier.await():等待到达指定的线程数(当第10个线程时结束)
/**
 *
     0 add..
     7 add..
     1 add..
     4 add..
     9 add..
     5 add..
     6 add..
     2 add..
     8 add..
     3 add..
     10个线程执行完了
 */
public class CyclicBarrierDemo {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(10, () ->{
            System.out.println("10个线程执行完了");
        });

        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+" add..");

                try {
                    cyclicBarrier.await();  // 等待到达指定的线程数(当第10个线程时结束)
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }, String.valueOf(i)).start();
        }
    }
}

Semaphore

  • 控制进入线程执行的数量
  • new Semaphore(3):接收一个线程数目
  • Semaphore.acquire():控制数量
  • Semaphore.release():释放
/**
 *
 *  模拟限流操作(控制进入线程执行的上限线程数)
 *
     线程Thread-0进入。
     线程Thread-5进入。
     线程Thread-1进入。
     线程Thread-1退出。
     线程Thread-0退出。
     线程Thread-4进入。
     线程Thread-5退出。
     线程Thread-3进入。
     线程Thread-2进入。
     线程Thread-2退出。
     线程Thread-3退出。
     线程Thread-4退出。
 */
public class SemaphoreDemo {
    public static void main(String[] args) {
        // 接收线程数量
        Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < 6; i++) {
            new Thread(()->{
                try {
                    // 限制数量
                    semaphore.acquire();
                    System.out.println("线程"+Thread.currentThread().getName()+"进入。");
                    TimeUnit.SECONDS.sleep(2);
                    System.out.println("线程"+Thread.currentThread().getName()+"退出。");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release(); // 释放
                }
            }).start();
        }
    }
}

AQS

公平锁和非公平锁

  • 公平锁: 不允许插队,遵守先来后到
  • 非公平锁: 允许线程插队

synchronized和ReentrantLock默认都是非公平锁,ReentrantLock通过构造方法设置为公平锁

独占锁和共享锁

ReadWriteLock 独占锁、共享锁

  • ReadWriteLock.readLock() : 共享锁,可以多个线程占有
  • ReadWriteLock.writeLock() : 独占锁,只能一个线程占有

实现读写操作(读时多个线程读,写入时一个线程写)

/**
 *
     线程2写入中
     线程2写入完成
     线程1写入中
     线程1写入完成
     线程3写入中
     线程3写入完成
     线程0写入中
     线程0写入完成
     线程4写入中
     线程4写入完成
     线程0读取完成
     线程1读取完成
 */
public class ReadWriteLockDemo {
    public static void main(String[] args) {
        MyCache myCache = new MyCache();

        // 写入
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                myCache.write("x", "");
            }, String.valueOf(i)).start();
        }

        // 读取
        for (int i = 0; i < 5; i++) {
            new Thread(()->{
                myCache.read("x");
            }, String.valueOf(i)).start();
        }
    }
}

class MyCache{
    private volatile Map<String, String> map = new HashMap<String, String>();
    ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();

    public void read(String key) {
        rwLock.readLock().lock();
        try {
            String s = map.get(key);
            System.out.println("线程"+Thread.currentThread().getName()+"读取完成");
        } catch (Exception e) {
        } finally {
            rwLock.readLock().unlock();
        }
    }

    public void write(String key, String data) {
        rwLock.writeLock().lock();
        try {
            System.out.println("线程"+Thread.currentThread().getName()+"写入中");
            //TimeUnit.SECONDS.sleep(1);
            map.put(key, data);
            System.out.println("线程"+Thread.currentThread().getName()+"写入完成");
        } catch (Exception e) {
        } finally {
            rwLock.writeLock().unlock();
        }
    }
}

自旋锁

  • 获得锁后死循环,直到解锁再退出循环
/**
 * 
 output:
A LOCK
A out loop
B LOCK
B locking...
B locking...
B locking...
B locking...
A UNLOCK
B locking...
B out loop
B UNLOCK
 */
public class spinlockDemo {
    public static void main(String[] args) throws InterruptedException {
        Spinlock spinlock = new Spinlock();

        new Thread(()->{
            spinlock.lock();
            try {
                TimeUnit.MILLISECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                spinlock.unLock();
            }
        }, "A").start();


        new Thread(()->{
            spinlock.lock();
            try {
                TimeUnit.MILLISECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                spinlock.unLock();
            }
        },"B").start();


    }
}

class Spinlock{
    // 原子引用类
    AtomicReference<Thread> atomicReference = new AtomicReference<>();

    // 加锁
    public void lock() {
        System.out.println(Thread.currentThread().getName()+" LOCK");

        // 自旋锁, 如果是null,赋值为当前线程,否则一直循环
        while (!atomicReference.compareAndSet(null, Thread.currentThread())) {
            System.out.println(Thread.currentThread().getName()+" locking...");
        }
        System.out.println(Thread.currentThread().getName()+" out loop");
    }

    // 解锁
    public void unLock() {
        System.out.println(Thread.currentThread().getName()+" UNLOCK");
        // 如果是当前线程,则设置为null
        atomicReference.compareAndSet(Thread.currentThread(), null);
    }
}

队列

阻塞队列

add / remove
add和remove失败 直接抛出异常

 ArrayBlockingQueue queue = new ArrayBlockingQueue<>(3);

        queue.add("a");
        queue.add("b");
        queue.add("c");
        // add方法抛出异常
        // queue.add("d"); // IllegalStateException: Queue full

        queue.remove();
        queue.remove();
        queue.remove();
        // remove方法抛出异常
        //queue.remove(); // java.util.NoSuchElementException

offer / poll
offer和poll操作队列时,有返回值不抛异常

 ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<>(3);

        System.out.println(queue.offer("a"));
        System.out.println(queue.offer("b"));
        System.out.println(queue.offer("c"));
        // 入队成功返回True,失败返回false
        System.out.println(queue.offer("d")); // false

        System.out.println(queue.poll());
        System.out.println(queue.poll());
        System.out.println(queue.poll());
        // 出队失败返回null
        System.out.println(queue.poll());  // null

获取队头元素

  • peek失败时返回null
  • element失败时抛出异常
        ArrayBlockingQueue queue = new ArrayBlockingQueue<>(0);
        // 获取队首元素
        System.out.println(queue.peek());  //null
        //System.out.println(queue.element()); // java.util.NoSuchElementException

阻塞等待
put和take出入队列时,如果队列为空或队列已满,则会一直等待

        ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<>(3);

        queue.put("a");
        queue.put("b");
        queue.put("c");
        //queue.put("d"); // 会一直阻塞,直到可以入队

        queue.take();
        queue.take();
        queue.take();
        //queue.take();  // 一直阻塞,直到可以出队

阻塞超时
offer和poll也可以指定超时参数,到期返回

        ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<>(3);

        System.out.println(queue.offer("a"));
        System.out.println(queue.offer("b"));
        System.out.println(queue.offer("c"));
        // 入队成功返回True,失败返回false
        // 入队时阻塞指定时间,超时退出
        System.out.println(queue.offer("d", 3, TimeUnit.SECONDS)); // false

        System.out.println(queue.poll());
        System.out.println(queue.poll());
        System.out.println(queue.poll());
        // 出队失败返回null
        // 出队时阻塞指定时间,超时退出
        System.out.println(queue.poll(3, TimeUnit.SECONDS));  // null

延时队列

  • DelayQueue存放实现了Delayed接口的对象
  • DelayQueue存放的元素只有当元素到期时才能从队列中取走
/**
 *
     MyDelayTask{data=d, startTime=1607999309154, expireTime=1000}
     MyDelayTask{data=c, startTime=1607999309154, expireTime=2000}
     MyDelayTask{data=b, startTime=1607999309154, expireTime=3000}
     MyDelayTask{data=a, startTime=1607999309154, expireTime=4000}
 */
public class DelayQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        DelayQueue<Delayed> delayeds = new DelayQueue<>();

        // 添加元素和过期时长
        delayeds.offer(new MyDelayTask("a", 4000));
        delayeds.offer(new MyDelayTask("b", 3000));
        delayeds.offer(new MyDelayTask("c", 2000));
        delayeds.offer(new MyDelayTask("d", 1000));

        // 获取队列直到队列为空
        while (!delayeds.isEmpty()) {
            Delayed take = delayeds.take();
            System.out.println(take);
        }
    }
}

class MyDelayTask implements Delayed{

    private Object data;
    // 开始时间
    private long startTime = System.currentTimeMillis();
    // 到期时间
    private long expireTime;

    public MyDelayTask(Object data, long expireTime) {
        this.data = data;
        this.expireTime = expireTime;
    }

    /**
     * 获取延时时间(过期时间 - 当前时间)
     *
     * 只有getDelay也就是剩余时间为0的时候,该元素才有资格被消费者从队列中取出
     * @param unit
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert((startTime + expireTime) - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    /**
     * 内部排序(当前对象的延时时间 - 比较对象的延时时间)
     * @param o
     * @return
     */
    @Override
    public int compareTo(Delayed o) {
        return (int) (getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }

    @Override
    public String toString() {
        return "MyDelayTask{" +
                "data=" + data +
                ", startTime=" + startTime +
                ", expireTime=" + expireTime +
                '}';
    }
}

同步队列

SynchronousQueue:同步队列

  • 没有容量,是无缓冲等待队列,是不存储元素的阻塞队列
  • 直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素
package com.thread.juc.queue;

import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

/**
 *
 *
     producer thread put 1
     consumed thread get 1
     producer thread put 2
     consumed thread get 2
     producer thread put 3
     consumed thread get 3
 */
public class SyncQueueDemo {
    public static void main(String[] args) {
        SynchronousQueue<Object> queue = new SynchronousQueue<>();

            new Thread(()->{
                try {
                    System.out.println(Thread.currentThread().getName()+" thread put 1");
                    queue.put("1");
                    System.out.println(Thread.currentThread().getName()+" thread put 2");
                    queue.put("2");
                    System.out.println(Thread.currentThread().getName()+" thread put 3");
                    queue.put("3");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, "producer").start();

            new Thread(()->{
                try {
                    TimeUnit.SECONDS.sleep(1);
                    System.out.println(Thread.currentThread().getName()+" thread get "+queue.take());
                    TimeUnit.SECONDS.sleep(1);
                    System.out.println(Thread.currentThread().getName()+" thread get "+queue.take());
                    TimeUnit.SECONDS.sleep(1);
                    System.out.println(Thread.currentThread().getName()+" thread get "+queue.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, "consumed").start();

    }
}

线程池

Executors

/**
 * 使用工具类Executors创建线程池
 *
 * Executors.newSingleThreadExecutor():单个线程
     pool-1-thread-1
     pool-1-thread-1
     pool-1-thread-1
     pool-1-thread-1
     pool-1-thread-1
     pool-1-thread-1
     pool-1-thread-1
     pool-1-thread-1
     pool-1-thread-1
     pool-1-thread-1
 *
 *Executors.newFixedThreadPool():指定线程池大小的线程池
     pool-1-thread-1
     pool-1-thread-5
     pool-1-thread-3
     pool-1-thread-4
     pool-1-thread-2
     pool-1-thread-4
     pool-1-thread-3
     pool-1-thread-5
     pool-1-thread-1
     pool-1-thread-2

 Executors.newCachedThreadPool():可伸缩池大小的线程池
     pool-1-thread-1
     pool-1-thread-4
     pool-1-thread-3
     pool-1-thread-2
     pool-1-thread-5
     pool-1-thread-6
     pool-1-thread-6
     pool-1-thread-5
     pool-1-thread-7
     pool-1-thread-8
 */
public class ExecutorsDemo {
    public static void main(String[] args) {
//        ExecutorService  threadPool= Executors.newSingleThreadExecutor(); // 单个线程
//        ExecutorService threadPool = Executors.newFixedThreadPool(5); // 指定大小线程池
        ExecutorService threadPool = Executors.newCachedThreadPool(); // 可伸缩线程池

        try {
            for (int i = 0; i < 10; i++) {
                threadPool.execute(()->{
                    System.out.println(Thread.currentThread().getName());
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 关闭线程池
            threadPool.shutdown();
        }

    }
}

ThreadPoolExecutor

自定义线程池
工具类Executors底层调用的是ThreadPoolExecutor创建线程池

四种拒绝策略(队列满了,如何处理这些线程):

  • ThreadPoolExecutor.AbortPolicy():线程池满时还有线程进来,不处理并抛出异常RejectedExecutionException
  • ThreadPoolExecutor.DiscardPolicy():线程池满时,丢弃任务,不会抛出异常
  • ThreadPoolExecutor.CallerRunsPolicy():队列满了,使用调用线程处理(此例中为main线程)
  • ThreadPoolExecutor.DiscardOldestPolicy():队列满时,丢弃掉队列头
public class ThreadPoolExecutorDemo {
    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                2, // 核心线程大小
                5,  // 核心线程最大
                3,  // 超过核心线程池时的闲置线程存活时间
                TimeUnit.SECONDS, // 时间单位
                new LinkedBlockingDeque<>(3), // 阻塞队列
                Executors.defaultThreadFactory(), // 创建线程的工厂类
                new ThreadPoolExecutor.DiscardOldestPolicy() //拒绝策略(四种)
        );

        try {
            for (int i = 0; i < 134; i++) {
                threadPoolExecutor.execute(()->{
                    System.out.println(Thread.currentThread().getName());
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPoolExecutor.shutdown();
        }
    }
}
/**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize 核心线程池大小
     * @param maximumPoolSize 最大核心线程池大小
     * @param keepAliveTime 超过核心线程池时的闲置线程存活时间
     * @param unit 时间单位
     * @param workQueue 阻塞队列
     * @param threadFactory 创建线程的工厂类
     * @param handler 拒绝策略
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

定义最大线程数

  1. cpu密集型

主要消耗CPU资源, 计算密集型任务同时进行的数量应当等于CPU的核心数

  1. io密集型

其消耗的主要资源为IO, CPU 使用率较低,程序中会存在大量的 I/O 操作占用时间,导致线程空余时间很多,所以通常就需要开CPU核心数两倍的线程

Volatile

Volatile是Java虚拟机提供的轻量级同步机制
特性:

  1. 保证可见性
  2. 不保证原子性
  3. 禁止指令重排

使用volatile让线程感知到主线程中num的修改

public class VolatileDemo1 {
    private volatile static int num = 0;

    public static void main(String[] args) {
        new Thread(()->{
            while (num == 0) {
            }
        }).start();

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        num = 1;
    }
}

使用原子类保证原子性

num++不是原子操作,字节码中有多个操作

public class VolatileDemo2 {
    private volatile static AtomicInteger num = new AtomicInteger();

    public static void main(String[] args) {
        for (int i = 0; i < 20; i++) {
            new Thread(()->{
                for (int i1 = 0; i1 < 1000; i1++) {
                    num.getAndIncrement(); // 自增
                }
            }).start();
        }

        while (Thread.activeCount() > 2) {
            Thread.yield();
        }

        System.out.println(num);
    }
}

CAS

Compare And Swap 比较并交换
CAS(V,A,B)
1:V表示内存中的地址
2:A表示预期值
3:B表示要修改的新值
比较当前工作内存中的值和主存中的值,如果这个值是期望的值,则执行操作,否则就一直循环等待(自旋锁机制)

原子类的cas操作

/**
 output:
     100
     true
     110
     false
 */
public class CompareAndSwap {
    public static void main(String[] args) {
        AtomicInteger atomicInteger = new AtomicInteger(100);

        System.out.println(atomicInteger);
        System.out.println(atomicInteger.compareAndSet(100, 110));
        System.out.println(atomicInteger);
        System.out.println(atomicInteger.compareAndSet(100, 100));
    }
}

cas解决aba问题


ABA问题导致的原因
CAS过程中只简单进行了“值”的校验,再有些情况下,“值”相同不会引入错误的业务逻辑(例如库存),有些情况下,“值”虽然相同,却已经不是原来的数据了
CAS不能只比对“值”,还必须确保的是原来的数据,才能修改成功。
解决方案
“版本号”的比对,一个数据一个版本,版本变化,即使值相同,也不应该修改成功。

原文地址:https://www.cnblogs.com/xiongyungang/p/14095040.html