简单的生产消费者模型

案例场景:httpclient4.3.5抓取网页,用自带的线程池进行多线程测试。

httpclient4.3.5简单介绍:对于同一主机的请求,会保存路由信息,下次的请求会根据保存的路由走,减少了查找主机的时间。

类介绍:数据结构用的阻塞队列结构;监控线程、生产线程、消费线程

代码如下:

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.http.conn.routing.HttpRoute;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;


public class AQSTest {

    public static void main(String[] args){
        
        BlockQueue queue = new BlockQueue();
        monitor motor = new monitor(queue) ;
        motor.start() ;// 监控队列大小
        Producer prod1 = new Producer(queue) ;
        Producer prod2 = new Producer(queue) ;
        Producer prod3 = new Producer(queue) ;
        
        prod1.start();
        prod2.start();
        prod3.start();
        
        Crawel crawel = new Crawel(queue);
        crawel.start() ;
        
        try {
            Thread.sleep(60*1000*2);
            prod1.shutdown();
            prod2.shutdown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        /*
        int i = new Random().nextInt()%5;
        System.out.println(i);
        */
    }
    
    public static class Crawel extends Thread{
        
        private BlockQueue queue ;
        public Crawel(BlockQueue _queue){
            queue = _queue ;
        }
        @Override
        public void run() {

            PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
            // 将最大连接数增加到200
            cm.setMaxTotal(200);
            // 将每个路由基础的连接增加到20
            cm.setDefaultMaxPerRoute(20);
            //将目标主机的最大连接数增加到50
            HttpHost localhost = new HttpHost("www.yeetrack.com", 80);
            cm.setMaxPerRoute(new HttpRoute(localhost), 50);

            CloseableHttpClient httpClient = HttpClients.custom()
                    .setConnectionManager(cm)
                    .build();
         // 为每个url创建一个线程,GetThread是自定义的类
            GetThread[] threads = new GetThread[50];
            for (int i = 0; i < threads.length; i++) {
                threads[i] = new GetThread(httpClient,queue );
            }

            // 启动线程
            for (int j = 0; j < threads.length; j++) {
                threads[j].start();
            }

            // join the threads
            for (int j = 0; j < threads.length; j++) {
                try {
                    threads[j].join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    public static class GetThread extends Thread {

        private final CloseableHttpClient httpClient;
        private final HttpContext context;
        private  HttpGet httpget;
        private volatile boolean stop = false;
        private final BlockQueue queue ;
        
        public GetThread(CloseableHttpClient httpClient,BlockQueue _queue) {
            this.httpClient = httpClient;
            this.context = HttpClientContext.create();
            this.queue = _queue ;
        }
        
        public void shutdown(){
            synchronized (this) {
                stop  = true ;
            }
        }
        
        @Override
        public void run() {
            while(!stop){
                try {
                    String url = queue.take() ;
                    httpget = new HttpGet(url);
                    CloseableHttpResponse response = httpClient.execute(
                            httpget, context);
                    try {
                        HttpEntity entity = response.getEntity();
                        String resp = EntityUtils.toString(entity);
                        
                        System.out.println("成功获取源码");
                        EntityUtils.consume(entity);
                    } finally {
                        response.close();
                    }
                } catch (ClientProtocolException ex) {
                    // Handle protocol errors
                } catch (IOException ex) {
                    // Handle I/O errors
                }
            }
            
        }

    }
    
    public static class monitor extends  Thread{

        private BlockQueue queue ;
        private volatile boolean shutdown = false;
        public monitor(BlockQueue _queue){
            queue = _queue ;
        }
        public void run() {
                while(!shutdown){
                    System.out.println("Queue Size: "+queue.size());
                    try {
                        sleep(500) ;
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
        }
        public void shutdown(){
            synchronized (this) {
                shutdown = true; 
            }
        }
    }
    
    public static class Const {
        
        private static String[] urls = {"http://www.baidu.com","http://www.bing.com","http://www.hao123.com","http://www.163.com"
                ,"http://www.csdn.net"};
        
        private static Lock lock = new ReentrantLock() ;
        public static String get(){
            lock.lock() ;
            int i = new Random().nextInt()%urls.length; 
            i = Math.abs(i);
            String str = urls[i] ;
            lock.unlock() ;
            return str ;
        }
    }
    
    public static class Producer extends Thread {

        private volatile boolean stop = false;
        private BlockQueue queue ;
        public Producer(BlockQueue _queue){
            queue = _queue ;
        }
        @Override
        public void run() {
            while(!stop){
                queue.put(Const.get()) ;
                try {
                    sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        
        public void shutdown(){
            synchronized (this) {
                stop = true; 
            }
        }
        
    }
    
    public static class BlockQueue {
        public  LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<String>(1000);
        
        public void put(String url){
            try {
                queue.put(url) ;
            } catch (InterruptedException e) {
                e.printStackTrace();
                queue.clear();
            }
        }
        
        public String take(){
            String element = null ;
            try {
                element = queue.take() ;
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return element ;
        }
        
        public int size(){
            int size = queue.size() ;
            return size ;
        }
    }
    
    public static class IdleConnectionMonitorThread extends Thread {

        private final HttpClientConnectionManager connMgr;
        private volatile boolean shutdown;

        public IdleConnectionMonitorThread(HttpClientConnectionManager connMgr) {
            super();
            this.connMgr = connMgr;
        }

        @Override
        public void run() {
            try {
                while (!shutdown) {
                    synchronized (this) {
                        wait(5000);
                        // 关闭失效的连接
                        connMgr.closeExpiredConnections();
                        // 可选的, 关闭30秒内不活动的连接
                        connMgr.closeIdleConnections(30, TimeUnit.SECONDS);
                    }
                }
            } catch (InterruptedException ex) {
                // terminate
            }
        }

        public void shutdown() {
            shutdown = true;
            synchronized (this) {
                notifyAll();
            }
        }

    }
    
}
原文地址:https://www.cnblogs.com/xinzhuangzi/p/4172317.html