python每日一题:网络爬虫百度百科

from bs4 import BeautifulSoup
import  re,csv, urllib.request,urllib.parse

class url_manager(object):
    def __init__(self):
        self.new_urls=[]#书籍上采用set()函数,主要是考虑到次函数的去重功能,但集合是无序的,导致不方便查找new_urls内的数据,且后序add(url)时已进行判定,不必要再使用set()
        self.old_urls=[]

    def add_new_url(self,url):
        if url  not in self.new_urls and url not in self.old_urls:
            self.new_urls.append(url)

    def add_new_urls(self, urls):
        if urls==None:
            return
        for url in urls:
            self.add_new_url(url)

    def have_new_url(self):
        return  len(self.new_urls)!=0

    def get_new_url(self):
        data=self.new_urls.pop(0)#从第一个数据进行删除,逐一爬虫

        self.old_urls.append(data)
        return data

class url_download(object):
    def download(self,url):
        response=urllib.request.urlopen(url)
        data=response.read()
        if data==None:
            print("no web")
            return False
        return data


class url_scrapy(object):
    def get_data(self,source_url,source_data):
        url_list=[]
        soup=BeautifulSoup(source_data,'lxml')
        title=soup.find('dd',class_="lemmaWgt-lemmaTitle-title").h1.string #抓取标题内容
        contents=soup.find( 'div',attrs={'class':'lemma-summary',"label-module":"lemmaSummary"})
        summary=None
        if contents!=None:#none时,没有get_text()函数,会返回错误
            summary = contents.get_text()#抓取简要内容
        scrapy_data = [(source_url, title, summary)]

        if contents!=None:
            urls=contents.find_all('a',href=re.compile(r'/item/.+'))
            for i in urls:
                a=i['href']
                k=urllib.parse.urljoin(source_url,a)
                url_list.append(k)#获取相关词条的网址数据
            return(scrapy_data,url_list)
        return (scrapy_data,None)#没有搜索到url时,返回None

