批量处理接口 Demo

1.接收请求的控制类:

package ..CompletableFuture;

import java.util.Map;
import java.util.UUID;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;

@Controller
public class CompleController {

    /**
     * 批量处理接口
     * @param request
     * @param response
     * @throws Exception
     */
    @RequestMapping("/comple.do")
    public void  index(HttpServletRequest request,HttpServletResponse response) throws Exception {
        request.setCharacterEncoding("UTF-8");
        response.setContentType("text/piain;charset=UTF-8");
        String st_fj_id = "keys";
        String serialNo = UUID.randomUUID().toString();//给每个请求加个标识
        Map<String ,Object> map = CompletableFutureService.query(st_fj_id,serialNo);
        String fhserialNo = map.get("serialNo").toString();
        if(!serialNo.equals(fhserialNo)){
            System.out.println("请求发送和结果返回的标识是否一致---->: "+serialNo+"---"+fhserialNo);
        }
        response.getWriter().print(serialNo+"----"+fhserialNo);
    }

}

2.批量请求处理实现类

package ..CompletableFuture;

import org.springframework.stereotype.Service;
import com.alibaba.fastjson.JSONObject;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.*;

@Service
public class CompletableFutureService {

static class Reques{

    String orderCode;
    String serialNo;    
    CompletableFuture<Map<String ,Object>> future;
}

    static LinkedBlockingDeque<Reques> queue = new LinkedBlockingDeque<>();

    public static Map<String ,Object> query(String orderCode,String serialNo) throws ExecutionException, InterruptedException {
       //设置回调监听
       CompletableFuture<Map<String ,Object>> future = new CompletableFuture<>();

        Reques request = new Reques();
        request.future = future;
        request.serialNo = serialNo;
        request.orderCode = orderCode;
        queue.put(request);
        //监听结果
        return future.get();
    }

    @PostConstruct
    public void init(){
        System.out.println("进入定时任务...");
        //创建执行定时任务的线程池
        ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
        service.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
               int size =queue.size();
               if(size ==0){
                   return;
               }
               System.out.println("size-------"+size); 
               JSONObject json = new JSONObject();
               ArrayList<Reques> req = new ArrayList<>();
               for (int i=0;i<size;i++){
                    try {

                           json.put("serialNo", reques.serialNo);
                           json.put("orderCode",reques.orderCode);
                           req.add(reques);
                           
                           List<Map<String,Object>> res = sendDB(json);
                           for(Reques requ : req){
                               String serialNo = requ.serialNo;
                               for (Map<String, Object> reque : res) {
//                                   System.out.println("判断    serialNo   ===  "+serialNo.equals(reque.get("serialNo").toString()));
                                if(serialNo.equals(reque.get("serialNo").toString())){
                                    requ.future.complete(reque);
                                    break;//判断相等就跳出循环,节省性能
                                }
                            }
                         } 
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        },0,10,TimeUnit.SECONDS);//10秒查看一下队列
    }

    
    public static List<Map<String,Object>> sendDB(JSONObject json) {
        
        List<Map<String,Object>> list = new ArrayList<>();
        Map<String,Object> fhMap = new HashMap<>();
        
        //业务逻辑
         
        fhMap.put("serialNo",json.get("serialNo"));
        list.add(fhMap);
        return list;
    }
    
}

3.测试类

package ..CompletableFuture;

import ..utils.requestUtils.HttpApiClient;
import java.util.concurrent.CountDownLatch;

public class Main {

    private static final int  Thread_num   = 2000;
    private static final CountDownLatch cd = new CountDownLatch(Thread_num);


    public static void cdSendData() throws InterruptedException {
        for (int i=0;i<Thread_num;i++){
            Thread t  = new Thread(()->{
                try {
                    cd.countDown();
                    cd.await();
                    String url = "http://localhost:8080/comple.do";
                    String result = HttpApiClient.getData(url);
                    System.out.println("result:   "+result);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
            t.start();
        }
        Thread.sleep(5000);
    }

    public static void main(String[] args) throws InterruptedException {
        cdSendData();

    }

}
原文地址:https://www.cnblogs.com/lifan12589/p/13935456.html