并发处理-线程池

之前写过一个HTTP Client 请求,刷新主机缓存,之前实现通过为前台获取主机地址 通过 for循环进行调用,通过测试之后就没在理,现在发现性能不足,

遇到timeout情况会产生严重延迟效果,无法使用,现在将其改造成并发处理。

之前有学习过并发,只是简单的了解线程,线程状态,线程安全等基本知识,联系过抢票等一下简单实例,具体开发没用到过,之前开发业务逻辑,也不需要,

简单的逻辑即可,突然进行并发处理有点懵,话不多说,直接贴代码。下面详细解释。

// HTTPClient     
public static  String doGet(String url){
        String result = "";
        BufferedReader in = null;
        try {
            URL realUrl = new URL(url);
            // 打开和URL之间的连接
            HttpURLConnection connection = (HttpURLConnection) realUrl.openConnection();
            // 设置通用的请求属性
            connection.setRequestProperty("accept", "*/*");
            connection.setRequestProperty("connection", "Keep-Alive");
            connection.setRequestProperty("user-agent", "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;SV1)");
            //设置超时
            System.setProperty("sun.net.client.defaultConnectTimeout", "5000");
            System.setProperty("sun.net.client.defaultReadTime", "5000");
            connection.setConnectTimeout(5000);
            connection.setReadTimeout(5000);
            // 建立实际的连接
            connection.connect();
            // 获取所有响应头字段
            Map<String, List<String>> map = connection.getHeaderFields();
            // 遍历所有的响应头字段
            for (String key : map.keySet()) {
                log.debug(key + "--->" + map.get(key));
            }
            // 定义 BufferedReader输入流来读取URL的响应
            in = new BufferedReader(new InputStreamReader(connection.getInputStream()));
            String line;
            while ((line = in.readLine()) != null) {
                result += line;
            }
        } catch (MalformedURLException e) {
            return "FAILD";
        } catch (IOException e) {
            return "FAILD";
        }finally {
            try {
                if (in != null) {
                    in.close();
                }
            } catch (Exception e2) {
                e2.printStackTrace();
            }
        }
        return result;
    }

  

HTTP请求是无状态的请求,所以很容易造成超时处理,必须设定合适的相应控制范围,由于公司主机网络延迟比较严重

设置5秒延迟时间

            System.setProperty("sun.net.client.defaultConnectTimeout", "5000"); //JDK 1.5之后推荐这么写,JVM层

            System.setProperty("sun.net.client.defaultReadTime", "5000");

            connection.setConnectTimeout(5000);      //JDK 1.5之前可以这么设置 链接层面

            connection.setReadTimeout(5000);

关于超时的设置,网上找到两种方法,我都引用了,万无一失,一个是JVM层面,一个是链接层面

//线程并发处理 
// 内部类
class HTTPThread implements Callable<IData> {
    private IData param  = null;
    private StringBuffer url = null;
    private CountDownLatch countDownLatch;

    public void setCountDownLatch(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }

    public IData getParam() {
        return param;
    }

    public void setParam(IData instance) {
        this.param = instance;
        url = new StringBuffer("http://");
        url.append(instance.getString("HOST")+":");
        url.append(instance.getString("PORT"));
        url.append(instance.getString("CONTEXT",""));
        url.append(instance.getString("SERVLET"));
        url.append("?WadeSafeUpdate");
    }

    @Override
    public IData call() throws Exception {
        try {
            String state = HTTPClientAPI.doGet(url.toString());
            this.param.put("RESULT", state);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (countDownLatch != null) {
                //递减锁存器的计数
                countDownLatch.countDown();
            }
        }
        return this.param;
    }
}

  

// 调用方法
/**
     * 刷新所有机器
     * @param param
     * @return
     * @throws Exception
     */
    public static IDataset refreshALLListener(IData param) throws Exception {
        int success = 0;
        int faild = 0;
        IDataset instances = queryListener(param);
        ArrayList<Future<IData>> results = new ArrayList<Future<IData>>();//
        ExecutorService executorService =Executors.newFixedThreadPool(10);
        CountDownLatch countDownLatch = new CountDownLatch(instances.size());
        for(int i=0; i<instances.size() ; i++){
            IData instance = instances.getData(i);
            HTTPThread httpThread = new HTTPThread();
            httpThread.setParam(instance);
            httpThread.setCountDownLatch(countDownLatch);
            results.add(executorService.submit(httpThread));
        }
        countDownLatch.await();
        executorService.shutdown();
        IDataset result = new DatasetList();
        for(int i=0; i<results.size() ; i++){
            result.add(results.get(i).get());
            IData res = result.getData(i);
            String state = res.getString("RESULT");
            if("SUCCESS".equals(state)){
                success++;
                res.put("FRESHSTATE","刷新成功");
                res.put("REFRESHSTATE",state);

                SQLParser parser=new SQLParser(res);
                parser.addSQL(" update vest_server_instance set UPTIME = sysdate() where HOST = :HOST ");
                parser.addSQL(" AND PORT = :PORT ");
                parser.addSQL(" AND  CONTEXT = :CONTEXT ");
                parser.addSQL(" AND SERVLET = :SERVLET");
                BaseDAO dao = new BaseDAO();
                dao.initial("base");
                dao.executeUpdate(parser);
                res.put("UPTIME", TIME_FORMAT.format(new Date()));
            }else{
                faild++;
                res.put("FRESHSTATE","刷新失败");
                res.put("REFRESHSTATE",state);
            }
        }
        if(instances.size() > 0){
            instances.getData(0).put("SUCCESS",success);
            instances.getData(0).put("FAILD",faild);
        }
        return result;
    }

}

  

1:线程池

在之前 没接触过线程池,开启线程都是通过Thread.run方法开启。但是随着new 的Thread 越来越多,创建-销毁,创建-销毁,就会造成资源的严重浪费,效率会下降,快速崩溃。线程池就可以帮助我们解决这个问题,他使线程可以重复使用,就是执行完一个任务线程不会被销毁,而是可以继续执行其他任务

2:线程执行先后、如何同步、处理完成

CountDownLatch 线程同步工具类,相当于一个计数器,通过初始化设置一个数量(只能设置一次,不能更改)。当线程完成任务后,会进行减一。

3线程如何执行完成如何返回数据

之前都是通过Runnable 接口来创建一个线程,但是run方法没有返回值,所以应该使用Callable,Callable的call方法可以根据你传入的泛型参数返回对应类型的数据。callable的call方法返回的数据是通过Future来接受的,有两个方法鼻祖知道,首先,可以用isDone()方法来查询Future是否已经完成,任务完成后,可以调用get()方法来获取结果 如果不加判断直接调用get方法,此时如果线程未完成,get将阻塞,直至结果准备就绪

线程池:

JAVA通过Executors创建线程池

  1.  newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
  2.  newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
  3.  newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
  4.  newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

线程池重要方法:

  1. execute()实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。
  2. submit()是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果。
  3. shutdown() 关闭线程池
  4. shutdownNow() 关闭线程池

 

原文地址:https://www.cnblogs.com/Tonyzczc/p/10737047.html