Java里的并发容器与安全共享策略总结

一、并发容器

ArrayList --> CopyOnWriteArrayList

概念 : 简单的讲就是写操作时赋值,当有新元素添加到CopyOnWriteArrayList时,它先从原有的数组里边Copy一份出来然后在新的数组上做些操作,操作完成以后在将引用指向新的数组;CopyOnWriteArrayList所有的操作都是在锁的保护下进行的,这样做的目的主要是为了在多线程并发做add操作的时候复制出多个副本出来导致数据混乱;

缺点 :

① 由于是copy的操作所以比较消耗内存,如果元素的内容较多的时候可能会触发GC,

② 不能用于实时读的场景,它比较适合读多写少的场景;

思想 :

① 读写分离;

② 最终一致性;

③ 另外开辟空间解决并发冲突;

// CopyOnWriteArrayList
@Slf4j
@ThreadSafe
public class CopyOnWriteArrayListExample {
 
    // 请求总数
    public static int clientTotal = 5000;
 
    // 同时并发执行的线程数
    public static int threadTotal = 200;
 
    private static List<Integer> list = new CopyOnWriteArrayList<>();
 
    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++) {
            final int count = i;
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("size:{}", list.size());
    }
 
    private static void update(int i) {
        list.add(i);
    }
}


HashSet --> CopyOnWriteArraySet 与 TreeSet --> ConcurrentSkipListSet

概念 :

CopyOnWriteArraySet它是线程安全,底层实现是使用CopyOnWriteArrayList,它的很多特性都与CopyOnWriteArrayList相似包括适用场景;

ConcurrentSkipListSet是jdk6新增的类,支持自然排序,可以在构造的时候自己定义比较器,它是基于Map集合的,在多线程环境下ConcurrentSkipListSet它里边的remote add 等方法都是线程安全的,但是对于批量操作并不能保证以原子方式进行操作,在批量操作的时候只能保证每一次的操作是原子性的;ConcurrentSkipListSet在使用批量操作的时候可能需要手动处理一下;

// CopyOnWriteArraySet
@Slf4j
@ThreadSafe
public class CopyOnWriteArraySetExample {
 
    // 请求总数
    public static int clientTotal = 5000;
 
    // 同时并发执行的线程数
    public static int threadTotal = 200;
 
    private static Set<Integer> set = new CopyOnWriteArraySet<>();
 
    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++) {
            final int count = i;
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("size:{}", set.size());
    }
 
    private static void update(int i) {
        set.add(i);
    }
}

// ConcurrentSkipListSet
@Slf4j
@ThreadSafe
public class ConcurrentSkipListSetExample {
 
    // 请求总数
    public static int clientTotal = 5000;
 
    // 同时并发执行的线程数
    public static int threadTotal = 200;
 
    private static Set<Integer> set = new ConcurrentSkipListSet<>();
 
    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++) {
            final int count = i;
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("size:{}", set.size());
    }
 
    private static void update(int i) {
        set.add(i);
    }
}


HashMap --> ConcurrentHashMap 与 TreeMap --> ConcurrentSkipListMap

概念 :

ConcurrentHashMap是HashMap线程安全的版本,ConcurrentHashMap不允许空值,在实际的应用中除了少数的插入操作和删除操作外,绝大多数操作都是读取操作,而且读操作大多数都是成功的,基于这个前提ConcurrentHashMap针对读操作多了特别多的优化,具有特别高的并发性;

ConcurrentSkipListMap是TreeMap线程安全的版本,ConcurrentSkipListMap底层是使用SkipList这种跳表的结构实现的;

// ConcurrentHashMap
@Slf4j
@ThreadSafe
public class ConcurrentHashMapExample {
 
    // 请求总数
    public static int clientTotal = 5000;
 
    // 同时并发执行的线程数
    public static int threadTotal = 200;
 
    private static Map<Integer, Integer> map = new ConcurrentHashMap<>();
 
    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++) {
            final int count = i;
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("size:{}", map.size());
    }
 
    private static void update(int i) {
        map.put(i, i);
    }
}

// ConcurrentSkipListMap
@Slf4j
@ThreadSafe
public class ConcurrentSkipListMapExample {
 
    // 请求总数
    public static int clientTotal = 5000;
 
    // 同时并发执行的线程数
    public static int threadTotal = 200;
 
    private static Map<Integer, Integer> map = new ConcurrentSkipListMap<>();
 
    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++) {
            final int count = i;
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("size:{}", map.size());
    }
 
    private static void update(int i) {
        map.put(i, i);
    }
}

二、J.U.C的实际构成

 

三、安全共享对象策略总结

1 线程限制 : 一个被线程限制的对象,由线程独占,并且只能被占有它的线程修改;

2 共享只读 : 一个共享只读的对象,在没有额外同步的情况下,可以被多个线程并发访问,但是任何线程都不能修改它;

3 线程安全对象 : 一个线程安全的对象或者容器,在内部通过同步机制来保证线程安全,所以其他线程无需额外的同步就可以通过公共接口随意访问它;

4 被守护对象 : 被守护对象只能通过获取特定的锁来访问;

原文地址:https://www.cnblogs.com/shamo89/p/10221094.html