test

package com.suning.sntcscase.controller.thread;

import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.*;
import com.suning.sntcscase.entity.User;
import com.suning.sntcscase.service.UserService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@RestController
@RequestMapping("con")
public class ConController {
private final static Logger logger = LoggerFactory.getLogger(Logger.class);

@Resource
private UserService userService;

@RequestMapping(value = "test1", method = RequestMethod.GET)
public String test1() {
try {
//10万条数据
List<User> list = userService.getAll();
List<User> list2 = new ArrayList<>();

long st = System.currentTimeMillis();
//每条线程处理的数据尺寸
int size = 250;
int count = list.size() / size;
if (count * size != list.size()) {
count++;
}
int countNum = 0;
final CountDownLatch countDownLatch = new CountDownLatch(count);
ExecutorService executorService = Executors.newFixedThreadPool(8);
ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(executorService);
while (countNum < list.size()) {
countNum += size;
ConCallable callable = new ConCallable(userService, countNum);
//截取list的数据,分给不同线程处理
callable.setList(ImmutableList.copyOf(list.subList(countNum - size, countNum < list.size() ? countNum : list.size())));
ListenableFuture listenableFuture = listeningExecutorService.submit(callable);
Futures.addCallback(listenableFuture, new FutureCallback<List<User>>() {
@Override
public void onSuccess(List<User> list1) {
countDownLatch.countDown();
list2.addAll(list1);
}

@Override
public void onFailure(Throwable throwable) {
countDownLatch.countDown();
logger.info("处理出错:", throwable);

}
});
}
countDownLatch.await(30, TimeUnit.MINUTES);
logger.info("符合条件的返回数据个数为:" + list2.size());
logger.info("回调函数:" + list2.toString());
long et = System.currentTimeMillis();

System.out.println(et - st);
} catch (Exception ex) {
ex.printStackTrace();
}
return "正在处理......";

}
}



package com.suning.sntcscase.controller.thread;

import com.suning.sntcscase.entity.User;
import com.suning.sntcscase.service.UserService;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;


public class ConCallable implements Callable {
private List<User> list;

public ConCallable(UserService userService, int end) {
this.userService = userService;
// this.begin = begin;
this.end = end;
}

private UserService userService;

private int begin;
private int end;



@Override
public Object call() throws Exception {
List<User> listRe = new ArrayList<>();
for(long i = end-250;i <end;i++){
User entity = new User();
entity.setAge(155);
entity.setId(i);
entity.setPassword("7777");
entity.setUserName("saasdsad");
// int xx = (int)i;
userService.update(entity);
listRe.add(entity);
}

return listRe;
}

public void setList(List<User> list) {
this.list = list;
}
}







原文地址:https://www.cnblogs.com/alamps/p/11343399.html