xpath+requests+peewee——CSDN论坛全方位爬虫

这周接到任务,要爬取CSDN论坛的各种信息,解析URL的时间花了很久,还改了好多乱七八糟的BUG。csdn的页面也有问题,比如可以转到帖子的下一页,但是页面为空;以及用户页面404等等。林林总总改了许多,代码如下。

有些调试的时候注释掉的代码,大家看看就行,懒得再改了哈哈哈  

首先是用peewee建立表

"""
***************************
@Project :csdn

@Author :majun
@Date : 2020/11/16 9:47

*******************************
"""
from peewee import *

db = MySQLDatabase("training", host="localhost", port=3306, user="username", password="password")

class BaseModel(Model):
    class Meta:
        database = db

class topic(BaseModel):
    topic_id = CharField(primary_key=True,max_length=50)#文章ID
    title = TextField(default="")#文章标题
    topic_status = CharField(max_length=50)#文章状态,已解决/未解决
    score = IntegerField()#文章的赏分
    author_id = CharField(max_length=100)#作者ID
    create_time = DateTimeField()#文章创建时间
    answer_nums = IntegerField()#文章评论数
    click_nums = IntegerField()#文章点击率
    last_action_time = DateTimeField()#文章最后一次活动时间
    content = TextField()#文章内容
    jtl = FloatField(default=0.0)#文章结贴率
    like_nums = IntegerField(default=0)#文章点赞数
    add_time = DateTimeField(index=True,null=True)#数据插入时间
    update_time = DateTimeField(index=True,null=True)#数据更新时间

class answer(BaseModel):
    answer_id = CharField(primary_key=True,max_length=50)#评论ID
    topic_id = CharField(max_length=50)#评论的文章ID
    user_id = CharField(max_length=50)#评论的用户ID
    create_time = DateTimeField()#评论创建时间
    content = TextField()#评论内容
    like_nums = IntegerField(default=0)#评论点赞数
    add_time = DateTimeField(index=True , null=True)#数据插入时间
    update_time = DateTimeField(index=True , null=True)#数据更新时间
class user(BaseModel):
    user_id = CharField(max_length=50,primary_key=True)#用户id
    name = CharField (max_length=200)#用户名
    title = TextField()#blog的title
    desc = TextField()#描述
    code_year = IntegerField(default=0)#码龄
    original_num = IntegerField(default=0)#原创数
    week_rank = IntegerField(default=0)#周排名
    total_rank = IntegerField(default=0)#总排名
    views = IntegerField(default=0)#访问量
    level = IntegerField(default=0)#等级
    points = IntegerField(default=0)#积分
    fans_nums = IntegerField(default=0)#粉丝数
    like_nums = IntegerField(default=0)#点赞数
    review_nums = IntegerField(default=0)#评论数
    favorites = IntegerField(default=0)#收藏数
    add_time = DateTimeField(index=True , null=True)#数据插入时间
    update_time = DateTimeField(index=True , null=True)#数据更新时间
