JAVA线程&&JUC

线程

一、线程创建的3种方式

1、继承Thread 类
public class Thread implements Runnable

重写run方法即可

public class ThreadTest extends Thread {
    @Override
    public void run() {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("继承Thread方法实现多线程");
    }
}

//调用
    ThreadTest th1=new ThreadTest();
    th1.start();
2、实现Runnable接口

本质上和继承Thread类一样,因为Thread类也实现了Rannable接口

public class ThreadTest1 implements Runnable{
    @Override
    public void run() {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("实现Runable的线程开启方式");
    }
}
3、实现Callable接口并返回值
public class ThreadTest2 implements Callable<String> {
    @Override
    public String call() throws Exception {
        Thread.sleep(2000);
        return "hello";
    }
}

//调用
Callable<String> callable = new ThreadTest2();
FutureTask<String> task=new FutureTask<>(callable);
Thread th4 = new Thread(task);
th4.start();
//task.get() 获取返回值
//这个get方法可能会产生阻塞
System.out.println("从子线程中获取:"+task.get());

分析:FutureTask 实现RunnableFuture,RunnableFuture继承了Runnable接口

public class FutureTask<V> implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V>

二、线程池

//ExecutorService threadPool = Executors.newSingleThreadExecutor();//单个线程
//ExecutorService threadPool =Executors.newFixedThreadPool(5);//创建固定的线程池大小
//ExecutorService threadPool =Executors.newCachedThreadPool();//可伸缩的线程池

/**
 * 最大线程该如何定义
 *
 * 1. CPU密集型,几核CPU就定义为几,可以保证CPU效率最高 Runtime.getRuntime().availableProcessors()
 * 2. IO 密集型 判断你程序中十分消耗IO的线程数
 */
System.out.println("CPU核心数:"+Runtime.getRuntime().availableProcessors());
//自定义线程池  工作中,使用ThreadPoolExecutor 自己创建
ExecutorService threadPool=new ThreadPoolExecutor(2,//核心线程池大小
        5,//最大线程池大小
        3,//非核心线程闲置存活时间
        TimeUnit.SECONDS,//超时单位
        new LinkedBlockingQueue<>(5),//阻塞队列
        Executors.defaultThreadFactory(),//线程工厂,创建线程,一般不修改
        new ThreadPoolExecutor.AbortPolicy()//拒绝策略:对新来的线程不做处理,抛出异常
);
//阻塞队列存放个数+最大线程池个数=同时允许使用线程池的个数
for(int i=0;i<10;i++){
    threadPool.execute(()->{
        System.out.println(Thread.currentThread().getName());
    });
}
//线程池使用完,程序结束,要关闭线程池
threadPool.shutdown();
拒绝策略
AbortPolicy()//对新来的线程不做处理,抛出异常
CallerRunsPolicy()//哪来的回哪里,用线程池的线程处理
DiscardPolicy()//队列满了,丢弃当前任务,不会抛出异常
DiscardOldestPolicay()//队列满了,把队列首个poll丢出,自己加入队列,不抛出异常 e.getQueue().poll();

并发

1、synchronized

非静态方法上

​ 锁的对象是 该方法的类的实例

静态方法上

​ 锁的对象是类的字节码 如:Person.class

同步代码块

​ 锁的对象是自己定义的

2、ReentrantLock

创建锁
Lock lock = new ReentrantLock();
加锁
lock.lock();
解锁
lock.unlock();

3、CAS

java.util.concurrent.atomic
//CAS  compareAndSet() 比较并交换
public static void main(String[] args) {
    AtomicInteger atomicInteger = new AtomicInteger(2020);

    //如果我期望的值达到了,就更新
    atomicInteger.compareAndSet(2020,2021);

    System.out.println(atomicInteger.get());
}

线程之间的通信

等待唤醒 synchronized

