Java并发编程原理

线程与进程区别

 每个正在系统上运行的程序都是一个进程。每个进程包含一到多个线程。线程是一组指令的集合,或者是程序的特殊段,它可以在程序里独立执行。也可以把它理解为代码运行的上下文。所以线程基本上是轻量级的进程,它负责在单个程序里执行多任务。通常由操作系统负责多个线程的调度和执行。

 使用线程可以把占据时间长的程序中的任务放到后台去处理,程序的运行速度可能加快,在一些等待的任务实现上如用户输入、文件读写和网络收发数据等,线程就比较有用了。在这种情况下可以释放一些珍贵的资源如内存占用等等。

 如果有大量的线程,会影响性能,因为操作系统需要在它们之间切换,更多的线程需要更多的内存空间,线程的中止需要考虑其对程序运行的影响。通常块模型数据是在多个线程间共享的,需要防止线程死锁情况的发生。

 总结:进程是所有线程的集合,每一个线程是进程中的一条执行路径。

  

Java中线程的状态分为6

1. 初始(NEW):新创建了一个线程对象,但还没有调用start()方法。
2. 运行(RUNNABLE):Java线程中将就绪(ready)和运行中(running)两种状态笼统的称为“运行”。
线程对象创建后,其他线程(比如main线程)调用了该对象的start()方法。该状态的线程位于可运行线程池中,等待被线程调度选中,获取CPU的使用权,此时处于就绪状态(ready)。就绪状态的线程在获得CPU时间片后变为运行中状态(running)。
3.阻塞(BLOCKED):表示线程阻塞于锁。
4.等待(WAITING):进入该状态的线程需要等待其他线程做出一些特定动作(通知或中断)。
5.超时等待(TIMED_WAITING):该状态不同于WAITING,它可以在指定的时间后自行返回。

6. 终止(TERMINATED):表示该线程已经执行完毕。

线程的状态图

     

 

1. 初始状态

实现Runnable接口和继承Thread可以得到一个线程类,new一个实例出来,线程就进入了初始状态。

2.1. 就绪状态

  1. 就绪状态只是说你资格运行,调度程序没有挑选到你,你就永远是就绪状态。
  2. 调用线程的start()方法,此线程进入就绪状态。
  3. 当前线程sleep()方法结束,其他线程join()结束,等待用户输入完毕,某个线程拿到对象锁,这些线程也将进入就绪状态。
  4. 当前线程时间片用完了,调用当前线程的yield()方法,当前线程进入就绪状态。
  5. 锁池里的线程拿到对象锁后,进入就绪状态。

2.2. 运行中状态

线程调度程序从可运行池中选择一个线程作为当前线程时线程所处的状态。这也是线程进入运行状态的唯一一种方式。

3. 阻塞状态

阻塞状态是线程阻塞在进入synchronized关键字修饰的方法或代码块(获取锁)时的状态。

4. 等待

处于这种状态的线程不会被分配CPU执行时间,它们要等待被显式地唤醒,否则会处于无限期等待的状态。

5. 超时等待

处于这种状态的线程不会被分配CPU执行时间,不过无须无限期等待被其他线程显示地唤醒,在达到一定时间后它们会自动唤醒。

6. 终止状态

  1. 当线程的run()方法完成时,或者主线程的main()方法完成时,我们就认为它终止了。这个线程对象也许是活的,但是,它已经不是一个单独执行的线程。线程一旦终止了,就不能复生。
  2. 在一个终止的线程上调用start()方法,会抛出java.lang.IllegalThreadStateException异常。

等待队列

  • 调用obj的wait(), notify()方法前,必须获得obj锁,也就是必须写在synchronized(obj) 代码段内。
  • 与等待队列相关的步骤和图

 

  • 1.线程1获取对象A的锁,正在使用对象A。
  • 2.线程1调用对象A的wait()方法。
  • 3.线程1释放对象A的锁,并马上进入等待队列。
  • 4.锁池里面的对象争抢对象A的锁。
  • 5.线程5获得对象A的锁,进入synchronized块,使用对象A。
  • 6.线程5调用对象A的notifyAll()方法,唤醒所有线程,所有线程进入同步队列。若线程5调用对象A的notify()方法,则唤醒一个线程,不知道会唤醒谁,被唤醒的那个线程进入同步队列。
  • 7.notifyAll()方法所在synchronized结束,线程5释放对象A的锁。
  • 8.同步队列的线程争抢对象锁,但线程1什么时候能抢到就不知道了。 

同步队列状态

  • 当前线程想调用对象A的同步方法时,发现对象A的锁被别的线程占有,此时当前线程进入同步队列。简言之,同步队列里面放的都是想争夺对象锁的线程。
  • 当一个线程1被另外一个线程2唤醒时,1线程进入同步队列,去争夺对象锁。
  • 同步队列是在同步的环境下才有的概念,一个对象对应一个同步队列
  • 线程等待时间到了或被notify/notifyAll唤醒后,会进入同步队列竞争锁,如果获得锁,进入RUNNABLE状态,否则进入BLOCKED状态等待获取锁。

几个方法的比较

  1. Thread.sleep(long millis),一定是当前线程调用此方法,当前线程进入TIMED_WAITING状态,但不释放对象锁,millis后线程自动苏醒进入就绪状态。作用:给其它线程执行机会的最佳方式。
  2. Thread.yield(),一定是当前线程调用此方法,当前线程放弃获取的CPU时间片,但不释放锁资源,由运行状态变为就绪状态,让OS再次选择线程。作用:让相同优先级的线程轮流执行,但并不保证一定会轮流执行。实际中无法保证yield()达到让步目的,因为让步的线程还有可能被线程调度程序再次选中。Thread.yield()不会导致阻塞。该方法与sleep()类似,只是不能由用户指定暂停多长时间。
  3. t.join()/t.join(long millis),当前线程里调用其它线程t的join方法,当前线程进入WAITING/TIMED_WAITING状态,当前线程不会释放已经持有的对象锁。线程t执行完毕或者millis时间到,当前线程进入就绪状态。
  4. obj.wait(),当前线程调用对象的wait()方法,当前线程释放对象锁,进入等待队列。依靠notify()/notifyAll()唤醒或者wait(long timeout) timeout时间到自动唤醒。
  5. obj.notify()唤醒在此对象监视器上等待的单个线程,选择是任意性的。notifyAll()唤醒在此对象监视器上等待的所有线程。

疑问

  1. 等待队列里许许多多的线程都wait()在一个对象上,此时某一线程调用了对象的notify()方法,那唤醒的到底是哪个线程?随机?队列FIFO?or sth else?Java文档就简单的写了句:选择是任意性的(The choice is arbitrary and occurs at the discretion of the implementation)。

 

传统线程技术

传统是相对于JDK1.5而言的

传统线程技术与JDK1.5的线程并发库

线程就是程序的一条执行线索/线路。

创建线程的两种传统方式

 

  • 创建Thread的子类,覆盖其中的run方法,运行这个子类的start方法即可开启线程

Thread thread = new Thread(){    
        @Override
    public void run(){
        while (true){
            //获取当前线程对象        获取线程名字
            Thread.currentThread()        threadObj.getName()
             //让线程暂停,休眠,此方法会抛出中断异常InterruptedException
             Thread.sleep(毫秒值);
        }
    }
};
thread.start();        
  • 创建Thread时传递一个实现Runnable接口的对象实例

Thread thread = new Thread(new Runnable(){
    public void run(){
   
    }
});
thread.start();    

 

问题:下边的线程运行的是Thread子类中的方法还是实现Runnable接口类的方法 

  new Thread(Runnable.run()){run()}.start();

由Thread类中的run方法源代码中看出,两种传统创建线程的方式都是在调用Thread对象的run方法,如果Thread对象的run方法没有被覆盖,并且像上边的问题那样为Thread对象传递了一个Runnable对象,就会调用Runnable对象的run方法。

多线程并不一定会提高程序的运行效率。

 

常用线程api方法

start()

启动线程

currentThread()

获取当前线程对象

getID()

获取当前线程ID      Thread-编号  该编号从0开始

getName()

获取当前线程名称

sleep(long mill)

休眠线程

Stop()

停止线程,

常用线程构造函数

Thread()

分配一个新的 Thread 对象

Thread(String name)

分配一个新的 Thread对象,具有指定的 name正如其名。

Thread(Runable r)

分配一个新的 Thread对象

Thread(Runable r, String name)

分配一个新的 Thread对象

守护线程

 Java中有两种线程,一种是用户线程,另一种是守护线程。用户线程是指用户自定义创建的线程,主线程停止,用户线程不会停止。守护线程当进程不存在或主线程停止,守护线程也会被停止。

 使用setDaemon(true)方法设置为守护线程

/**
 * 什么是守护线程? 守护线程 进程线程(主线程挂了) 守护线程也会被自动销毁.
 * 
 * @classDesc: 功能描述:(守护线程)*/
