多线程的对照与案例(计算文件夹下文件的大小)

本人使用的是mac 所以有usr文件夹。把下面的几种情况分别贴出来给大家分析下各自有什么优缺点!

问题:计算一个可能含有数千个文件的文件夹的大小,我们可能将计算任务分解成若干个子任务。在全部子任务完毕之后,我们还须要把全部返回的结果累加起来!

1.顺序计算文件夹大小code:

package jvm;

import java.io.File;

/**
 * 第一版
 * 顺序计算文件夹大小
 * @author zeuskingzb
 *
 */
public class TotalFileSizeSequential {
	private long getTotalSizeOfFilesInDir(File file){
		if (file.isFile()) {
			return file.length();
		}
		final File[] children = file.listFiles();
		long total = 0;
		if (children != null) {
			for (File child : children) {
				total += getTotalSizeOfFilesInDir(child);
			}
		}
		return total;
	}
	public static void main(String[] args) {
		final long  start = System.nanoTime();
		final long total = new TotalFileSizeSequential().getTotalSizeOfFilesInDir(new File("/usr"));
		final long end = System.nanoTime();
		System.out.println("Total size :"+total);
		System.out.println("Time taken:"+ (end-start)/1.0e9);
	}
}

第一次执行的结果:

Total size :2912829118

Time taken:6.263514

第二次执行的结果:

Total size :2912829118

Time taken:0.747395


在本代码中,首先我们给定一个文件夹,然后递归地累加当中全部文件/子文件夹的大小。

和本书中其它计算文件夹大小的程序一样,上面的代码第一次运行须要花非常长时间,但假设在几分钟内再运行一次的话则耗时将会有所下降。

这里由于在程序运行一次之后,文件系统的文件/文件夹信息缓存在内存里,从而加快了程序的运行速度,我忽略了程序第一次运行的时间,以便全部的比較測试都利用系统缓存的加速效果。



2.我们希望通过将上述code进行并发改造来加快运行速度。于是我们将问题拆分成若干子任务。当中每一个子任务负责一个子文件夹/文件的计算并返回其统计结果。Callable接口是个不错的选择,由于该接口在call()函数能够在任务完毕之后返回一个结果值。当程序循环扫描文件夹下的每一个文件/文件夹的时候,我们就能够使用ExecutorService的submit()函数来调度子任务运行计算工作。随后我们能够调用Future对象的get()函数来获取子任务的计算结果,该对象主要起到一个托付的作用,即子任务一完毕它就将结果返回给我们。

package jvm;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * 第二版
 * 运行 出现了 超时
 * 速度慢的原因都把时间花在线程调度上面了
 * 待扫描的文件夹结构非常深,则程序 就会卡在这个地方
 * 
 * 
 * 
 *   Callable接口相似于Runnable。从名字就能够看出来了,
 *   可是Runnable不会返回结果,而且无法抛出返回结果的异常。
 *   而Callable功能更强大一些,被线程运行后。能够返回值,
 *   这个返回值能够被Future拿到,也就是说,Future能够拿到异步运行任务的返回值
 * @author zeuskingzb
 *
 */
public class NaivelyConcurrentTotalFileSize {
	private long getTotalSizeOfFilesInDir(final ExecutorService service,final File file) throws InterruptedException, ExecutionException, TimeoutException{
		if (file.isFile()) {
			return file.length();
		}
		long total = 0;
		final File[] children = file.listFiles();
		if (children != null) {
			
			final List<Future<Long>> partialTotalFutures = new ArrayList<Future<Long>>();
			for (final File child : children) {
				partialTotalFutures.add(service.submit(new Callable<Long>() {
					@Override
					public Long call() throws Exception {
						// TODO Auto-generated method stub
						
						return getTotalSizeOfFilesInDir(service, child);
					}
				}));
			}
			for (Future<Long> partialTotalFuture : partialTotalFutures) {
				//设置 超时时间100ms
					total += partialTotalFuture.get(100, TimeUnit.SECONDS);
			}
			
		}
		return total;
	}
	private long getTotalSizeOfFile(final String fileName) throws InterruptedException, ExecutionException, TimeoutException{
		// 100 线程池
		final ExecutorService service = Executors.newFixedThreadPool(100);
			return getTotalSizeOfFilesInDir(service, new File(fileName));
	}
	
	public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
		final long start = System.nanoTime();
		final long total = new NaivelyConcurrentTotalFileSize().getTotalSizeOfFile("/usr");
		final long end = System.nanoTime();
		System.out.println("Total size :"+total);
		System.out.println("Time taken:"+ (end-start)/1.0e9);
	}
}