if __name__=="__main__":
    db.create_tables([topic, answer, user]

爬虫本体

因为csdn有很多bug页面,我这边只处理了收集到的问题,如果还有其他会导致爬虫获取失败的页面,欢迎评论一起讨论。

 

"""
***************************
@Project :csdn

@Author :majun
@Date : 2020/11/11 11:14

*******************************
"""
import re

import requests
import time
from model import *
from urllib import parse
from scrapy import Selector
from datetime import datetime
import pymysql

domain = "https://bbs.csdn.net"

url_list = []

# 筛选出左边链接js中的一级url
level1_url = []
headers = [
'USER_AGENT' 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36'
]

# 获取左边菜单js
def get_nodes_json():
left_menu_text = requests.get("https://bbs.csdn.net/dynamic_js/left_menu.js?csdn").text
nodes_str_match = re.search("forumNodes:(.*])", left_menu_text)
if nodes_str_match:
nodes_str = nodes_str_match.group(1).replace("null", "None")
# nodes_list = ast.literal_eval(nodes_str)
nodes_list = eval(nodes_str)
return nodes_list
return []


# 筛选出左边链接js中的url 一级,二级,三级
def process_nodes_list(nodes_list):
for item in nodes_list:
if "url" in item:
if item["url"]:
url_list.append(item["url"])
if "children" in item:
process_nodes_list(item["children"])
if 'children' in item:
process_nodes_list(item["children"])


def get_level1_list(nodes_list):
for item in nodes_list:
if "url" in item and item["url"]:
level1_url.append(item["url"])


# 获取最终需要抓取的url
def get_last_urls():
nodes_list = get_nodes_json()
# print(nodes_list)
process_nodes_list(nodes_list)
# print(process_nodes_list)
get_level1_list(nodes_list)
# print(get_level1_list)
last_urls = []
for url in url_list:
if url not in level1_url:
last_urls.append(url)
all_urls = []
for url in last_urls:
all_urls.append(parse.urljoin(domain, url))
all_urls.append(parse.urljoin(domain, url + "/recommend"))
all_urls.append(parse.urljoin(domain, url + "/closed"))
return all_urls


def parse_topic(url):
# 获取帖子的详情及回复
# url = "https://bbs.csdn.net/topicss/398243351"
if re.search("page", url):
topic_id = url.split("/")[-1].split("?")[0]
else:
topic_id = url.split("/")[-1]
response_text = requests.get(url).text
time.sleep(0.1)
sel = Selector(text=response_text)
print(url)
try:
flag_alldiv = sel.xpath("//div[starts-with(@id,'post-')]")
topics_item = flag_alldiv[0]
flag=1
except:
flag = 0
if flag == 1:
all_divs = sel.xpath("//div[starts-with(@id,'post-')]")
topics_item = all_divs[0]
content = topics_item.xpath(".//div[@class='post_body post_body_min_h']/div").extract()[0]
# print(content)
like_nums = topics_item.xpath(".//label[@class='red_praise digg d_hide']//em/text()").extract()[0]
like_nums = like_nums.split(' ')[1]
try:
jtl_flag = topics_item.xpath(".//div[@class='close_topic']/text()").extract()[0]
flag = 1
except:
flag = 0
if flag == 1:
jtl = topics_item.xpath(".//div[@class='close_topic']/text()").extract()[0]
jtl=re.findall(r'd+.?d*',jtl)
if jtl:
jtl = jtl[0]
# print(jtl)
else:
jtl = 0
else:
pass
# print(jtl)
# except:
# jtl = 0

# existed_topics = topic.select().where(topic.topic_id == topic_id)
for info in topic.select(topic.add_time).where(topic.topic_id == topic_id):
info.topic_id = topic_id
info.content = content
info.like_nums = like_nums
if flag ==1:
info.jtl = jtl
else:
pass
info.update_time = datetime.now()
info.save()

for answers_item in all_divs[1:]:
answers = answer()
answers.topic_id = topic_id
# author_info = answers_item.xpath(".//div[@class='nick_name']//a[1]/@href").extract()[0]
# # topic_id = author_info.split("/")[-1]
# # answers.topic_id = topic_id
# print(topic_id)
answer_id = sel.xpath(".//div[starts-with(@id,'post-')]").extract()[0]
answer_id = int(answer_id.split('-')[1].split('"')[0])
answers.answer_id = answer_id
# print(answer_id)
create_time = answers_item.xpath(".//label[@class='date_time']/text()").extract()[0]
create_time = datetime.strptime(create_time, "%Y-%m-%d %H:%M:%S")
# add_time = datetime.now()
# update_time = datetime.now()
# # print(update_time)
# # print(add_time)
# answers.update_time = update_time
answers.create_time = create_time
like_nums = answers_item.xpath(".//label[@class='red_praise digg d_hide']//em/text()").extract()[0]
# like_nums = re.findall(r'd+.?d',like_nums)
like_nums=like_nums.split(' ')[1]
if like_nums:
like_nums = like_nums
# print(like_nums)
else:
like_nums = 0
# like_nums = like_nums.split(' ')[1]


# if like_nums:
# like_nums = like_nums
# else:
# like_nums = 0
# print(like_nums)
content = answers_item.xpath(".//div[@class='post_body post_body_min_h']").extract()[0]
answers.content = content
# print(content)
user_id = answers_item.xpath(".//div[@class='nick_name']//a/@href").extract()[0]
user_id = user_id.split('/')[3]
# print(user_id)
answers.user_id = user_id
answers.like_nums = like_nums
answers.update_time = datetime.now()
# print(answers.add_time)
answers.add_time = datetime.now()
# print(answers.add_time)

is_exist = False
for info in answers.select(answer.add_time).where(answer.answer_id == answers.answer_id):
is_exist = True
answers.add_time = info.add_time
answers.save()
if not is_exist:
answers.save(force_insert=True)
#
# info.answer_id = answer_id
# info.topic_id = topic_id
# info.user_id = user_id
# info.create_id = create_time
# info.content = content
# info.like_nums = like_nums
# info.add_time =
else:
pass

next_page_str = sel.xpath("//a[@class='pageliststy next_page']/text()").extract()
#print(next_page_str)
next_page = sel.xpath("//a[@class='pageliststy next_page']/@href").extract()
#print(next_page)
if next_page:
if next_page_str[0] == " ":
next_url = parse.urljoin(domain, next_page[0])
parse_topic(next_url)
# elif next_page_str[1] is None:



def parse_author(url):
# 获取用户的详情
# url = "https://blog.csdn.net/stpeace"
author_id = url.split("/")[-1]
print(url)
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36',
}
response_text = requests.get(url, headers=headers).text
sel = Selector(text=response_text)
author = user()
author.user_id = author_id
flag_404 = sel.xpath('//*[@id="main"]/div[1]/div[1]/div[1]/img[1]')
if flag_404:
flag=1
else:
flag=0
if flag == 0:
original_nums = sel.xpath("//*[@id='asideProfile']/div[2]/dl[1]/@title").extract()[0]
if original_nums:
original_nums = original_nums
else:
original_nums = 0
# print(original_nums)
# resources_nums = sel.xpath(
# "//ul[@class='me_chanel_list clearfix']/li[2]/a/label/span[@class='count']/text()").extract()
# if resources_nums:
# resources_nums = resources_nums[0]
# else:
# resources_nums = 0
# forum_nums = sel.xpath("//ul[@class='me_chanel_list clearfix']/li[3]/a/label/span[@class='count']/text()").extract()
# if forum_nums:
# forum_nums = forum_nums[0]
# else:
# forum_nums = 0
# blink_nums = sel.xpath("//ul[@class='me_chanel_list clearfix']/li[4]/a/label/span[@class='count']/text()").extract()
# if blink_nums:
# blink_nums = blink_nums[0]
# else:
# blink_nums = 0
# ask_nums = sel.xpath("//ul[@class='me_chanel_list clearfix']/li[5]/a/label/span[@class='count']/text()").extract()
# if ask_nums:
# ask_nums = ask_nums[0]
# else:
# ask_nums = 0
favorites = sel.xpath("//*[@id='asideProfile']/div[4]/dl[5]/@title").extract()[0]
if favorites:
favorites = favorites
else:
favorites = 0
# print(favorites)
# special_column_nums = sel.xpath(
# "//ul[@class='me_chanel_list clearfix']/li[7]/a/label/span[@class='count']/text()").extract()
# if special_column_nums:
# special_column_nums = special_column_nums[0]
# else:
# special_column_nums = 0
desc = sel.xpath("/html/body/header/div/div/div[1]/p/text()").extract()
if desc:
desc = desc[0].strip()
# # print(desc)
# name = sel.xpath("//*[@id='uid']/span").extract()[2]
# name = name.strip()
try:
title = sel.xpath("/html/body/header/div/div/div[1]/h1/a/text()").extract()[0]
except:
title = author.user_id+'的博客'
if title:
title = title
else:
title = 0