public class DaemonThread {

    public static void main(String[] args) {
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    try {
                        Thread.sleep(100);
                    } catch (Exception e) {
                        // TODO: handle exception
                    }
                    System.out.println("我是子线程...");
                }
            }
        });
        thread.setDaemon(true);
        thread.start();
        for (int i = 0; i < 10; i++) {
            try {
                Thread.sleep(100);
            } catch (Exception e) {

            }
            System.out.println("我是主线程");
        }
        System.out.println("主线程执行完毕!");
    }

}

传统定时器技术

传统定时器的创建:直接使用定时器类Timer

  • 定时执行
new Timer().schedule(TimerTask定时任务, Date time定的时间);
  • 延迟,固定频率执行
new Timer().schedule(TimerTask定时任务, Long延迟(第一次执行)时间, Long间隔时间);

 

TimerTask与Runnable类似,有一个run方法,Timer是定时器对象,到时间后会触发炸弹(TimerTask)对象

new Timer().schedule(
    new TimerTask()定时执行的任务
    {
    public void run()
    {
        SOP(“bombing”);
        }
        显示计时信息
        while (true)
        {
        SOP(new Date().getSeconds());
        Thread.sleep(1000);
        }
    },10    定好的延迟时间,10秒以后执行任务
);    

 

  • 定时任务中创建定时任务
public class T {

    static class T2 extends TimerTask{
        
        @Override
        public void run() {
            System.out.println("T1~boom~~~");
            new Timer().schedule(new T1(), 2000);
        } 
    }
    static class T1 extends TimerTask{
        
        @Override
        public void run() {
            System.out.println("T2~boom~~~");
            new Timer().schedule(new T2(), 5000);
        }     
    }
    
