Python Scrapy Item Pipeline总结

帅图!

在这里插入图片描述

@

前戏

Item Pipeline是项目管道,在item.py中定义的具体项目被爬虫提取出来之后,传给项目管道,进行最后的写入文件,保存至数据库等操作,需要在pipelines.py内编写具体的代码,然后在settings.py内开启指定的Item Pineline

一个Pipeline类中的核心方法是process_item(self, item, spider),执行将项目写入的操作,返回item交给下一个级别的Pipeline继续处理,或者抛出DropItem异常,丢弃Item

其他的方法还有:

  • open_spider(self, spider)
    • 当开启爬虫时要进行的操作,比如打开一个文件,或者连接数据库
  • close_spider(self, spider)
    • 当关闭爬虫时要进行的操作,比如关闭一个文件,或者关闭数据库连接
  • from_crawler(cls, crawler)
    • 用@classmethod标识的类方法,通过crawler参数可以拿到在Scrapy的所有组件,包括在settings内定义的全局变量

下面总结一下Scrapy中最常用的几个Pipeline

CsvWriterPipeline

保存为CSV文件

import csv


class CsvWriterPipeline:
    
    def __init__(self):
        self.file = open('storage/national_bus_stations.csv', 'w', encoding='utf-8')
        self.my_writer = csv.writer(self.file)

    def open_spider(self, spider):
        pass

    def process_item(self, item, spider):
        self.my_writer.writerow(list(dict(item).values()))
        return item

    def close_spider(self, spider):
        self.file.close()

CsvItemPipeline

利用Scrapy的CsvItemExporter,保存为csv文件

from scrapy.exporters CsvItemExporter


class CsvItemPipeline:

    def __init__(self):
        self.file = open('data.csv', 'w')
        self.exporter = CsvItemExporter(self.file)

    def open_spider(self, spider):
        pass

    def process_item(self, item, spider):
        self.exporter.export_item(item)
        return item

    def close_spider(self, spider):
        self.file.close()

JsonWriterPipeline

保存为jsonline文件,每一行都是json格式

import json

from itemadapter import ItemAdapter


class JsonWriterPipeline:

    def __init__(self):
        self.file = open('items.jl', 'w', encoding='utf-8')

    def open_spider(self, spider):
        pass

    def close_spider(self, spider):
        self.file.close()

    def process_item(self, item, spider):
        line = json.dumps(ItemAdapter(item).asdict()) + "
"
        self.file.write(line)
        return item

JsonItemLinePipeline

利用Scrapy的JsonLinesItemExporter,将提取出的item一条一条地写入json文件

from scrapy.exporters import JsonLinesItemExporter


class JsonItemLinePipeline:

    def __init__(self):
        self.file = open('storage/national_bus_stations.jl', 'wb')
        self.exporter = JsonLinesItemExporter(self.file, encoding='utf-8')

    def open_spider(self, spider):
        pass

    def process_item(self, item, spider):
        self.exporter.export_item(item)
        return item

    def close_spider(self, spider):
        self.file.close()

JsonItemPipeline

利用Scrapy的JsonItemExporter,将提取出的item一次性全部写入json文件

from scrapy.exporters import JsonItemExporter


class JsonItemPipeline:

    def __init__(self):
        self.file = open('data.json', 'w')
        self.exporter = JsonItemExporter(self.file, encoding='utf-8')

    def open_spider(self, spider):
        self.exporter.start_exporting()

    def process_item(self, item, spider):
        self.exporter.export_item(item)
        return item

    def close_spider(self, spider):
        self.exporter.finish_exporting()
        self.file.close()

MongoPipeline

写入MongoDB

import pymongo
from itemadapter import ItemAdapter


class MongoPipeline:

    def __init__(self, mongo_uri, mongo_db):
        self.mongo_uri = mongo_uri
        self.mongo_db = mongo_db
        self.client = None
        self.db = None

    @classmethod
    def from_crawler(cls, crawler):
        return cls(
            mongo_uri=crawler.settings.get('MONGO_URI'),
            mongo_db=crawler.settings.get('MONGO_DATABASE', 'items')
        )

    def open_spider(self, spider):
        self.client = pymongo.MongoClient(self.mongo_uri)
        self.db = self.client[self.mongo_db]

    def process_item(self, item, spider):
        self.db[item.collection].insert_one(ItemAdapter(item).asdict())
        return item

    def close_spider(self, spider):
        self.client.close()

其中collection可以在item.py内定义,如

from scrapy import Item, Field


class BusStationItem(Item):
    
    table = collection = 'national_bus_stations'
    province_name = Field()
    city_name = Field()
    area_name = Field()
    line_name = Field()
    line = Field()
    station = Field()

MysqlPipeline

写入Mysql

import pymysql


class MysqlPipeline:

    def __init__(self, host, user, port, database, password):
        self.host = host
        self.user = user
        self.port = port
        self.database = database
        self.password = password
        self.db = None
        self.cursor = None

    @classmethod
    def from_crawler(cls, crawler):
        return cls(
            host=crawler.settings.get('MYSQL_HOST'),
            user=crawler.settings.get('MYSQL_USER'),
            port=crawler.settings.get('MYSQL_PORT'),
            database=crawler.settings.get('MYSQL_DATABASE'),
            password=crawler.settings.get('MYSQL_PASSWORD')
        )

    def open_spider(self, spider):
        self.db = pymysql.connect(self.host, self.user, self.password, self.database, self.port, charset='utf8')
        self.cursor = self.db.cursor()

    def process_item(self, item, spider):
        data = dict(item)
        keys = ','.join(data.keys())
        values = ','.join(['%s'] * len(data))
        sql = f'INSERT INTO {item.table} ({keys}) VALUES ({values})'
        self.cursor.execute(sql, tuple(data.values()))
        self.db.commit()
        return item

    def close_spider(self, spider):
        self.cursor.close()
        self.db.close()