# print(title)
# # print(title)
# following_nums = sel.xpath("//*[@id="asideProfile"]/div[4]/dl[3]/dt/span").extract()[0]
# if following_nums:
# following_nums = following_nums[0]
# else:
# following_nums = 0

fans_nums = sel.xpath("//*[@id='fan']/text()").extract()[0]
# # print(fans_nums)
if fans_nums:
fans_nums = fans_nums[0]
else:
fans_nums = 0
like_nums = sel.xpath("//*[@id='asideProfile']/div[4]/dl[3]/dt/span/text()").extract()[0]
if like_nums:
like_nums = like_nums
else:
like_nums = 0
# print(like_nums)
review_nums = sel.xpath("//*[@id='asideProfile']/div[4]/dl[4]/dt/span/text()").extract()[0]
if review_nums:
review_nums = review_nums
else:
review_nums = 0
# print(review_nums)
try:
code_year = sel.xpath("//*[@id='asideProfile']/div[1]/div[2]/div[2]/span[1]/text()").extract()[0]
print(code_year)
flag = 1
except:
flag = 0
if flag == 1:
# code_year = sel.xpath("//*[@id='asideProfile']/div[1]/div[2]/div[2]/span[1]/text()").extract()[0]
if code_year:
code_year = re.findall(r'd?d',code_year)
print(code_year)
code_year = code_year[0]
else:
code_year = 0

