多线程常用处理

// 多线程处理
public class DySchedule {
    private static AtomicInteger line = new AtomicInteger(0);
    static ExecutorService pool = Executors.newFixedThreadPool(100);

    public static int getLine(){
        return line.addAndGet(1000);
    }
    public static void doJob(){
        for (int i = 0;i<100;i++){
            Thread thread = new MyThread();
            pool.execute(thread);
        }
        pool.shutdown();

    }
    public static void main(String[] args) {
        DySchedule.doJob();
    }
}

public class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println("线程:" + Thread.currentThread().getName());
        Integer num = DySchedule.getLine();
        System.out.println("startline = " +(num-1000)+",endline = " + num);
    }
}



@PostConstruct
private void multithreadingGetData() {
	// 数据总量
	int count = 312;
	// 计算机可用cpu 设置线程池个数最好与之匹配
	int availableProcessors = Runtime.getRuntime().availableProcessors();
	// 每个线程要查多少条
	int num = count / availableProcessors;

	ExecutorService executorService = Executors.newFixedThreadPool(availableProcessors);
	List<CompletableFuture> futures = new ArrayList<>();

	long start = System.currentTimeMillis();
	for (int i = 0; i < num; i++) {
		int id = (i + 1) * num;
		if(id <= count){
			//futures.add(CompletableFuture.supplyAsync(() -> officeResponsibilityMapper.multithreadingGetData(id), executorService));
			CompletableFuture.supplyAsync(() -> officeResponsibilityMapper.multithreadingGetData(id), executorService);
		}
	}
	executorService.shutdown();
	System.out.println("多线程查询共用时间" + (System.currentTimeMillis() - start));

	long start2 = System.currentTimeMillis();
	officeResponsibilityMapper.selectAll();
	System.out.println("单线程查询共用时间" + (System.currentTimeMillis() - start2));

	// List<Object> collect = futures.stream().map(CompletableFuture::join).collect(Collectors.toList());

}




List<String> stuIds = stuService.getStuIds();
 //将集合分成n等份
 List<List<String>> lists = xxUtil.averageAssign(departmentIds, n);
 final List<CompletableFuture<Map<String, Integer>>> futures =  lists.parallelStream().map(subIds-> CompletableFuture.supplyAsync(() -> {
            Map<String, Integer> map = new HashMap<>();
            try {
                for (String id: subIds) { 
                    //根据id去查询数据库
                    Integer count = xxDao.getData(id);
                    map.put(id, count);
                }
            } catch (Exception e) {
                log.error("执行sql失败", e.getMessage(), e);
            }
            return map;
       //asyncTaskExecutor是自定义的线程池,这里也可以不传这个参数,默认使用jdk线程池
        }, asyncTaskExecutor)).collect(Collectors.toList());
 
        CompletableFuture<Void> completableFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
        CompletableFuture<List<Map<String, Integer>>> listCompletableFuture = completableFuture.thenApply(v ->
                futures.stream().map(future -> {
                    try {
                        return future.get();
                    } catch (Exception e) {
                        log.error("线程执行发生异常{}", e);
                    }
                    return new HashMap<>();
                }).collect(Collectors.toList()));
        try {
            //从线程中获取结果,拿到结果后,自行处理
            //get方法会等到所有的线程执行完毕,才会返回结果;换句话说,多线程的执行效率取决于最慢的那条线程的执行时间
           List<Map<String,Integer>> listMap =  listCompletableFuture.get();
        } catch (Exception e) {
            log.error("从线程中获取执行结果失败", e);
        }
每天一点点,惊喜不间断
原文地址:https://www.cnblogs.com/wszn-java/p/14985135.html