本人使用的是mac 所以有usr文件夹。把下面的几种情况分别贴出来给大家分析下各自有什么优缺点!
问题:计算一个可能含有数千个文件的文件夹的大小,我们可能将计算任务分解成若干个子任务。在全部子任务完毕之后,我们还须要把全部返回的结果累加起来!
1.顺序计算文件夹大小code:
package jvm; import java.io.File; /** * 第一版 * 顺序计算文件夹大小 * @author zeuskingzb * */ public class TotalFileSizeSequential { private long getTotalSizeOfFilesInDir(File file){ if (file.isFile()) { return file.length(); } final File[] children = file.listFiles(); long total = 0; if (children != null) { for (File child : children) { total += getTotalSizeOfFilesInDir(child); } } return total; } public static void main(String[] args) { final long start = System.nanoTime(); final long total = new TotalFileSizeSequential().getTotalSizeOfFilesInDir(new File("/usr")); final long end = System.nanoTime(); System.out.println("Total size :"+total); System.out.println("Time taken:"+ (end-start)/1.0e9); } }
第一次执行的结果:
Total size :2912829118
Time taken:6.263514
第二次执行的结果:
Total size :2912829118
Time taken:0.747395
这里由于在程序运行一次之后,文件系统的文件/文件夹信息缓存在内存里,从而加快了程序的运行速度,我忽略了程序第一次运行的时间,以便全部的比較測试都利用系统缓存的加速效果。
package jvm; import java.io.File; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** * 第二版 * 运行 出现了 超时 * 速度慢的原因都把时间花在线程调度上面了 * 待扫描的文件夹结构非常深,则程序 就会卡在这个地方 * * * * Callable接口相似于Runnable。从名字就能够看出来了, * 可是Runnable不会返回结果,而且无法抛出返回结果的异常。 * 而Callable功能更强大一些,被线程运行后。能够返回值, * 这个返回值能够被Future拿到,也就是说,Future能够拿到异步运行任务的返回值 * @author zeuskingzb * */ public class NaivelyConcurrentTotalFileSize { private long getTotalSizeOfFilesInDir(final ExecutorService service,final File file) throws InterruptedException, ExecutionException, TimeoutException{ if (file.isFile()) { return file.length(); } long total = 0; final File[] children = file.listFiles(); if (children != null) { final List<Future<Long>> partialTotalFutures = new ArrayList<Future<Long>>(); for (final File child : children) { partialTotalFutures.add(service.submit(new Callable<Long>() { @Override public Long call() throws Exception { // TODO Auto-generated method stub return getTotalSizeOfFilesInDir(service, child); } })); } for (Future<Long> partialTotalFuture : partialTotalFutures) { //设置 超时时间100ms total += partialTotalFuture.get(100, TimeUnit.SECONDS); } } return total; } private long getTotalSizeOfFile(final String fileName) throws InterruptedException, ExecutionException, TimeoutException{ // 100 线程池 final ExecutorService service = Executors.newFixedThreadPool(100); return getTotalSizeOfFilesInDir(service, new File(fileName)); } public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException { final long start = System.nanoTime(); final long total = new NaivelyConcurrentTotalFileSize().getTotalSizeOfFile("/usr"); final long end = System.nanoTime(); System.out.println("Total size :"+total); System.out.println("Time taken:"+ (end-start)/1.0e9); } }
执行结果:
Exception in thread "main" java.util.concurrent.TimeoutException
at java.util.concurrent.FutureTask.get(FutureTask.java:201)
at jvm.NaivelyConcurrentTotalFileSize.getTotalSizeOfFilesInDir(NaivelyConcurrentTotalFileSize.java:51)
at jvm.NaivelyConcurrentTotalFileSize.getTotalSizeOfFile(NaivelyConcurrentTotalFileSize.java:60)
at jvm.NaivelyConcurrentTotalFileSize.main(NaivelyConcurrentTotalFileSize.java:65)
)假设我们没有设置超时的话,这将演变成一种潜在的"线程池诱发型死锁"。
因为设置了超时时间,所以我们至少 可以在出问题的时候中断程序执行而不是无休止地等下去。看来顺序版本号的代码并发化并非一件简单的事。
该方法的优点是使线程被堵住的时候不会走超过扫描给其实文件夹的直接子文件夹的时间。当每一个任务返回给定文件夹的子文件夹列表的时候,也会把该文件夹中所含文件的大小算好一并返回
package jvm; import java.io.File; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** * 第三版 * 速度比第四版要快点 * * 此版本号的代码逻辑比顺序运行的实现要复杂非常多,相比于旧的Naively,这个版本号在设计上更N x * 因为这个版本号的避免了潜在在死锁的问题,并能高速获取了文件夹列表,所以我们能够独立地调度线程来逐个遍历这些个文件夹。* @author zeuskingzb * */ public class ConcurrentTotalFileSize { //内部类 class SubDirectoriesAndSize{ final public long size; final public List<File> subDirectories;//了文件列表 public SubDirectoriesAndSize(final long totalSize,final List<File> theSubDirs){ size = totalSize; subDirectories = Collections.unmodifiableList(theSubDirs);//得到数据,也就是个仅仅读的数据,不能更改 } } /** * 得到一个包括该文件夹下所以子文件夹的列表,和该文件夹下全部文件大小的SubDirectoriesAndSize对象 * @param file * @return */ private SubDirectoriesAndSize getTotalAndSubDirs(final File file){ long total = 0; final List<File> subDirectories = new ArrayList<File>(); if (file.isDirectory()) {//假设是文件夹 final File[] children = file.listFiles(); if (children != null) { for (File child : children) { if (child.isFile()) { total += child.length();//怎样是文件则统计总和 }else{ subDirectories.add(child);//假设是文件夹的话放入文件夹列表 } } } } return new SubDirectoriesAndSize(total, subDirectories); } private long getTotalSizeOfFilesInDir(final File file) throws InterruptedException, ExecutionException, TimeoutException{ final ExecutorService serive = Executors.newFixedThreadPool(100); try { long total = 0; final List<File> directories = new ArrayList<File>(); directories.add(file);//文件夹 while (!directories.isEmpty()) {//不为空 //接收内部类的列表 final List<Future<SubDirectoriesAndSize>> partialResults = new ArrayList<Future<SubDirectoriesAndSize>>(); for (final File directory : directories) { //多线程 partialResults.add(serive.submit(new Callable<SubDirectoriesAndSize>() { @Override public SubDirectoriesAndSize call() throws Exception { // TODO Auto-generated method stub return getTotalAndSubDirs(directory); } } )); } //清列表,directories仅仅是工具,数据在partialResults里面 directories.clear(); for (Future<SubDirectoriesAndSize> partialResultFuture : partialResults) { final SubDirectoriesAndSize subDirectoriesAndSize = partialResultFuture.get(100, TimeUnit.SECONDS); //System.out.println(directories.size()); //把文件夹列表给文件夹 directories.addAll(subDirectoriesAndSize.subDirectories); System.out.println("size: "+subDirectoriesAndSize.size); total += subDirectoriesAndSize.size; } } return total; } finally{ serive.shutdown(); } } public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException { final long start = System.nanoTime(); final long total = new ConcurrentTotalFileSize().getTotalSizeOfFilesInDir(new File("/usr")); final long end = System.nanoTime(); System.out.println("Total size :"+total); System.out.println("Time taken:"+ (end-start)/1.0e9); } }
运行结果 :
Total size :2912829118
Time taken:0.498055
4.以下的实现中,我们不再像之前那样返回子文件夹列表和文件大小,而是令每一个线程都去更新一个共享变量。
因为没有不论什么返回值,代码较之前大大简化。相同。我们必须保证主线程要等待全部子文件夹遍历完毕之后才干结束。
为此。我们能够使用CountDownLatch作为等待结束的标记。线程闩的作用是作为一个或多个线程等待其它线程到达其完毕位置的同步点。这里我们简单地将其作为一个开关来使用。
package jvm; import java.io.File; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /** * 第四版 * 每一个线程都去更新一个共享变量。* 我们必须保证主线程要等待全部了文件夹遍历完毕之后才干结束。 * 我们在这里使用 CountDownLatch 作为等待结束的标记,Latch的作用是作为一个 * 或多个线程等待其它线程到达其完毕位置的同步点,这里我们简单地将其作为一个开关使用 * * 我们递归地将扫描子文件夹的任务托付给不同的线程.当扫描到一个文件时,线程不再返回一个计算结果,而是去更新一个AtomicLong类型的 * 共享变量totalSize,AtomicLong提供了更改并取回一个简单long型变量的线程安全的方法.此外,我们还会用到还有一个叫做pendingFileVisists的AtomicLong * 型变量,其作用是保存当前待訪问文件的数量。当变量为0时,我们就调用countDown()来释放线程闩. * * CountDownLatch最重要的方法是countDown()和await(),前者主要是倒数一次,后者是等待倒数到0,假设没有到达0,就仅仅有堵塞等待了。
* @author zeuskingzb * */ public class ConcurrentTotalFileSizeWLatch { private ExecutorService executorService; //保存当前待訪问文件的数量 final private AtomicLong pendingFileVisits = new AtomicLong(); final private AtomicLong totalSize = new AtomicLong(); final private CountDownLatch latch = new CountDownLatch(1); private void updateTotalSizeOfFilesInDir(final File file){ long fileSize = 0; if (file.isFile()) { fileSize = file.length(); }else{ final File[] children = file.listFiles(); if (children != null) { for (final File child : children) { if (child.isFile()) { fileSize +=child.length(); }else{ pendingFileVisits.incrementAndGet(); System.out.println("pendingFileVisits.incrementAndGet()"+pendingFileVisits+"thread:"+Thread.currentThread().getName()); executorService.execute(new Runnable() { @Override public void run() { // TODO Auto-generated method stub updateTotalSizeOfFilesInDir(child); } }); } } } } totalSize.addAndGet(fileSize); System.out.println("pendingFileVisits.decrementAndGet()"+pendingFileVisits+"thread:"+Thread.currentThread().getName()); if (pendingFileVisits.decrementAndGet() == 0) { //能够运行下一个动作 latch.countDown(); } } private long getTotalSizeOfFile(final String fileName) throws InterruptedException{ executorService = Executors.newFixedThreadPool(100); pendingFileVisits.incrementAndGet(); try{ updateTotalSizeOfFilesInDir(new File(fileName)); latch.await(100,TimeUnit.SECONDS); return totalSize.longValue(); }finally{ executorService.shutdown(); } } public static void main(String[] args) throws InterruptedException { final long start = System.nanoTime(); final long total = new ConcurrentTotalFileSizeWLatch().getTotalSizeOfFile("/usr"); final long end = System.nanoTime(); System.out.println("Total size :"+total); System.out.println("Time taken:"+ (end-start)/1.0e9); } }
执行结果:
Total size :2912899381
Time taken:0.601758
分析:我们递归扫描子文件夹的任务托付给不同的线程。当扫描到一个文件时,线程不再返回一个计算结果,而是去更新一个AtomicLong类型的共享变量totalSize.AtomicLong 提供了更改并取回一个简单long型变量值的安全的方法。此外,我们还会用到还有一个叫做pendingFileVisits的AtomicLong型变量,其作用是保存当前待訪问文件(或子文件夹)的数量。当该变量为0时。我们就调用countDown()来释放纯程闩。比如,若想要使插入操作将被堵塞。
JDK提供了好几种功能各异的BlockingQueue.比如,若想要使插入操作和删除操作一一相应,能够使用SynchronousQueue类。
该类的作用是将本线程的每个插入操作与其它线程相应的删除操作相匹配,以完毕类似于手递手形式的传输数据。
而假设希望数据能够依据某种优先级在队列中上下浮动。则能够使用PriorityBlockingQueue.另外,假设仅仅是想要一个简单的堵塞队列。我们能够选择用链表实现的LinkedBlockingQueue或数组方式实现的ArrayBlockingQueue.
package jvm; import java.io.File; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /** * 第五版 * 这个版本号的程序在性能 方面与前一版本号相仿,但在代码简化方面又提升了一个档次,这主要归功于堵塞队列帮我们完毕了线程间的数据 * 交换和同步操作。 * @author zeuskingzb * */ public class ConcurrentTotalFileSizeWQueue { private ExecutorService service; final private BlockingQueue<Long> fileSizes = new ArrayBlockingQueue<>(500); final AtomicLong pendingFileVisits = new AtomicLong(); /** * 多线程浏览文件 * @param file */ private void startExploreDir(final File file) { pendingFileVisits.incrementAndGet(); //System.out.println("pendingFileVisits.incrementAndGet()"+pendingFileVisits); service.execute(new Runnable() { @Override public void run() { // TODO Auto-generated method stub exploreDir(file); } }); } /** * 浏览文件,文件的执行过程 * @param file */ private void exploreDir(File file) { long fileSize = 0; if (file.isFile()) { fileSize = file.length(); } else { File[] children = file.listFiles(); if (children != null) { for (final File child : children) { if (child.isFile()) { fileSize += child.length(); } else { startExploreDir(child); } } } try { // 把fileSize加到BlockingQueue里,假设BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续 fileSizes.put(fileSize); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } pendingFileVisits.decrementAndGet(); //System.out.println("pendingFileVisits.decrementAndGet()"+pendingFileVisits); } } private long getTotalSizeOfFile(String fileName) throws InterruptedException{ service = Executors.newFixedThreadPool(100); try { startExploreDir(new File(fileName)); long totalSize = 0; while (pendingFileVisits.get() >0 || fileSizes.size()>0) { // 从BlockingQueue取出一个队首的对象,假设在指定时间内,队列一旦有数据可取,则马上返回队列中的数据 //否则知道时间超时还没有数据可取,返回失败。结果:final long size = fileSizes.poll(10, TimeUnit.SECONDS); //System.out.println(size); totalSize += size; } return totalSize; }finally{ service.shutdown(); } } public static void main(String[] args) throws InterruptedException { final long start = System.nanoTime(); final long total = new ConcurrentTotalFileSizeWQueue().getTotalSizeOfFile("/usr"); final long end = System.nanoTime(); System.out.println("Total size :"+total); System.out.println("Time taken:"+ (end-start)/1.0e9); } }
Total size :2912899381
Time taken:0.594396
分析:我们为每一个子文件夹的遍历工作都分配一个独立的任务 。每一个任务负责计算给定文件夹下全部文件大小之和,并在计算结束之后调用堵塞函数put()将该值插入到队列其中.每一个子文件夹的遍历都是在一个独立的线程中进行的。即线程之间互不影响。
Fork-join使用了work-stealing策略,即线程在完毕自己的任务之后,发现其它线程还有活没有干完,就主动帮其它人一起干。
该策略的使用不但提升了API的性能,并且还有助于提高线程利用率。
ForkJoinTask有两个子类:RecursiveAction,RecursiveTask的子类则主要用于运行须要返回值的任务。
package jvm; import java.io.File; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveTask; /** * 第六版 用Fork-join api jdk 1.7 * *这里我们假定要遍历一个文件文件夹,由于文件的文件夹能够包括嵌套若干层的文件夹或者文件,从某种角度来说 *构成了一个树形结构。我们再遍历到每一个文件的时候,能够将文件夹作为一个子task来处理,这里就能够形成一个完整的 *fork/join pool应用 * * 这个版本号的的性能要比本章前面其它并发版本号好非常多。同一时候我们也注意要,对于大型的分层文件夹,程序也没有重蹈navie版本号的覆辙 * * 在本例中,我们递归地将归扫描任务进行分解,直至任务无法再拆分,但一般来说,拆分粒度过细会显著添加线程调度的开销,所以我们并不希望将问题拆分得过小。 * java.util.concurrent包中定义了非常多线程安全的集合类,这些集合类即保证了并发编程环境下的数据安全性,同一时候也能够被当成同步点来使用。
尽管线程安全是非常重要的, * 但我们也不想为此牺牲太多性能。 * @author zeuskingzb * */ public class FileSize { private final static ForkJoinPool forkjoinPool = new ForkJoinPool(); private static class FileSizeFinder extends RecursiveTask<Long>{ final File file; public FileSizeFinder(final File theFile) { // TODO Auto-generated constructor stub file = theFile; } @Override protected Long compute() { // TODO Auto-generated method stub long size = 0; if (file.isFile()) { size = file.length(); }else{ File[] children = file.listFiles(); if (children != null) { List<ForkJoinTask<Long>> tasks = new ArrayList<ForkJoinTask<Long>>(); for (File child : children) { if (child.isFile()) { size += child.length(); }else{ tasks.add(new FileSizeFinder(child)); } } for (ForkJoinTask<Long> forkJoinTask : invokeAll(tasks)) { size += forkJoinTask.join(); } } } return size; } } public static void main(String[] args) throws InterruptedException { final long start = System.nanoTime(); final long total = forkjoinPool.invoke(new FileSizeFinder(new File("/usr"))); final long end = System.nanoTime(); System.out.println("Total size :"+total); System.out.println("Time taken:"+ (end-start)/1.0e9); } }
结果:
Total size :2912899381
Time taken:0.498283
在compute()函数中.我们将给它文件夹下的全部文件大小累加起来,并将扫描和计算文件夹大小的工作托付给其它任务(即其它FileSizeFinder的实例)来完毕。
invokeAll()函数将等待全部子任务完毕之后才会运行下一步循环累加操作。在任务被堵塞期间,其运行线程并不是什么也不做一直傻等全部子任务结束(就像一个优秀的团队中那些有高度责任感的成员所做的那样),而是能够被调度去做其它任务。
最后。每一个任务都将在compute()函数结束时返回给定文件夹下全部文件和文件夹的大小!