Java核心技术 卷1 多线程----线程安全的集合(4)

  如果多线程要并发的修改一个数据结构,例如散列表,那么很容易会破坏这个数据结构。一个线程可能要开始向表中插入一个新元素。假定在调整散列表各个桶之间的链接关系的过程中,被剥夺了控制权。如果另一个线程也开始遍历同一个链表,可能使用无效的链接并造成混乱,会抛出异常或者陷入死循环。

  可以通过提供锁来保护共享数据结构,但是选择线程安全的实现作为替代可能更容易些。上一篇讨论的阻塞队列就是线程安全的集合。接下来讨论java类库提供的另外一些线程安全的集合。

  高效的映射表、集合和队列

  java.util.concurrent包提供了映射表、有序集合和队列的高效实现:ConcurrentHashMap、ConcurrentSkipListMap、ConcurrentSkipListSet和ConcurrentLinkedQueue。

java.util.concurrent.ConcurrentLinkedQueue<E>
  • ConcurrentLinkedQueue<E>() 构造一个可以被多线程安全访问的无边界非阻塞的队列。
  • ConcurrentSkipListSet<E>(Comparateor<? super E> comp)    构造一个可以被多线程安全访问的有序集
java.util.concurrent.ConcurrentSkipListMap<K,V>
  • ConcurrentHashMap<K,V>()
  • ConcurrentHashMap<K,V>(int initialCapacity)
  • ConcurrentHashMap<K,V>(int initialCapacity,float loadFactor,int concurrencyLevel)

  参数:initialCapacity 集合的初始容量。默认值16

       loadFactor      控制调整:如果每一个桶的平均负载超过这个因子,表的大小会被重新调整。默认值为0.75.

     concurrencyLevel 并发写者结束 的估计数目。

  • ConcurrentSkipListMap<K,V>
  • ConcurrentSkipListSet<K,V>(Comparator<? super k) comp)

  构造一个可以被多线程安全访问的有序的映象表。第一个构造器要求键实现Comparable接口。

  • V putIfAbsent(K key,V value)

  如果该键没有在映像表中出现,则将给定的值同给定的键关联起来,并返回null。否则返回与该键关键的现有值。

  • boolean remove(K key,V value)

  如果给定的键与给定的值关联,删除给定的键与值并返回真。否则,返回false.

  • boolean replace(K key,V oldValue,V newValue)

  如果给定的键当前与oldvalue相关联,用它与newValue关联。否则返回false

写数组的拷贝

  CopyOnWriteArrayList和CopyOnWriteArraySet是线程安全的集合。其中所有的修改线程对底层数组进行复制。如果在集合上进行迭代的线程数超过修改线程数,这样的安排是很有用的。当构建一个迭代器的时候,它包含一个对当前数组的引用。如果数组后来被修改了,迭代器仍然引用旧数组,但是,集合的数组已经被替换了。因为旧的迭代器拥有一致的视图,访问无须任何同步开销。

Callable与Future

Runnable封装了一个异步的任务,可以把它想像成为一个没有参数和返回值的异步方法。Callable与Runnable类似,但是有返回值。Callable接口是一个参数化的类型,只有一个方法call。

public interface Callable<V>
{ 
    V call() throws Exception;
}

  类型参数是返回值的类型。例如:Callable<Integer>表示一个最终返回Integer对象的异步计算。

  Future保存异步计算的结果。可以启动一个计算,将Future对象交给某个线程,然后忘掉它。Future对象的所有者在结果计算 好之后就可以获得它。

  Future接口具有下面的方法:

public interface Future<V>
{
      V get() throws ...;
      V get(long timeout,TimeUnit unit) throws ...;
      void cancel(boolean mayInterrupt);
      boolean isCancelled();
      boolean isDone();    
}    

  第一个get方法的调用被阻塞,直到计算完成。如果在计算完成之前,第二个方法的调用超时,抛出一个TimeoutException异常。如果运行该计算的线程被中断,两个方法都将抛出InterruptedException。 如果计算完成,那么get方法立即返回。

  如果计算还在进行,isDone方法返回false;如果完成了,则么回true.

  可以用cancel方法取消该计算。如果计算还没有开始,它被取消且不再开始。如果计算处于运行中,那么如果mayInterrupt参数为true,它就被中断。

  FutureTask包装器是一种非常便利的机制,可以将Callable转换成Future转换成Future和Runnable,它同时实现二者的接口。

  以下这个例子与上一篇寻找包含指定关键字的文件的例子相似。然而,现在我们仅仅计算匹配的文件数目。因此,我们有了一个需要长时间运行的任务,它产生一个数例,一个Callable<Integer>的例子。

class MatchCouner implements Callable<Integer>
{
       public MatchCounter(Fille directory,String keyWord){...}
       public Integer call(){...} //返回匹配文件的数
}    

  然后我们利用MatchCounter创建一个FutureTask对象,并用来启动一个线程。