表table也和mongo的collection一样,在item.py内指定

AsyncMysqlPipeline

异步写入Mysql

import logging

import pymysql
from twisted.enterprise import adbapi


class AsyncMysqlPipeline:

    def __init__(self, db_params):
        self.db_params = db_params
        self.db_pool = None

    @classmethod
    def from_crawler(cls, crawler):
        return cls(
            db_params=dict(
                host=crawler.settings.get('MYSQL_HOST'),
                user=crawler.settings.get('MYSQL_USER'),
                port=crawler.settings.get('MYSQL_PORT'),
                database=crawler.settings.get('MYSQL_DATABASE'),
                password=crawler.settings.get('MYSQL_PASSWORD'),
                charset='utf8',
                cursorclass=pymysql.cursors.DictCursor,
                use_unicode=True
            )
        )

    def open_spider(self, spider):
        self.db_pool = adbapi.ConnectionPool('pymysql', **self.db_params)
        self.db_pool.runInteraction(self.do_truncate)

    def process_item(self, item, spider):
        """
        使用Twisted 异步插入Mysql
        """
        query = self.db_pool.runInteraction(self.do_insert, item)
        query.addErrback(self.handle_error)
        return item

    @staticmethod
    def handle_error(failure):
        """
        记录异常
        """
        logging.debug(failure)

    @staticmethod
    def do_insert(cursor, item):
        """
        执行具体的插入操作, 和同步MysqlPipeline一样
        """
        data = dict(item)
        keys = ','.join(data.keys())
        values = ','.join(['%s'] * len(data))
        sql = f'INSERT INTO {item.table} ({keys}) VALUES ({values})'
        cursor.execute(sql, tuple(data.values()))

    def close_spider(self, spider):
        self.db_pool.close()

Mysql的相关变量在settings.py内统一配置

# Mysql settings
MYSQL_HOST = 'localhost'
MYSQL_USER = 'root'
MYSQL_PORT = 3306
MYSQL_DATABASE = 'station'
MYSQL_PASSWORD = 'Password123$'

为了对接大数据集群,还有另外自创的两个Pipeline,分别是可以将数据写入远程服务器的RemoteJsonPipeline,和对接远程Flume的RemoteFlumePipeline

RemoteJsonPipeline

利用paramiko库与远程服务器建立ssh连接,使用sftp协议将数据写入远程json文件

import paramiko
from scrapy.exporters import JsonLinesItemExporter


class RemoteJsonPipeline:

    def __init__(self, host, port, user, password, file_path):
        self.host = host
        self.port = port
        self.user = user
        self.password = password
        self.file_path = file_path
        self.client = paramiko.SSHClient()
        self.sftp = None
        self.file = None
        self.exporter = None

    @classmethod
    def from_crawler(cls, crawler):
        return cls(
            host=crawler.settings.get('SSH_HOST'),
            port=crawler.settings.get('SSH_PORT'),
            user=crawler.settings.get('SSH_USER'),
            password=crawler.settings.get('SSH_PASSWORD'),
            file_path=crawler.settings.get('FILE_PATH')
        )

    def open_spider(self, spider):
        self.client.load_system_host_keys()
        self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        self.client.connect(self.host, self.port, self.user, self.password)
        self.sftp = self.client.open_sftp()
        self.file = self.sftp.open(self.file_path + 'data.json', 'wb')
        self.exporter = JsonLinesItemExporter(self.file, encoding='utf-8')

    def process_item(self, item, spider):
        self.exporter.export_item(item)
        return item

    def close_spider(self, spider):
        self.file.close()
        self.client.close()

其中的SSH变量在settings.py内统一配置:

# SSH settings
SSH_HOST = '172.16.1.2'
SSH_PORT = 22
SSH_USER = 'root'
SSH_PASSWORD = 'passwd'
FILE_PATH = '/opt/'

RemoteFlumePipeline

使用telnetlib库,通过telnet协议与远程服务器建立连接,往Flume监听的端口发送数据,Flume接受到数据后写入HDFS

from telnetlib import Telnet


class RemoteFlumePipeline:
    def __init__(self, host, port):
        self.host = host
        self.port = port
        self.tn = None

    @classmethod
    def from_crawler(cls, crawler):
        return cls(
            host=crawler.settings.get('FLUME_HOST'),
            port=crawler.settings.get('FLUME_PORT')
        )

    def open_spider(self, spider):
        self.tn = Telnet(self.host, self.port)

    def process_item(self, item, spider):
        text = str(item).replace('
', '').encode('utf-8') + b'
'
        self.tn.write(text)
        return item

    def close_spider(self, spider):
        self.tn.close()

Flume的相关变量在settings.py内统一配置

# Flume settings
FLUME_HOST = '172.16.1.2'
FLUME_PORT = 10000
原文地址:https://www.cnblogs.com/pineapple-py/p/14252463.html