public synchronized void incrnumber() throws InterruptedException {
    while (number!=0){//不要用if判断,应该用while防止虚假唤醒
        //等待
       this.wait();
    }
    number ++;
    System.out.println(Thread.currentThread().getName()+"number="+number);
    //通知其他线程
    this.notifyAll();
}

等待唤醒 ReentrantLock

	private int number=0;
    final Lock lock=new ReentrantLock();
    final Condition conditionA = lock.newCondition();
    final Condition conditionB = lock.newCondition();
    //condition.wait();//等待
    //condition.signalAll();//唤醒全部

    //+1
    public  void incrnumber() throws InterruptedException {
            //1、加锁
            lock.lock();

            try{
                while (number!=0){//不要用if判断,应该用while防止虚假唤醒
                    // A 等待
                    conditionA.await();
                }
                number ++;
                System.out.println(Thread.currentThread().getName()+" number="+number);
                //唤醒 B
                conditionB.signal();
            }catch (Exception e){}finally {
                //解锁
                lock.unlock();
            }
    }

    //-1
    public  void decrnumber() throws InterruptedException {
        //加锁
            lock.lock();
            try{
                while(number==0){
                    //B 等待
                    conditionB.await();
                }
                number-- ;
                System.out.println(Thread.currentThread().getName()+" number="+number);
                //唤醒 A
                conditionA.signal();
            }catch (Exception e){}
            finally {
                //解锁
                lock.unlock();
            }
    }

减法计数器CountDownLatch

CountDownLatch countDownLatch = new CountDownLatch(6);

for (int i=0;i<6;i++){
    new Thread(()->{
        System.out.println(Thread.currentThread().getName()+"---go---out");
        //数量 减1
        countDownLatch.countDown();
    },String.valueOf(i)).start();
}

//等待计数器归零 再向下执行
countDownLatch.await();

System.out.println("执行 完成 6");

加法计数器CyclicBarrier

/**
 * 集齐7颗龙珠召唤神龙
 */

//计数,线程
//如果收集不齐,会阻塞
CyclicBarrier cyclicBarrier=new CyclicBarrier(7,()->{
    System.out.println("召唤神龙成功");
});

for(int i=0;i<7;i++){
    new Thread(()->{
        //这里面如果要拿外面的变量,需要拿 final 类型变量
        System.out.println(Thread.currentThread().getName()+":收集龙珠");
        try {
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }).start();
}

读写锁ReadWriteLock

/**
 * 独占锁:写锁 一次只能被一个线程占有
 * 共享锁:读锁 可以被多个线程拥有
 *
 * 读-读:共存
 * 读-写:不能共存
 * 写-写:不能共存
 */
class MyCacheLock{
    private volatile Map<String,Object> map=new HashMap<>();
    //读写锁
    private ReadWriteLock readWriteLock =  new ReentrantReadWriteLock();

    //存 写入的时候只希望同时只有一个线程 写
    public void put(String key,Object value){
        //写锁
        readWriteLock.writeLock().lock();

        try {
            System.out.println(Thread.currentThread().getName()+"写入");
            map.put(key,value);
            System.out.println(Thread.currentThread().getName()+"写入OK");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //释放锁
            readWriteLock.writeLock().unlock();
        }
    }

    //取 所有人都可以读
    public void get(String key){
        //读锁
        readWriteLock.readLock().lock();
        try {
            map.get(key);
            System.out.println(Thread.currentThread().getName()+"读取");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //释放锁
            readWriteLock.readLock().unlock();
        }

    }
}

信号量Semaphore

Semaphore也是一个线程同步的辅助类,可以维护当前访问自身的线程个数,并提供了同步机制。使用Semaphore可以控制同时访问资源的线程个数,例如,实现一个文件允许的并发访问数。

Semaphore的主要方法摘要:

  void acquire():从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断。

  void release():释放一个许可,将其返回给信号量。

  int availablePermits():返回此信号量中当前可用的许可数。

  boolean hasQueuedThreads():查询是否有线程正在等待获取。

/**
 * 抢车位
 * 如果已经满了,一直等待直到释放为止
 */
//作用:多个共享资源互斥的使用  并发限流,控制最大线程数
public static void main(String[] args) {
    //线程数量  有3个停车位
    final Semaphore semaphore = new Semaphore(3);
    for (int i=0;i<6;i++){
        new Thread(()->{
            //得到/获取  semaphore.acquire();

            try {
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName()+"抢到车位");
                //停两秒钟
                TimeUnit.SECONDS.sleep(2);

            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                //离开车位
                semaphore.release();
            }
            //释放 semaphore.release();

        },String.valueOf("线程"+i)).start();
    }
}