    public static void main(String[] args) {
        
        new Timer().schedule(new T1(), 2000);
        while(true) {
            System.out.println(new Date().getSeconds());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

传统线程互斥技术

  互斥方法:

  • a、同步代码块synchronized (lock){}
  • b、同步方法,方法返回值前加synchronized

  同步方法上边用的锁就是this对象

  静态同步方法使用的锁是该方法所在的class文件对象

  使用synchronized关键字实现互斥,要保证同步的地方使用的是同一个锁对象

 

传统线程同步通信技术

/**
 * 子线程循环10次,回主线程循环10次,
 * 再到子线程循环10次,再回主线程循环10次
 * 如此循环50次     
 */
class Business {
    private boolean bShouleSub = true;

    public synchronized void sub(int x) throws InterruptedException {
        while (bShouleSub) {
            for (int i = 1; i < 11; i++)
                System.out.println("sub~x:" + x + ",i:" + i);
            bShouleSub = false;
            this.notify();
        }this.wait();
    }

    public synchronized void main(int x) throws InterruptedException {
        while (!bShouleSub) {
            for (int i = 1; i < 11; i++)
                System.out.println("main~x:" + x + ",i:" + i);
            bShouleSub = true;
            this.notify();
        }
        this.wait();
    }
}            

 

public class T {

    public static void main(String[] args) throws InterruptedException {
        Business b = new Business();
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 50; i++) {
                    try {
                        b.sub(i);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
        
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 50; i++) {
                    try {
                        b.main(i);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
    }
}

要用到共同数据(包括同步锁)或相同算法的多个方法要封装在一个类中

 锁是上在代表要操作的资源类的内部方法中的,而不是上在线程代码中的。这样写出来的类就是天然同步的,只要使用的是同一个new出来的对象,那么这个对象就具有同步互斥特性

 判断唤醒等待标记时使用while增加程序健壮性,防止伪唤醒

 

线程范围内共享变量的概念与作用

线程范围内共享数据图解:

 

class ThreadScopeShareData{
    三个模块共享数据,主线程模块和AB模块
    private static int data = 0;    准备共享的数据
    存放各个线程对应的数据
    private Map<Thread, Integer> threadData = new HashMap<Thread, Integer>();
    public static void main(String[] args){    
      创建两个线程
for (int i=0; i<2; i++){     new Thread( new Runnable(){     public void run(){
          现在当前线程中修改一下数据,给出修改信息       data
= new Random().nextInt();       SOP(Thread.currentThread().getName()+将数据改为+data);       将线程信息和对应数据存储起来       threadData.put(Thread.currentThread(), data);       使用两个不同的模块操作这个数据,看结果       new A().get();       new B().get();           }       }).start();     }   } static class A{    public void get(){    data = threadData.get(Thread.currentThread());      SOP(A+Thread.currentThread().getName()+拿到的数据+data);     }   } static class B{    public void get(){   data = threadData.get(Thread.currentThread());      SOP(B+Thread.currentThread().getName()+拿到的数据+data);     }   } }

 

多个线程访问共享对象和数据的方式

如果每个线程执行的代码相同,可以使用同一个Runnable对象,这个Runnable对象中有那个共享数据,例如,买票系统就可以这么做。

如果每个线程执行的代码不同,这时候需要用不同的Runnable对象,有如下两种方式来实现这些Runnable对象之间的数据共享:

  将共享数据封装在另外一个对象中,然后将这个对象逐一传递给各个Runnable对象。每个线程对共享数据的操作方法也分配到那个对象身上去完成,这样容易实现针对该数据进行的各个操作的互斥和通信。

public class Test {
    
    public static void main(String[] args) {
        ShareData shareDate = new ShareData();
        new Thread(new MyRunnable1(shareDate)).start();
        new Thread(new MyRunnable2(shareDate)).start();        
    }
}


class MyRunnable1 implements Runnable{
    private ShareData shareData;
    
    public MyRunnable1(ShareData shareData) {
        this.shareData = shareData;
    }
    @Override
    public void run() {
        while(true) {
            shareData.increase();
        }
    }
}

class MyRunnable2 implements Runnable{
    private ShareData shareData;
    
    public MyRunnable2(ShareData shareData) {
        this.shareData = shareData;
    }
    @Override
    public void run() {
        while(true) {
            shareData.decrease();
        }
    }
}


class ShareData{
    int count = 0;
    synchronized void  increase() {
        count++;
    }
    synchronized void decrease() {
        count--;
    }
}

 

  将这些Runnable对象作为某一个类中的内部类,共享数据作为这个外部类中的成员变量,每个线程对共享数据的操作方法也分配给外部类,以便实现对共享数据进行的各个操作的互斥和通信,作为内部类的各个Runnable对象调用外部类的这些方法。

  上面两种方式的组合:将共享数据封装在另外一个对象中,每个线程对共享数据的操作方法也分配到那个对象身上去完成,对象作为这个外部类中的成员变量或方法中的局部变量,每个线程的Runnable对象作为外部类中的成员内部类或局部内部类。

  总之,要同步互斥的几段代码最好是分别放在几个独立的方法中,这些方法再放在同一个类中,这样比较容易实现它们之间的同步互斥和通信。

极端且简单的方式,即在任意一个类中定义一个static的变量,这将被所有线程共享。

 

ThreadLocal实现线程范围的共享变量

ThreadLocal的作用和目的:用于实现线程内的数据共享,即对于相同的程序代码,多个模块在同一个线程中运行时要共享一份数据,而在另外线程中运行时又共享另外一份数据。

每个线程调用全局ThreadLocal对象的set方法,就相当于往其内部的map中增加一条记录,key分别是各自的线程,value是各自的set方法传进去的值。在线程结束时可以调用ThreadLocal.clear()方法,这样会更快释放内存,不调用也可以,因为线程结束后也可以自动释放相关的ThreadLocal变量。

ThreadLocal的应用场景:

  订单处理包含一系列操作:减少库存量、增加一条流水台账、修改总账,这几个操作要在同一个事务中完成,通常也即同一个线程中进行处理,如果累加公司应收款的操作失败了,则应该把前面的操作回滚,否则,提交所有操作,这要求这些操作使用相同的数据库连接对象,而这些操作的代码分别位于不同的模块类中。

  银行转账包含一系列操作: 把转出帐户的余额减少,把转入帐户的余额增加,这两个操作要在同一个事务中完成,它们必须使用相同的数据库连接对象,转入和转出操作的代码分别是两个不同的帐户对象的方法。

  例如Strut2的ActionContext,同一段代码被不同的线程调用运行时,该代码操作的数据是每个线程各自的状态和数据,对于不同的线程来说,getContext方法拿到的对象都不相同,对同一个线程来说,不管调用getContext方法多少次和在哪个模块中getContext方法,拿到的都是同一个。

实验案例:定义一个全局共享的ThreadLocal变量,然后启动多个线程向该ThreadLocal变量中存储一个随机值,接着各个线程调用另外其他多个类的方法,这多个类的方法中读取这个ThreadLocal变量的值,就可以看到多个类在同一个线程中共享同一份数据。

实现对ThreadLocal变量的封装,让外界不要直接操作ThreadLocal变量。

  对基本类型的数据的封装,这种应用相对很少见。

  对对象类型的数据的封装,比较常见,即让某个类针对不同线程分别创建一个独立的实例对象。

一个ThreadLocal对象只能记录一个线程内部的一个共享变量,需要记录多个共享数据,可以创建多个ThreadLocal对象,或者将这些数据进行封装,将封装后的数据对象存入ThreadLocal对象中。

 

将数据对象封装成单例,同时提供线程范围内的共享数据的设置和获取方法,提供已经封装好了的线程范围内的对象实例,使用时只需获取实例对象即可实现数据的线程范围内的共享,因为该对象已经是当前线程范围内的对象了。

import java.util.Random;

public class ThreadLocalShareDataDemo {
    /**
     * ThreadLocal类及应用技巧 将线程范围内共享数据进行封装,封装到一个单独的数据类中,提供设置获取方法
     * 将该类单例化,提供获取实例对象的方法,获取到的实例对象是已经封装好的当前线程范围内的对象
     */
    public static void main(String[] args) {
        for (int i = 0; i < 2; i++) {
            new Thread(new Runnable() {
                public void run() {
                    int data = new Random().nextInt(889);
                    System.out.println(Thread.currentThread().getName() + "产生数据:" + data);
                    MyData myData = MyData.getInstance();
                    myData.setAge(data);
                    myData.setName("Name:" + data);
                    new A().get();
                    new B().get();
                }
            }).start();
        }
    }

    static class A { // 可以直接使用获取到的线程范围内的对象实例调用相应方法
        String name = MyData.getInstance().getName();
        int age = MyData.getInstance().getAge();

        public void get() {
            System.out.println(Thread.currentThread().getName() + "-- AA name:" + name + "...age:" + age);
        }
    }

    static class B {
        // 可以直接使用获取到的线程范围内的对象实例调用相应方法
        String name = MyData.getInstance().getName();
        int age = MyData.getInstance().getAge();

        public void get() {
            System.out.println(Thread.currentThread().getName() + "-- BB name:" + name + "...age:" + age);
        }
    }

    static class MyData {
        private String name;
        private int age;

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public int getAge() {
            return age;
        }

        public void setAge(int age) {
            this.age = age;
        }

        // 单例
        private MyData() {
        };

        // 提供获取实例方法
        public static MyData getInstance() {
            // 从当前线程范围内数据集中获取实例对象
            MyData instance = threadLocal.get();
            if (instance == null) {
                instance = new MyData();
                threadLocal.set(instance);
            }
            return instance;
        }

        // 将实例对象存入当前线程范围内数据集中
        static ThreadLocal<MyData> threadLocal = new ThreadLocal<MyData>();
    }
}

 

线程池

  • 线程池的概念与Executors类的应用
    • 创建固定大小的线程池
    • ExecutorService threadPool = Executors.newFixedThreadPool(3);

      创建缓存线程池

    • ExecutorService threadPool = Executors.newCacheThreadPool();

      创建单一线程池

    • ExecutorService threadPool = Executors.newSingleThreadExector();
  • 关闭线程池
    • shutdown与shutdownNow的比较

threadPool.shutdown() 线程全部空闲,没有任务就关闭线程池;threadPool.shutdownNow()  不管任务有没有做完,都关掉

 

  • 用线程池启动定时器
    • a、创建调度线程池,提交任务 延迟指定时间后执行任务

    Executors.newScheduledThreadPool(线程数).schedule(Runnable, 延迟时间,时间单位);

    • b、创建调度线程池,提交任务, 延迟指定时间执行任务后,间隔指定时间循环执行

    Executors.newScheduledThreadPool(线程数). scheduleAtFixedRate (Runnable, 延迟时间,间隔时间,时间单位);

Callable&Future

Future取得的结果类型和Callable返回的结果类型必须一致,这是通过泛型来实现的。

Callable要采用ExecutorSevice的submit方法提交,返回的future对象可以取消任务。

CompletionService用于提交一组Callable任务,其take方法返回已完成的一个Callable任务对应的Future对象。 

public static void main(String[] args) {
        
        ExecutorService threadPool = Executors.newScheduledThreadPool(5);
        
        Future<String> future = threadPool.submit(
            new Callable<String>() {

                @Override
                public String call() throws Exception {
                    return "result";
                }
                
            });
        System.out.println("等待结果~~~");
        try {
            String result = future.get();
            System.out.println(result);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        
    }

CompletionService用于提交一组Callable任务,其take方法返回一个已完成的Callable任务对应的Future对象。好比同时种几块麦子等待收割,收割时哪块先熟先收哪块。

将生产新的异步任务与使用已完成任务的结果分离开来的服务。生产者 submit 执行的任务。使用者 take 已完成的任务,并按照完成这些任务的顺序处理它们的结果。例如,CompletionService 可以用来管理异步 IO ,执行读操作的任务作为程序或系统的一部分提交,然后,当完成读操作时,会在程序的不同部分执行其他操作,执行操作的顺序可能与所请求的顺序不同。

通常,CompletionService 依赖于一个单独的 Executor 来实际执行任务,在这种情况下,CompletionService 只管理一个内部完成队列。ExecutorCompletionService 类提供了此方法的一个实现。

public static void main(String[] args) {
        
        ExecutorService service = Executors.newCachedThreadPool();
        
        CompletionService<Integer> completionService = new ExecutorCompletionService<>(service);
        //向线程池提交一组任务
        for (int i = 0; i < 10; i++) {
            final int j = i;
            
            completionService.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    Thread.sleep(new Random().nextInt(5000));
                    return j;
                }
            });
        }
        //哪个线程先执行完,哪个就输出结果
        for (int i = 0; i < 10; i++) {
            try {
                System.out.println(completionService.take().get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }

 

Lock&Condition实现线程同步通信

Lock比传统线程模型中的synchronized方式更加面向对象,与生活中的锁类似,锁本身也应该是一个对象。两个线程执行的代码片段要实现同步互斥的效果,它们必须用同一个Lock对象。

读写锁:分为读锁和写锁,多个读锁不互斥,读锁与写锁互斥,这是由jvm自己控制的,你只要上好相应的锁即可。如果你的代码只读数据,可以很多人同时读,但不能同时写,那就上读锁;如果你的代码修改数据,只能有一个人在写,且不能同时读取,那就上写锁。总之,读的时候上读锁,写的时候上写锁!

在等待 Condition 时,允许发生“虚假唤醒”,这通常作为对基础平台语义的让步。对于大多数应用程序,这带来的实际影响很小,因为 Condition 应该总是在一个循环中被等待,并测试正被等待的状态声明。某个实现可以随意移除可能的虚假唤醒,但建议应用程序程序员总是假定这些虚假唤醒可能发生,因此总是在一个循环中等待。

一个锁内部可以有多个Condition,即有多路等待和通知,可以参看jdk1.5提供的Lock与Condition实现的可阻塞队列的应用案例,从中除了要体味算法,还要体味面向对象的封装。在传统的线程机制中一个监视器对象上只能有一路等待和通知,要想实现多路等待和通知,必须嵌套使用多个同步监视器对象。(如果只用一个Condition,两个放的都在等,一旦一个放的进去了,那么它通知可能会导致另一个放接着往下走。)

 

java.util.concurrent.locks 为锁和等待条件提供一个框架的接口和类

接口摘要

Condition

Condition 将 Object 监视器方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待 set(wait-set)。

Lock

Lock 实现提供了比使用 synchronized 方法和语句可获得的更广泛的锁定操作。

ReadWriteLock

ReadWriteLock 维护了一对相关的锁,一个用于只读操作,另一个用于写入操作。

类摘要

AbstractOwnableSynchronizer

可以由线程以独占方式拥有的同步器。

AbstractQueuedLongSynchronizer

以 long 形式维护同步状态的一个 AbstractQueuedSynchronizer 版本。

AbstractQueuedSynchronizer

为实现依赖于先进先出 (FIFO) 等待队列的阻塞锁和相关同步器(信号量、事件,等等)提供一个框架。

LockSupport

用来创建锁和其他同步类的基本线程阻塞原语。

ReentrantLock

一个可重入的互斥锁 Lock,它具有与使用 synchronized 方法和语句所访问的隐式监视器锁相同的一些基本行为和语义,但功能更强大。

ReentrantReadWriteLock

支持与 ReentrantLock 类似语义的 ReadWriteLock 实现。

ReentrantReadWriteLock.ReadLock

ReentrantReadWriteLock.readLock() 方法返回的锁。

ReentrantReadWriteLock.WriteLock

ReentrantReadWriteLock.writeLock() 方法返回的锁。

Lock比传统线程模型中的synchronized更加面向对象,锁本身也是一个对象,两个线程执行的代码要实现同步互斥效果,就要使用同一个锁对象。锁要上在要操作的资源类的内部方法中,而不是线程代码中。

public interface Lock

所有已知实现类:

  • ReentrantLock,
  • ReentrantReadWriteLock.
  • ReadLock,
  • ReentrantReadWriteLock.
  • WriteLock 

随着灵活性的增加,也带来了更多的责任。不使用块结构锁就失去了使用 synchronized 方法和语句时会出现的锁自动释放功能。在大多数情况下,应该使用以下语句: 

     Lock l = ...; 
     l.lock();
     try {
         // access the resource protected by this lock
     } finally {
         l.unlock();
     }

锁定和取消锁定出现在不同作用范围中时,必须谨慎地确保保持锁定时所执行的所有代码用 try-finally 或 try-catch 加以保护,以确保在必要时释放锁。

 

方法摘要

 void

lock()           获取锁。

 void

lockInterruptibly()           如果当前线程未被中断,则获取锁。

 Condition

newCondition()           返回绑定到此 Lock 实例的新 Condition 实例。

 boolean

tryLock()           仅在调用时锁为空闲状态才获取该锁。

 boolean

tryLock(long time, TimeUnit unit)           如果锁在给定的等待时间内空闲,并且当前线程未被中断,则获取锁。

 void

unlock()           释放锁。

 

Lock与synchronized对比,打印字符串例子

class Outputer{
    public void outpur(String name){
      int len = name.length();
      synchronized (Outputer.class){
         for (int i=0; i<len; i++){
        //不换行逐个打印字符    SO(name.charAt[i]);       }       SOP()换行 }   } }

 

//使用Lock的代码
class Outputer { 
  //Lock是接口,需new实现类对象 Lock lock
= new ReentrantLock(); public void outpur(String name){    int len = name.length();    lock.lock() 锁上 for (int i=0; i<len; i++){
       //不换行逐个打印字符   SO(name.charAt[i]);      }      SOP()换行 lock.unlock(); 解锁
  } }

 

读写锁

例子1:

import java.util.Random;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * -----------------------读写锁的例子---------------------------
 */
public class ReadWriteLockTest {
    public static void main(String[] args) {
        final Queue3 q3 = new Queue3();
        for(int i=0;i<3;i++)
        {
            new Thread(){
                public void run(){
                    while(true){
                        q3.get();                        
                    }
                }
                
            }.start();
        }
        for(int i=0;i<3;i++)
        {        
            new Thread(){
                public void run(){
                    while(true){
                        q3.put(new Random().nextInt(10000));
                    }
                }            
                
            }.start();    
        }
    }
}

class Queue3{
    private Object data = null;//共享数据,只能有一个线程能写该数据,但可以有多个线程同时读该数据。
    private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    public void get(){
        rwl.readLock().lock();
        System.out.println(Thread.currentThread().getName() + " be ready to read data!");
        try {
            Thread.sleep((long)(Math.random()*1000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + "have read data :" + data);        
        rwl.readLock().unlock();
    }
    
    public void put(Object data){

        rwl.writeLock().lock();
        System.out.println(Thread.currentThread().getName() + " be ready to write data!");                    
        try {
            Thread.sleep((long)(Math.random()*1000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        this.data = data;        
        System.out.println(Thread.currentThread().getName() + " have write data: " + data);                    
        
        rwl.writeLock().unlock();        
    }
}

例子2:

class CachedData {
   Object data;
   volatile boolean cacheValid;
   final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

   void processCachedData() {
     rwl.readLock().lock();
     if (!cacheValid) {
        // Must release read lock before acquiring write lock
        rwl.readLock().unlock();
        rwl.writeLock().lock();
        try {
          // Recheck state because another thread might have
          // acquired write lock and changed state before we did.
          if (!cacheValid) {
            data = ...
            cacheValid = true;
          }
          // Downgrade by acquiring read lock before releasing write lock
          rwl.readLock().lock();
        } finally {
          rwl.writeLock().unlock(); // Unlock write, still hold read
        }
     }

     try {
       use(data);
     } finally {
       rwl.readLock().unlock();
     }
   }
 }

 

Condition的例子:实现两个线程交替执行

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionTest {

    public static void main(String[] args) {
        ExecutorService service = Executors.newSingleThreadExecutor();
        final Business2 business = new Business2();
        service.execute(new Runnable(){

            public void run() {
                for(int i=0;i<50;i++){
                    business.sub();
                }
            }
            
        });
        
        for(int i=0;i<50;i++){
            business.main();
        }
    }

}

class Business2{
    Lock lock = new ReentrantLock();
    Condition condition = lock.newCondition();
    boolean bShouldSub = true;
    public void sub(){
        lock.lock();
        if(!bShouldSub)
            try {
                //等待
                condition.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        try
        {
            for(int i=0;i<10;i++){
                System.out.println(Thread.currentThread().getName() + " : " + i);
            }
            bShouldSub = false;
            //唤醒
            condition.signal();
        }finally{
            lock.unlock();
        }
    }
    
    public void main(){
        lock.lock();
        if(bShouldSub)
            try {
                //等待
                condition.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }        
        try
        {
            for(int i=0;i<5;i++){
                System.out.println(Thread.currentThread().getName() + " : " + i);
            }
            bShouldSub = true;
            //唤醒
            condition.signal();            
        }finally{
            lock.unlock();
        }        
    }
}

 

Condition的例子:2个condition实现可阻塞队列

class BoundedBuffer {
   final Lock lock = new ReentrantLock();
   final Condition notFull  = lock.newCondition(); 
   final Condition notEmpty = lock.newCondition(); 

   final Object[] items = new Object[100];
   int putptr, takeptr, count;

   public void put(Object x) throws InterruptedException {
     lock.lock();
     try {
       while (count == items.length)
         notFull.await();
       items[putptr] = x;
       if (++putptr == items.length) putptr = 0;
       ++count;
       notEmpty.signal();
     } finally {
       lock.unlock();
     }
   }

   public Object take() throws InterruptedException {
     lock.lock();
     try {
       while (count == 0)
         notEmpty.await();
       Object x = items[takeptr];
       if (++takeptr == items.length) takeptr = 0;
       --count;
       notFull.signal();
       return x;
     } finally {
       lock.unlock();
     }
   }
 }

 

Condition的例子:实现三个线程交替运行的效果

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ThreeCondition {

    public static void main(String[] args) {

        new ThreeCondition().init();

    }

    private void init() {
        final Business b = new Business();
        new Thread() {
            public void run() {
                for (int i = 0; i < 50; i++)
                    b.sub1();
            }

        }.start();
        new Thread() {
            public void run() {
                for (int i = 0; i < 50; i++)
                    b.sub2();
            }
        }.start();

        new Thread() {
            public void run() {
                for (int i = 0; i < 50; i++)
                    b.sub3();
            }
        }.start();
    }

    private class Business {
        int status = 1;
        Lock lock = new ReentrantLock();
        Condition cond1 = lock.newCondition();
        Condition cond2 = lock.newCondition();
        Condition cond3 = lock.newCondition();

        public void sub1() {
            lock.lock();
            while (status != 1) {
                try {
                    cond1.await();
                } catch (Exception e) {
                }
            }
            for (int i = 1; i <= 5; i++) {
                try {
                    Thread.sleep(200);
                } catch (Exception e) {
                }
                System.out.println(Thread.currentThread().getName() + ":" + i);
            }
            status = 2;
            cond2.signal();
            lock.unlock();
        }

        public void sub2() {
            lock.lock();
            while (status != 2) {
                try {
                    cond2.await();
                } catch (Exception e) {
                }
            }
            for (int i = 1; i <= 10; i++) {
                try {
                    Thread.sleep(200);
                } catch (Exception e) {
                }
                System.out.println(Thread.currentThread().getName() + ":" + i);
            }
            status = 3;
            cond3.signal();
            lock.unlock();
        }

        public void sub3() {
            lock.lock();
            while (status != 3) {
                try {
                    cond3.await();
                } catch (Exception e) {
                }
            }
            for (int i = 1; i <= 10; i++) {
                try {
                    Thread.sleep(200);
                } catch (Exception e) {
                }
                System.out.println(Thread.currentThread().getName() + ":" + i);
            }
            status = 1;
            cond1.signal();
            lock.unlock();
        }
    }
}

 

Semaphore实现信号灯

Semaphore可以维护当前访问自身的线程个数,并提供了同步机制。使用Semaphore可以控制同时访问资源的线程个数,例如,实现一个文件允许的并发访问数。

单个信号量的Semaphore对象可以实现互斥锁的功能,并且可以是由一个线程获得了“锁”,再由另一个线程释放“锁”,这可应用于死锁恢复的一些场合。

构造方法摘要

Semaphore(int permits)           创建具有给定的许可数和非公平的公平设置的 Semaphore。

Semaphore(int permits, boolean fair)           创建具有给定的许可数和给定的公平设置的 Semaphore。

方法摘要

 void

acquire()           从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断。

 void

acquire(int permits)           从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞,或者线程已被中断。

 void

acquireUninterruptibly()           从此信号量中获取许可,在有可用的许可前将其阻塞。

 void

acquireUninterruptibly(int permits)           从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞。

 int

availablePermits()           返回此信号量中当前可用的许可数。

 int

drainPermits()           获取并返回立即可用的所有许可。

protected  Collection<Thread>

getQueuedThreads()           返回一个 collection,包含可能等待获取的线程。

 int

getQueueLength()           返回正在等待获取的线程的估计数目。

 boolean

hasQueuedThreads()           查询是否有线程正在等待获取。

 boolean

isFair()           如果此信号量的公平设置为 true,则返回 true。

protected  void

reducePermits(int reduction)           根据指定的缩减量减小可用许可的数目。

 void

release()           释放一个许可,将其返回给信号量。

 void

release(int permits)           释放给定数目的许可,将其返回到信号量。

 String

toString()           返回标识此信号量的字符串,以及信号量的状态。

 boolean

tryAcquire()          仅在调用时此信号量存在一个可用许可,才从信号量获取许可。

 boolean

tryAcquire(int permits)           仅在调用时此信号量中有给定数目的许可时,才从此信号量中获取这些许可。

 boolean

tryAcquire(int permits, long timeout, TimeUnit unit)           如果在给定的等待时间内此信号量有可用的所有许可,并且当前线程未被中断,则从此信号量获取给定数目的许可。

 boolean

tryAcquire(long timeout, TimeUnit unit)           如果在给定的等待时间内,此信号量有可用的许可并且当前线程未被中断,则从此信号量获取一个许可。

 

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class SemaphoreTest {
    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool();
        final  Semaphore sp = new Semaphore(3);
        for(int i=0;i<10;i++){
            Runnable runnable = new Runnable(){
                    public void run(){
                    try {
                        sp.acquire();
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }
                    System.out.println("线程" + Thread.currentThread().getName() + 
                            "进入,当前已有" + (3-sp.availablePermits()) + "个并发");
                    try {
                        Thread.sleep((long)(Math.random()*10000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("线程" + Thread.currentThread().getName() + 
                            "即将离开");                    
                    sp.release();
                    //下面代码有时候执行不准确,因为其没有和上面的代码合成原子单元
                    System.out.println("线程" + Thread.currentThread().getName() + 
                            "已离开,当前已有" + (3-sp.availablePermits()) + "个并发");                    
                }
            };
            service.execute(runnable);            
        }
    }

}

 

其他同步工具类

  • CyclicBarrier

一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。

CyclicBarrier 支持一个可选的 Runnable 命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作 很有用。

构造方法摘要

CyclicBarrier(int parties)           创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作。

CyclicBarrier(int parties, Runnable barrierAction)           创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行。

方法摘要

 int

await()     在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。

 int

await(long timeout, TimeUnit unit)           在所有参与者都已经在此屏障上调用 await 方法之前将一直等待,或者超出了指定的等待时间。

 int

getNumberWaiting()           返回当前在屏障处等待的参与者数目。

 int

getParties()           返回要求启动此 barrier 的参与者数目。

 boolean

isBroken()           查询此屏障是否处于损坏状态。

 void

reset()           将屏障重置为其初始状态。

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CyclicBarrierTest {

    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool();
        final  CyclicBarrier cb = new CyclicBarrier(3);
        for(int i=0;i<3;i++){
            Runnable runnable = new Runnable(){
                    public void run(){
                    try {
                        Thread.sleep((long)(Math.random()*10000));    
                        System.out.println("线程" + Thread.currentThread().getName() + 
                                "即将到达集合地点1,当前已有" + cb.getNumberWaiting() + "个已经到达,正在等候");                        
                        cb.await();
                        
                        Thread.sleep((long)(Math.random()*10000));    
                        System.out.println("线程" + Thread.currentThread().getName() + 
                                "即将到达集合地点2,当前已有" + cb.getNumberWaiting() + "个已经到达,正在等候");                        
                        cb.await();    
                        Thread.sleep((long)(Math.random()*10000));    
                        System.out.println("线程" + Thread.currentThread().getName() + 
                                "即将到达集合地点3,当前已有" + cb.getNumberWaiting() + "个已经到达,正在等候");                        
                        cb.await();                        
                    } catch (Exception e) {
                        e.printStackTrace();
                    }                
                }
            };
            service.execute(runnable);
        }
        service.shutdown();
    }
}

 

  • CountDownLatch

一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。 用给定的计数 初始化 CountDownLatch。由于调用了 countDown() 方法,所以在当前计数到达零之前,await 方法会一直受阻塞。之后,会释放所有等待的线程,await 的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。如果需要重置计数,请考虑使用 CyclicBarrier。

CountDownLatch 是一个通用同步工具,它有很多用途。将计数 1 初始化的 CountDownLatch 用作一个简单的开/关锁存器,或入口:在通过调用 countDown() 的线程打开入口前,所有调用 await 的线程都一直在入口处等待。用 N 初始化的 CountDownLatch 可以使一个线程在 N 个线程完成某项操作之前一直等待,或者使其在某项操作完成 N 次之前一直等待。

CountDownLatch 的一个有用特性是,它不要求调用 countDown 方法的线程等到计数到达零时才继续,而在所有线程都能通过之前,它只是阻止任何线程继续通过一个 await。

犹如倒计时计数器,调用CountDownLatch对象的countDown方法就将计数器减1,当计数到达0时,则所有等待者或单个等待者开始执行。

构造方法摘要

CountDownLatch(int count)           构造一个用给定计数初始化的 CountDownLatch。

方法摘要

 void

await()           使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断。

 boolean

await(long timeout, TimeUnit unit)           使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。

 void

countDown()           递减锁存器的计数,如果计数到达零,则释放所有等待的线程。

 long

getCount()           返回当前计数。

 String

toString()           返回标识此锁存器及其状态的字符串。

 

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountdownLatchTest {

    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool();
        final CountDownLatch cdOrder = new CountDownLatch(1);
        final CountDownLatch cdAnswer = new CountDownLatch(3);        
        for(int i=0;i<3;i++){
            Runnable runnable = new Runnable(){
                    public void run(){
                    try {
                        System.out.println("线程" + Thread.currentThread().getName() + "正准备接受命令");                        
                        cdOrder.await();
                        System.out.println("线程" + Thread.currentThread().getName() + "已接受命令");                                
                        Thread.sleep((long)(Math.random()*10000));    
                        System.out.println("线程" + Thread.currentThread().getName() + "回应命令处理结果");                        
                        cdAnswer.countDown();                        
                    } catch (Exception e) {
                        e.printStackTrace();
                    }                
                }
            };
            service.execute(runnable);
        }        
        try {
            Thread.sleep((long)(Math.random()*10000));
        
            System.out.println("线程" + Thread.currentThread().getName() + "即将发布命令");                        
            cdOrder.countDown();
            System.out.println("线程" + Thread.currentThread().getName() + "已发送命令,正在等待结果");    
            cdAnswer.await();
            System.out.println("线程" + Thread.currentThread().getName() + "已收到所有响应结果");    
        } catch (Exception e) {
            e.printStackTrace();
        }                
        service.shutdown();
    }
}

 

  • Exchanger

用于实现两个人之间的数据交换,每个人在完成一定的事务后想与对方交换数据,第一个先拿出数据的人将一直等待第二个人拿着数据到来时,才能彼此交换数据。

构造方法摘要

Exchanger()           创建一个新的 Exchanger。

方法摘要

 V

exchange(V x)           等待另一个线程到达此交换点(除非当前线程被中断),然后将给定的对象传送给该线程,并接收该线程的对象。

 V

exchange(V x, long timeout, TimeUnit unit)           等待另一个线程到达此交换点(除非当前线程被中断,或者超出了指定的等待时间),然后将给定的对象传送给该线程,同时接收该线程的对象。

 

import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExchangerTest {

    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool();
        final Exchanger exchanger = new Exchanger();
        service.execute(new Runnable(){
            public void run() {
                try {                
                    Thread.sleep((long)(Math.random()*10000));
                    String data1 = "zxx";
                    System.out.println("线程" + Thread.currentThread().getName() + "正在把数据" + data1 +"换出去");
                    String data2 = (String)exchanger.exchange(data1);
                    System.out.println("线程" + Thread.currentThread().getName() + "换回的数据为" + data2);
                }catch(Exception e){
                    
                }
            }    
        });
        service.execute(new Runnable(){
            public void run() {
                try {                
                    Thread.sleep((long)(Math.random()*10000));
                    String data1 = "lhm";
                    System.out.println("线程" + Thread.currentThread().getName() + "正在把数据" + data1 +"换出去");
                    String data2 = (String)exchanger.exchange(data1);
                    System.out.println("线程" + Thread.currentThread().getName() + "换回的数据为" + data2);
                }catch(Exception e){
                    
                }                
            }    
        });        
    }
}

 

并发队列

在并发队列上JDK提供了两套实现,一个是以ConcurrentLinkedQueue为代表的高性能队列非阻塞队列,一个是以BlockingQueue接口为代表的阻塞队列,无论哪种都继承自Queue。

阻塞队列与非阻塞队

阻塞队列与普通队列的区别在于,当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞。试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素。同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来,如从队列中移除一个或者多个元素,或者完全清空队列.

1.ArrayDeque, (数组双端队列) 
2.PriorityQueue, (优先级队列) 
3.ConcurrentLinkedQueue, (基于链表的并发队列) 
4.DelayQueue, (延期阻塞队列)(阻塞队列实现了BlockingQueue接口) 
5.ArrayBlockingQueue, (基于数组的并发阻塞队列) 
6.LinkedBlockingQueue, (基于链表的FIFO阻塞队列) 
7.LinkedBlockingDeque, (基于链表的FIFO双端阻塞队列) 
8.PriorityBlockingQueue, (带优先级的无界阻塞队列) 
9.SynchronousQueue (并发同步阻塞队列)

 

ConcurrentLinkedQueue

ConcurrentLinkedQueue : 是一个适用于高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能,通常ConcurrentLinkedQueue性能好于BlockingQueue.它是一个基于链接节点的无界线程安全队列。该队列的元素遵循先进先出的原则。头是最先加入的,尾是最近加入的,该队列不允许null元素。
ConcurrentLinkedQueue重要方法:
add 和offer() 都是加入元素的方法(在ConcurrentLinkedQueue中这俩个方法没有任何区别)poll() 和peek() 都是取头元素节点,区别在于前者会删除元素,后者不会。

    ConcurrentLinkedDeque q = new ConcurrentLinkedDeque();
    q.offer("a");
    q.offer("b");
    q.offer("c");
    q.offer("d");
    q.offer("e");
    //从头获取元素,删除该元素
    System.out.println(q.poll());
    //从头获取元素,不刪除该元素
    System.out.println(q.peek());
    //获取总长度
    System.out.println(q.size());

BlockingQueue

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:

在队列为空时,获取元素的线程会等待队列变为非空。

当队列满时,存储元素的线程会等待队列可用。

阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

BlockingQueue即阻塞队列,从阻塞这个词可以看出,在某些情况下对阻塞队列的访问可能会造成阻塞。被阻塞的情况主要有如下两种:

1. 当队列满了的时候进行入队列操作

2. 当队列空了的时候进行出队列操作

因此,当一个线程试图对一个已经满了的队列进行入队列操作时,它将会被阻塞,除非有另一个线程做了出队列操作;同样,当一个线程试图对一个空队列进行出队列操作时,它将会被阻塞,除非有另一个线程进行了入队列操作。

在Java中,BlockingQueue的接口位于java.util.concurrent 包中(在Java5版本开始提供),由上面介绍的阻塞队列的特性可知,阻塞队列是线程安全的。

在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为快速搭建高质量的多线程程序带来极大的便利。。

 

常用的队列主要有以下两种:(当然通过不同的实现方式,还可以延伸出很多不同类型的队列,DelayQueue就是其中的一种)

  先进先出(FIFO):先插入的队列的元素也最先出队列,类似于排队的功能。从某种程度上来说这种队列也体现了一种公平性。

  后进先出(LIFO):后插入队列的元素最先出队列,这种队列优先处理最近发生的事件。

      多线程环境中,通过队列可以很容易实现数据共享,比如经典的“生产者”和“消费者”模型中,通过队列可以很便利地实现两者之间的数据共享。假设我们有若干生产者线程,另外又有若干个消费者线程。如果生产者线程需要把准备好的数据共享给消费者线程,利用队列的方式来传递数据,就可以很方便地解决他们之间的数据共享问题。但如果生产者和消费者在某个时间段内,万一发生数据处理速度不匹配的情况呢?理想情况下,如果生产者产出数据的速度大于消费者消费的速度,并且当生产出来的数据累积到一定程度的时候,那么生产者必须暂停等待一下(阻塞生产者线程),以便等待消费者线程把累积的数据处理完毕,反之亦然。然而,在concurrent包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。好在此时,强大的concurrent包横空出世了,而他也给我们带来了强大的BlockingQueue。(在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒)

ArrayBlockingQueue

ArrayBlockingQueue是一个有边界的阻塞队列,它的内部实现是一个数组。有边界的意思是它的容量是有限的,我们必须在其初始化的时候指定它的容量大小,容量大小一旦指定就不可改变。

ArrayBlockingQueue是以先进先出的方式存储数据,最新插入的对象是尾部,最新移出的对象是头部。下面是一个初始化和使用ArrayBlockingQueue的例子:

<String> arrays = new ArrayBlockingQueue<String>(3);
arrays.add("李四");
arrays.add(
"张军"); arrays.add("张军"); // 添加阻塞队列 arrays.offer("张三", 1, TimeUnit.SECONDS);

LinkedBlockingQueue

LinkedBlockingQueue阻塞队列大小的配置是可选的,如果我们初始化时指定一个大小,它就是有边界的,如果不指定,它就是无边界的。说是无边界,其实是采用了默认大小为Integer.MAX_VALUE的容量 。它的内部实现是一个链表。

和ArrayBlockingQueue一样,LinkedBlockingQueue 也是以先进先出的方式存储数据,最新插入的对象是尾部,最新移出的对象是头部。下面是一个初始化和使LinkedBlockingQueue的例子:

LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(3);
linkedBlockingQueue.add("张三");
linkedBlockingQueue.add("李四");
linkedBlockingQueue.add("李四");
System.out.println(linkedBlockingQueue.size());

PriorityBlockingQueue

PriorityBlockingQueue是一个没有边界的队列,它的排序规则和 java.util.PriorityQueue一样。需要注意,PriorityBlockingQueue中允许插入null对象。所有插入PriorityBlockingQueue的对象必须实现 java.lang.Comparable接口,队列优先级的排序规则就 是按照我们对这个接口的实现来定义的。另外,我们可以从PriorityBlockingQueue获得一个迭代器Iterator,但这个迭代器并不保证按照优先级顺 序进行迭代。

下面举个例子来说明一下,首先定义一个对象类型,这个对象需要实现Comparable接口:

SynchronousQueue

SynchronousQueue队列内部仅允许容纳一个元素。当一个线程插入一个元素后会被阻塞,除非这个元素被另一个线程消费。

使用BlockingQueue模拟生产者与消费

class ProducerThread implements Runnable {
    private BlockingQueue<String> blockingQueue;
    private AtomicInteger count = new AtomicInteger();
    private volatile boolean FLAG = true;

    public ProducerThread(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + "生产者开始启动....");
        while (FLAG) {
            String data = count.incrementAndGet() + "";
            try {
                boolean offer = blockingQueue.offer(data, 2, TimeUnit.SECONDS);
                if (offer) {
                    System.out.println(Thread.currentThread().getName() + ",生产队列" + data + "成功..");
                } else {
                    System.out.println(Thread.currentThread().getName() + ",生产队列" + data + "失败..");
                }
                Thread.sleep(1000);
            } catch (Exception e) {

            }
        }
        System.out.println(Thread.currentThread().getName() + ",生产者线程停止...");
    }

    public void stop() {
        this.FLAG = false;
    }

}

class ConsumerThread implements Runnable {
    private volatile boolean FLAG = true;
    private BlockingQueue<String> blockingQueue;

    public ConsumerThread(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + "消费者开始启动....");
        while (FLAG) {
            try {
                String data = blockingQueue.poll(2, TimeUnit.SECONDS);
                if (data == null || data == "") {
                    FLAG = false;
                    System.out.println("消费者超过2秒时间未获取到消息.");
                    return;
                }
                System.out.println("消费者获取到队列信息成功,data:" + data);

            } catch (Exception e) {
                // TODO: handle exception
            }
        }
    }

}

public class Test0008 {

    public static void main(String[] args) {
        BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(3);
        ProducerThread producerThread = new ProducerThread(blockingQueue);
        ConsumerThread consumerThread = new ConsumerThread(blockingQueue);
        Thread t1 = new Thread(producerThread);
        Thread t2 = new Thread(consumerThread);
        t1.start();
        t2.start();
        //10秒后 停止线程..
        try {
            Thread.sleep(10*1000);
            producerThread.stop();
        } catch (Exception e) {
            // TODO: handle exception
        }
    }

}

 

 

抛出异常

特殊值

阻塞

超时

插入

add(e)

offer(e)

put(e)

offer(e, time, unit)

移除

remove()

poll()

take()

poll(time, unit)

检查

element()

peek()

不可用

不可用

 

 

构造方法摘要

ArrayBlockingQueue(int capacity)           创建一个带有给定的(固定)容量和默认访问策略的 ArrayBlockingQueue。

ArrayBlockingQueue(int capacity, boolean fair)           创建一个具有给定的(固定)容量和指定访问策略的 ArrayBlockingQueue。

ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)           创建一个具有给定的(固定)容量和指定访问策略的 ArrayBlockingQueue,它最初包含给定 collection 的元素,并以 collection 迭代器的遍历顺序添加元素。

方法摘要

 boolean

add(E e)           将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true,如果此队列已满,则抛出 IllegalStateException。

 void

clear()           自动移除此队列中的所有元素。

 boolean

contains(Object o)           如果此队列包含指定的元素,则返回 true。

 int

drainTo(Collection<? super E> c)           移除此队列中所有可用的元素,并将它们添加到给定 collection 中。

 int

drainTo(Collection<? super E> c, int maxElements)           最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定 collection 中。

 Iterator<E>

iterator()           返回在此队列中的元素上按适当顺序进行迭代的迭代器。

 boolean

offer(E e)           将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true,如果此队列已满,则返回 false。

 boolean

offer(E e, long timeout, TimeUnit unit)           将指定的元素插入此队列的尾部,如果该队列已满,则在到达指定的等待时间之前等待可用的空间。

 E

peek()           获取但不移除此队列的头;如果此队列为空,则返回 null。

 E

poll()           获取并移除此队列的头,如果此队列为空,则返回 null。

 E

poll(long timeout, TimeUnit unit)           获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)。

 void

put(E e)           将指定的元素插入此队列的尾部,如果该队列已满,则等待可用的空间。

 int

remainingCapacity()           返回在无阻塞的理想情况下(不存在内存或资源约束)此队列能接受的其他元素数量。

 boolean

remove(Object o)           从此队列中移除指定元素的单个实例(如果存在)。

 int

size()           返回此队列中元素的数量。

 E

take()  获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。

 Object[]

toArray()           返回一个按适当顺序包含此队列中所有元素的数组。

<T> T[]

toArray(T[] a)           返回一个按适当顺序包含此队列中所有元素的数组;返回数组的运行时类型是指定数组的运行时类型。

 String

toString()           返回此 collection 的字符串表示形式。

阻塞队列的实现原理(Condition锁中有提到await signal)

 

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueTest {
    public static void main(String[] args) {
        final BlockingQueue queue = new ArrayBlockingQueue(3);
        for(int i=0;i<2;i++){
            new Thread(){
                public void run(){
                    while(true){
                        try {
                            Thread.sleep((long)(Math.random()*1000));
                            System.out.println(Thread.currentThread().getName() + "准备放数据!");                            
                            queue.put(1);
                            System.out.println(Thread.currentThread().getName() + "已经放了数据," + "队列目前有" + queue.size() + "个数据");
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }

                    }
                }
                
            }.start();
        }
        
        new Thread(){
            public void run(){
                while(true){
                    try {
                        //将此处的睡眠时间分别改为100和1000,观察运行结果
                        Thread.sleep(1000);
                        System.out.println(Thread.currentThread().getName() + "准备取数据!");
                        queue.take();
                        System.out.println(Thread.currentThread().getName() + "已经取走数据," + "队列目前有" + queue.size() + "个数据");                    
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
            
        }.start();            
    }
}

 

线程

什么是线程池

Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来3个好处。
第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
第三:提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,
还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。但是,要做到合理利用线程池,必须对其实现原理了如指掌。

线程池作用

线程池是为突然大量爆发的线程设计的,通过有限的几个固定线程为大量的操作服务,减少了创建和销毁线程所需的时间,从而提高效率。

如果一个线程的时间非常长,就没必要用线程池了(不是不能作长时间操作,而是不宜。),况且我们还不能控制线程池中线程的开始、挂起、和中止。

线程池的分类

ThreadPoolExecutor

Java是天生就支持并发的语言,支持并发意味着多线程,线程的频繁创建在高并发及大数据量是非常消耗资源的,因为java提供了线程池。在jdk1.5以前的版本中,线程池的使用是及其简陋的,但是在JDK1.5后,有了很大的改善。JDK1.5之后加入了java.util.concurrent包,java.util.concurrent包的加入给予开发人员开发并发程序以及解决并发问题很大的帮助。这篇文章主要介绍下并发包下的Executor接口,Executor接口虽然作为一个非常旧的接口(JDK1.5 2004年发布),但是很多程序员对于其中的一些原理还是不熟悉,因此写这篇文章来介绍下Executor接口,同时巩固下自己的知识。如果文章中有出现错误,欢迎大家指出。

Executor框架的最顶层实现是ThreadPoolExecutor类,Executors工厂类中提供的newScheduledThreadPool、newFixedThreadPool、newCachedThreadPool方法其实也只是ThreadPoolExecutor的构造函数参数不同而已。通过传入不同的参数,就可以构造出适用于不同应用场景下的线程池,那么它的底层原理是怎样实现的呢,这篇就来介绍下ThreadPoolExecutor线程池的运行过程。

 

corePoolSize: 核心池的大小。 当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中
maximumPoolSize: 线程池最大线程数,它表示在线程池中最多能创建多少个线程;
keepAliveTime: 表示线程没有任务执行时最多保持多久时间会终止。
unit: 参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:

线程池四种创建方式

Java通过Executors(jdk1.5并发包)提供四种线程池,分别为:

newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。

newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

newCachedThreadPool

创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。示例代码如下:

        // 无限大小线程池 jvm自动回收
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            final int temp = i;
            newCachedThreadPool.execute(new Runnable() {

                @Override
                public void run() {
                    try {
                        Thread.sleep(100);
                    } catch (Exception e) {
                        // TODO: handle exception
                    }
                    System.out.println(Thread.currentThread().getName() + ",i:" + temp);

                }
            });
        }

总结: 线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。

newFixedThreadPool

创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。示例代码如下:

ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 10; i++) {
            final int temp = i;
            newFixedThreadPool.execute(new Runnable() {

                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getId() + ",i:" + temp);

                }
            });
        }

总结:因为线程池大小为3,每个任务输出index后sleep 2秒,所以每两秒打印3个数字。定长线程池的大小最好根据系统资源进行设置。如Runtime.getRuntime().availableProcessors()

newScheduledThreadPool

创建一个定长线程池,支持定时及周期性任务执行。延迟执行示例代码如下:

ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(5);
        for (int i = 0; i < 10; i++) {
            final int temp = i;
            newScheduledThreadPool.schedule(new Runnable() {
                public void run() {
                    System.out.println("i:" + temp);
                }
            }, 3, TimeUnit.SECONDS);
}

表示延迟3秒执行。

newSingleThreadExecutor

创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。示例代码如下:

    ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 10; i++) {
            final int index = i;
            newSingleThreadExecutor.execute(new Runnable() {

                @Override
                public void run() {
                    System.out.println("index:" + index);
                    try {
                        Thread.sleep(200);
                    } catch (Exception e) {
                        // TODO: handle exception
                    }
                }
            });
        }

注意: 结果依次输出,相当于顺序执行各个任务。

线程原理剖析

提交一个任务到线程池中,线程池的处理流程如下:

1、判断线程池里的核心线程是否都在执行任务,如果不是(核心线程空闲或者还有核心线程没有被创建)则创建一个新的工作线程来执行任务。如果核心线程都在执行任务,则进入下个流程。

2、线程池判断工作队列是否已满,如果工作队列没有满,则将新提交的任务存储在这个工作队列里。如果工作队列满了,则进入下个流程。

3、判断线程池里的线程是否都处于工作状态,如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务。

自定义线程线程池

如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;

如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;

如果队列已经满了,则在总线程数不大于maximumPoolSize的前提下,则创建新的线程

如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;

如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。

public class Test0007 {

    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3));
        for (int i = 1; i <= 6; i++) {
            TaskThred t1 = new TaskThred("任务" + i);
            executor.execute(t1);
        }
        executor.shutdown();
    }
}

class TaskThred implements Runnable {
    private String taskName;

    public TaskThred(String taskName) {
        this.taskName = taskName;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName()+taskName);
    }

}

合理配置线程池

CPU密集

CPU密集的意思是该任务需要大量的运算,而没有阻塞,CPU一直全速运行。

CPU密集任务只有在真正的多核CPU上才可能得到加速(通过多线程),而在单核CPU上,无论你开几个模拟的多线程,该任务都不可能得到加速,因为CPU总的运算能力就那些。

IO密集

IO密集型,即该任务需要大量的IO,即大量的阻塞。在单线程上运行IO密集型的任务会导致浪费大量的CPU运算能力浪费在等待。所以在IO密集型任务中使用多线程可以大大的加速程序运行,即时在单核CPU上,这种加速主要就是利用了被浪费掉的阻塞时间。

 

如何合理的设置线程池大小

要想合理的配置线程池的大小,首先得分析任务的特性,可以从以下几个角度分析:

1. 任务的性质:CPU密集型任务、IO密集型任务、混合型任务。

2. 任务的优先级:高、中、低。

3. 任务的执行时间:长、中、短。

4. 任务的依赖性:是否依赖其他系统资源,如数据库连接等。

性质不同的任务可以交给不同规模的线程池执行。

对于不同性质的任务来说,CPU密集型任务应配置尽可能小的线程,如配置CPU个数+1的线程数,IO密集型任务应配置尽可能多的线程,因为IO操作不占用CPU,不要让CPU闲下来,应加大线程数量,如配置两倍CPU个数+1,而对于混合型的任务,如果可以拆分,拆分成IO密集型和CPU密集型分别处理,前提是两者运行的时间是差不多的,如果处理时间相差很大,则没必要拆分了。

若任务对其他系统资源有依赖,如某个任务依赖数据库的连接返回的结果,这时候等待的时间越长,则CPU空闲的时间越长,那么线程数量应设置得越大,才能更好的利用CPU。

当然具体合理线程池值大小,需要结合系统实际情况,在大量的尝试下比较才能得出,以上只是前人总结的规律。

 

最佳线程数目 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目

比如平均每个线程CPU运行时间为0.5s,而线程等待时间(非CPU运行时间,比如IO)为1.5s,CPU核心数为8,那么根据上面这个公式估算得到:((0.5+1.5)/0.5)*8=32。这个公式进一步转化为:

最佳线程数目 = (线程等待时间与线程CPU时间之比 + 1)* CPU数目

可以得出一个结论: 
线程等待时间所占比例越高,需要越多线程。线程CPU时间所占比例越高,需要越少线程。以上公式与之前的CPU和IO密集型任务设置线程数基本吻合。

CPU密集型时,任务可以少配置线程数,大概和机器的cpu核数相当,这样可以使得每个线程都在执行任务IO密集型时,大部分线程都阻塞,故需要多配置线程数,2*cpu核数

操作系统之名称解释:

某些进程花费了绝大多数时间在计算上,而其他则在等待I/O上花费了大多是时间,前者称为计算密集型(CPU密集型)computer-bound,后者称为I/O密集型,I/O-bound。

 

 

Java锁的深度化

 

悲观锁、乐观锁、排他锁

 

当多个请求同时操作数据库时,首先将订单状态改为已支付,在金额加上200,在同时并发场景查询条件下,会造成重复通知。

 

悲观锁与乐观锁

 

悲观锁:悲观锁悲观的认为每一次操作都会造成更新丢失问题,在每次查询时加上排他锁。

 

每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会block直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。

 

Select * from xxx for update;

 

乐观锁:乐观锁会乐观的认为每次查询都不会造成更新丢失,利用版本字段控制

 

重入锁

 

锁作为并发共享数据,保证一致性的工具,在JAVA平台有多种实现(如 synchronized 和 ReentrantLock等等 ) 。这些已经写好提供的锁为我们开发提供了便利。

 

重入锁,也叫做递归锁,指的是同一线程 外层函数获得锁之后 ,内层递归函数仍然有获取该锁的代码,但不受影响。
在JAVA环境下 ReentrantLock 和synchronized 都是可重入锁

public class Test implements Runnable {
    public  synchronized void get() {
        System.out.println("name:" + Thread.currentThread().getName() + " get();");
        set();
    }

    public synchronized  void set() {
        System.out.println("name:" + Thread.currentThread().getName() + " set();");
    }

    @Override

    public void run() {
        get();
    }

    public static void main(String[] args) {
        Test ss = new Test();
        new Thread(ss).start();
        new Thread(ss).start();
        new Thread(ss).start();
        new Thread(ss).start();
    }
}

 

public class Test02 extends Thread {
    ReentrantLock lock = new ReentrantLock();
    public void get() {
        lock.lock();
        System.out.println(Thread.currentThread().getId());
        set();
        lock.unlock();
    }
    public void set() {
        lock.lock();
        System.out.println(Thread.currentThread().getId());
        lock.unlock();
    }
    @Override
    public void run() {
        get();
    }
    public static void main(String[] args) {
        Test ss = new Test();
        new Thread(ss).start();
        new Thread(ss).start();
        new Thread(ss).start();
    }

}

 

 

 

CAS无锁机制

 

(1)与锁相比,使用比较交换(下文简称CAS)会使程序看起来更加复杂一些。但由于其非阻塞性,它对死锁问题天生免疫,并且,线程间的相互影响也远远比基于锁的方式要小。更为重要的是,使用无锁的方式完全没有锁竞争带来的系统开销,也没有线程间频繁调度带来的开销,因此,它要比基于锁的方式拥有更优越的性能。

 

(2)无锁的好处:

 

第一,在高并发的情况下,它比有锁的程序拥有更好的性能;

 

第二,它天生就是死锁免疫的。

 

就凭借这两个优势,就值得我们冒险尝试使用无锁的并发。

 

(3)CAS算法的过程是这样:它包含三个参数CAS(V,E,N): V表示要更新的变量,E表示预期值,N表示新值。仅当V值等于E值时,才会将V的值设为N,如果V值和E值不同,则说明已经有其他线程做了更新,则当前线程什么都不做。最后,CAS返回当前V的真实值。

 

(4)CAS操作是抱着乐观的态度进行的,它总是认为自己可以成功完成操作。当多个线程同时使用CAS操作一个变量时,只有一个会胜出,并成功更新,其余均会失败。失败的线程不会被挂起,仅是被告知失败,并且允许再次尝试,当然也允许失败的线程放弃操作。基于这样的原理,CAS操作即使没有锁,也可以发现其他线程对当前线程的干扰,并进行恰当的处理。

 

(5)简单地说,CAS需要你额外给出一个期望值,也就是你认为这个变量现在应该是什么样子的。如果变量不是你想象的那样,那说明它已经被别人修改过了。你就重新读取,再次尝试修改就好了。

 

(6)在硬件层面,大部分的现代处理器都已经支持原子化的CAS指令。在JDK 5.0以后,虚拟机便可以使用这个指令来实现并发操作和并发数据结构,并且,这种操作在虚拟机中可以说是无处不在。

 

 

/** 
     * Atomically increments by one the current value. 
     * 
     * @return the updated value 
     */  
    public final int incrementAndGet() {  
        for (;;) {  
            //获取当前值  
            int current = get();  
            //设置期望值  
            int next = current + 1;  
            //调用Native方法compareAndSet,执行CAS操作  
            if (compareAndSet(current, next))  
                //成功后才会返回期望值,否则无线循环  
                return next;  
        }  
    }  

 

 

 

自旋锁

 

自旋锁是采用让当前线程不停地的在循环体内执行实现的,当循环的条件被其他线程改变时 才能进入临界区。如下

 

    private AtomicReference<Thread> sign =new AtomicReference<>();
    public void lock() {
        Thread current = Thread.currentThread();
        while (!sign.compareAndSet(null, current)) {
          }
    }
    public void unlock() {
        Thread current = Thread.currentThread();
        sign.compareAndSet(current, null);
    }

 

 

public class Test implements Runnable {
    static int sum;
    private SpinLock lock;

    public Test(SpinLock lock) {
        this.lock = lock;
    }

    /**
     * @param args
     * @throws InterruptedException
     */
    public static void main(String[] args) throws InterruptedException {
        SpinLock lock = new SpinLock();
        for (int i = 0; i < 100; i++) {
            Test test = new Test(lock);
            Thread t = new Thread(test);
            t.start();
        }

        Thread.currentThread().sleep(1000);
        System.out.println(sum);
    }

    @Override
    public void run() {
        this.lock.lock();
        this.lock.lock();
        sum++;
        this.lock.unlock();
        this.lock.unlock();
    }

}

当一个线程 调用这个不可重入的自旋锁去加锁的时候没问题,当再次调用lock()的时候,因为自旋锁的持有引用已经不为空了,该线程对象会误认为是别人的线程持有了自旋锁

使用了CAS原子操作,lock函数将owner设置为当前线程,并且预测原来的值为空。unlock函数将owner设置为null,并且预测值为当前线程。

当有第二个线程调用lock操作时由于owner值不为空,导致循环一直被执行,直至第一个线程调用unlock函数将owner设置为null,第二个线程才能进入临界区。

由于自旋锁只是将当前线程不停地执行循环体,不进行线程状态的改变,所以响应速度更快。但当线程数不停增加时,性能下降明显,因为每个线程都需要执行,占用CPU时间。如果线程竞争不激烈,并且保持锁的时间段。适合使用自旋锁。

 

分布式锁

如果想在不同的jvm中保证数据同步,使用分布式锁技术。

有数据库实现、缓存实现、Zookeeper分布式锁

 

原文地址:https://www.cnblogs.com/aaron911/p/9949494.html