莫烦网-爬虫学习-代码记录

from urllib.request import urlopen,urljoin
import re
from bs4 import BeautifulSoup
import random
import requests
import webbrowser
import os
from urllib.request import urlretrieve
import multiprocessing as mp
import time
import asyncio
import aiohttp
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
import scrapy
def url():
    base_url = "https://baike.baidu.com"
    his = ["/item/%E7%BD%91%E7%BB%9C%E7%88%AC%E8%99%AB/5162711"]
    url=base_url+his[-1]
    html=urlopen(url).read().decode('utf-8')
    #html=urlopen("https://morvanzhou.github.io/static/scraping/basic-structure.html").read().decode("utf-8")
    #html = urlopen("https://morvanzhou.github.io/static/scraping/list.html").read().decode('utf-8')
    #html = urlopen("https://morvanzhou.github.io/static/scraping/table.html").read().decode('utf-8')
    return html

def findobject():
    html=url()
    res=re.findall(r"<title>(.+?)</title>",html)
    rese=re.findall(r"<p>(.*?)</p>",html,flags=re.DOTALL)
    reses=re.findall(r'href="(.*?)"', html)
    print("
Page title is: ",res[0])
    print("
Page paragraph is: ",rese[0])
    print("
All links: ",reses)

def usesoup():
    html=url()
    soup=BeautifulSoup(html,features='lxml')
    print(soup.h1)
    print('
',soup.p)
    all_href=soup.find_all('a')
    all_href=[l['href'] for l in all_href]
    print('
',all_href) 
    month=soup.find_all('li',{"class":"month"})
    for m in month:
        print(m.get_text())
    jan=soup.find('ul',{"class":"jan"})
    d_jan=jan.find_all('li')
    for d in d_jan:
        print(d.get_text())

def Rexsoup():
    html=url()
    soup=BeautifulSoup(html,features='lxml')
    img_links=soup.find_all("img",{"src":re.compile('.*?.jpg')})
    for link in img_links:
        print(link['src'])
    course_links=soup.find_all('a',{"href":re.compile('https://morvan.*')})
    for link in course_links:
        print(link['href'])

def baike():
    base_url = "https://baike.baidu.com"
    his = ["/item/%E7%BD%91%E7%BB%9C%E7%88%AC%E8%99%AB/5162711"]
    for i in range(20):
        url=base_url+his[-1]
        html=urlopen(url).read().decode('utf-8')
        soup=BeautifulSoup(html,features='lxml')
        print(i,soup.find('h1').get_text(),' url:',his[-1])

        sub_urls=soup.find_all("a",{"target":"_blank","href":re.compile("/item/(%.{2})+$")})
        if len(sub_urls)!=0:
            his.append(random.sample(sub_urls,1)[0]['href'])
        else:
            his.pop()
    #print(his)
def getbaidus():
    param = {"wd": "莫烦Python"}
    r=requests.get("http://www.baidu.com/s",params=param)
    print(r.url)
    webbrowser.open(r.url)

def postbaidu():#problem
    data = {'firstname': '莫烦', 'lastname': ''}
    r = requests.post('http://pythonscraping.com/files/processing.php', data=data)
    print(r.text)

def postfiile():#problem
    file = {'uploadFile': open('C:/Users/LX/Pictures/TLP.jpg', 'rb')}
    r = requests.post('http://pythonscraping.com/files/processing2.php', files=file)
    print(r.text)

def cookiepage():#problem
    payload={'username':'dsfdsfs','password':'password'}
    r = requests.post('http://pythonscraping.com/pages/cookies/welcome.php',data=payload)
    print(r.cookies.get_dict())
    a = requests.get('http://pythonscraping.com/pages/cookies/profile.php', cookies=r.cookies)
    print(a.text)

def sessioncookies():
    session=requests.Session()
    payload={'username':'dsfdsfs','password':'password'}
    r=session.post('http://pythonscraping.com/pages/cookies/welcome.php',data=payload)
    print(r.cookies.get_dict())

    r=session.get("http://pythonscraping.com/pages/cookies/profile.php")
    print(r.text)

def uploadfile():
    os.makedirs('d:yanglele',exist_ok=True)
    IMAGE_URL = "https://morvanzhou.github.io/static/img/description/learning_step_flowchart.png"
    urlretrieve(IMAGE_URL,'d:yangleleimage1.png')#下载功能

def requestfile():
    IMAGE_URL = "https://morvanzhou.github.io/static/img/description/learning_step_flowchart.png"
    r=requests.get(IMAGE_URL)#下载功能
    with open('d:yangleleimage2.png','wb') as f:
        f.write(r.content)

def requestf():
    IMAGE_URL = "https://morvanzhou.github.io/static/img/description/learning_step_flowchart.png"
    r=requests.get(IMAGE_URL,stream=True)
    with open('d:yangleleimage3.png','wb') as f:
        for chunk in r.iter_content(chunk_size=32):#下载功能
            f.write(chunk)

def downloadimg():
    URL = "http://www.nationalgeographic.com.cn/animals/"
    html=requests.get(URL).text
    soup=BeautifulSoup(html,'lxml')
    img_url=soup.find_all('ul',{'class':'img_list'})
    for ul in img_url:
        imgs=ul.find_all('img')
        for img in imgs:
            url=img['src']
            r=requests.get(url,stream=True)
            image_name=url.split('/')[-1]
            with open('d:yanglele\%s' % image_name,'wb') as f:
                for chunk in r.iter_content(chunk_size=128):
                    f.write(chunk)
            print('Saved %s' % image_name)

base_url ='https://morvanzhou.github.io/'
if base_url !='https://morvanzhou.github.io/':
    restricted_crawl = True
else:
    restricted_crawl = False
def crawl(url):
    response=urlopen(url)
    #time.sleep(0.1)
    return response.read().decode()

def parse(html):
    soup = BeautifulSoup(html,'lxml')
    urls = soup.find_all('a',{'href':re.compile('^/.+?/$')})
    title = soup.find('h1').get_text().strip()
    page_urls=set([urljoin(base_url,url['href']) for url in urls])#去重
    url = soup.find('meta',{'property':'og:url'})['content']
    return title,page_urls,url

def singleuse():    
    unseen=set([base_url,])
    seen=set()
    if base_url !='https://morvanzhou.github.io/':
        restricted_crawl = True
    else:
        restricted_crawl = False
    count,t1=1,time.time()
    while len(unseen) != 0:
        if restricted_crawl and len(seen) >= 20:
            break
        print('
Distributed Crawling...')
        htmls=[crawl(url) for url in unseen]
        print('
Distributed Parsing...')
        results=[parse(html) for html in htmls]
        print('
Analysing...')
        seen.update(unseen)
        unseen.clear()
        for title,page_urls,url in results:
            print(count,title,url)
            count+=1
            unseen.update(page_urls - seen)
    print('Total time: %.1f s' % (time.time()-t1,))

def multiuse():#需要if __name__=='__main__':才能正常运行
    unseen=set([base_url,])
    seen=set()
    pool=mp.Pool(4)
    count,t1=1,time.time()
    while len(unseen)!=0:
        if restricted_crawl and len(seen)>20:
            break
        print('
Distributed Crawling...')
        crawl_jobs=[pool.apply_async(crawl,args=(url,)) for url in unseen]
        htmls=[j.get() for j in crawl_jobs]
        print('
Distributed Parsing...')
        parse_jobs=[pool.apply_async(parse,args=(html,)) for html in htmls]
        results=[j.get() for j in parse_jobs]
        print('
Analysing...')
        seen.update(unseen)
        unseen.clear()
        for title,page_urls,url in results:
            print(count,title,url)
            count+=1
            unseen.update(page_urls - seen)
    print('Total time: %.1f s' % (time.time()-t1,))

def job(x):
    return x*x

def pooltest():
    pool = mp.Pool()
    res=pool.map(job,range(10))
    print(res)
    res=pool.apply_async(job,(2,))
    nulti_res=[pool.apply_async(job,(i,)) for i in range(10)]
    print(res.get())
    print([mures.get() for mures in multi_res])

def job1(t):
    print('Start job',t)
    time.sleep(t)
    print('Job',t,'takes',t,' s')

def main():
    [job1(t) for t in range(1,3)]

async def job2(t):  # async 形式的功能
    print('Start job',t)
    await asyncio.sleep(t)  # 等待 "t" 秒, 期间切换其他任务
    print('Job',t,'takes',t,' s')

async def main1(loop):
    tasks = [
        loop.create_task(job2(t)) for t in range(1,3)    # 创建任务, 但是不执行
    ]
    await asyncio.wait(tasks)   # 执行并等待所有任务完成

def normal():
    for i in range(2):
        r=requests.get(base_url)
        url=r.url
        print(url)

async def job3(session):
    response = await session.get(base_url)   # 等待并切换
    return str(response.url)

async def main2(loop):
    async with aiohttp.ClientSession() as session:
        tasks = [loop.create_task(job3(session)) for _ in range(2)]
        finished,unfinished = await asyncio.wait(tasks)
        all_results = [r.result() for r in finished]
        print(all_results)

def asyncdo():
    t1=time.time()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main2(loop))
    loop.close()
    print("Async total time:",time.time()-t1)

def seleniumweb():
    #chrome_options=Options()#不弹出浏览器窗口,但是还是弹出窗口
    #chrome_options.add_argument("--headless")  
    #driver = webdriver.Chrome(chrome_options=chrome_options)
    driver = webdriver.Chrome(executable_path="C:Program Files (x86)GoogleChromeApplicationchromedriver")
    driver.get("https://morvanzhou.github.io/")
    driver.find_element_by_xpath(u"//img[@alt='强化学习 (Reinforcement Learning)']").click()
    driver.find_element_by_link_text("About").click()
    driver.find_element_by_link_text(u"赞助").click()
    driver.find_element_by_link_text(u"教程 ▾").click()
    driver.find_element_by_link_text(u"数据处理 ▾").click()
    driver.find_element_by_link_text(u"网页爬虫").click()

    html = driver.page_source
    driver.get_screenshot_as_file("D:yanglelejietu2.png")
    driver.close()


if __name__=='__main__':
    seleniumweb()

上面有些代码执行不成功,姑且全记下

import scrapy

class QuotesSpider(scrapy.Spider):
    name = "quotes"
    start_urls = [
        'http://quotes.toscrape.com/tag/humor/',
    ]

    def parse(self, response):
        for quote in response.css('div.quote'):
            yield {
                'text': quote.css('span.text::text').extract_first(),
                'author': quote.xpath('span/small/text()').extract_first(),
            }

        next_page = response.css('li.next a::attr("href")').extract_first()
        if next_page is not None:
            yield response.follow(next_page, self.parse)

https://docs.scrapy.org/en/latest/intro/overview.html

原文地址:https://www.cnblogs.com/lely/p/9990589.html