并发容器

ThreadLocal

ThreadLocal 线程局部变量,只对当前线程范围有效,比如下面例子,在第一个线程设置的值,第二个线程是使用不了的。

public class TLDemo2 {

    private static ThreadLocal<User> threadLocal = new ThreadLocal<>();

    public static void main(String[] args) {

        new Thread(()->{
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (Exception e) {
                e.printStackTrace();
            }
            threadLocal.set(new User());
            System.out.println(threadLocal.get());
        }).start();

        new Thread(()->{
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (Exception e) {
                e.printStackTrace();
            }
           threadLocal.set(new User());
//            threadLocal.remove();
            System.out.println(threadLocal.get());
        }).start();

    }

}

有10000张火车票,同时有10个窗口对外售票

请写一个模拟程序

public class SaleOfTickets1 {

    private static List<Integer> tickets = new ArrayList<>();
    
    static{
        for (int i = 0; i < 10000; i++) {
            tickets.add(i);
        }
    }

    /**
     * 当我们卖到最后一张票的时候,tickets是大于0的,第一个线程看到他大于0,进入判断,
     * 第二个线程同样操作区remove的时候票就没有了,在高并发的情况下,还可能会卖重,
     * 几百万线程去卖票的时候,remove方法不是同步的,第一个线程卖了这张票,第二个线程可能也买了。
     * @param args
     */
    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                while(tickets.size() > 0){
                    System.out.println(Thread.currentThread().getName()+"销售票编号:" + tickets.remove(0));
                }
            }).start();
        }
    }
}

vector

public class SaleOfTickets2 {

    private static Vector<Integer> tickets = new Vector<>();
    
    static {
        for (int i = 0; i < 10000; i++) {
            tickets.add(i);
        }
    }


    /**
     * vector是一个同步容器
     * 这里是判断和操作分离了,虽然说在vector里面remove方法是原子性的,但是他的判断和remove方法是分离的,
     * 但是可能在判断到remove的过程当中线程可能会被打断。我们可以加一个模拟性的睡眠,因为在你实际开发的
     * 时候,可能在这中间有些判断代码逻辑代码。
     * 如果剩了最后一个了,很多线程去抢票,虽然size是原子性的,remove是原子性的,但是在他们的中间,
     * 线程还是有可能被打断
     */
    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                while(tickets.size() > 0){
                    try {
                        TimeUnit.MILLISECONDS.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("销售票编号:" + tickets.remove(0));
                }
            }).start();
        }
    }
}

synchronized

public class SaleOfTickets3 {

    private static List<Integer> tickets = new LinkedList<>();
    
    static {
        for (int i = 0; i < 10000; i++) {
            tickets.add(i);
        }
    }
    
    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                while(true){
                    //这里使用synchronized,使两个操作具备了原子性,不会出问题
                    synchronized(tickets){
                        if(tickets.size() <= 0){
                            break;
                        }
                        System.out.println("销售票编号:" + tickets.remove(0));
                    }
                }
            }).start();
        }
    }

}

ConcurrentLinkedQueue

在JDK1.5以后,java里面提供了很多的并发容器,这里我们用的是一个queue,队列。
* 所谓队列其实就是一个容器,就是站成一对,不管票还是人都在里面排成一堆,队列有几种,有先进先出的,
* 还有两端的队列,还有就是栈,先进后出,先加进去的后出来。
* 这里用了一个concurrentlinkedqueue,并发的链表队列。线程里面调用了一个poll方法,
* 意思是往外面拿一个数据,相当于在尾巴里面拿一个,如果没有拿到,他的返回值就是空,那么就中断线程。
* 这里面没有加锁,同样有判断,但是不会出问题。完成卖票功能这种效率是比较高的。queue里面是不能装空值。
* 这里虽然判断和操作是一起的,但是我们没有在判断里面有任何操作,大不了反过头来再拿一边,
* poll底层实现是cas,这里我们就不用加锁了。

public class SaleOfTickets4 {

    private static Queue<Integer> tickets = new ConcurrentLinkedQueue<>();
    
    static {
        for (int i = 0; i < 10000; i++) {
            tickets.add(i);
        }
    }
    
    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                while(true){
                    Integer poll = tickets.poll();
                    if(poll == null){
                        break;
                    }
                    System.out.println("销售票编号:" + poll);
                }
            }).start();
        }
    }
}

