并发容器

并发容器

From Vector To Queue
1.原始的ArrayList
/**
 * 有N张火车票  每张票都有一个编号
 * 同时有10个窗口对外售票
 *
 */
package com.bingfarongqi;

import java.util.ArrayList;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.TimeUnit;

public class TicketSeller1 {
        //定义一个容器 tickets 总票数
            static List<String> tickets=new ArrayList<>();
            static {
                //总共1000张票 定义每张票的编号
                for (int i = 0; i <1000 ; i++) tickets.add("票编号"+i);
            }

    public static void main(String[] args) {
        for (int i = 0; i <10 ; i++) {
            new Thread(()->{
            while (tickets.size()>0){
                try {
                    TimeUnit.MICROSECONDS.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //把容器内第0个位置的票remove
                System.out.println("销售了--"+tickets.remove(0));
            }
            }).start();
        }
    }
}

这种最原始的集合容器 一眼可以看出 没有加锁 问题就在这 10个线程同时运行没有加锁肯定会出问题

size大于0 的时候没有问题 因为只要还有剩余的票我就往外卖,取一张往外remove

但是当剩下最后一张票的时候问题就出现了 好几个线程执行到这都发现size大于0 所有线程都往外卖一张票,那么就会发生只有一个线程拿到了票,其余线程就会输出空null,所以没有加锁 线程不安全。

2.Vector
/**
 * 有N张火车票  每张票都有一个编号
 * 同时有10个窗口对外售票
 *
 */
package com.bingfarongqi;

import java.util.ArrayList;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.TimeUnit;

public class TicketSeller2 {
    //定义一个容器 tickets 总票数
    static Vector<String> tickets=new Vector<>();
    static {
        //总共1000张票 定义每张票的编号
        for (int i = 0; i <1000 ; i++) tickets.add("票编号"+i);
    }

    public static void main(String[] args) {
        for (int i = 0; i <10 ; i++) {
            new Thread(()->{
                while (tickets.size()>0){
                    try {
                        TimeUnit.MICROSECONDS.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    //把容器内第0个位置的票remove
                    System.out.println("销售了--"+tickets.remove(0));
                }
            }).start();
        }
    }
}

第二种是容器Vetor 是自带内部锁

读它的内部方法时会看到synchronized

虽然Vetor上来先加了锁保证了线程的安全性 但是当我们读这个程序的时候发现还是不对,锁为了线程的安全,就是当我们调用size方法的时候他加锁了,调用了remove的时候也加锁了,可不幸的是你这两个中间没加 那好多个线程依然断定这个size是大于0 的 又出现了超卖的现象。

3.Linkenlist

/**
 * 有N张火车票  每张票都有一个编号
 * 同时有10个窗口对外售票
 *
 */
package com.bingfarongqi;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.TimeUnit;

public class TicketSeller3 {
    //定义一个容器 tickets 总票数
    static List<String> tickets=new LinkedList<>();
    static {
        //总共1000张票 定义每张票的编号
        for (int i = 0; i <1000 ; i++) tickets.add("票编号"+i);
    }

    public static void main(String[] args) {
        for(int i = 0; i <10 ; i++){
            new Thread(()->{
                while (true){
                    //要在外层单独加一把锁 不然还是出问题
                    synchronized (tickets){
                        if (tickets.size()<=0) break;

                    try {
                        TimeUnit.MICROSECONDS.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    //把容器内第0个位置的票remove
                    System.out.println("销售了--"+tickets.remove(0));
                }
                }
            }).start();
        }
    }
}

这个容器虽然是用了加锁 但由于子啊调用并发容器的时候你是调用了两个原子方法,所以你在外层还是得再加上一把锁 这个不会出现什么问题 它会踏踏实实的执行 但是效率不是最高的。

4.Queue
/**
 * 有N张火车票  每张票都有一个编号
 * 同时有10个窗口对外售票
 *
 */
package com.bingfarongqi;

import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;

public class TicketSeller4 {
    //定义一个容器 tickets 总票数
    static Queue<String> tickets=new ConcurrentLinkedQueue<>();
    static {
        //总共1000张票 定义每张票的编号
        for (int i = 0; i <1000 ; i++) tickets.add("票编号"+i);
    }

