使用多线程提高restfull 服务的吞吐量
@PostMapping(value = "/request") public Callable<CommonResponse> getrequest(HttpServletRequest request, @RequestBody JSONObject data) { Callable<CommonResponse> result = new Callable<CommonResponse>() { @Override public CommonResponse call() throws Exception { // get all header info Enumeration<String> requestHeader = request.getHeaderNames(); JSONObject header = new JSONObject(); while (requestHeader.hasMoreElements()) { String headerKey = requestHeader.nextElement().toString(); // System.err.println("headerKey="+headerKey+";value="+request.getHeader(headerKey)); header.put(headerKey, request.getHeader(headerKey)); } String platform = header.getString(HEADER_KEY_PLATFORM); if (StringUtils.isBlank(platform)) { String message = "request haader must have headerKey:platform"; return ResultUtils.resultFail(HttpServletResponse.SC_CREATED, message, null); } dataDistributionService.dataDistribution(platform, data); return ResultUtils.resultOk("success", null); } }; return result; }
FutureTask
FutureTask 实现了 RunnableFuture 接口,这个接口的定义如下:
public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); }
可以看到这个接口实现了 Runnable 和 Future 接口,接口中的具体实现由 FutureTask 来实现。这个类的两个构造方法如下 :
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
sync = new Sync(callable);
}
public FutureTask(Runnable runnable, V result) {
sync = new Sync(Executors.callable(runnable, result));
}
如上提供了两个构造函数,一个以 Callable 为参数,另外一个以 Runnable 为参数。这些类之间的关联对于任务建模的办法非常灵活,允许你基于 FutureTask 的 Runnable 特性(因为它实现了 Runnable 接口),把任务写成 Callable,然后封装进一个由执行者调度并在必要时可以取消的 FutureTask。
FutureTask 可以由执行者调度,这一点很关键。它对外提供的方法基本上就是 Future 和 Runnable 接口的组合:get()、cancel、isDone()、isCancelled() 和 run(),而 run() 方法通常都是由执行者调用,我们基本上不需要直接调用它。
一个 FutureTask 的例子:
//建议用于controller.提高接口的吞吐量
public class MyCallable implements Callable<String> { private long waitTime; public MyCallable(int timeInMillis){ this.waitTime=timeInMillis; } @Override public String call() throws Exception { Thread.sleep(waitTime); //return the thread name executing this callable task return Thread.currentThread().getName(); } }
public class FutureTaskExample { public static void main(String[] args) { MyCallable callable1 = new MyCallable(1000); // 要执行的任务 MyCallable callable2 = new MyCallable(2000); FutureTask<String> futureTask1 = new FutureTask<String>(callable1);// 将Callable写的任务封装到一个由执行者调度的FutureTask对象 FutureTask<String> futureTask2 = new FutureTask<String>(callable2); ExecutorService executor = Executors.newFixedThreadPool(2); // 创建线程池并返回ExecutorService实例 executor.execute(futureTask1); // 执行任务 executor.execute(futureTask2); while (true) { try { if(futureTask1.isDone() && futureTask2.isDone()){// 两个任务都完成 System.out.println("Done"); executor.shutdown(); // 关闭线程池和服务 return; } if(!futureTask1.isDone()){ // 任务1没有完成,会等待,直到任务完成 System.out.println("FutureTask1 output="+futureTask1.get()); } System.out.println("Waiting for FutureTask2 to complete"); String s = futureTask2.get(200L, TimeUnit.MILLISECONDS); if(s !=null){ System.out.println("FutureTask2 output="+s); } } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }catch(TimeoutException e){ //do nothing } } } }