网络爬虫:分离生产者和消费者来优化爬虫程序

问题描写叙述:

  基于前面的一些工作(可点击这里參见笔者前面的相关博客),我们取得了一些成果。只是存在的问题又总是会让人坐立不安。本文通过分离生产者、消费者以及引入连接池技术来优化爬虫程序。解决前面说到的数据库连接数过大、程序长时间执行OOM的情况。


思路分析:

1.结构设计图:

  

2.思路整理:

  首先。我来说明一下在之前的工作中遗留下的一些主要问题:

  (1)数据库Crash了(可能原因是unvisited_site表的数据过大引起的);

  (2)程序执行一段时间之后,出现OOM异常.

  对于第一个问题,虽不能全然保证是由于数据量过大引发的问题,可是这可能是当中的一个方面。关于Crash的日志,请点击这里查看。

对此我的优化策略就是过滤加入到DB中的数据,另外引入连接池来避免数据的连接数过大导致连接数超过最大连接数异常。

  而对于程序在长时间执行后OOM的情况,我表示这的确是困扰了我一些时间。

一開始我是以为这是一个内存泄露的Bug。但是在漫长寻找Bug的过程中,我发现这可能并非内存泄露,而是正常情况。事实也正是如此。

以下的内存执行情况中会展示我改动了一些逻辑之后的合理内存使用情况。


程序执行情况展示:

1.内存:

  

2.线程:

  


3.CPU:

  


关键代码:

1.连接池相关代码:

ConnectionPool.java

public class ConnectionPool {
    private String jdbcDriver = ""; // 数据库驱动
    private String dbUrl = ""; // 数据 URL
    private String dbUsername = ""; // 数据库username
    private String dbPassword = ""; // 数据库用户password
    private String testTable = ""; // 測试连接是否可用的測试表名,默认没有測试表
    private int initialConnections = 1; // 连接池的初始大小
    private int incrementalConnections = 10; // 连接池自己主动添加的大小
    private int maxConnections = 500; // 连接池最大的大小
    private Vector<PooledConnection> connections = null; // 存放连接池中数据库连接的向量 ,