CopyOnWriteArrayList

写时复制容器
*
* 在往集合中添加数据的时候,先拷贝存储的数组,然后添加元素到拷贝好的数组中,
* 然后用现在的数组去替换成员变量的数组(就是get等读取操作读取的数组)。
* 这个机制和读写锁是一样的,但是比读写锁有改进的地方,那就是读取的时候可以写入的 ,
* 这样省去了读写之间的竞争,看了这个过程,你也发现了问题,同时写入的时候怎么办呢,当然果断还是加锁。
* 读多写少可以用copyonwritelist
public class Demo {

    public static void main(String[] args) {
//        List<String> lists = new ArrayList<>();
//        List<String> lists = new Vector<>();
        List<String> lists = new CopyOnWriteArrayList<>();
        Random r = new Random();
        Thread[] threads = new Thread[100];
        
        for (int i = 0; i < threads.length; i++) {
            Runnable task = new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < 1000; j++) {
                        lists.add("A" + r.nextInt(10000));
                    }
                }
            };
            threads[i] = new Thread(task);
        }
        
        run(threads);
        
        System.out.println(lists.size());
    }

    private static void run(Thread[] threads) {
        long start = System.currentTimeMillis();
        Arrays.asList(threads).forEach(t->t.start());
        Arrays.asList(threads).forEach(t->{
            try {
                t.join();
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        long end = System.currentTimeMillis();
        System.out.println(end - start);
    }
}

Collections

collections是java里面一个集合处理类,里面有给容器加锁的方法,通过调用api可以返回一个加了锁的容器。

public static void main(String[] args) {
        ArrayList<String> arrayList = new ArrayList<>();
        List<String> synchronizedList = Collections.synchronizedList(arrayList);
    }

ConcurrentLinkedQueue

public class Demo {

    public static void main(String[] args) {
        Queue<String> strings = new ConcurrentLinkedQueue<>();
        
        for (int i = 0; i < 10; i++) {
            //offer,类似于add方法,add会出一些问题,比如容量限制,
            //超出限制会抛异常,offer有返回值可以判断是否加成功了
            strings.offer("a" + i);
        }
        
        System.out.println(strings);
        
        System.out.println(strings.size());

        System.out.println(strings.poll());//拿了就没了
        System.out.println(strings.size());

        System.out.println(strings.peek());//用一下不删
        System.out.println(strings.size());
    }
}

ConcurrentHashMap

并发的hashmap,这个例子测试一下效率
*
* 第一种用hashtable,hashtable所有方法都加了锁了,第二种concurrenthashmap,
* 大致能看出来他的效率要比hashtable要高一些,在多线程的情况下。
* 为什么呢,因为hashtable往里面加任何数据的时候都是要锁定整个对象,
* 而concurrenthashmap,是分成十六个段,每次插数据的时候,只会锁住一小段,1.8之后实现不同。
public class Demo {

    public static void main(String[] args) {
        Map<String, String> map = new ConcurrentHashMap<>();
        //Map<String, String> map = new ConcurrentSkipListMap<>();
//        Map<String, String> map = new Hashtable<>();

//        Map<String, String> map = new HashMap<>();
//        Map<String, String> map1 = Collections.synchronizedMap(map);

        Random random = new Random();
        Thread[] threads = new Thread[100];
        CountDownLatch latch = new CountDownLatch(threads.length);
        long start_time = System.currentTimeMillis();
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(()->{
                for(int j=0; j<10000;j++) {
                    map.put("a" + random.nextInt(100000), "a" + random.nextInt(100000));
//                    map1.put("a" + random.nextInt(100000), "a" + random.nextInt(100000));
                }
                latch.countDown();
            });
        }
        Arrays.asList(threads).forEach(t->t.start());
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        long end_time = System.currentTimeMillis();
        System.out.println(end_time-start_time);
    }
}

 换成hashtable看下结果

LinkedBlockingQueue

阻塞式的容器
public class Demo {

    private static BlockingQueue<String> strings = new LinkedBlockingQueue<>(10);

    public static void main(String[] args) {
        new Thread(()->{
            for (int i = 0; i < 100; i++) {
                try {
                    // 在阻塞式容器里面加了一个方法,put,也就是如果满了就会等待,对应的方法叫take,如果空了就会等待。
                    // 这种容器我们去用的时候自动就实现了阻塞式的生产者消费者。
                    strings.put("商品" + i);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "producer").start();

        for (int i = 0; i < 5; i++) {
            new Thread(()->{
                for(;;){
                    try {
                        // take,拿,如果空了也会阻塞
                        System.out.println(Thread.currentThread().getName() + " take " + strings.take()); //如果空了,就会等待
                    } catch (Exception e) {
                        e.printStackTrace();
                    } 
                }
            },"consumer" + i).start();
        }

    }

}

ArrayBlockingQueue

* 有界队列,意思就是说这个队列能装的元素的个数是固定的,后面讲线程池的时候,里面装的其实是一个个任务。
* 这里只能装10个,如果超过了可能会出问题可能会阻塞,这里看你调用什么方法。
* add会报异常
* offer不会报异常,他只通过布尔类型的返回值来告诉你是加成功了还是没有加成功。
* offer可以设置时间,如果这段时间加不进去就不加了也就是返回false
* put方法是满了会阻塞住。
public class Demo {

    private static BlockingQueue<String> strings = new ArrayBlockingQueue<>(10);
    
    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            strings.put("a" + i);
        }
        strings.add("aaaa");
//        strings.put("aaaa");
//        strings.offer("aaaa");
        strings.offer("aaaa",1, TimeUnit.SECONDS);
        System.out.println(strings);
    }
}

DelayQueue

容器里每一个元素都设置了一个时间,时间到了才能从中提取元素

public class Demo {

    private static BlockingQueue<MyTask> tasks = new DelayQueue<>();

    static class MyTask implements Delayed{

        long runningTime;
        
        public MyTask(long rt) {
            this.runningTime = rt;
        }

        @Override
        public int compareTo(Delayed o) {
            if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MICROSECONDS)) {
                return -1;
            }else if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
                return 1;
            }else {
                return 0;
            }
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
        
        @Override
        public String toString() {
            return "" + runningTime;
        }
        
        public static void main(String[] args) throws InterruptedException {
            long now = System.currentTimeMillis();
            MyTask t1 = new MyTask(now+1000);
            MyTask t2 = new MyTask(now+2000);
            MyTask t3 = new MyTask(now+1500);
            MyTask t4 = new MyTask(now+2500);
            MyTask t5 = new MyTask(now+500);
            
            tasks.put(t1);
            tasks.put(t2);
            tasks.put(t3);
            tasks.put(t4);
            tasks.put(t5);
            
            System.out.println(tasks);
            
            for (int i = 0; i < 5; i++) {
                System.out.println(tasks.take());
            }
        }

    }

}