    public static void main(String[] args) {
        for(int i = 0; i <10 ; i++){
            new Thread(()->{
                while (true){
                    String s=tickets.poll();
                    if (s==null) break;
                        //把容器内第0个位置的票remove
                        else System.out.println("销售了--"+s);
                    }
            }).start();
        }
    }
}

这个容器是最新的一个接口 主要是为了多线程用的 所以以后多线程这种单个元素的时候多考虑Queue 少用List呵呵Set

这里调用了个poll方法 poll的意思就是我从ticket去取值,这个值什么时候空了说明里面的值已经没了,所以while(true)不断往外销售,一直等到票没了

它的源码 :

意思是说从queue的头部取值 等取完了 也就是值为null的时候我就返回null值。

ConcurrentMAP

这里会出现一个跳表结构

CopyOnWrite

CopyOnWrite的意思就是写时复制

主要是在写等待时候很少 读的时候很多的情况 这个时候就可以考虑用CopyOnWrite方式来提高效率了。

Vector是在写的时候加锁 读的时候也加锁,但是CopyOnWriteList用的时候读的时候不加锁 写的时候会在原来的基础上拷贝一个,拷贝的时候扩展出来一个新元素来,然后把你添加的这个扔到最后的位置,与此同时把指向老的容器的一个引用指向新的,这就叫写时复制。

BlockingQueue

BlockingQueue的概念重点在Blocking上 Blocking阻塞 Queue队列,是阻塞队列。

这个Queue提供了一些可以给多线程比较友好的接口 :

1.offer 对应的就是原来的add 与add的区别就在于 add如果加不进去是会抛出异常 ,所以我们用的最多的是Queue里面的offer 它会概念一个返回值。

2.poll 用来取数据 取出数据并且remove

3.peek 拿出来数据 取出数据而不remove

package com.bingfarongqi.f_03;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

public class BlockingQueue {
    public static void main(String[] args) {
        Queue<String> strs=new ConcurrentLinkedQueue<>();
        for (int i = 0; i <10 ; i++) {
            strs.offer("a"+i);//add
        }
        //10个数据
        System.out.println(strs);
        //size是10
        System.out.println(strs.size());
            //取出a0 并且remove掉 所以size还剩9
        System.out.println(strs.poll());
        System.out.println(strs.size());
        //因为刚才poll取出a0并且remove 所以peek取出的只能是a1 并且size还是9
        //因为peek不remove 所以size 9 不变
        System.out.println(strs.peek());
        System.out.println(strs.size());
    }
}

LinkedBlockingQueue

LinkedBlockingQueue是在Queue的基础上又添加了两个方法 一个叫put 一个叫take

这两个方法真正的实现了阻塞,put往里面装 如果满了的话就会让这个线程阻塞住,take往外取,如果空了的话就会阻塞住。 所以这是真正的实现了 生产者和消费者的容器。

ArrayBlockingQueue(扩展-面试常问问题)

ArrayBlockingQueue是有界的 可以界定容器的大小。

当往容器里面仍数据的时候 一旦满了 ,这个put方法就会阻塞住,offer用返回值来判断到底加没加成功。

扩展- 面试常问问题:Queue和list的区别到底在哪里?

主要是在这,添加里一些对线程友好的API :offer peek poll put take

package com.bingfarongqi.f_03;

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class ArrayBlockingQueue_test {
    static BlockingQueue<String> strs=new ArrayBlockingQueue<>(10);
    static Random r=new Random();

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i <10 ; i++) {
            strs.put("a"+i);
        }
        //strs.put("aaa");//满了就会等待,程序阻塞(无返回值)
        //strs.add("aaa");//满了就会报出异常
        strs.offer("aaa");//满了就会返回值判定成功
        strs.offer("aaa",1, TimeUnit.SECONDS);

        System.out.println(strs);
    }
}

DelayQueue(扩展PriorityQueue)

DelayQueue可以实现时间上的排序,主要是按照在里面等待的时间进行排序。

就看站着线程 正常情况下按照顺序 t1 >2>3>4>5

但是这里看的是后面指定的时间 t1是1秒 t2是2秒 t3是1.5 t4是2.5 t5是0.5

所以根据这个时间 运行顺序就是t5>1>3>2>4

这个主要是用在调度时间的案例 ,比如 谁谁谁一个小时后干什么 三小时后干什么.....

扩展--:

DelayQueue的本质上用的是一个PriorityQueue,PriorityQueue是从AbstractQueue继承的。

PriorityQueue的特点是它内部你往里装的时候并不是按顺序,而是内部进行了一个排序,按照优先级,最小的优先。它内部的结构其实是一个二叉树 .

package com.bingfarongqi.f_03;

import java.util.PriorityQueue;

public class PriorityQueue_test {
    public static void main(String[] args) {
        PriorityQueue<String> q=new PriorityQueue<>();

        q.add("c");
        q.add("e");
        q.add("a");
        q.add("d");
        q.add("z");
        for (int i = 0; i <5 ; i++) {
            System.out.println(q.poll());
            //输出结果是 a c d e z
        }
    }
}

这个例子就很明显的展现胡了二叉树 优先级的问题 最小的在上面 。

SynchronusQueue

这个容器的容量是0 它不是用来装东西的,而是专门用来两个线程传内容的,第三节的最好有一个容器叫Exchanger 就是那个两个人交换装备的,他们两个本质上概念是一样的。

package com.bingfarongqi.f_03;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;

public class SynchronusQueue_test {
    //容量为0
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> strs=new SynchronousQueue<>();
        new Thread(()->{
            try {
                System.out.println(strs.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
            strs.put("aaa");//阻塞等待消费者
            //strs.put("bbb");
            //strs.add("aaa");

        System.out.println(strs.size());
    }
}

SynchronusQueue在线程池的作用特别大 ,很多线程取任务,互相之间进行任务调度的时候都用它。

TransferQueue

TransferQueue添加了一个新方法 叫transfer 这个方法和put相似 但是和put唯一的不同是put是把数据放进来就走 然后接着放 知道满了 然后等待,transfer是撞进来一个数据就在这等着 ,直到消费者把这个数据拿走了我才回去再做我的事。

package com.bingfarongqi.f_03;

import java.util.concurrent.LinkedTransferQueue;

public class TransferQueue_test {
    public static void main(String[] args) throws InterruptedException {
        LinkedTransferQueue<String> strs=new LinkedTransferQueue<>();

        new Thread(()->{
            try {
                System.out.println(strs.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        strs.transfer("aaa");
//        strs.put("aaa");
//        new Thread(()->{
//            try {
//                System.out.println(strs.take());
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
//        }).start();
    }
}

一般使用场景:我付了钱,这个订单我付款完成了,但是我一直要等这个付款的结果完成才可以给客户反馈。

原文地址:https://www.cnblogs.com/beizhai/p/13796277.html