else:
code_year = 1
week_rank = sel.xpath("//*[@id='asideProfile']/div[2]/dl[2]/@title").extract()[0]
if week_rank:
week_rank = week_rank
if week_rank == '暂无排名':
week_rank = 0
else:
week_rank = 0
# print(week_rank)
total_rank = sel.xpath("//*[@id='asideProfile']/div[2]/dl[3]/@title").extract()[0]
if total_rank:
total_rank = total_rank
if total_rank == '暂无排名':
total_rank = 0
else:
total_rank = 0
# print(total_rank)
level = sel.xpath('//*[@id="asideProfile"]/div[2]/dl[5]/@title').extract()[0]
if level:
level = re.findall(r"d+.?d*",level)
level = level[0]
else:
level = 1
# print(level)
views = sel.xpath('//*[@id="asideProfile"]/div[2]/dl[4]/@title').extract()[0]
if views:
views = views
else:
views = 0

points =sel.xpath('//*[@id="asideProfile"]/div[4]/dl[1]/@title').extract()[0]
if points:
points = points
else:
points = 0
# add_time = datetime.now()
# # print(add_time)
# update_time = datetime.now()
# # print(update_time)
# author.name = name
author.title = title
author.original_num = int(original_nums)
# author.resources_nums = int(resources_nums)
# author.forum_nums = int(forum_nums)
# author.blink_nums = int(blink_nums)
# author.ask_nums = int(ask_nums)
author.week_rank = int(week_rank)
author.code_year = int(code_year)
author.favorites = int(favorites)
# author.special_column_nums = int(special_column_nums)
author.desc = desc
# author.following_nums = int(following_nums)
author.fans_nums = int(fans_nums)
author.like_nums = int(like_nums)
author.review_nums = int(review_nums)
author.total_rank = int(total_rank)
author.level = int(level)
author.views = int(views)
author.points = int(points)
author.add_time = datetime.now()
author.update_time = datetime.now()
# if author.add_time != 'NULL':
# pass
# else:
# author.add_time = add_time
# author.update_time = update_time
# existed_author = user.select().where(user.user_id == author_id)
is_exist = False
for info in user.select(user.add_time).where(user.user_id == author.user_id):
is_exist = True
author.add_time = info.add_time
author.save()
if not is_exist:
author.save(force_insert=True)
else:
pass