TransferQueue

* 和普通的queue的方法差不多,多了一个transfer方法。
* 如果你用这种队列的话,往往是消费者先启动,生产者生产一个东西的时候,他先是去找消费者,
* 如果有消费者就直接丢给消费者。
public class Demo {

    public static void main(String[] args) throws InterruptedException {
        LinkedTransferQueue<String> strings = new LinkedTransferQueue<>();
        
        new Thread(()->{
            try {
                System.out.println("t1"+strings.take());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(()->{
            try {
                System.out.println("t2"+strings.take());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();

        TimeUnit.SECONDS.sleep(2);

        strings.transfer("aaa");
//        strings.put("aaa");
        System.out.println(strings.size());
//        new Thread(()->{
//            try {
//                System.out.println(strings.take());
//            } catch (Exception e) {
//                e.printStackTrace();
//            }
//        }).start();
    }

}

SynchronousQueue

同步队列
* 同步队列是容量为0,也就是来的东西必须给消费掉.
* 首先启动一个消费者,调用add方法,他报错了
* 只能调用put,意思就是阻塞等待消费者消费。put里面其实用的是transfer,任何东西必须消费,不能往容器里面扔。
public class Demo {

    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> strings = new SynchronousQueue<>();
        
        new Thread(()->{
            try {
                System.out.println(strings.take());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
//        strings.add("aaa");
        strings.put("aaa");
        strings.put("aaa");
        strings.put("aaa");
        strings.put("aaa");
        strings.put("aaa");
        System.out.println(strings.size());
    }
}

 

原文地址:https://www.cnblogs.com/lusaisai/p/12741456.html