执行结果:

Exception in thread "main" java.util.concurrent.TimeoutException

at java.util.concurrent.FutureTask.get(FutureTask.java:201)

at jvm.NaivelyConcurrentTotalFileSize.getTotalSizeOfFilesInDir(NaivelyConcurrentTotalFileSize.java:51)

at jvm.NaivelyConcurrentTotalFileSize.getTotalSizeOfFile(NaivelyConcurrentTotalFileSize.java:60)

at jvm.NaivelyConcurrentTotalFileSize.main(NaivelyConcurrentTotalFileSize.java:65)




此任务easy超时,速度比上个版本号的慢,这是由于非常多时间被消耗在线程调度上.
在getTotalSizeOfFileInDir()函数中有堵塞线程池的操作。每当扫描到一个子文件夹的时候getTotalSizeOfFileInDir()函数就将扫描该子文件夹的任务调度给其它线程。一旦它调度完了全部任务,该函数就等待不论什么一个任务的响应。当子文件夹数据不是非常多的时候,这样的做法不会有什么问题。可是假设待扫描的文件夹结构非常深, 则程序就会卡在这个地方。即线程池内的线程在等待某些任务的响应,而这些任务却在ExecutorService的队列中等待运行机会(因为程序是递归且线程池大小是固定的,所以当子文件夹数超过线程池大小时,就会发生全部线程都 在等待最底层文件夹的计算结果,而最底层子文件夹的计算任务又没有额外的线程来执行,以至形成死锁。

)假设我们没有设置超时的话,这将演变成一种潜在的"线程池诱发型死锁"。

因为设置了超时时间,所以我们至少 可以在出问题的时候中断程序执行而不是无休止地等下去。看来顺序版本号的代码并发化并非一件简单的事。


3.令每一个任务都返回给文件夹的子文件夹列表而非该文件夹的大小。于是在主任务中,我们就能够分派其它任务来扫描列表中的子文件夹。

该方法的优点是使线程被堵住的时候不会走超过扫描给其实文件夹的直接子文件夹的时间。当每一个任务返回给定文件夹的子文件夹列表的时候,也会把该文件夹中所含文件的大小算好一并返回



package jvm;

import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;