def parse_list(url):
print("获取topic列表页数据, url:{}" .format(url))
response_text = requests.get(url).text
sel = Selector(text=response_text)
all_trs = sel.xpath("//table[@class='forums_tab_table']//tbody//tr")
try:
topic_flag = all_trs.xpath('.//td/span/text()').extract()[0]
except:
topic_flag = 0
if topic_flag != 0:
for tr in all_trs:
status = tr.xpath(".//td[1]/span/text()").extract()[0]
score = tr.xpath(".//td[2]/em/text()").extract()[0]
if tr.xpath(".//td[3]/span/text()").extract():
top_flag = tr.xpath(".//td[3]/span/text()").extract()[0]
top_flag = re.findall(u'[u4e00-u9fa5]+',top_flag)
top_flag = top_flag[0]
print(top_flag)
if top_flag == '置顶':
topic_url = parse.urljoin(domain, tr.xpath(".//td[3]/a[2]/@href").extract()[0])
topic_title = tr.xpath(".//td[3]/a[2]/text()").extract()[0]
else:
topic_url = parse.urljoin(domain, tr.xpath(".//td[3]/a/@href").extract()[0])
topic_title = tr.xpath(".//td[3]/a/text()").extract()[0]
else:
topic_url = parse.urljoin(domain, tr.xpath(".//td[3]/a/@href").extract()[0])
topic_title = tr.xpath(".//td[3]/a/text()").extract()[0]
author_url = parse.urljoin(domain, tr.xpath(".//td[4]/a/@href").extract()[0])
author_id = author_url.split("/")[-1]
create_time_str = tr.xpath(".//td[4]/em/text()").extract()[0]
create_time = datetime.strptime(create_time_str, "%Y-%m-%d %H:%M")
answers_info = tr.xpath(".//td[5]/span/text()").extract()[0]
answers_nums = answers_info.split("/")[0]
click_nums = answers_info.split("/")[1]
last_time_str = tr.xpath(".//td[6]/em/text()").extract()[0]
last_time = datetime.strptime(last_time_str, "%Y-%m-%d %H:%M")
topic_id_str = topic_url.split("/")[-1]
# add_time = datetime.now()
# # # print(add_time)
# update_time = datetime.now()
# # # print(update_time)

topics = topic()
topics.topic_id = int(topic_id_str)
topics.title = topic_title
            topics.score = int(score)
            topics.author_id = author_id
topics.click_nums = int(click_nums)
topics.answer_nums = int(answers_nums)
topics.create_time = create_time
topics.last_action_time = last_time
topics.points = int(score)
topics.topic_status = status
topics.update_time = datetime.now()
topics.add_time = datetime.now()
is_exist = False
for info in topic.select(topic.add_time).where(topic.topic_id == topics.topic_id):
is_exist = True
topics.add_time = info.add_time
topics.save()
if not is_exist:
topics.save(force_insert=True)
# if topics.add_time != 'NULL':
# pass
# else:
# topics.add_time = add_time

# existed_topics = topic.select().where(topic.topic_id == topics.topic_id)

# for info in topic.select().where(topic.topic_id == topics.topic_id):
# # info = topic()
# info.topic_id = int(topic_id_str)
# info.title = topic_title
# info.author_id = author_id
# info.click_nums = int(click_nums)
# info.answer_nums = int(answers_nums)
# info.create_time = create_time
# info.last_action_time = last_time
# info.points = int (score)
# info.topic_status = status
# info.update_time = datetime.now()
# info.add_time = datetime.now()
# info.save(force_insert=True)
# else:
# topics.save(force_insert=True)
parse_topic(topic_url)
# print(topics.topic_id)
parse_author(author_url)

next_page_str = sel.xpath("//a[@class='pageliststy next_page']/text()").extract()
next_page = sel.xpath("//a[@class='pageliststy next_page']/@href").extract()
if next_page:
if next_page_str[1] == " ":
next_url = parse.urljoin(domain, next_page[0])
parse_list(next_url)


if __name__ == "__main__":
all_urls = get_last_urls()
for url in all_urls:
parse_list(url)
# parse_author(url)
# parse_topic(url)
# parse_author("")

 

原文地址:https://www.cnblogs.com/Mangnolia/p/14011850.html