多线程中集合的处理线程处理完后添加到集合不成功的问题解决

昨天有个同事(我招进来的,我负责带)问我说,多线程处理中添加不进集合咋办?

他的源码就不上了,我就直接上解决方案

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.tianbo.tcp.TcpBootstrap;
import com.tianbo.tcp.vo.EquVo;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;

/**
* @author jianyongchao
* @date: 2021-11-16 09:21:58
* @company: tianbo
* @description:
*/
//@RunWith(SpringRunner.class)
//@SpringBootTest(classes = TcpBootstrap.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ThreadPoolTaskExecutorTest {


private static final Integer CORE_SIZE = 20;
private static final Integer MAX_SIZE = 50;
private static final Integer QUEUE_CAPACITY = 2000;
private static final Integer KEEP_ALIVE = 300;



public ThreadPoolTaskExecutor threadPoolTaskExecutor() {

// ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("tb-cloud-pool-%d").build();
// ThreadPoolExecutor.CallerRunsPolicy handler = new ThreadPoolExecutor.CallerRunsPolicy();
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.initialize();
executor.setCorePoolSize(CORE_SIZE);
executor.setMaxPoolSize(MAX_SIZE);
executor.setKeepAliveSeconds(KEEP_ALIVE);
executor.setQueueCapacity(QUEUE_CAPACITY);
// executor.setThreadFactory(threadFactory);
// executor.setRejectedExecutionHandler(handler);

return executor;
}

@Test
public void test() {
ThreadPoolTaskExecutorTest test = new ThreadPoolTaskExecutorTest();
ThreadPoolTaskExecutor executor = test.threadPoolTaskExecutor();

List<EquVo> lists = Collections.synchronizedList(new ArrayList<>());

//CountDownLatch能够使一个线程在等待另外一些线程完成各自工作之后,
// 再继续执行。使用一个计数器进行实现。计数器初始值为线程的数量。
// 当每一个线程完成自己任务后,计数器的值就会减一。
// 当计数器的值为0时,表示所有的线程都已经完成一些任务,
// 然后在CountDownLatch上等待的线程就可以恢复执行接下来的任务
long start = System.currentTimeMillis();
int threadCount = 10000;
CountDownLatch latch = new CountDownLatch(threadCount);
for(int i=0;i<threadCount;i++){
executor.submitListenable(() -> {
EquVo equVo = new EquVo();

equVo.setEquId(1L);
return equVo;
}).addCallback(new ListenableFutureCallback<EquVo>() {
@Override
public void onSuccess(EquVo equVo) {
// lists.add(equVo);
latch.countDown();
lists.add(equVo);
}

@Override
public void onFailure(Throwable throwable) {
latch.countDown();
}
});
}


try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
long end = System.currentTimeMillis();
System.err.println(lists.size());
System.out.println(end - start);//200 以内
}
}

处理10000,花费的时间200以内还是可以哈
-------------------------------------

package com.tianbo.tcp;

import com.alibaba.fastjson.JSONObject;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;

public class Main {
public static void main(String[] args) throws InterruptedException {
//Vector list = new Vector();
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();


List<Object> list = Collections.synchronizedList(new ArrayList<>());

//CountDownLatch能够使一个线程在等待另外一些线程完成各自工作之后,
// 再继续执行。使用一个计数器进行实现。计数器初始值为线程的数量。
// 当每一个线程完成自己任务后,计数器的值就会减一。
// 当计数器的值为0时,表示所有的线程都已经完成一些任务,
// 然后在CountDownLatch上等待的线程就可以恢复执行接下来的任务
long start = System.currentTimeMillis();
int threadCount = 100;
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
int finalI = i;
new Thread(new Runnable() {
JSONObject json = new JSONObject();

@Override
public void run() {
json.put("i", finalI);
list.add(json);
//计数器减一
latch.countDown();
}
}).start();
}
//阻塞当前线程,直到计数器的值为0
latch.await();
long end = System.currentTimeMillis();
System.err.println(list.size());
System.out.println(end-start);
}
}

希望大家有有所帮助,写作不易







原文地址:https://www.cnblogs.com/qq3245792286/p/15560121.html