/**
 * 第三版
 * 速度比第四版要快点 
 * 
 * 此版本号的代码逻辑比顺序运行的实现要复杂非常多,相比于旧的Naively,这个版本号在设计上更N x
 * 因为这个版本号的避免了潜在在死锁的问题,并能高速获取了文件夹列表,所以我们能够独立地调度线程来逐个遍历这些个文件夹。

* @author zeuskingzb * */ public class ConcurrentTotalFileSize { //内部类 class SubDirectoriesAndSize{ final public long size; final public List<File> subDirectories;//了文件列表 public SubDirectoriesAndSize(final long totalSize,final List<File> theSubDirs){ size = totalSize; subDirectories = Collections.unmodifiableList(theSubDirs);//得到数据,也就是个仅仅读的数据,不能更改 } } /** * 得到一个包括该文件夹下所以子文件夹的列表,和该文件夹下全部文件大小的SubDirectoriesAndSize对象 * @param file * @return */ private SubDirectoriesAndSize getTotalAndSubDirs(final File file){ long total = 0; final List<File> subDirectories = new ArrayList<File>(); if (file.isDirectory()) {//假设是文件夹 final File[] children = file.listFiles(); if (children != null) { for (File child : children) { if (child.isFile()) { total += child.length();//怎样是文件则统计总和 }else{ subDirectories.add(child);//假设是文件夹的话放入文件夹列表 } } } } return new SubDirectoriesAndSize(total, subDirectories); } private long getTotalSizeOfFilesInDir(final File file) throws InterruptedException, ExecutionException, TimeoutException{ final ExecutorService serive = Executors.newFixedThreadPool(100); try { long total = 0; final List<File> directories = new ArrayList<File>(); directories.add(file);//文件夹 while (!directories.isEmpty()) {//不为空 //接收内部类的列表 final List<Future<SubDirectoriesAndSize>> partialResults = new ArrayList<Future<SubDirectoriesAndSize>>(); for (final File directory : directories) { //多线程 partialResults.add(serive.submit(new Callable<SubDirectoriesAndSize>() { @Override public SubDirectoriesAndSize call() throws Exception { // TODO Auto-generated method stub return getTotalAndSubDirs(directory); } } )); } //清列表,directories仅仅是工具,数据在partialResults里面 directories.clear(); for (Future<SubDirectoriesAndSize> partialResultFuture : partialResults) { final SubDirectoriesAndSize subDirectoriesAndSize = partialResultFuture.get(100, TimeUnit.SECONDS); //System.out.println(directories.size()); //把文件夹列表给文件夹 directories.addAll(subDirectoriesAndSize.subDirectories); System.out.println("size: "+subDirectoriesAndSize.size); total += subDirectoriesAndSize.size; } } return total; } finally{ serive.shutdown(); } } public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException { final long start = System.nanoTime(); final long total = new ConcurrentTotalFileSize().getTotalSizeOfFilesInDir(new File("/usr")); final long end = System.nanoTime(); System.out.println("Total size :"+total); System.out.println("Time taken:"+ (end-start)/1.0e9); } }


运行结果 :

Total size :2912829118

Time taken:0.498055

4.以下的实现中,我们不再像之前那样返回子文件夹列表和文件大小,而是令每一个线程都去更新一个共享变量。

因为没有不论什么返回值,代码较之前大大简化。相同。我们必须保证主线程要等待全部子文件夹遍历完毕之后才干结束。

为此。我们能够使用CountDownLatch作为等待结束的标记。线程闩的作用是作为一个或多个线程等待其它线程到达其完毕位置的同步点。这里我们简单地将其作为一个开关来使用。

package jvm;

import java.io.File;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
 * 第四版
 * 每一个线程都去更新一个共享变量。

* 我们必须保证主线程要等待全部了文件夹遍历完毕之后才干结束。 * 我们在这里使用 CountDownLatch 作为等待结束的标记,Latch的作用是作为一个 * 或多个线程等待其它线程到达其完毕位置的同步点,这里我们简单地将其作为一个开关使用 * * 我们递归地将扫描子文件夹的任务托付给不同的线程.当扫描到一个文件时,线程不再返回一个计算结果,而是去更新一个AtomicLong类型的 * 共享变量totalSize,AtomicLong提供了更改并取回一个简单long型变量的线程安全的方法.此外,我们还会用到还有一个叫做pendingFileVisists的AtomicLong * 型变量,其作用是保存当前待訪问文件的数量。当变量为0时,我们就调用countDown()来释放线程闩. * * CountDownLatch最重要的方法是countDown()和await(),前者主要是倒数一次,后者是等待倒数到0,假设没有到达0,就仅仅有堵塞等待了。

* @author zeuskingzb * */ public class ConcurrentTotalFileSizeWLatch { private ExecutorService executorService; //保存当前待訪问文件的数量 final private AtomicLong pendingFileVisits = new AtomicLong(); final private AtomicLong totalSize = new AtomicLong(); final private CountDownLatch latch = new CountDownLatch(1); private void updateTotalSizeOfFilesInDir(final File file){ long fileSize = 0; if (file.isFile()) { fileSize = file.length(); }else{ final File[] children = file.listFiles(); if (children != null) { for (final File child : children) { if (child.isFile()) { fileSize +=child.length(); }else{ pendingFileVisits.incrementAndGet(); System.out.println("pendingFileVisits.incrementAndGet()"+pendingFileVisits+"thread:"+Thread.currentThread().getName()); executorService.execute(new Runnable() { @Override public void run() { // TODO Auto-generated method stub updateTotalSizeOfFilesInDir(child); } }); } } } } totalSize.addAndGet(fileSize); System.out.println("pendingFileVisits.decrementAndGet()"+pendingFileVisits+"thread:"+Thread.currentThread().getName()); if (pendingFileVisits.decrementAndGet() == 0) { //能够运行下一个动作 latch.countDown(); } } private long getTotalSizeOfFile(final String fileName) throws InterruptedException{ executorService = Executors.newFixedThreadPool(100); pendingFileVisits.incrementAndGet(); try{ updateTotalSizeOfFilesInDir(new File(fileName)); latch.await(100,TimeUnit.SECONDS); return totalSize.longValue(); }finally{ executorService.shutdown(); } } public static void main(String[] args) throws InterruptedException { final long start = System.nanoTime(); final long total = new ConcurrentTotalFileSizeWLatch().getTotalSizeOfFile("/usr"); final long end = System.nanoTime(); System.out.println("Total size :"+total); System.out.println("Time taken:"+ (end-start)/1.0e9); } }



执行结果:

Total size :2912899381

Time taken:0.601758

分析:我们递归扫描子文件夹的任务托付给不同的线程。当扫描到一个文件时,线程不再返回一个计算结果,而是去更新一个AtomicLong类型的共享变量totalSize.AtomicLong 提供了更改并取回一个简单long型变量值的安全的方法。此外,我们还会用到还有一个叫做pendingFileVisits的AtomicLong型变量,其作用是保存当前待訪问文件(或子文件夹)的数量。当该变量为0时。我们就调用countDown()来释放纯程闩。



5.假设想要在线程间互发多组数据,则BlockingQueue接口能够派上用场.顾名思义,该接口的特点是:假设队列里没有可用空间,则插入操作将会被堵塞;而假设队列里没有可用数据,则删除操作将被堵塞。JDK提供了好几种功能各异的BlockingQueue。

比如,若想要使插入操作将被堵塞。

JDK提供了好几种功能各异的BlockingQueue.比如,若想要使插入操作和删除操作一一相应,能够使用SynchronousQueue类。

该类的作用是将本线程的每个插入操作与其它线程相应的删除操作相匹配,以完毕类似于手递手形式的传输数据。

而假设希望数据能够依据某种优先级在队列中上下浮动。则能够使用PriorityBlockingQueue.另外,假设仅仅是想要一个简单的堵塞队列。我们能够选择用链表实现的LinkedBlockingQueue或数组方式实现的ArrayBlockingQueue.





package jvm;

import java.io.File;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
 * 第五版
 * 这个版本号的程序在性能 方面与前一版本号相仿,但在代码简化方面又提升了一个档次,这主要归功于堵塞队列帮我们完毕了线程间的数据
 * 交换和同步操作。
 * @author zeuskingzb
 *
 */
public class ConcurrentTotalFileSizeWQueue {
	private ExecutorService service;
	final private BlockingQueue<Long> fileSizes = new ArrayBlockingQueue<>(500);
	final AtomicLong pendingFileVisits = new AtomicLong();
	/**
	 * 多线程浏览文件
	 * @param file
	 */
	private void startExploreDir(final File file) {
		pendingFileVisits.incrementAndGet();
		//System.out.println("pendingFileVisits.incrementAndGet()"+pendingFileVisits);
		service.execute(new Runnable() {

			@Override
			public void run() {
				// TODO Auto-generated method stub
				exploreDir(file);
			}
		});
	}
	/**
	 * 浏览文件,文件的执行过程
	 * @param file
	 */
	private void exploreDir(File file) {
		long fileSize = 0;
		if (file.isFile()) {
			fileSize = file.length();
		} else {
			File[] children = file.listFiles();
			if (children != null) {
				for (final File child : children) {
					if (child.isFile()) {
						fileSize += child.length();
					} else {
						startExploreDir(child);
					}
				}
			}
			try {
				// 把fileSize加到BlockingQueue里,假设BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续
				fileSizes.put(fileSize);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			pendingFileVisits.decrementAndGet();
			//System.out.println("pendingFileVisits.decrementAndGet()"+pendingFileVisits);
		}
	}
	private long getTotalSizeOfFile(String fileName) throws InterruptedException{
		service = Executors.newFixedThreadPool(100);
		try {
			startExploreDir(new File(fileName));
			long totalSize = 0;
			while (pendingFileVisits.get() >0 || fileSizes.size()>0) {
				// 从BlockingQueue取出一个队首的对象,假设在指定时间内,队列一旦有数据可取,则马上返回队列中的数据
				//否则知道时间超时还没有数据可取,返回失败。

final long size = fileSizes.poll(10, TimeUnit.SECONDS); //System.out.println(size); totalSize += size; } return totalSize; }finally{ service.shutdown(); } } public static void main(String[] args) throws InterruptedException { final long start = System.nanoTime(); final long total = new ConcurrentTotalFileSizeWQueue().getTotalSizeOfFile("/usr"); final long end = System.nanoTime(); System.out.println("Total size :"+total); System.out.println("Time taken:"+ (end-start)/1.0e9); } }

结果:

Total size :2912899381

Time taken:0.594396

分析:我们为每一个子文件夹的遍历工作都分配一个独立的任务 。

每一个任务负责计算给定文件夹下全部文件大小之和,并在计算结束之后调用堵塞函数put()将该值插入到队列其中.每一个子文件夹的遍历都是在一个独立的线程中进行的。即线程之间互不影响。

主线程则首先启动文件夹遍历任务,随后便仅仅需简单地循环读堵塞队列将其文件大小值累加起来。直至全部子文件夹遍历完毕为止。




6.我们能够使用ExecutorService来管理线程并调度线程池中的线程来运行任务。然而线程池所含的线程数量是由程序猿决定的。而程序猿调度运行的任务和任务在运行过程中所创建的子任务之间也是没有不论什么差别的。所以为了提高效率和性能,Java7为我们带来了专门针对ExecutorService效率和性能的改进版的fork-join API
ForkJoinPool类能够依据可用的处理器数量和任务需求动态地对线程进行管理。

Fork-join使用了work-stealing策略,即线程在完毕自己的任务之后,发现其它线程还有活没有干完,就主动帮其它人一起干。

该策略的使用不但提升了API的性能,并且还有助于提高线程利用率。

在Fork-join API  中,活动任务(active task) 所创建的子任务是由创建主任务所不同的还有一套函数来负责调度的。通常我们在一个应用程序中仅仅会使用一个fork-join池来调度任务,且因为该池使用了守护线程,所以用过之后也无需运行关闭操作。
为了更好地调度任务,我们提供了一些ForkJoinTask的实例(一般是其某个子类的实例)来配合ForkJoinPool函数的使用。我们能够用ForkJoinTask来创建(fork)任务,然后再将主线程join到任务的完毕点上。

ForkJoinTask有两个子类:RecursiveAction,RecursiveTask的子类则主要用于运行须要返回值的任务。


package jvm;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

/**
 * 第六版 用Fork-join api jdk 1.7
 * 
 *这里我们假定要遍历一个文件文件夹,由于文件的文件夹能够包括嵌套若干层的文件夹或者文件,从某种角度来说
 *构成了一个树形结构。

我们再遍历到每一个文件的时候,能够将文件夹作为一个子task来处理,这里就能够形成一个完整的 *fork/join pool应用 * * 这个版本号的的性能要比本章前面其它并发版本号好非常多。同一时候我们也注意要,对于大型的分层文件夹,程序也没有重蹈navie版本号的覆辙 * * 在本例中,我们递归地将归扫描任务进行分解,直至任务无法再拆分,但一般来说,拆分粒度过细会显著添加线程调度的开销,所以我们并不希望将问题拆分得过小。 * java.util.concurrent包中定义了非常多线程安全的集合类,这些集合类即保证了并发编程环境下的数据安全性,同一时候也能够被当成同步点来使用。

尽管线程安全是非常重要的, * 但我们也不想为此牺牲太多性能。 * @author zeuskingzb * */ public class FileSize { private final static ForkJoinPool forkjoinPool = new ForkJoinPool(); private static class FileSizeFinder extends RecursiveTask<Long>{ final File file; public FileSizeFinder(final File theFile) { // TODO Auto-generated constructor stub file = theFile; } @Override protected Long compute() { // TODO Auto-generated method stub long size = 0; if (file.isFile()) { size = file.length(); }else{ File[] children = file.listFiles(); if (children != null) { List<ForkJoinTask<Long>> tasks = new ArrayList<ForkJoinTask<Long>>(); for (File child : children) { if (child.isFile()) { size += child.length(); }else{ tasks.add(new FileSizeFinder(child)); } } for (ForkJoinTask<Long> forkJoinTask : invokeAll(tasks)) { size += forkJoinTask.join(); } } } return size; } } public static void main(String[] args) throws InterruptedException { final long start = System.nanoTime(); final long total = forkjoinPool.invoke(new FileSizeFinder(new File("/usr"))); final long end = System.nanoTime(); System.out.println("Total size :"+total); System.out.println("Time taken:"+ (end-start)/1.0e9); } }


结果:

Total size :2912899381

Time taken:0.498283


分析:Fork-join API通过working-stealing 完美地攻克了这一问题.当一个任务处于等待其子任务结束的状态时,该任务的运行线程能够将该任务挂起,然后转去运行其它任务。
在FileSize类中,我们创建了一个ForkJoinPool实例的引用,该实例使用keywordstatic定义。即它能够在整个应用程序中共享.随后我们定义了一个名为FileSizeFinder的静态内部类.该类继承了RecursiveTask并实现了compute()函数,我们能够用它来作为任务的运行引擎。

在compute()函数中.我们将给它文件夹下的全部文件大小累加起来,并将扫描和计算文件夹大小的工作托付给其它任务(即其它FileSizeFinder的实例)来完毕。

invokeAll()函数将等待全部子任务完毕之后才会运行下一步循环累加操作。在任务被堵塞期间,其运行线程并不是什么也不做一直傻等全部子任务结束(就像一个优秀的团队中那些有高度责任感的成员所做的那样),而是能够被调度去做其它任务。

最后。每一个任务都将在compute()函数结束时返回给定文件夹下全部文件和文件夹的大小!





通过以上的6种方式。带大家了解了 多线程的奥妙!我还会再后期发一些优秀的案例来和大家分享!




原文地址:https://www.cnblogs.com/blfbuaa/p/6946975.html