第一种:
1 package com.xxl.job.executor.service.datatask.warnconf.sender;
2
3 import java.net.URLEncoder;
4 import java.util.Map;
5 import java.util.Set;
6 import java.util.concurrent.ArrayBlockingQueue;
7 import java.util.concurrent.ThreadPoolExecutor;
8 import java.util.concurrent.TimeUnit;
9 import javax.annotation.Resource;
10 import org.apache.commons.lang.StringUtils;
11 import org.springframework.stereotype.Service;
12 import com.grand.common.Enum.Type;
13 import com.grand.common.utils.HttpUtils;
14 import com.grand.common.utils.SystemUtils;
15 import com.xxl.job.executor.service.datatask.SysUserPushService;
16 import com.xxl.job.executor.service.datatask.warnconf.confservice.WxConfService;
17 import lombok.extern.slf4j.Slf4j;
18 import net.sf.json.JSONObject;
19
20
21 //调用微信服务号的接口推送消息
22 @Slf4j
23 @Service
24 public class WxMsgSender {
25
26 @Resource
27 private WxConfService wxConfService;
28 @Resource
29 private SysUserPushService sysUserPushService;
30 private final static ArrayBlockingQueue<Runnable> WORK_QUEUE = new ArrayBlockingQueue<>(500);
31 private static ThreadPoolExecutor pool = new ThreadPoolExecutor(3, 10, 20, TimeUnit.SECONDS, WORK_QUEUE);
32
33 public String sendWxMsg(String title, String content, String remark, String id, Type warnType, Set<String> openidSet,Object time) {
34 if(StringUtils.isEmpty(title)
35 || StringUtils.isEmpty(content)
36 || StringUtils.isEmpty(remark)
37 || StringUtils.isEmpty(id)
38 || warnType==null
39 || openidSet==null || openidSet.size()==0
40 ) {
41 return "error";
42 }
43 JSONObject json = new JSONObject();
44 if(time==null || time.toString().trim().equals("")) {
45 time = SystemUtils.getNowDate("yyyy-MM-dd HH:mm:ss");
46 }
47 json.put("time", time);
48 json.put("title", title);
49 json.put("content", content);
50 json.put("remark", remark);
51 json.put("id", id);
52 json.put("warnType", warnType.getValue());
53
54 // 微信告警消息推送
55 for (String openid : openidSet) {
56 // 循环调用微信下发消息服务
57 pool.execute(()->{
58 try {
59 String passport = wxConfService.getPassportByOpenid(openid);
60 json.put("openid", openid);
61 json.put("passport", passport);
62 Map<String,Object> pushConf = sysUserPushService.getPushConf(passport);
63 if(pushConf!=null && pushConf.get("weixin").toString().equals("1")) {
64 HttpUtils.sendGet("http://xxx.xxx.xxx.cn/weichat/wx/pushMessage", toParam(json));
65 }
66 } catch (Exception e) {
67 e.printStackTrace();
68 log.error(e.getMessage(),e);
69 }
70 });
71
72 }
73 return "success";
74 }
75
76 private String toParam(JSONObject json) {
77 StringBuffer sb = new StringBuffer();
78 String value = null;
79 for (Object key : json.keySet()) {
80 try {
81 value = json.get(key).toString();
82 sb.append("&"+key+"="+URLEncoder.encode(value, "utf-8"));
83 } catch (Exception e) {
84 e.printStackTrace();
85 log.error(e.getMessage(),e);
86 }
87 }
88 if(sb.length()>0) {
89 return sb.substring(1);
90 }
91 return null;
92 }
93
94 }
第二种:
1 package demo.syn;
2
3 import java.util.concurrent.Callable;
4 import java.util.concurrent.ExecutionException;
5 import java.util.concurrent.ExecutorService;
6 import java.util.concurrent.Executors;
7 import java.util.concurrent.Future;
8 /**
9 * callable 跟runnable的区别:
10 * runnable的run方法不会有任何返回结果,所以主线程无法获得任务线程的返回值
11 *
12 * callable的call方法可以返回结果,但是主线程在获取时是被阻塞,需要等待任务线程返回才能拿到结果
13 * @author
14 *
15 */
16 public class SynDemo {
17
18 public static void main(String[] args) throws InterruptedException, ExecutionException {
19 ExecutorService pool = Executors.newFixedThreadPool(4);
20
21 for(int i = 0; i < 10; i++){
22 Future<String> submit = pool.submit(new Callable<String>(){
23
24 public String call() throws Exception {
25 Thread.sleep(5000);
26 System.out.println("b--"+Thread.currentThread().getName() +" 我在线程里面执行");
27 return "b--"+Thread.currentThread().getName();
28 }
29 });
30 //从Future中get结果,这个方法是会被阻塞的,一直要等到线程任务返回结果
31 System.out.println("我不是阻塞的,啦啦啦啦");
32 System.out.println(submit.get());
33 }
34 pool.shutdown();
35 }
36
37 }