class output_data(object):
    def data_save(self,data):
        with open('pachong.csv',"a+",encoding='utf-8') as f:
            f1 = csv.writer(f, lineterminator='
')
            f1.writerows(data)


class controllers(object):
    def __init__(self):
        self.manager=url_manager()
        self.download=url_download()
        self.scrapy=url_scrapy()
        self.output=output_data()
    def control(self,url):
        self.manager.add_new_url(url)
        num=1
        data1=0
        while(1):
            if num>20:

               break
            elif self.manager.have_new_url():
                    url_down=self.manager.get_new_url()
                    info=self.download.download(url_down)
                    print(num,"is scrapying:",url_down )
                    data1,url1=self.scrapy.get_data(url_down,info)
                    if data1!=None:
                        self.output.data_save(data1)
                        print(num,"is finished:",url_down)
                        num += 1
                    self.manager.add_new_urls(url1)
            else:
                print('has no url')
                break

if __name__=="__main__":
    url=r'https://baike.baidu.com/item/%E7%BD%91%E7%BB%9C%E7%88%AC%E8%99%AB'
    a=controllers()
    a.control(url)
对上一程序进行了优化,采用分布式爬虫,程序如下:
有几个问题需要待优化,留以后进行优化:
2.保存的数据只是简单以txt进行保存,不方便查看,采用mongodb进行存储,并设置密码

4.网址数据中有些需要转接网址,待深度爬取
5.网址集合采用set()后不是按顺序爬取,待验证。
6.爬虫时,每次从头部开始爬虫,比较费时。考虑爬虫时,保存断点,爬虫时从断点开始爬虫,
7.
程序中已优化如下:
1.爬虫时,程序出现假死机现象是因为,网速原因或者电脑卡造成的,仅限于偶尔网速异常时,可以采用except中止当前url的爬取,继续下一url。
2.爬虫采用urlopen函数时,程序务必设置timeout,采用try 。。。except。。。保证出现异常也能继续爬虫操作
3.反爬虫操作可以采用sleep函数,并请求完成后,采用close函数关闭请求来避免。
4.待爬取的网站数据采用临时文件进行保存,减小内存
5.对已爬取的网址进行保存,判断单个url是否已爬取,采用逐个检查文档的形式,可以用于大规模爬虫,但耗时


# !控制主机程序
'''主机发送爬虫网址,从机进行爬取信息,并返回给主机'''
#本次优化主要是:由于发送url与爬取url速度差异较大,造成发送url的队列中存在数据较多,占用内存。
# 新方案是当发送url队列中数量大于200条时,暂不发送url任务;
# 当new-urls数量大于200条时,分批存储在临时文件夹下,减小内存占用
# 当old_urls数量大于200条时,保存到本地文件夹
import pickle, hashlib, sys, codecs, time, sys,tempfile,os,pickle
from multiprocessing import Process, Queue
from multiprocessing.managers import BaseManager


class url_manager(object):
    def __init__(self):
        self.new_urls = self.load_process('newurls.txt')
        self.old_urls = self.load_process('oldurls.txt')

    def add_new_url(self, url):#此处判定url可以只判断是否在old_urls里即可
        if  url not in self.old_urls:
            self.new_urls.add(url)

    def add_new_urls(self, url):

        if url!=None:
            for i in url:

                self.add_new_url(i)


    def has_new_url(self):
        return len(self.new_urls) != 0

    def get_new_url(self):
        a = self.new_urls.pop()
        self.old_urls.add(a)
        return a

 #   def md_url(self, url):
  #      a = hashlib.md5()
   #     a.update(bytes(url, encoding='utf-8'))
    #    return a.hexdigest()

    def save_process(self, path, data):
        ''' print('is saving fidle:',path)'''
        with open(path, 'ab+')as f:
            pickle.dump(data, f)
        f.close()

    def load_process(self, path):
        ''' print('is loading file:%s',path)'''
        print('从文件加载进度:%s' % path)
        try:
            with open(path, 'rb')as f:
                data = pickle.load(f)
                return data
            f.close()
        except:
            print('is not created: ', path)
        return set()


class data_save(object):
    def __init__(self):
        self.date = time.strftime(" %Y-%m-%d-%H-%M-%S", time.localtime())
        self.filepath = 'baike%s.txt' % (self.date)
        self.urlpath = 'url%s.txt' % (self.date)
        self.data = []

    def data_saving(self, path, datas):
        self.data.append(datas)
        if len(self.data) > 5 or datas == 'end':
            with open(path, 'a+', encoding='utf-8') as f:
                for i in self.data:
                    f.write(i)
                    f.write(r'
')
            f.close()
            self.data = []


class controller(object):  # 建立网络队列

    def __init__(self):
        self.url_manag = url_manager()
        self.dataing = data_save()

    def multi_processmanager(self, url_q, result_q):
        BaseManager.register('get_task_queue', callable=url_q)
        BaseManager.register('get_result_queue', callable=result_q)
        manager = BaseManager(address=('127.0.0.1', 8100), authkey='baike'.encode())
        manager.start()
        return manager

    def send_url1(self, url_q, send_url_q, root_url):  # 将接收到的新url队列,保存到url_manager,并发送给控制节点
        self.url_manag.add_new_url(root_url)
        num1 = 0
        while True:
            if not send_url_q.empty():  # 新接收到的urls,全部转入new_urls,进行爬虫
                urls = send_url_q.get()
                if urls == 'end':
                    self.url_manag.save_process(self.dataing.urlpath, self.url_manag.old_urls)  # 保存已爬取的网页
                    break
                self.url_manag.add_new_urls(urls)
            if self.url_manag.has_new_url():
                old_url = self.url_manag.get_new_url()
                url_q.put(old_url)  # 发送到网络队列,传输给爬虫节点
                num1 += 1
                print(num1, 'is running:', old_url)

    def data_manager(self, result_q, send_data_q,
                     send_url_q):  # 将网络上的爬虫节点传输的结果队列的数据和url分发到各控制节点的数据队列(用于保存到本地)和url队列(用于传输给url_manager),
        while True:
            if not result_q.empty():
                data = result_q.get()  # 接收到的爬虫网站数据包括data和url两类
                if data[0] == 'end' or data[1] == 'end':
                    send_data_q.put('end')  # 发送data数据到存储进程
                    send_url_q.put('end')  # 发送url到进程sen_url中,
                    break
                send_data_q.put(data[0])  # 发送data数据到存储进程
                if data[1] != 'Null':
                    send_url_q.put(data[1])  # 发送url到进程sen_url中,

    def data_saves(self, data_q):  # 保存数据的进程
        while True:
            if not data_q.empty():
                data1 = data_q.get()
                if data1 == 'end':
                    break
                self.dataing.data_saving(self.dataing.filepath, data1)

    def send_url(self, url_q, send_url_q, root_url):#保存newurl和 oldurl到本地文件
        self.url_manag.add_new_url(root_url)
        num1,num2,num3=0,0,0
        temp = tempfile.TemporaryFile()#创建临时文件夹,保存newurl
        filename=temp.name
        urls=[]
        while True:
            if self.url_manag.has_new_url():
                old_url = self.url_manag.get_new_url()
                url_q.put(old_url)  # 发送到网络队列,传输给爬虫节点
                num1 += 1
                print(num1, 'is sending:', old_url)
            if not send_url_q.empty():  # 新接收到的urls,全部转入new_urls,进行爬虫
                urls = send_url_q.get()
                if urls == 'end':  # 或者爬虫结束时,进行保存本地
                    self.url_manag.save_process(self.dataing.urlpath, self.url_manag.old_urls)
                    self.url_manag.old_urls = set()
                    break
                elif urls!=[]:
                    if num2 < 10:#刚开始爬虫时,数据直接添加到队列
                        self.url_manag.add_new_urls(urls)
                        num2 += 1
                        continue
                    else:
                        if len(urls)>8:#urls数据较大时,loads会报run out input
                            #self.url_manag.add_new_urls(urls)
                            for i  in urls:
                                data1 = pickle.dumps(i)
                                temp.write(data1)  # newurl全部保存到临时文件夹,从临时文件夹存取url
                                temp.write(b'
')
                        else:
                            data1=pickle.dumps(urls)
                            temp.write(data1)# newurl全部保存到临时文件夹,从临时文件夹存取url
                            temp.write(b'
')

            if url_q.qsize() < 100:  # 当发送任务url队列中数据较少时,添加数据
                temp.seek(0)
                lines = temp.readlines()
                if num3 < len(lines):
                    urldata = lines[num3]
                    num3 += 1
                    url1 = pickle.loads(urldata)
                    if isinstance(url1, list):
                        self.url_manag.add_new_urls(url1)
                    else:
                        url0 = []
                        url0.append(url1)
                        self.url_manag.add_new_urls(url0)
            if len(self.url_manag.old_urls) > 100:  # old_urls中数据较多,进行保存本地
                self.url_manag.save_process(self.dataing.urlpath, self.url_manag.old_urls)
                self.url_manag.old_urls = set()


url_q = Queue()  # 控制节点发给爬虫节点的队列
result_q = Queue()  # 爬虫节点发送的网站数据


def url_q1():
    return url_q


def result_q1():
    return result_q


if __name__ == '__main__':
    sys.setrecursionlimit(1000000)  # 不加时,爬虫容易出现递归错误,
    data_q = Queue()  # 网站数据中关于title,reffer等数据,用于保存数据的队列
    urlmanager_q = Queue()  # 网址数据发送给url_manager的队列
    url = r'https://baike.baidu.com/item/%E5%8C%96%E5%AD%A6/127240'
    url1=r'https://baike.baidu.com/item/%E8%87%AA%E7%84%B6%E7%A7%91%E5%AD%A6/260539'
    a = controller()
    manag = a.multi_processmanager(url_q1, result_q1)
    url_queue = manag.get_task_queue()
    result_queue = manag.get_result_queue()  # 获取网络队列

    p1 = Process(target=a.send_url, args=(url_queue, urlmanager_q, url,))
    p2 = Process(target=a.data_manager, args=(result_queue, data_q, urlmanager_q,))
    p3 = Process(target=a.data_saves, args=(data_q,))
    p1.start()
    p2.start()
    p3.start()
    p1.join()
    p2.join()
    p3.join()

  


  爬虫从机程序:

 
#!从机爬取网站数据,返回给主机进行保存

from bs4 import BeautifulSoup
import time,random,re, csv, urllib.request, urllib.parse, sys,socket
from multiprocessing.managers import BaseManager


class url_download(object):
    def download(self, url):
        data='nodata'#第一次时,设置None,结果在bs时,系统提示None has no len,设置时尽量避免使用关键字None,False 等等。
        # 百度有些服务器会禁止爬虫,当爬取不到网站时,返回nodata
        try:
            response = urllib.request.urlopen(url,timeout=5)#没有超时设置时,程序容易进入假死机状态,
            data = response.read().decode()
            response.close()
        except urllib.error.URLError as e:#设置一些异常情况处理,以使程序继续爬虫
            print(e.reason)
        except socket.timeout:
            print('timeout')
        time.sleep(random.randint(0, 4))#设置间隔,防止百度服务器识别出爬虫,而中断程序
        return data


class url_scrapy(object):
    def get_data(self, source_url, source_data):

        url_list = []
        soup = BeautifulSoup(source_data, 'lxml')

        print(source_url)
        title = None
        title0 = soup.find('dd', class_="lemmaWgt-lemmaTitle-title")  # 抓取标题内容
        if title0 != None:  # none时,没有.h1,会返回错误
            title = title0.h1.string  # 抓取标题内容
        contents = soup.find('div', attrs={'class': 'lemma-summary', "label-module": "lemmaSummary"})
        summary = None
        if contents != None:  # none时,没有get_text()函数,会返回错误
            summary = contents.get_text()  # 抓取简要内容
        scrapy_data = [(source_url, title, summary)]
        if contents != None:
            urls = contents.find_all('a', href=re.compile(r'/item/.+'))
            for i in urls:
                a = i['href']
                k = urllib.parse.urljoin(source_url, a)
                url_list.append(k)  # 获取相关词条的网址数据
            return scrapy_data, url_list
        return scrapy_data, 'Null'  # 没有搜索到url时,返回None


class controner(object):
    def __init__(self):
        BaseManager.register('get_task_queue')
        BaseManager.register('get_result_queue')
        manager = BaseManager(address=('127.0.0.1', 8100), authkey='baike'.encode())
        manager.connect()
        self.task = manager.get_task_queue()
        self.result = manager.get_result_queue()
        self.download = url_download()
        self.scrapy = url_scrapy()
        self.num = 0

    def get_web(self):

        while True:
            if not self.task.empty():
                url = self.task.get()
                data = self.download.download(url)
                data2, url2 = self.scrapy.get_data(url, data)
                print(data2)
                self.num += 1

                if self.num > 2000:
                    self.result.put(['end', 'end'])
                    break
                else:
                    self.result.put([str(data2), url2])
                    print(self.num)
            else:
                print('no task url ')
                time.sleep(2)

if __name__ == '__main__':
    sys.setrecursionlimit(1000000)  # 不加时,爬虫容易出现递归错误,
    con = controner()
    con.get_web()

  

 
原文地址:https://www.cnblogs.com/xuehaiwuya0000/p/10484883.html