并发集合

1、List

 //并发下arraylist是不安全的
// List<String> list = new ArrayList<>();

 //1、解决方案 1
 //List<String> list = new Vector<>();

 //解决方案 2
// List<String> list =Collections.synchronizedList(new ArrayList<>());

 //解决方案 3 写入时复制COW 计算机程序设计领域的一种优化策略
 //CopyOnWriteArrayList 比Vector好,Vector有syncthronized,效率会降低
 List<String> list=new CopyOnWriteArrayList<>();
 for(int i=0;i<10;i++){
     new Thread(()->{
         list.add(UUID.randomUUID().toString().substring(0,5));
         System.out.println(list);
     },String.valueOf(i)).start();
 }

2、Set

//Set集合在多线程下 也是不安全的
// Set<String> set = new HashSet<>();

 //1解决方案
 Set<String> set= Collections.synchronizedSet(new HashSet<>());

 //2解决方案
// Set<String> set=new CopyOnWriteArraySet<>();

 for(int i=0;i<30;i++){
     new Thread(()->{
         set.add(UUID.randomUUID().toString().substring(0,5));
         System.out.println(set);
     },String.valueOf(i)).start();
 }

3、Map

//Map集合在多线程下 也是不安全的
Map<String,String> map = new HashMap<>();

//1解决方案
//Map<String,String> map= Collections.synchronizedMap(new HashMap<>());

//2解决方案
//Map<String,String> map=new ConcurrentHashMap<>();

for(int i=0;i<30;i++){
    new Thread(()->{
        map.put(UUID.randomUUID().toString().substring(0,5),"AAAA");
        System.out.println(map);
    },String.valueOf(i)).start();
}

BlockingQueue

ArrayBlockingQueue

public static void main(String[] args) {
    //collection
    //List
    //Set
    //BlockingQueue 不是新的东西
    /**
     * 什么情况下使用 阻塞队列
     * 多线程:线程池,多线程并发处理 A 调用 B
     *
     */
}

//抛出异常
@Test
public void test1(){
    //要设置队列的大小
    ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
    System.out.println(arrayBlockingQueue.add("a"));
    System.out.println(arrayBlockingQueue.add("b"));
    System.out.println(arrayBlockingQueue.add("c"));

    //java.lang.IllegalStateException: Queue full
    //超过大小会抛出异常
    //System.out.println(arrayBlockingQueue.add("c"));


    //移除元素并返回 移除的元素
    System.out.println(arrayBlockingQueue.remove());
    System.out.println(arrayBlockingQueue.remove());
    System.out.println(arrayBlockingQueue.remove());

    //java.util.NoSuchElementException
    //没有元素后移除会抛出异常
    //System.out.println(arrayBlockingQueue.remove());

}

/**
 * 不抛出异常
 */
