Juc进阶

JUC

1、进程和线程

  • 进程:一个程序,qq.exe,Music.exe程序的集合

  • 一个进程往往可以包含多个线程,至少要包含一个

  • Java默认有两个线程,main线程和gc线程

  • 创建线程的方式:Thread ,runable ,callable

  • Java真的能开启线程吗?

    • 不能,其实Java开启线程的strat0方法使用了native关键字调用了本地方法库中的方法开启线程

并发和并行

并发:多个线程操作同一个资源

并行:多个线程同时执行

wait跟sleep的区别

  • 来自不同的类
    • wait==>object
    • sleep==> Thread
  • 关于锁的释放
    • wait在线程等待的时候,会释放锁
    • sleep则不会
  • 使用范围
    • wait必须在同步代码块中使用
    • sleep中可以在任何地方使用

2、Lock锁

2.1 synchronized锁

package com.hao.synchronizedLock;

public class BuyTick {
    public static void main(String[] args) {
        Tick tick = new Tick();
        new Thread(()->{
            for (int i = 0; i <40 ; i++) {
                try {
                    tick.buy();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"小明").start();
        new Thread(()->{
            for (int i = 0; i <40 ; i++) {
                try {
                    tick.buy();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"小红").start();
        new Thread(()->{
            for (int i = 0; i <40 ; i++) {
                try {
                    tick.buy();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"小张").start();
    }
}
class Tick{
    private int number=40;
    public synchronized void buy() throws InterruptedException {
        if (number>0){
            Thread.sleep(100);
            System.out.println(Thread.currentThread().getName()+"买了第"+(number--)+"票,还剩"+number+"张票");
        }
    }
}

2.2Lock锁

image-20201126133956924

image-20201126134126578

image-20201126142304976

公平锁:先来后到

非公平锁:可以插队(默认是非公平锁)

package com.hao.synchronizedLock;

import sun.awt.windows.ThemeReader;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class BuyTick2 {
    public static void main(String[] args) {
        Tick2 tick2 = new Tick2();
        new Thread(()->{for (int i = 0; i <40 ; i++) tick2.buy(); },"A").start();
        new Thread(()->{for (int i = 0; i <40 ; i++) tick2.buy(); },"B").start();
        new Thread(()->{for (int i = 0; i <40 ; i++) tick2.buy(); },"C").start();
    }
}
class Tick2{
    private int number=40;
    Lock lock=new ReentrantLock();
    public  void buy() {
        lock.lock();  //加锁
        try {
            //放业务逻辑代码
            if (number>0){
                System.out.println(Thread.currentThread().getName()+"买了第"+(number--)+"票,还剩"+number+"张票");
            }
        }catch (Exception e){

        }finally {
            lock.unlock(); //释放锁
        }

    }
}

2.3 synchronized和lock的区别

1、synchronized是Java关键字,lock是Java类

2、synchronized不能判断锁的状态,而lock锁可以判断是否获取到了锁

3、synchronized会自动释放锁,而lock锁必须手动加锁,释放锁。

4、synchronized在线程阻塞时,其他线程会一直等待下去。而在使用lock锁时,其他线程可以调用trylock()方法,判断是否可以获取到锁,如果不可以,就会终止线程。

5、synchronized可以锁方法和代码块,一般用于锁少量的代码。lock只能锁代码块,一般用于锁大量的代码块。

6、synchronized时可重入锁,不可中断,非公平的。lock是 可重入锁,可以去判断,公不公平(可以去设置)。

3、生产者和消费者问题(线程同步)

3.1synchronized版

package com.hao.pc;

public class A {
    public static void main(String[] args) {
        Num num = new Num();
        new Thread(()->{
            for (int i = 0; i <10 ; i++) {
                try {
                    num.jia();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"A").start();
        new Thread(()->{
            for (int i = 0; i <10 ; i++) {
                try {
                    num.jian();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"B").start();
        new Thread(()->{
            for (int i = 0; i <10 ; i++) {
                try {
                    num.jia();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"C").start();
        new Thread(()->{
            for (int i = 0; i <10 ; i++) {
                try {
                    num.jian();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"D").start();
    }
}
class Num{
    private int num=0;
    public synchronized void jia() throws InterruptedException {
        while (num!=0){
            this.wait();
        }
        num++;
        System.out.println(Thread.currentThread().getName()+"在加完后num为"+num);
        this.notifyAll();
    }
    public synchronized  void jian() throws InterruptedException {
        while (num==0){
            this.wait();
        }
        num--;
        System.out.println(Thread.currentThread().getName()+"在减完后num为"+num);
        this.notifyAll();
    }
}

注意:这里使用的判断为while,要是使用if的话,就会造成虚拟唤醒,多个线程同时进入该方法。

3.2 lock版

package com.hao.pc;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class B {
    public static void main(String[] args) {
        Num2 num2 = new Num2();
        new Thread(()->{
            for (int i = 0; i <10 ; i++) {
                    num2.jia();
            }
        },"A").start();
        new Thread(()->{
            for (int i = 0; i <10 ; i++) {
                    num2.jian();
            }
        },"B").start();
        new Thread(()->{
            for (int i = 0; i <10 ; i++) {
                    num2.jia();
            }
        },"C").start();
        new Thread(()->{
            for (int i = 0; i <10 ; i++) {
                num2.jian();
            }
        },"D").start();
    }
}
class Num2{
    private int num=0;
    Lock lock=new ReentrantLock();
    Condition condition = lock.newCondition();
    public  void jia() {
        lock.lock();
        try {
            while (num!=0){
                condition.await();
            }
            num++;
            System.out.println(Thread.currentThread().getName()+"在加完后num为"+num);
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }

    }
    public  void jian() {
            lock.lock();
        try {
            while (num==0){
                condition.await();
            }
            num--;
            System.out.println(Thread.currentThread().getName()+"在减完后num为"+num);
           condition.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }

    }
}

这里使用的condition中的await()和signalAll()方法代替synchronized中的等待唤醒的方法。

其中,lock中的condition的唤醒机制与notify不同的是,condition可以精准的唤醒哪一个线程。

package com.hao.pc;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class C {
    public static void main(String[] args) {
        next next = new next();
        new Thread(()->{
            for (int i = 0; i <5 ; i++) {
                next.printlf1();
            }
        },"A").start();
        new Thread(()->{
            for (int i = 0; i <5 ; i++) {
                next.printlf2();
            }
        },"B").start();
        new Thread(()->{
            for (int i = 0; i <5 ; i++) {
                next.printlf3();
            }
        },"C").start();
    }
}
class next{
    Lock lock=new ReentrantLock();
    Condition condition1 =lock.newCondition();
    Condition condition2 =lock.newCondition();
    Condition condition3 =lock.newCondition();
    private int num=1;
    public void printlf1(){
        lock.lock();
        try {
            while(num!=1){
                condition1.await();
            }
            System.out.println(Thread.currentThread().getName()+"======");
            num=2;
            condition2.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
    public void printlf2(){
        lock.lock();
        try {
            while(num!=2){
                condition2.await();
            }
            System.out.println(Thread.currentThread().getName()+"======");
            num=3;
            condition3.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
    public void printlf3(){
        lock.lock();
        try {
            while(num!=3){
                condition3.await();
            }
            System.out.println(Thread.currentThread().getName()+"======");
            num=1;
            condition1.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

4、集合类不安全

4.1 arrayList不安全

image-20201127192726128

package com.hao.unsafe;

import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;

public class alist {
    public static void main(String[] args) {
      // List<String> list = new ArrayList<>(); 问题:在多线程下,arrayList不安全,容易出现并发修改异常。
        //解决方式一: Vector<String> list = new Vector<>(); 改为vector实现类
        //方式二:List<String> list = Collections.synchronizedList(new ArrayList<>()); 调用Collections工具类中的synchronizedList方法。
        //方式三:List<String> list = new CopyOnWriteArrayList<>(); 其中的add方法是COW方式,读是固定读,写是先复制ArrayList集合,再添加,
        //防止添加时,被其他线程覆盖。
        List<String> list = new CopyOnWriteArrayList<>();
        for (int i = 1; i <10 ; i++) {
            new Thread(()->{
                list.add(UUID.randomUUID().toString().substring(0,3));
                System.out.println(list);
            },String.valueOf(i)).start();
            new Thread(()->{
                list.add(UUID.randomUUID().toString().substring(0,5));
                System.out.println(list);
            },String.valueOf(i)).start();
        }
    }
}

4.2HashSet不安全

package com.hao.unsafe;

import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;

public class SetTest {
    public static void main(String[] args) {
        //一样的hashSet也是线程不安全的,会造成ConcurrentModificationException并发修改异常
        //解决方式一:Set<String> set = Collections.synchronizedSet(new HashSet<>());
        //方式二:Set<String> set = new CopyOnWriteArraySet<>();
       Set<String> set = new HashSet<>();
        for (int i = 1; i <30 ; i++) {
            new Thread(()->{
                set.add(UUID.randomUUID().toString().substring(0,3));
                System.out.println(set);
            },String.valueOf(i)).start();
        }
    }
}

其实hashSet的底层原理就是new 了个hashmap,使用hashmap中的key做对象的存储。

image-20201127195237265

image-20201127195449908

hashSet的add方法就是调用了hashmap中的put方法。

4.3 hashMap不安全

package com.hao.unsafe;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

public class mapTest {
    public static void main(String[] args) {

        //一样的.ConcurrentModificationException异常出现
        //解决方法:Map<String, String> map = Collections.synchronizedMap(new HashMap<>());
        //二:Map<String, String> map = new ConcurrentHashMap<>();
        Map<String, String> map = new ConcurrentHashMap<>();
        for (int i = 1; i <30 ; i++) {
            new Thread(()->{
                map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0,3));
                System.out.println(map);
            },String.valueOf(i)).start();
        }
    }
}

5、callable

package com.hao.unsafe;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class test {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        MyThread thread = new MyThread();
        FutureTask futureTask = new FutureTask(thread);//创建runnable接口的实现类,把callable的实现类放进去,再将futureTask放入线程进行执行
        new Thread(futureTask,"A").start();
        String o = (String) futureTask.get();//调用call()方法的返回值,可能会造成阻塞,应该放到后面执行
        System.out.println(o);
    }
}
class MyThread implements Callable<String>{

    @Override
    public String call() throws Exception {
        System.out.println("努力学习,天天向上");
        return "小胖学JUC";
    }
}

注意:

使用callable创建线程不能直接使用,需要创建runnable接口的实现类来进行适配,才能创建相应的线程

image-20201127212757795

futureTask中的get()方法返回的是call()方法中的返回值。get()可能回造成阻塞。

futureTask中的会存在缓存。

6、常用的辅助类

6.1 CountDownLatch(减法计数器)

image-20201127214250979

package com.hao.add;

import java.util.concurrent.CountDownLatch;
//计数器
public class CountDownLachDemo {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(5);//创建一个初始值为5的计数器
        for (int i = 1; i <6 ; i++) {
           new Thread(()->{
               System.out.println(Thread.currentThread().getName());
               latch.countDown();//计数器数量减一
           },String.valueOf(i)).start();
        }
        latch.await(); //等待计数器为零,才往下执行。
        System.out.println("OVER");
    }
}

原理:

countDownLatch.countDown(),计数器数量减一。

countDownLatch.await(); //等待计数器归零,然后再向下执行

每次有线程调用countDown()数量减一,假设计数器变为0,countDownLatch.await()就会被唤醒,继续向下执行。

6.2 CyclicBarrier(加法计数器)

image-20201127215615000

package com.hao.add;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
            System.out.println("召集神龙成功!!");
        });
        for (int i = 1; i <8 ; i++) {
            final int temp=i;
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+"召唤了第"+temp+"龙珠");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            },String.valueOf(i)).start();
        }
    }
}

这里调用cyclicBarrier.await()方法,程序计数器数量加一,需要等待线程的数量达到CyclicBarrier中的初始值,才能执行CyclicBarrier中的线程。

6.3 Semaphore

image-20201127222051826

举例:抢车位,只有3个车位,但有6个车想停车!

package com.hao.add;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreDemo {
    public static void main(String[] args) {
        //设置只有3个停车位,限流
        Semaphore semaphore = new Semaphore(3);
        for (int i = 1; i <7 ; i++) {
            new Thread(()->{
                try {
                    semaphore.acquire();//获得,假设如果已经满了,需要等待,等待其他线程释放
                    System.out.println(Thread.currentThread().getName()+"抢到了车位");
                    TimeUnit.SECONDS.sleep(3);
                    System.out.println(Thread.currentThread().getName()+"离开了车位");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    semaphore.release();//释放,会将当前的信号量加一,然后唤醒等待的线程。
                }
            },String.valueOf(i)).start();
        }
    }
}

semaphore.acquire();获得,假设如果已经满了,需要等待,等待其他线程释放。

semaphore.release();释放,会将当前的信号量加一,然后唤醒等待的线程。

作用:并发限流,控制最大的线程数!多个共享资源互斥使用!

7、读写锁

image-20201127232911376

package com.hao.synchronizedLock;

import java.util.HashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
//独占锁、共享锁
public class ReadWriteLock {
    public static void main(String[] args) {
        MyCache myCache = new MyCache();
        for (int i = 1; i <6 ; i++) {
            final  int temp=i;
            new Thread(()->{
                myCache.put(temp+"",temp+"");
            },String.valueOf(i)).start();
        }
        for (int i = 1; i <6 ; i++) {
            final  int temp=i;
            new Thread(()->{
                myCache.get(temp+"");
            },String.valueOf(i)).start();
        }
    }
}
class MyCache{
   private volatile HashMap<String, Object> map = new HashMap<>();
   private   ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
   public void put(String key,Object value){

       try {
           readWriteLock.writeLock().lock();
           System.out.println(Thread.currentThread().getName()+"在写入"+key);
           map.put(key,value);
           System.out.println(Thread.currentThread().getName()+"写入成功");
       } catch (Exception e) {
           e.printStackTrace();
       } finally {
           readWriteLock.writeLock().unlock();
       }

   }
   public void get(String key){
       readWriteLock.readLock().lock();
       try {
           System.out.println(Thread.currentThread().getName()+"在读取"+key);
           map.get(key);
           System.out.println(Thread.currentThread().getName()+"读取成功");
       } catch (Exception e) {
           e.printStackTrace();
       } finally {
           readWriteLock.readLock().unlock();
       }

   }
}

8、阻塞队列

image-20201128000845476

image-20201128001152312

什么情况下我们会使用 阻塞队列:多线程并发处理,线程池。

学会使用队列:添加、移除

四种API

方式 抛出异常 不抛出异常,有返回值 阻塞等待 超时等待
添加 add offer() put offer(添加的值,时间值,时间单位)
移除 remove poll() take poll(时间值,时间单位)
查看队首元素 element peek -- --
 public static void test01(){
        /*
        * 抛出异常
        * */
        ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);
        System.out.println(blockingQueue.add("a"));
        System.out.println(blockingQueue.add("b"));
        System.out.println(blockingQueue.add("c"));
        System.out.println(blockingQueue.element());//查看队首元素
        //抛出异常 java.lang.IllegalStateException: Queue full
       // System.out.println(blockingQueue.add("d"));
        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());
        //抛出异常 java.util.NoSuchElementException
       // System.out.println(blockingQueue.remove());

    }
  public static void test02(){
        /*
         * 不抛出异常,有返回值
         * */
        ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);
        System.out.println(blockingQueue.offer("a"));
        System.out.println(blockingQueue.offer("b"));
        System.out.println(blockingQueue.offer("c"));
        System.out.println(blockingQueue.peek());//查看队首元素
        //  System.out.println(blockingQueue.offer("d"));  //不抛出异常 返回false
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
       // System.out.println(blockingQueue.poll());    //不抛出异常 返回null
    }
 public static void test03() throws InterruptedException {
        /*
         * 阻塞等待 ,(会一直等待下去)
         * */
        ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);
       blockingQueue.put("a");
       blockingQueue.put("b");
       blockingQueue.put("c");
      // blockingQueue.put("d");
        System.out.println(blockingQueue.take());
        System.out.println(blockingQueue.take());
        System.out.println(blockingQueue.take());
        //System.out.println(blockingQueue.take());
    }
 public static void test04() throws InterruptedException {
        /*
         * 阻塞等待 ,(等待一段时间就不会等待了)
         * */
        ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);
       blockingQueue.offer("a");
       blockingQueue.offer("b");
       blockingQueue.offer("c");
      // blockingQueue.offer("d",2, TimeUnit.SECONDS);
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll(2,TimeUnit.SECONDS));
    }

同步队列 SynchronousQueue

没有容量

put进去一个元素,必须要等待取出来take之后,才能继续往里面添加元素。(相当于容量为一的阻塞队列)

package com.hao.pq;

import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

public class SynchronousQueueDemo {
    public static void main(String[] args) {
        SynchronousQueue<Object> synchronousQueue = new SynchronousQueue<>();
        new Thread(()->{
            try {
                System.out.println(Thread.currentThread().getName()+"put ==1");
                synchronousQueue.put("1");
                System.out.println(Thread.currentThread().getName()+"put ==2");
                synchronousQueue.put("2");
                System.out.println(Thread.currentThread().getName()+"put ==3");
                synchronousQueue.put("3");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"小明").start();
        new Thread(()->{
            try {
                TimeUnit.SECONDS.sleep(2);
                System.out.println(Thread.currentThread().getName()+"取出来了"+synchronousQueue.take());
                TimeUnit.SECONDS.sleep(2);
                System.out.println(Thread.currentThread().getName()+"取出来了"+synchronousQueue.take());
                TimeUnit.SECONDS.sleep(2);
                System.out.println(Thread.currentThread().getName()+"取出来了"+synchronousQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"小红").start();
    }
}

9、线程池(重点)

池化技术

程序的运行,本质:占用系统的资源!优化资源的使用==》池化技术

线程池、链接池、内存池、对象 //创建、销毁。十分浪费资源。

池化技术:事先准备好一些资源,有人要用,就来我这拿,用完之后还给我。

线程池的好处:

1、降低资源的消耗

2、提高响应的速度

3、方便管理

线程复用、可以控制最大并发数、管理线程

创建线程池的三大方法

package com.hao.pool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Exctous {
    public static void main(String[] args) {
       // ExecutorService pool = Executors.newSingleThreadExecutor(); 单个线程
       // ExecutorService pool =Executors.newFixedThreadPool(5);  创建一个固定大小的线程池
      //  ExecutorService pool = Executors.newCachedThreadPool(); 可伸缩的,遇强则强,遇弱则弱
        ExecutorService pool = Executors.newCachedThreadPool();
        for (int i = 1; i <10 ; i++) {
            pool.execute(()->{
                System.out.println(Thread.currentThread().getName()+"====OK");
            });
        }
    } 
}

使用Executors工具类创建的线程本质就是new ThreadPoolExecutor();创建线程,其中有7大参数

image-20201128202851622

在使用new ThreadPoolExecutor()方法创建线程池时,要设置的7大参数:

image-20201128203707821

package com.hao.pool;

import java.util.concurrent.*;

public class Exctous {
    public static void main(String[] args) {

        ExecutorService pool = new ThreadPoolExecutor(3, //核心线程池大小
                6,    //最大核心线程池大小
                5,      //超时时间,超时了没有人调用就会释放
                TimeUnit.SECONDS,    //超时时间单位
                new ArrayBlockingQueue<>(4),  //阻塞队列,当线程数超过最大线程池大小数时,在队列可以阻塞存区的线程数。
                Executors.defaultThreadFactory(),  //线程工厂,创建线程的,一般不用动
                new ThreadPoolExecutor.AbortPolicy());  //当线程数超过最大线程池大小和阻塞队列阻塞的线程数时,拒绝线程的策略。

        try {
            for (int i = 1; i <11 ; i++) {
                pool.execute(()->{
                    System.out.println(Thread.currentThread().getName()+"====OK");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            pool.shutdown();
        }

    }
}

当线程数超过最大线程池的大小和阻塞队列中的线程数时,就会有拒绝策略拒绝多余的线程。其中有4种拒绝策略。

image-20201128204732448

package com.hao.pool;

import java.util.concurrent.*;
//1.new ThreadPoolExecutor.AbortPolicy(),当阻塞队列满了时,就会抛出异常RejectedExecutionException
//2.new ThreadPoolExecutor.CallerRunsPolicy() 当阻塞队列满了时,不会抛出异常,哪里来的回到哪里去
//3.new ThreadPoolExecutor.DiscardPolicy() 当阻塞队列满了时,不会抛出异常,丢掉该任务
//4.new ThreadPoolExecutor.DiscardPolicy() 当阻塞队列满了时,会尝试跟最早的线程竞争,也不会抛出异常
public class Exctous {
    public static void main(String[] args) {

        ExecutorService pool = new ThreadPoolExecutor(3, //核心线程池大小
                6,    //最大核心线程池大小
                5,      //超时时间,超时了没有人调用就会释放
                TimeUnit.SECONDS,    //超时时间单位
                new ArrayBlockingQueue<>(4),  //阻塞队列,当线程数超过最大线程池大小数时,在队列可以阻塞存区的线程数。
                Executors.defaultThreadFactory(),  //线程工厂,创建线程的,一般不用动
                new ThreadPoolExecutor.DiscardOldestPolicy());  //当线程数超过最大线程池大小和阻塞队列阻塞的线程数时,拒绝线程的策略。

        try {
            for (int i = 1; i <12 ; i++) {
                pool.execute(()->{
                    System.out.println(Thread.currentThread().getName()+"====OK");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            pool.shutdown();
        }

    }
}

10、四大函数式接口

函数式接口:只有一个方法的接口

@FunctionalInterface
public interface Runnable {
  
    public abstract void run();
}

10.1 Function函数型接口

package com.hao.function;

import java.util.function.Function;

//函数型接口,有一个输入参数,有一个输出
public class demo01 {
    public static void main(String[] args) {
       /* Function function = new Function<String,String>() {
            @Override
            public String apply(String str) {
                return str;
            }
        };*/
       //简化
        Function function=(str)->{return str;};
        System.out.println(function.apply("sdaad"));
    }
}

10.2 Predicate断定型接口

package com.hao.function;

import java.util.function.Predicate;
//断定型接口,有一个输入参数,返回值只能是布尔值
public class demo02 {
    public static void main(String[] args) {
      /*  Predicate predicate=new Predicate<String>() {
            @Override
            public boolean test(String str) {
                return str.isEmpty();
            }
        };*/
      Predicate<String> predicate=(str)->{return str.isEmpty();};
        System.out.println(predicate.test("sss"));
    }
}

10.3 Consumer消费型接口

package com.hao.function;

import java.util.function.Consumer;
//消费型接口,给定一个参数,没有返回值
public class demo03 {
    public static void main(String[] args) {
      /*  Consumer consumer=new Consumer<String>() {
            @Override
            public void accept(String str) {
                System.out.println(str);
            }
        };*/
      //简化
        Consumer consumer=(str)->{};
        consumer.accept("sssss");
    }
}

10.4 Supplier供给型接口

package com.hao.function;

import java.util.function.Supplier;
//供给型接口 ,没有参数,有返回值
public class demo04 {
    public static void main(String[] args) {
     /*   Supplier supplier=new Supplier() {
            @Override
            public Object get() {
                System.out.println("fet");
                return 123;
            }
        };*/
        Supplier supplier=()->{return 123;};
        System.out.println(supplier.get());
    }
}

11、Stream流式计算

image-20201128222137721

package com.hao.Steam;

import java.util.ArrayList;
/*
* 题目要求:
* 1、ID必须为偶数
* 2、年龄必须大于23
* 3、用户名要转为大写
* 4、用户名字母倒序输出
* 5、只输出一个用户
* */
public class test01 {
    public static void main(String[] args) {
        User u1=new User(1,"a",21);
        User u2=new User(2,"b",22);
        User u3=new User(3,"c",23);
        User u4=new User(4,"d",24);
        User u5=new User(6,"e",25);
        ArrayList<User> list = new ArrayList<User>();
        list.add(u1);
        list.add(u2);
        list.add(u3);
        list.add(u4);
        list.add(u5);
        list.stream().filter((u)->{ return u.getId()%2==0;})
                .filter((u)->{return u.getAge()>23;})
                .map((u)->{return u.getName().toUpperCase();})
                .sorted((uu1,uu2)->{return uu2.compareTo(uu1);})
                .limit(1)
                .forEach((u)->{ System.out.println(u);});
        }
}

12、ForkJoin

什么是ForkJoin?

在数据量很大时,把大任务拆分成多个小任务,并发执行任务,提高效率。

ForkJoin的特点:工作窃取

ForkJoin的使用

1、首先需要new 一个ForkJoinPool,通过它来执行

2、计算任务,ForkJoinPool.execute(ForkJoinTask task)

3、计算类要继承ForkJoinTask的子类RecursiveTask<>才能进行计算

package com.hao.ForkJoin;

import java.util.concurrent.RecursiveTask;

public class ForkJoinDemo   extends RecursiveTask<Long> {
    private Long start;
    private Long end;
    private Long temp=1000000L;

    public ForkJoinDemo(Long start, Long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        if((end-start)<temp){
            Long sum=0L;
            for (Long i = start; i <=end ; i++) {
                sum +=i;
            }
            return sum;
        }else {
           Long middle= (end+start)/2;
            ForkJoinDemo task1 = new ForkJoinDemo(start, middle);
            task1.fork();
            ForkJoinDemo task2 = new ForkJoinDemo(middle + 1, end);
            task2.fork();
            return task1.join()+task2.join();
        }

    }
}

测试:

package com.hao.ForkJoin;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;

public class test {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
      // test01(); //sum=500000000500000000时间为:3781   34017
   test02();   //sum=500000000500000000时间为:4415    40538
     //   test03();//sum=500000000500000000时间为:386
    }
    public static void test01(){
        long sum=0L;
        long start =System.currentTimeMillis();
        for (Long i = 1L; i <=10_0000_0000L ; i++) {
            sum +=i;
        }
        long end=System.currentTimeMillis();
        System.out.println("sum="+sum+"时间为:"+(end-start));
    }
    public static void test02() throws ExecutionException, InterruptedException {
        long sum=0L;
        long start =System.currentTimeMillis();
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinDemo joinDemo = new ForkJoinDemo(0L,100_0000_0000L);
        ForkJoinTask<Long> task = forkJoinPool.submit(joinDemo);
        sum=task.get();
        long end=System.currentTimeMillis();
        System.out.println("sum="+sum+"时间为:"+(end-start));
    }
    public static void test03(){
        long start =System.currentTimeMillis();
        long sum = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0, Long::sum);
        long end=System.currentTimeMillis();
        System.out.println("sum="+sum+"时间为:"+(end-start));
    }
}

13、异步回调

对将来的某个事件结果进行建模

package com.hao.ForkJoin;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class Test01 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //发送一个请求,等待该线程被调用。没有返回值
        CompletableFuture<Void> runAsync = CompletableFuture.runAsync(()->{
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+"runAsync");
        });
        System.out.println("1111111111");
        runAsync.get();
    }
}
package com.hao.ForkJoin;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class Test01 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
       //发送一个请求,等待被调用,有返回值
        CompletableFuture<Integer> uCompletableFuture = CompletableFuture.supplyAsync(() -> {
                 return 1024;
        });
        System.out.println(uCompletableFuture.whenComplete((t, u) -> {
            System.out.println(t); //正常的返回结果
            System.out.println(u);  //java.util.concurrent.CompletableFuture@5fd0d5ae[Completed normally]
        }).exceptionally((e)->{
            System.out.println(e.getMessage()); //错误时,返回的异常信息。
            return 233;    //返回错误的结果
        }));
    }
}

14、JMM

什么是JMM

JMM:Java内存模型,不存在的东西,是一个概念,约定!

关于JMM的一些同步的约定:

1、线程解锁前,必须把共享变量立刻刷新回主存。

2、线程加锁前,必须读取主存中的最新值到线程的工作内存中。

3、加锁和解锁是同一把锁

img

内存交互操作有8种,虚拟机实现必须保证每一个操作都是原子的,不可在分的(对于double和long类型的变量来说,load、store、read和write操作在某些平台上允许例外)

  • lock (锁定):作用于主内存的变量,把一个变量标识为线程独占状态
  • unlock (解锁):作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定
  • read (读取):作用于主内存变量,它把一个变量的值从主内存传输到线程的工作内存中,以便随后的load动作使用
  • load (载入):作用于工作内存的变量,它把read操作从主存中变量放入工作内存中
  • use (使用):作用于工作内存中的变量,它把工作内存中的变量传输给执行引擎,每当虚拟机遇到一个需要使用到变量的值,就会使用到这个指令
  • assign (赋值):作用于工作内存中的变量,它把一个从执行引擎中接受到的值放入工作内存的变量副本中
  • store (存储):作用于主内存中的变量,它把一个从工作内存中一个变量的值传送到主内存中,以便后续的write使用
  • write  (写入):作用于主内存中的变量,它把store操作从工作内存中得到的变量的值放入主内存的变量中

JMM对这八种指令的使用,制定了如下规则:

  • 不允许read和load、store和write操作之一单独出现。即使用了read必须load,使用了store必须write
  • 不允许线程丢弃他最近的assign操作,即工作变量的数据改变了之后,必须告知主存
  • 不允许一个线程将没有assign的数据从工作内存同步回主内存
  • 一个新的变量必须在主内存中诞生,不允许工作内存直接使用一个未被初始化的变量。就是怼变量实施use、store操作之前,必须经过assign和load操作
  • 一个变量同一时间只有一个线程能对其进行lock。多次lock后,必须执行相同次数的unlock才能解锁
  • 如果对一个变量进行lock操作,会清空所有工作内存中此变量的值,在执行引擎使用这个变量前,必须重新load或assign操作初始化变量的值
  • 如果一个变量没有被lock,就不能对其进行unlock操作。也不能unlock一个被其他线程锁住的变量
  • 对一个变量进行unlock操作之前,必须把此变量同步回主内存

15、Volatile

什么是Volatile?

Volatile是Java虚拟机提供的轻量级的同步机制。

1、保证可见性

线程修改了共享变量,其他线程也能够知道。及时做出修改

2、不保证原子性

3、禁止指令重排

什么是指令重排:你写的程序,计算机并不是按照你写的那样去执行的。

源代码---》编译器优化重排--》指令并行也可能会重排--》内存系统也会重排--》执行

volatile之所以能够禁止指令重排,是因为存在内存屏障。能保证特定的操作的执行顺序。还可以保证某些变量的内存可见性。

16、锁的理解

16.1 公平锁、非公平锁

公平锁:线程不能插队,先来后到

非公平锁:线程可以插队,

16.2 可重入锁

可重入锁(递归锁)

什么是 “可重入”,可重入就是说某个线程已经获得某个锁,可以再次获取锁而不会出现死锁。

package com.hao.cas;

public class CASdemo {
    public static void main(String[] args) {
        phone phone = new phone();
        new Thread(()->{
            phone.Sms();
        },"A").start();
        new Thread(()->{
            phone.Sms();
        },"B").start();
    }

}
class phone{
    public synchronized void Sms(){
        System.out.println(Thread.currentThread().getName()+"    Sms");
        call();
    }

    public synchronized void call(){
        System.out.println(Thread.currentThread().getName()+"    call");
    }
}

Lock版

package com.hao.cas;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Demo02{
    public static void main(String[] args) {
        phone2 phone = new phone2();
        new Thread(()->{
            phone.Sms();
        },"A").start();
        new Thread(()->{
            phone.Sms();
        },"B").start();
    }

}
class phone2{
    Lock lock=new ReentrantLock();
    public  void Sms(){
        lock.lock();
        try {
            System.out.println(Thread.currentThread().getName()+"    Sms");
            call();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }

    }

    public  void call(){
        lock.lock();
        try {
            System.out.println(Thread.currentThread().getName()+"    call");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }

    }
}

ReentrantLock 和 synchronized 不一样,需要手动释放锁,所以使用 ReentrantLock的时候一定要手动释放锁,并且加锁次数和释放次数要一样

16.3自旋锁

自旋锁的定义:当一个线程尝试去获取某一把锁的时候,如果这个锁此时已经被别人获取(占用),那么此线程就无法获取到这把锁,该线程将会等待,间隔一段时间后会再次尝试获取。这种采用循环加锁 -> 等待的机制被称为自旋锁(spinlock)

自旋锁的原理比较简单,如果持有锁的线程能在短时间内释放锁资源,那么那些等待竞争锁的线程就不需要做内核态和用户态之间的切换进入阻塞状态,它们只需要等一等(自旋),等到持有锁的线程释放锁之后即可获取,这样就避免了用户进程和内核切换的消耗。

自定义自旋锁:

public class SpinLockTest {

    private AtomicBoolean available = new AtomicBoolean(false);

    public void lock(){

        // 循环检测尝试获取锁
        while (!tryLock()){
            // doSomething...
        }

    }

    public boolean tryLock(){
        // 尝试获取锁,成功返回true,失败返回false
        return available.compareAndSet(false,true);
    }

    public void unLock(){
        if(!available.compareAndSet(true,false)){
            throw new RuntimeException("释放锁失败");
        }
    }

}

16.4 死锁

概念:多个线程各自占有一些共享资源,并且互相等待其他线程占有的资源才能运行,而导致两个或多个线程都在等待对方释放资源,都停止执行的情形。某一个同步块同时拥有“两个以上对象的锁”时,就可能会发生死锁的问题。

  • 产生死锁的四个必要条件:
    • 互斥条件:一个资源每次只能被一个进程使用。
    • 请求与保持条件:一个进程因请求资源而阻塞时,对已获得资源保持不放。
    • 不剥夺条件:进程已获得的资源,在未使用完之前,不能强行剥夺。
    • 循环等待条件:若干个进程之间形成一种头尾相接的循环等待资源关系。
原文地址:https://www.cnblogs.com/xiaopanjava/p/14064064.html