网络爬虫:使用多线程爬取网页链接

前言:

  经过前面两篇文章,你想大家应该已经知道网络爬虫是怎么一回事了。这篇文章会在之前做过的事情上做一些改进,以及说明之前的做法的不足之处。


思路分析:

1.逻辑结构图

  

  上图中展示的就是我们网络爬虫中的整个逻辑思路(调用Python解析URL,这里只作了简略的展示)。


2.思路说明:

  首先,我们来把之前思路梳理一下。之前我们采用的两个队列Queue来保存已经访问过和待访问的链接列表,并采用广度优先搜索进行递归访问这些待访问的链接地址。而且这里使用的是单线程操作。在对数据库的操作中,我们添加了一个辅助字段cipher_address来进行“唯一”性保证,因为我们担心MySQL在对过长的url链接操作时会有一些不尽如人意。

  我不知道上面这一段能否让你对之前我们处理Spider的做法有一个大概的了解,如果你还没有太明白这是怎么一回事。你可以访问《网络爬虫初步:从访问网页到数据解析》和《网络爬虫初步:从一个入口链接开始不断抓取页面中的网址并入库》这两篇文章进行了解。

  下面我就来说明一下,之前的做法存在的问题:

  1.单线程:采用单线程的做法,可以说相当不科学,尤其是对付这样一个大数据的问题。所以,我们需要采用多线程来处理问题,这里会用到多线程中的线程池。


  2.数据存储方式:如果我们采用内存去保存数据,这样会有一个问题,因为数据量非常大,所以程序在运行的过种中必然会内存溢出。而事实也正是如此:

  


  3.Url去重的方式:如果我们对Url进行MD5或是SHA1进行加密的方式进行哈希的话,这样会有一个效率的隐患。不过的确这个问题并不那么复杂。对效率的影响也很小。不过,还好Java自身就已经对String型的数据有哈希的函数可以直接调用:hashCode()


代码及说明:

LinkSpider.java

public class LinkSpider {
	
    private SpiderQueue queue = null;
    
	/**
	 *  遍历从某一节点开始的所有网络链接
	 * LinkSpider
	 * @param startAddress
	 * 			 开始的链接节点
	 */
	public void ErgodicNetworkLink(String startAddress) {
	    if (startAddress == null) {
            return;
        }
	    
	    SpiderBLL.insertEntry2DB(startAddress);
	    
	    List<WebInfoModel> modelList = new ArrayList<WebInfoModel>();
		queue = SpiderBLL.getAddressQueue(startAddress, 0);
		if (queue.isQueueEmpty()) {
            System.out.println("Your address cannot get more address.");
            return;
        }
		
		ThreadPoolExecutor threadPool = getThreadPool();
		int index = 0;
        boolean breakFlag = false;
        
		while (!breakFlag) {
		    
		    // 待访问队列为空时的处理
		    if (queue.isQueueEmpty()) {
		        System.out.println("queue is null...");
		        modelList = DBBLL.getUnvisitedInfoModels(queue.MAX_SIZE);
		        if (modelList == null || modelList.size() == 0) {
                    breakFlag = true;
                } else {
                    for (WebInfoModel webInfoModel : modelList) {
                        queue.offer(webInfoModel);
                        DBBLL.updateUnvisited(webInfoModel);
                    }
                }
		    }
		    
			WebInfoModel model = queue.poll();
			
			if (model == null) {
                continue;
            }
			
			// 判断此网站是否已经访问过
			if (DBBLL.isWebInfoModelExist(model)) {
			    // 如果已经被访问,进入下一次循环
			    System.out.println("已存在此网站(" + model.getName() + ")");
				continue;
			}
			
			poolQueueFull(threadPool);
			
			System.out.println("LEVEL: [" + model.getLevel() + "] NAME: " + model.getName());
			SpiderRunner runner = new SpiderRunner(model.getAddress(), model.getLevel(), index++);
			threadPool.execute(runner);
			
			SystemBLL.cleanSystem(index);
			
			// 对已访问的address进行入库
			DBBLL.insert(model);
		}
		
		threadPool.shutdown();
	}
	
	/**
	 * 创建一个线程池的对象
	 * LinkSpider
	 * @return
	 */
	private ThreadPoolExecutor getThreadPool() {
	    final int MAXIMUM_POOL_SIZE = 520;
        final int CORE_POOL_SIZE = 500;
        return new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(MAXIMUM_POOL_SIZE), new ThreadPoolExecutor.DiscardOldestPolicy());
	}
	
	/**
	 * 线程池中的线程队列已经满了
	 * LinkSpider
	 * @param threadPool
	 *         线程池对象
	 */
	private void poolQueueFull(ThreadPoolExecutor threadPool) {
	    while (getQueueSize(threadPool.getQueue()) >= threadPool.getMaximumPoolSize()) {
            System.out.println("线程池队列已满,等3秒再添加任务");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
	}
	
	/**
	 * 获得线程池中的活动线程数
	 * LinkSpider
	 * @param queue
	 *         线程池中承载线程的队列
	 * @return
	 */
	private synchronized int getQueueSize(Queue queue) {
        return queue.size();
    }
	
	/**
	 * 接收一个链接地址,并调用Python获取该链接下的关联的所有链接list
	 * 将list入库
	 */
	class SpiderRunner implements Runnable {
	    private String address;
	    private SpiderQueue auxiliaryQueue; // 记录访问某一个网页中解析出的网址
	    
	    private int index;
	    private int parentLevel;
	    
	    public SpiderRunner(String address, int parentLevel, int index) {
	        this.index = index;
	        this.address = address;
	        this.parentLevel = parentLevel;
        }
	    
        public void run() {
            auxiliaryQueue = SpiderBLL.getAddressQueue(address, parentLevel);
            System.out.println("[" + index + "]: " + address);
            DBBLL.insert2Unvisited(auxiliaryQueue, index);
            auxiliaryQueue = null;
        }
    }
}

  在上面的ErgodicNetworkLink方法代码中,大家可以看到我们已经把使用Queue保存数据的方式改为使用数据库存储。这样做的好处就是我们不用再为OOM而烦恼了。而且,上面的代码也使用了线程池。使用多线程来执行在调用Python获得链接列表的操作。

  而对于哈希Url的做法,可以参考如下关键代码:

/**
     * 添加单个model到等待访问的数据库中
     * DBBLL
     * @param model
     */
	public static void insert2Unvisited(WebInfoModel model) {
	    if (model == null) {
            return;
        }
	    
        String sql = "INSERT INTO unvisited_site(name, address, hash_address, date, visited, level) VALUES('" + model.getName() + "', '" + model.getAddress() + "', " + model.getAddress().hashCode() + ", " + System.currentTimeMillis() + ", 0, " + model.getLevel() + ");";
        DBServer db = null;
        try {
            db = new DBServer();
            db.insert(sql);
            
            db.close();
        } catch (Exception e) {
            System.out.println("your sql is: " + sql);
            e.printStackTrace();
        } finally {
            db.close();
        }
	}


  PythonUtils.java

  这个类是与Python进行交互操作的类。代码如下:

public class PythonUtils {

	// Python文件的所在路径
	private static final String PY_PATH = "/root/python/WebLinkSpider/html_parser.py";
		
	/**
	 * 获得传递给Python的执行参数
	 * PythonUtils
	 * @param address
	 * 			网络链接
	 * @return
	 */
	private static String[] getShellArgs(String address) {
		String[] shellParas = new String[3];
    	shellParas[0] = "python";
    	shellParas[1] = PY_PATH;
    	shellParas[2] = address.replace(""", "\"");
    	
    	return shellParas;
	}
	
	private static WebInfoModel parserWebInfoModel(String info, int parentLevel) {
		if (BEEStringTools.isEmptyString(info)) {
			return null;
		}
		
		String[] infos = info.split("\$#\$");
		if (infos.length != 2) {
			return null;
		}
		
		if (BEEStringTools.isEmptyString(infos[0].trim())) {
            return null;
        }
		
		if (BEEStringTools.isEmptyString(infos[1].trim()) || infos[1].trim().equals("http://") || infos[1].trim().equals("https://")) {
            return null;
        }
		
		WebInfoModel model = new WebInfoModel();
		
		model.setName(infos[0].trim());
		model.setAddress(infos[1]);
		model.setLevel(parentLevel + 1);
		
		return model;
	}
	
	/**
	 * 调用Python获得某一链接下的所有合法链接
	 * PythonUtils
	 * @param shellParas
	 * 			传递给Python的执行参数
	 * @return
	 */
	private static SpiderQueue getAddressQueueByPython(String[] shellParas, int parentLevel) {
		if (shellParas == null) {
			return null;
		}
		
		Runtime r = Runtime.getRuntime();
		SpiderQueue queue = null;
		
    	try {
			Process p = r.exec(shellParas);
			
			BufferedReader bfr = new BufferedReader(new InputStreamReader(p.getInputStream()));
			
			queue = new SpiderQueue();
			String line = "";
			WebInfoModel model = null;
			while((line = bfr.readLine()) != null) {
//			    System.out.println("----------> from python: " + line);
			    
			    if (BEEStringTools.isEmptyString(line.trim())) {
                    continue;
                }
			    
			    if (HttpBLL.isErrorStateCode(line)) {
                    break;
                }
			    
			    model = parserWebInfoModel(line, parentLevel);
			    if (model == null) {
                    continue;
                }
			    
				queue.offer(model);
			}
			
			model = null;
            line = null;
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
            r = null;
		}
    	
    	return queue;
	}
	
	/**
	 * 调用Python获得某一链接下的所有合法链接
	 * PythonUtils
	 * @param address
	 * 			网络链接
	 * @return
	 */
	public static SpiderQueue getAddressQueueByPython(String address, int parentLevel) {
		return getAddressQueueByPython(getShellArgs(address), parentLevel);
	}
}

遇到的问题:

1.请使用Python2.7

  因为Python2.6中HTMLParser还是有一些缺陷的,例如下图中展示的。不过在Python2.7中,这个问题就不再是问题了。

  


2.数据库崩溃了

  数据库崩溃的原因可能是待访问的数据表中的数据过大引起的。

  


3.对数据库的同步操作

  上面的做法是对数据库操作进行同步时出现的问题,如果不进行同步,我们会得到数据库连接数超过最大连接数的异常信息。对于这个问题有望在下篇文章中进行解决。

  不知道大家对上面的做法有没有什么疑问。当然,我希望你有一个疑问就是在于,我们去同步数据库的操作。当我们开始进行同步的时候就已经说明我们此时的同步只是做了单线程的无用功。因为我开始以为对数据库的操作是需要同步的,数据库是一个共享资源,需要互斥访问(如果你学习过“操作系统”,对这些概念应该不会陌生)。实际上还是单线程,解决的方法就是不要对数据库的操作进行同步操作。而这些引发的数据库连接数过大的问题,会在下篇文章中进行解决。

原文地址:https://www.cnblogs.com/fengju/p/6336051.html