    public ConnectionPool(String jdbcDriver, String dbUrl, String dbUsername, String dbPassword) {
        this.jdbcDriver = jdbcDriver;
        this.dbUrl = dbUrl;
        this.dbUsername = dbUsername;
        this.dbPassword = dbPassword;

        try {
            createPool();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public int getInitialConnections() {
        return this.initialConnections;
    }

    public void setInitialConnections(int initialConnections) {
        this.initialConnections = initialConnections;
    }

    public int getIncrementalConnections() {
        return this.incrementalConnections;
    }

    public void setIncrementalConnections(int incrementalConnections) {
        this.incrementalConnections = incrementalConnections;
    }

    public int getMaxConnections() {
        return this.maxConnections;
    }

    public void setMaxConnections(int maxConnections) {
        this.maxConnections = maxConnections;
    }

    public String getTestTable() {
        return this.testTable;
    }

    public void setTestTable(String testTable) {
        this.testTable = testTable;
    }

    public synchronized void createPool() throws Exception {

        // 假设连接池己经创建了,保存连接的向量 connections 不会为空
        if (connections != null) {
            return; // 假设己经创建。则返回
        }

        // 实例化 JDBC Driver 中指定的驱动类实例

        Driver driver = (Driver) (Class.forName(this.jdbcDriver).newInstance());

        DriverManager.registerDriver(driver); // 注冊 JDBC 驱动程序

        // 创建保存连接的向量 , 初始时有 0 个元素
        connections = new Vector<PooledConnection>();

        // 依据 initialConnections 中设置的值,创建连接。

createConnections(this.initialConnections); System.out.println("create pool"); } private void createConnections(int numConnections) throws SQLException { // 循环创建指定数目的数据库连接 for (int x = 0; x < numConnections; x++) { // 是否连接池中的数据库连接的数量己经达到最大?最大值由类成员 maxConnections // 指出。假设 maxConnections 为 0 或负数。表示连接数量没有限制。 // 假设连接数己经达到最大。即退出。 System.out.println(this.connections.size() + ", " + this.maxConnections); if (this.maxConnections > 0 && this.connections.size() >= this.maxConnections) { System.out.println("连接数己经达到最大"); break; } try { connections.addElement(new PooledConnection(newConnection())); } catch (SQLException e) { System.out.println(" 创建数据库连接失败!

" + e.getMessage()); throw new SQLException(); } System.out.println(" 数据库连接己创建 ......"); } } private Connection newConnection() throws SQLException { Connection conn = DriverManager.getConnection(dbUrl, dbUsername, dbPassword); if (connections.size() == 0) { DatabaseMetaData metaData = conn.getMetaData(); int driverMaxConnections = metaData.getMaxConnections(); if (driverMaxConnections > 0 && this.maxConnections > driverMaxConnections) { this.maxConnections = driverMaxConnections; } } return conn; // 返回创建的新的数据库连接 } public synchronized PooledConnection getConnection() throws SQLException { // 确保连接池己被创建 if (connections == null) { return null; // 连接池还没创建。则返回 null } PooledConnection conn = getFreeConnection(); // 获得一个可用的数据库连接 // 假设眼下没有能够使用的连接,即全部的连接都在使用中 while (conn == null) { // 等一会再试 wait(250); conn = getFreeConnection(); // 又一次再试,直到获得可用的连接。假设 // getFreeConnection() 返回的为 null // 则表明创建一批连接后也不可获得可用连接 } return conn; // 返回获得的可用的连接 } public void print() { System.out.println("total connection:" + connections.size()); int i = 1; for (PooledConnection conn : connections) { System.out.println("---" + i + ":" + conn.isBusy()); } } private PooledConnection getFreeConnection() throws SQLException { // 从连接池中获得一个可用的数据库连接 PooledConnection conn = findFreeConnection(); if (conn == null) { // 假设眼下连接池中没有可用的连接 // 创建一些连接 System.out.println("眼下连接池中没有可用的连接,创建一些连接 "); createConnections(incrementalConnections); // 又一次从池中查找是否有可用连接 conn = findFreeConnection(); if (conn == null) { // 假设创建连接后仍获得不到可用的连接,则返回 null return null; } } return conn; } private PooledConnection findFreeConnection() throws SQLException { // 获得连接池向量中全部的对象 for (int i = 0; i < connections.size(); i++) { PooledConnection pc = connections.elementAt(i); // System.out.println("pConn.isBusy():"+pConn.isBusy()); if (!pc.isBusy()) { // 假设此对象不忙。则获得它的数据库连接并把它设为忙 Connection conn = pc.getConnection(); pc.setBusy(true); // 測试此连接是否可用 if (!isValid(conn)) { // 假设此连接不可再用了,则创建一个新的连接, // 并替换此不可用的连接对象,假设创建失败。删除该无效连接,遍历下一个不忙连接 try { conn = newConnection(); pc.setConnection(conn); } catch (SQLException e) { e.printStackTrace(); connections.remove(i--); continue; } } return pc; // 己经找到一个可用的连接,退出 } } return null; // 返回找到到的可用连接 } private boolean isValid(Connection conn) { try { return conn.isValid(3000); } catch (SQLException e) { e.printStackTrace(); return false; } } public void returnConnection(Connection conn) { // 确保连接池存在,假设连接没有创建(不存在),直接返回 if (connections == null) { System.out.println(" 连接池不存在,无法返回此连接到连接池中 !"); return; } PooledConnection pConn = null; Enumeration<PooledConnection> enumerate = connections.elements(); // 遍历连接池中的全部连接。找到这个要返回的连接对象 while (enumerate.hasMoreElements()) { pConn = (PooledConnection) enumerate.nextElement(); // 先找到连接池中的要返回的连接对象 if (conn == pConn.getConnection()) { // 找到了 , 设置此连接为空暇状态 pConn.setBusy(false); break; } } } public synchronized void refreshConnections() throws SQLException { // 确保连接池己创新存在 if (connections == null) { System.out.println(" 连接池不存在,无法刷新 !"); return; } PooledConnection pConn = null; Enumeration<PooledConnection> enumerate = connections.elements(); while (enumerate.hasMoreElements()) { // 获得一个连接对象 pConn = (PooledConnection) enumerate.nextElement(); // 假设对象忙则等 5 秒 ,5 秒后直接刷新 if (pConn.isBusy()) { wait(5000); // 等 5 秒 } // 关闭此连接,用一个新的连接取代它。 closeConnection(pConn.getConnection()); pConn.setConnection(newConnection()); pConn.setBusy(false); } } public synchronized void closeConnectionPool() throws SQLException { // 确保连接池存在,假设不存在,返回 if (connections == null) { System.out.println("连接池不存在,无法关闭 !"); return; } PooledConnection pConn = null; Enumeration<PooledConnection> enumerate = connections.elements(); while (enumerate.hasMoreElements()) { pConn = (PooledConnection) enumerate.nextElement(); // 假设忙,等 5 秒 if (pConn.isBusy()) { wait(5000); // 等 5 秒 } // 5 秒后直接关闭它 closeConnection(pConn.getConnection()); // 从连接池向量中删除它 connections.removeElement(pConn); } // 置连接池为空 connections = null; } private void closeConnection(Connection conn) { try { conn.close(); } catch (SQLException e) { System.out.println(" 关闭数据库连接出错: " + e.getMessage()); } } private void wait(int mSeconds) { try { Thread.sleep(mSeconds); } catch (InterruptedException e) { } } public class PooledConnection { private Connection connection = null;// 数据库连接 private boolean busy ; // 此连接是否正在使用的标志,默认没有正在使用 // 构造函数,依据一个 Connection 构告一个 PooledConnection 对象 private PooledConnection(Connection connection) { this.connection = connection; } public ResultSet executeQuery(String sql) throws SQLException { return connection.createStatement().executeQuery(sql); } public int executeUpdate(String sql) throws SQLException { return connection.createStatement().executeUpdate(sql); } // 返回此对象中的连接 private Connection getConnection() { return connection; } // 设置此对象的,连接 private void setConnection(Connection connection) { this.connection = connection; } // 获得对象连接是否忙 private boolean isBusy() { return busy; } // 设置对象的连接正在忙 private void setBusy(boolean busy) { this.busy = busy; } public void close() { busy = false; } } }


DBManager.java

public class DBManager {

    private static PooledConnection conn;
    private static ConnectionPool connectionPool;
    private static DBManager inst;
    
    private String mUrl = DBModel.getMysqlUrl();
    private String mUser = DBModel.getMysqlUesr();
    private String mPassword = DBModel.getMysqlPassword();
    private String mDriver = DBModel.getMysqlDerver();

    public void close() {
        try {
            connectionPool.closeConnectionPool();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    public DBManager() {
        if (inst != null)
            return;

        connectionPool = new ConnectionPool(mDriver, mUrl, mUser, mPassword);
        try {
            connectionPool.createPool();
            inst = this;
            

        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public static PooledConnection getConnection() {
        if (inst == null) {
            new DBManager();
        }
        try {
            conn = connectionPool.getConnection();
        } catch (SQLException e) {
            e.printStackTrace();
        }

        return conn;
    }
}

连接池使用过程:

public static void insert(WebInfoModel model) {
		if (model == null) {
			return;
		}
		
		if (BEEStringTools.isEmptyString(model.getName()) || BEEStringTools.isEmptyString(model.getAddress())) {
            return;
        }
		
		String sql = "INSERT INTO visited_site(name, address, hash_address, date, level) VALUES('" + model.getName() + "', '" + model.getAddress() + "', " + model.getAddress().hashCode() + ", " + System.currentTimeMillis() + ", " + model.getLevel() + ")";
		PooledConnection conn = null;
		try {
		    conn = DBManager.getConnection();
		    conn.executeUpdate(sql);
		} catch (Exception e) {
		    System.out.println("your sql is: " + sql + "
Error: " + e);
		} finally {
		    conn.close();
		    model = null;
		    sql = null;
		}
	}

2.批量插入数据相关代码:

/**
	 * 将set中的数据批量insert到数据库中
	 * DBBLL
	 * @param set
	 */
	public static void insertSet2UnvisitedBatch(SpiderSet set, Map<Integer, Integer> map) {
	    if (set == null || set.size() == 0) {
            return;
        }
	    
	    String sql = "INSERT INTO unvisited_site(name,address,hash_address,date,visited,level) VALUES(?

,?,?,?

,?

,?);"; Connection conn = null; PreparedStatement ps = null; WebInfoModel model = null; try { conn = DriverManager.getConnection(DBModel.getMysqlUrl(), DBModel.getMysqlUesr(), DBModel.getMysqlPassword()); conn.setAutoCommit(false); ps = conn.prepareStatement(sql); final int batchSize = 1000; int count = 0; while (!set.isEmpty()) { model = set.next(); ps.setString(1, model.getName()); ps.setString(2, model.getAddress()); ps.setInt(3, model.getAddress().hashCode()); ps.setLong(4, System.currentTimeMillis()); ps.setInt(5, 0); ps.setInt(6, model.getLevel()); ps.addBatch(); if (++count % batchSize == 0) { ps.executeBatch(); conn.commit(); } } ps.executeBatch(); conn.commit(); } catch (Exception e) { System.out.println("Batch insert error:" + e); } finally { model = null; sql = null; try { ps.close(); conn.close(); } catch (SQLException e) { System.err.println("Close conn/ps error:" + e); } } }


3.分离消费者与生产者相关代码 :

生产者ProduceToWaittingVisitedRunner:

从unvisited_site数据表中获得数据,填充到待訪问的队列中

public class ProduceToWaittingVisitedRunner implements Runnable {

    private SpiderQueue mQueue;
    private List<WebInfoModel> mModelList;
    private boolean mStop = false;
    
    public ProduceToWaittingVisitedRunner(SpiderQueue queue) {
        mQueue = queue;
        initEvent();
    }
    
    private void initEvent() {
        mModelList = new ArrayList<WebInfoModel>();
    }
    
    @Override
    public void run() {
        while (!mStop) {
            mModelList = DBBLL.getUnvisitedInfoModels(mQueue.getMaxSize());
            if (mModelList == null || mModelList.size() == 0) {
                sleep(100);
                continue;
            }
            
            sleep(100);
            
            for (WebInfoModel model : mModelList) {
                mQueue.offer(model);
                DBBLL.updateUnvisited(model);
            }
        }
    }

    private void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
            mStop = true;
        }
    }
}

生产者ParserRunner:

从待訪问队列中消费一个model。调用Python生产链接的列表Queue,将生成的列表Queue offer到结果Set中

public class ParserRunner implements Runnable {

    private SpiderSet mResultSet = null;
    private WebInfoModel mInfoModel = null;
    private int mIndex;
    private final boolean DEBUG = true;
    private Map<Integer, Integer> mResultMap = null;
    
    public ParserRunner(SpiderSet set, WebInfoModel model, int index, Map<Integer, Integer> resultMap) {
        mResultSet = set;
        mInfoModel = model;
        mIndex = index;
        mResultMap = resultMap;
    }
    
    
    @Override
    public void run() {
        SpiderQueue tmpQueue = new SpiderQueue();
        PythonUtils.fillAddressQueueByPython(tmpQueue, mInfoModel.getAddress(), mInfoModel.getLevel()); // 记录訪问某一个网页中解析出的网址
        WebInfoModel model = null;
        while (!tmpQueue.isQueueEmpty()) {
            model = tmpQueue.poll();
            if (model == null || mResultMap.containsKey(model.getAddress().hashCode())) {
                continue;
            }
            
            mResultSet.add(model);
            putMap(model.getAddress().hashCode());
        }
        
        if (DEBUG) {
            System.out.println("[index:" + mIndex + ", size:" + mResultSet.size() + "]: " + mInfoModel.getAddress());
        }
        
        tmpQueue = null;
        model = null;
    }

    private void putMap(int hashUrl) {
        if (mResultMap.containsKey(hashUrl)) {
            mResultMap.put(hashUrl, 1 + mResultMap.get(hashUrl));
        } else {
            mResultMap.put(hashUrl, 1);
        }
    }
    
    @SuppressWarnings("unused")
    private void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

消费者PushModelsToDBRunner:

从辅助Queue中消费产品(利用暂时Queue一次全额消费)。此辅助Queue是从Python中解析出来的Url列表信息

public class PushModelsToDBRunner implements Runnable {

    private SpiderSet mResultSet = null;
    private SpiderSet tmpSet = null;
    private boolean mStop = false;
    private Map<Integer, Integer> mResultMap = null;
    
    public PushModelsToDBRunner(SpiderSet queue, Map<Integer, Integer> resultMap) {
        mResultSet = queue;
        mResultMap = resultMap;
        
        initEvent();
    }
    
    private void initEvent() {
        tmpSet = new SpiderSet();
        
        tmpSet.setMaxSize(1000);
    }
    
    @Override
    public void run() {

        WebInfoModel model = null;
        while (!mStop) {
            if (mResultSet.isEmpty()) {
                sleep(100);
                continue;
            }
            
            tmpSet.setMaxSize(Math.max(mResultSet.size(), 1000)); // TODO
            while(!mResultSet.isEmpty()) {
                model = mResultSet.next();
                if (model == null) {
                    continue;
                }
                
                tmpSet.add(model);
                
                if (tmpSet.isFull()) {
                    break;
                }
            }
            
            DBBLL.insertSet2UnvisitedBatch(tmpSet, mResultMap);
        }
        
        model = null;
    }

    private void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
            mStop = true;
        }
    }
}


代码优化点:

1.引入HashSet

  引入HashSet的目的有两个唯一和无序

2.每次消费HashSet为全额消费

  全额消费的目的在于,避免HashSet中的元素占用过多内存(关于这一点,在上文中能够体现出我的确痛苦过...)。

3.引入mResultMap參数

  此參数的类型是Map<Integer, Integer>,当中的key的类型Integer是表示address的hash值,第二个Integer是该address出现的次数。

  可能你会问我,为什么要引入这个參数。引入此Map的目的是为了在入库之前就进行一步去重(由于是对内存操作,所以效率会比較高)。

只是说实话,引入这个參数,我也是忧虑过。由于这会引入一些内存的开销,只是在我做过一些单元測试之后。发现它引入的内存开销是非常小的。所以就引入了此參数。

  好了。引入此參数是能够解决一些问题。只是。你可能又会问我,没有其它的方式既能去重又能够避免这些小量开销吗?优化此步操作有两点须要非常清楚:一是我的内存中存储Url的Set长度非常有限;二是我须要对Set中的数据进行批量insert。假设这个时候我们要每增一条记录到数据库都要进行check的话,这将是一个耗时的操作。假设引入此Map參数,那么问题就引刃而解了。

  引入此參数后内存的使用情况例如以下图:

  


存在的问题:

  1.假设要算上效率的话。这是一个问题(眼下的情况是15000左右/小时)。

  2.线程池中的解析HTML线程常常出现堵塞的情况。

原文地址:https://www.cnblogs.com/mthoutai/p/7347612.html