@Test
public  void test2(){
    ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);

    //element()监测 队列首个元素,要抛出异常
    //arrayBlockingQueue.element();

    //peek监测 队列首个元素 没有返回null 不抛出异常
    //System.out.println(arrayBlockingQueue.peek());

    //存值 offer
    System.out.println(arrayBlockingQueue.offer("a"));
    System.out.println(arrayBlockingQueue.offer("b"));
    System.out.println(arrayBlockingQueue.offer("c"));
   //满了  不抛出异常,返回 Boolean 值
    System.out.println(arrayBlockingQueue.offer("d"));

    // element()  返回队列首个元素值,不做移除
    System.out.println(arrayBlockingQueue.element());
    System.out.println(arrayBlockingQueue.peek());

    //移除 poll 返回移除的值
    System.out.println(arrayBlockingQueue.poll());
    System.out.println(arrayBlockingQueue.poll());
    System.out.println(arrayBlockingQueue.poll());

    //若移除空了 没有值返回null
    System.out.println(arrayBlockingQueue.poll());
}

/**
 * 等待,阻塞(一直阻塞)
 */
public void test3() throws InterruptedException {
    ArrayBlockingQueue<Object> arrayBlockingQueue = new ArrayBlockingQueue<>(3);

    //存值 put
    arrayBlockingQueue.put("a");
    arrayBlockingQueue.put("b");
    arrayBlockingQueue.put("c");
    //队列没有位置了,会一直等待 阻塞
    //arrayBlockingQueue.put("d");

    //取值 take()
    System.out.println(arrayBlockingQueue.take());
    System.out.println(arrayBlockingQueue.take());
    System.out.println(arrayBlockingQueue.take());
    //如果没有元素 会一直阻塞
    System.out.println(arrayBlockingQueue.take());

}

/**
 * 等待,阻塞(等待超时)
 */
@Test
public void test4() throws InterruptedException {
    ArrayBlockingQueue<Object> arrayBlockingQueue = new ArrayBlockingQueue<>(3);

    //存 offer
    arrayBlockingQueue.offer("a",2, TimeUnit.SECONDS);
    arrayBlockingQueue.offer("a",2, TimeUnit.SECONDS);
    arrayBlockingQueue.offer("a",2, TimeUnit.SECONDS);
    //队列满了等待2秒 等待超过两秒就退出
    arrayBlockingQueue.offer("a",2, TimeUnit.SECONDS);

    //取 poll
    arrayBlockingQueue.poll(2,TimeUnit.SECONDS);
    arrayBlockingQueue.poll(2,TimeUnit.SECONDS);
    arrayBlockingQueue.poll(2,TimeUnit.SECONDS);

    //等待超过两秒 队列中还没有值就 超时退出
    arrayBlockingQueue.poll(2,TimeUnit.SECONDS);
}
总结:

1、add() remove() 添加 移除,当添加时超过大小,移除时没有元素时抛出异常

2、offer() poll() 存值、取值,存值时空间不够返回false,取值时没有时返回Null

3、put() take() 存值、取值 ,存值时空间不够会阻塞,取值时没有元素会一直阻塞

4、offer() pool(),设置阻塞等待的时间,超过时间没有存值或取值就放弃等待退出

SynchronousQueue

/**
 * 同步队列
 * 和其他的BlockingQueue 不一样 ,SynchronousQueue不存元素
 * put 了一个元素,必须从里面先take取出来,否则不能在put进去值
 */
public class SynchronousQueueDemo {
    @Test
    public void Test1() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(2);

        BlockingQueue<String> queue = new SynchronousQueue<>();

        new Thread(()->{
            try {
                System.out.println(Thread.currentThread().getName()+"put 1");
                queue.put("1");
                System.out.println(Thread.currentThread().getName()+"put 2");
                queue.put("2");
                System.out.println(Thread.currentThread().getName()+"put 3");
                queue.put("3");
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                countDownLatch.countDown();
            }
        },"t1").start();

        new Thread(()->{
            try {
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName()+"take"+queue.take());
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName()+"take"+queue.take());
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName()+"take"+queue.take());
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                countDownLatch.countDown();
            }
        },"t2").start();

        countDownLatch.await();
    }
}
原文地址:https://www.cnblogs.com/harriets-zhang/p/14652255.html