Item pipeline

一个Item Pipeline 不需要继承特定基类,只需要实现某些特定方法,面向接口。

class MyPipeline(object):
    def __init__(self):
        """
        可选实现,做参数初始化等
        """

    def process_item(self, item, spider):
        """
        该方法必须实现,每个item pipeline组件都需要调用该方法,
        该方法必须返回一个 Item 对象,被丢弃的item将不会被之后的pipeline组件所处理。
        :param item: 被爬取的item
        :param spider: 爬取该item的spider debug查看类属性
        :return:
        """
        return item

    def open_spider(self, spider):
        """
        可选实现,当spider被开启时,这个方法被调用。
        :param spider: 被开启的spider
        :return:
        """

    def close_spider(self, spider):
        """
        可选实现,当spider被关闭时,这个方法被调用
        :param spider: 被关闭的spider
        :return:
        """

  

采用同步的机制写入数据:

class MysqlPipeline(object):
    def __init__(self):
        pass

    def process_item(self, item, spider):
        if isinstance(item,InstanceItem):
            save(item)
        if spider.name == "spider_name":
            save(item)

  

采用异步的机制写入代码

class MysqlTwistedPipeline(object):
    # 采用异步的机制写入mysql
    def __init__(self, dbpool):
        self.dbpool = dbpool

    @classmethod
    def from_settings(cls, settings):
        """
        from_settings 激活pipeline之后,会自动调用该函数加载settings中的配置
        :param settings:
        :return:
        """
        dbparms = dict(
            host="127.0.0.1",  # settings["MYSQL_HOST"],
            db="spider",  # settings["MYSQL_DBNAME"],
            user="root",  # settings["MYSQL_USER"]
            password="root",  # settings["MYSQL_PASSWORD"],
            charset="utf8",
            use_unicode=True,  # 不然没办法保存中文
            cursorclass=cursors.DictCursor
        )
        db_pool = adbapi.ConnectionPool('pymysql', **dbparms)
        return cls(db_pool)

    def process_item(self, item, spider):
        ##使用twisted将mysql插入变成异步执行
        query = self.dbpool.runInteraction(self.do_insert, item)
        query.addErrback(self.handle_error, item, spider)  # 处理异常

    def handle_error(self, failure, item, spider):
        # 处理异步插入的异常
        print(failure)

    def do_insert(self, cursor, item):
        # 执行具体的插入
        # 根据不同的的item构建不同的sql语句插入到mysql中
        insert_sql, params = item.get_insert_sql()

        cursor.execute(insert_sql, params)
        # 自动commit

数据库连接异常

pymysql.err.InterfaceError: (0, '')

原因:数据库操作对象实例未注销,但持有的数据库连接已失效,导致后续数据库操作无法进行。

解决:在每次插入数据之前检测连接是否可用Connection.ping()。

其实sqlalchemy就有这个处理,原生pymysql则需要自行处理。

下面是代码

from twisted.enterprise import adbapi
from pymysql import cursors


def get_db_pool():
    dbparms = dict(
        host=MYSQL_HOST,
        port=MYSQL_PORT,
        db=MYSQL_DBNAME,
        user=MYSQL_USER,
        password=MYSQL_PASSWORD,
        charset="utf8",
        use_unicode=True,  # 不然没办法保存中文
        cursorclass=cursors.DictCursor
    )
    db_pool = adbapi.ConnectionPool('pymysql', **dbparms)
    return db_pool


class MysqlTwistedPipeline(object):
    # 采用异步的机制写入mysql
    def __init__(self, dbpool):
        self.dbpool = dbpool

    @classmethod
    def from_settings(cls, settings):
        """
        from_settings 激活pipeline之后,会自动调用该函数加载settings中的配置
        :param settings:
        :return:
        """
        db_pool = get_db_pool()
        return cls(db_pool)

    def process_item(self, item, spider):
        ##使用twisted将mysql插入变成异步执行
        query = self.dbpool.runInteraction(self.do_insert, item)
        query.addErrback(self.handle_error, item, spider)  # 处理异常

    def handle_error(self, failure, item, spider):
        # 处理异步插入的异常
        print(failure)

    def do_insert(self, cursor, item):
        # 执行具体的插入
        # 根据不同的的item构建不同的sql语句插入到mysql中
        conn = cursor.connection
        try:
            conn.ping()
        except:
            self.dbpool.close()
            self.dbpool = get_db_pool()

        insert_sql, params = item.get_insert_sql()
        cursor.execute(insert_sql, params)
原文地址:https://www.cnblogs.com/zenan/p/8288485.html