FutureTask<Integer> task =new FutureTask<Integer>(counter);
Thread t=new Thread(task);
t.start();

  最后,打印结果: 

System.out.println(task.get()+" mathcing files.");

  当然对get的调用会发生阻塞,直到有可获得的结果为止。

  在call方法内部,使用相同的递归机制。对于每一个子目录,产生一个新的MatchCounter并为它启动一个线程。把FutureTask对象隐藏在ArrayList<Future<Integer>>中。最后所有结果加起来:

for(Future<Integer> result:results)
    count+=result.get()

  每一次对get的调用都会发生阻塞直到结果可获得为止。当然线程是并行运行的,因此,很可能在大致相同的时候所有的结果都可获得。完整代码如下

 1 package test.Future;
 2 
 3 import java.io.File;
 4 import java.util.Scanner;
 5 import java.util.concurrent.ExecutionException;
 6 import java.util.concurrent.FutureTask;
 7 
 8 /**
 9  * Created by Administrator on 2017/11/23.
10  */
11 public class FutureTest {
12     public static void main(String[] args) {
13         Scanner in =new Scanner(System.in);
14         System.out.print("输入查询路径:");
15         String directory=in.nextLine();
16         System.out.print("输入关键字:");
17         String keyword=in.nextLine();
18 
19         MatchCounter counter=new MatchCounter(new File(directory),keyword);
20         FutureTask<Integer> task=new FutureTask<>(counter);
21         Thread t=new Thread(task);
22         t.start();
23         try {
24             System.out.println(task.get()+" 匹配文件");
25         } catch (InterruptedException e) {
26             e.printStackTrace();
27         } catch (ExecutionException e) {
28             e.printStackTrace();
29         }
30 
31     }
32 }
View Code
 1 package test.Future;
 2 
 3 import java.io.*;
 4 import java.util.ArrayList;
 5 import java.util.List;
 6 import java.util.Scanner;
 7 import java.util.concurrent.Callable;
 8 import java.util.concurrent.ExecutionException;
 9 import java.util.concurrent.Future;
10 import java.util.concurrent.FutureTask;
11 
12 /**
13  * 此任务对包含给定关键字的目录和子目录中的文件进行计数
14  */
15 public class MatchCounter implements Callable<Integer> {
16     private File directory;
17     private String keyword;
18     private int count;
19 
20     /**
21      * @param directory 开始搜索目录
22      * @param keyword   寻找关键字
23      */
24     public MatchCounter(File directory, String keyword) {
25         this.directory = directory;
26         this.keyword = keyword;
27     }
28 
29     @Override
30     public Integer call() throws Exception {
31         count = 0;
32         try {
33             File[] files = directory.listFiles();
34             if (files == null) {
35                 return count;
36             }
37             List<Future<Integer>> results = new ArrayList<>();
38             for (File file : files
39                     ) {
40                 if (file.isDirectory()) {
41                     MatchCounter counter = new MatchCounter(file, keyword);
42                     FutureTask<Integer> task = new FutureTask<>(counter);
43                     results.add(task);
44                     Thread t = new Thread(task);
45                     t.start();
46                 } else {
47                     if (search(file)) {
48                         count++;
49                     }
50                 }
51                 for (Future<Integer> result : results) {
52                     try {
53                         count += result.get();
54                     } catch (ExecutionException e) {
55                         e.printStackTrace();
56                     }
57                 }
58             }
59         } catch (InterruptedException e) {
60         }
61         return count;
62     }
63 
64     /**
65      * 搜索一个给定关键字的文件
66      *
67      * @param file
68      * @return
69      */
70     public boolean search(File file) {
71         try{
72             try(Scanner in= new Scanner(file,"gbk")){
73                 boolean found=false;
74                 while (!found&&in.hasNextLine()){
75                     String line=in.nextLine();
76                     if (line.contains(keyword)){
77                         found=true;
78                         System.out.println(file.toString());
79                     }
80                 }
81                 return found;
82             }
83         }
84         catch (IOException e){
85             return false;
86         }
87     }
88 }
View Code
java.util.concurrent.Callable<V>
  •  V call() 运行将产生结果的任务
  •  V get()
  •  V get(long time,TimeUnit unit)

  获取结果,如果没有结果可用,则阻塞直到得到结果超过指定的时间为止。如果不成功,第二个方法会抛出TimeoutException异常。

  • boolean cancel(boolean mayInterrupt)

  尝试取消这一任务的运行。如果任务已经开始,并且mayInterrupt参数值为true,它就会被中断如果成功执行取消操作,返回true。

  • boolean isCancelled() 如果任务在完成前被取消了,则返回true。
  • boolean isDone() 如果任务结束,无论是正常结束、中途取消或发生异常,都返回true。
java.util.concurrent.FutureTask<V>
  • FutureTask(Callable<V> task)
  • FutureTask(Runnable task,V result) 构造一个既是Future<V>又是Runnable的对象。

  

原文地址:https://www.cnblogs.com/gousheng107/p/7885681.html