python网络爬虫(2)回顾Python编程

文件写入

def storFile(data,fileName,method='a'):
    with open(fileName,method,newline ='') as f:
        f.write(data)
        pass
    pass

storFile('123', '1.txt')

文件读取

with open('1.txt','r') as f:
    print(f.read())

序列化操作

把内存中的数据变为可保存和共享,实现状态保存。cPickle使用C语言编写,效率高,优先使用。如果不存在则使用pickle。pickle使用dump和dumps实现序列化。

try:
    import cPickle as pickle
except ImportError:
    import pickle
d=dict(url='index.html',title='1',content='2')
f=open('2.txt','wb')
pickle.dump(d,f)
f.close()
print(pickle.dumps(d))

反序列化操作

使用load实现反序列化

try:
    import cPickle as pickle
except ImportError:
    import pickle
f=open('2.txt','rb')
d=pickle.load(f)
f.close()
print(d)

多进程创建

多进程使用os的fork复制完全相同的进程,并对子进程返回0,对父进程返回子进程的pid。只在linux/unix中使用。

import os
if __name__ == '__main__':
  pid=os.fork()   if pid<0:    print('error pid')   elif pid==0:    print('child ,parent pid',os.getpid(),os.getppid())   else:    print('parent pid,create child ',os.getpid,pid)

使用multiprocessing模块创建进程,使用start启动进程,使用join同步。

import os
from multiprocessing import Process
def run_proc(name):
    print('name ,child pid   running',name,os.getpid())
if __name__ == '__main__':
    print('parent pid',os.getpid())
    for i in range(5):
        p=Process(target=run_proc,args=(str(i),))
        print('Process will start')
        p.start()
    p.join()
    print('end')

使用multiprocessing模块中的Pool限定进程数量

import os
from multiprocessing import Process,Pool
import random,time
def run_proc(name):
    print('name ,child pid   running ',name,os.getpid())
    time.sleep(random.random()*10)
    print('name ,child pid   running end',name,os.getpid())
if __name__ == '__main__':
    print('parent pid',os.getpid())
    p=Pool(processes=3)
    for i in range(10):
        p.apply_async(run_proc,args=(i,))
    print('wait')
    p.close()
    p.join()
    print('end')

进程间通信

Queue通信

适用多进程间通信,采用put和get方法。

import os
from multiprocessing import Process,Queue
import time,random
def write_proc(q,urls):
    print('w processing ',os.getpid(),'is running')
    for u in urls:
        q.put(u)
        print('put :',u)
        time.sleep(random.random())
    pass
def read_proc(q):
    print('r processing ',os.getpid(),'is running')
    while(True):
        u=q.get(True)
        print('get:',u)
    pass

if __name__ == '__main__':
    q=Queue()
    w1=Process(target=write_proc,args=(q,['u1','u2','u3']))
    w2=Process(target=write_proc,args=(q,['u4','u5','u6']))
    r1=Process(target=read_proc,args=(q,))
    w1.start()
    w2.start()
    r1.start()
    w1.join()
    w2.join()
    r1.terminate()
    pass

Pipe通信

Pipe方法返回conn1和conn2,全双工模式下均可收发(Pipe方法中duplex参数控制),通过send和recv控制。

import os
from multiprocessing import Process,Pipe
import time,random
def send_proc(p,urls):
    print('s processing ',os.getpid(),'is running')
    for u in urls:
        p.send(u)
        print('send :',u)
        time.sleep(random.random())
    pass
def receive_proc(p):
    print('r processing ',os.getpid(),'is running')
    while(True):
        u=p.recv()
        print('receive:',u)
    pass

if __name__ == '__main__':
    p=Pipe()
    p1=Process(target=send_proc,args=(p[0],['u1','u2','u3']))
    p2=Process(target=receive_proc,args=(p[1],))
    p1.start()
    p2.start()

    p1.join()
    p2.terminate()
    pass

多线程

一点理解。使用threading模块创建多线程

import time,random,threading

def run_proc(url):
    print('threading name',threading.current_thread().name)
    for u in url:
        print(threading.current_thread().name,'----->',u)
        time.sleep(random.random())
    print('end ',threading.current_thread().name)
    pass

if __name__ == '__main__':
    print('running :',threading.current_thread().name)
    w1=threading.Thread(target=run_proc,name='T1',args=(['u1','u2','u3'],))
    w2=threading.Thread(target=run_proc,name='T2',args=(['u4','u5','u6'],))
    w1.start()
    w2.start()
    w1.join()
    w2.join()
    print('end')
    pass

使用threading.Thread继承创建线程类:代码源:https://github.com/qiyeboy/SpiderBook

import random
import threading
import time
class myThread(threading.Thread):
    def __init__(self,name,urls):
        threading.Thread.__init__(self,name=name)
        self.urls = urls

    def run(self):
        print('Current %s is running...' % threading.current_thread().name)
        for url in self.urls:
                print('%s ---->>> %s' % (threading.current_thread().name,url))
                time.sleep(random.random())
        print('%s ended.' % threading.current_thread().name)
        
print('%s is running...' % threading.current_thread().name)
t1 = myThread(name='Thread_1',urls=['url_1','url_2','url_3'])
t2 = myThread(name='Thread_2',urls=['url_4','url_5','url_6'])
t1.start()
t2.start()
t1.join()
t2.join()
print('%s ended.' % threading.current_thread().name)

线程同步

线程同步以保护数据,主要有Lock和RLock两种方案。参阅。另外,全局解释锁的存在,限制了线程资源访问,在CPU密集场合倾向使用多进程。对于IO密集型场合,使用多线程。

import threading
mylock = threading.RLock()
num=0
class myThread(threading.Thread):
    def __init__(self, name):
        threading.Thread.__init__(self,name=name)

    def run(self):
        global num
        while True:
            mylock.acquire()
            print( '%s locked, Number: %d'%(threading.current_thread().name, num))
            if num>=100:
                mylock.release()
                print( '%s released, Number: %d'%(threading.current_thread().name, num))
                break
            num+=1
            print( '%s released, Number: %d'%(threading.current_thread().name, num))
            mylock.release()

if __name__== '__main__':
    thread1 = myThread('Thread_1')
    thread2 = myThread('Thread_2')
    thread1.start()
    thread2.start()

协程 

from gevent import monkey; monkey.patch_all()
import gevent
import urllib.request as urllib2

def run_task(url):
    print('Visit --> %s' % url)
    try:
        response = urllib2.urlopen(url)
        data = response.read()
        print('%d bytes received from %s.' % (len(data), url))
    except Exception:
        print(Exception)
if __name__=='__main__':
    urls = ['https://github.com/','https://www.python.org/','http://www.cnblogs.com/']
    greenlets = [gevent.spawn(run_task, url) for url in urls  ]
    gevent.joinall(greenlets)

支持的池

from gevent import monkey
monkey.patch_all()
import urllib.request as urllib2
from gevent.pool import Pool
def run_task(url):
    print('Visit --> %s' % url)
    try:
        response = urllib2.urlopen(url)
        data = response.read()
        print('%d bytes received from %s.' % (len(data), url))
    except Exception:
        print(Exception)
    return 'url:%s --->finish'% url

if __name__=='__main__':
    pool = Pool(2)
    urls = ['https://github.com/','https://www.python.org/','http://www.cnblogs.com/']
    results = pool.map(run_task,urls)
    print(results)

分布式进程

 创建服务进程(Windows)代码源

import queue as Queue
from multiprocessing.managers import BaseManager
from multiprocessing import freeze_support
#任务个数
task_number = 10
#定义收发队列
task_queue = Queue.Queue(task_number)
result_queue = Queue.Queue(task_number)
def get_task():
    return task_queue
def get_result():
     return result_queue
# 创建类似的QueueManager:
class QueueManager(BaseManager):
    pass
def win_run():
    #windows下绑定调用接口不能使用lambda,所以只能先定义函数再绑定
    QueueManager.register('get_task_queue',callable = get_task)
    QueueManager.register('get_result_queue',callable = get_result)
    #绑定端口并设置验证口令,windows下需要填写ip地址,linux下不填默认为本地
    manager = QueueManager(address = ('127.0.0.1',8001),authkey = b'qiye')
    #启动
    manager.start()
    try:
        #通过网络获取任务队列和结果队列
        task = manager.get_task_queue()
        result = manager.get_result_queue()
        #添加任务
        for url in ["ImageUrl_"+str(i) for i in range(10)]:
            print('put task %s ...' %url)
            task.put(url)
        print('try get result...')
        for i in range(10):
            print('result is %s' %result.get(timeout=10))
    except:
        print('Manager error')
    finally:
        #一定要关闭,否则会爆管道未关闭的错误
        manager.shutdown()

if __name__ == '__main__':
    #windows下多进程可能会有问题,添加这句可以缓解
    freeze_support()
    win_run()

创建任务进程:python后续版本的修正

#coding:utf-8
import time
from multiprocessing.managers import BaseManager
# 创建类似的QueueManager:
class QueueManager(BaseManager):
    pass
# 实现第一步:使用QueueManager注册获取Queue的方法名称
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
# 实现第二步:连接到服务器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和验证口令注意保持与服务进程设置的完全一致:
m = QueueManager(address=(server_addr, 8001), authkey='qiye')
# 从网络连接:
m.connect()
# 实现第三步:获取Queue的对象:
task = m.get_task_queue()
result = m.get_result_queue()
# 实现第四步:从task队列取任务,并把结果写入result队列:
while(not task.empty()):
        image_url = task.get(True,timeout=5)
        print('run task download %s...' % image_url)
        time.sleep(1)
        result.put('%s--->success'%image_url)

# 处理结束:
print('worker exit.')

创建Linux版本的服务进程:(未测试)

服务进程(taskManager.py)(linux版)

import random,time,Queue
from multiprocessing.managers import BaseManager
#实现第一步:建立task_queue和result_queue,用来存放任务和结果
task_queue=Queue.Queue()
result_queue=Queue.Queue()

class Queuemanager(BaseManager):
    pass
#实现第二步:把创建的两个队列注册在网络上,利用register方法,callable参数关联了Queue对象,
# 将Queue对象在网络中暴露
Queuemanager.register('get_task_queue',callable=lambda:task_queue)
Queuemanager.register('get_result_queue',callable=lambda:result_queue)

#实现第三步:绑定端口8001,设置验证口令‘qiye’。这个相当于对象的初始化
manager=Queuemanager(address=('',8001),authkey='qiye')

#实现第四步:启动管理,监听信息通道
manager.start()

#实现第五步:通过管理实例的方法获得通过网络访问的Queue对象
task=manager.get_task_queue()
result=manager.get_result_queue()

#实现第六步:添加任务
for url in ["ImageUrl_"+str(i) for i in range(10)]:
    print 'put task %s ...' %url
    task.put(url) 
#获取返回结果
print 'try get result...'
for i in range(10):
    print 'result is %s' %result.get(timeout=10)
#关闭管理
manager.shutdown()

TCP编程

创建服务端

#coding:utf-8
import socket
import threading
import time
def dealClient(sock, addr):
    #第四步:接收传来的数据,并发送给对方数据
    print('Accept new connection from %s:%s...' % addr)
    sock.send(b'Hello,I am server!')
    while True:
        data = sock.recv(1024)
        time.sleep(1)
        if not data or data.decode('utf-8') == 'exit':
            break
        print('-->>%s!' % data.decode('utf-8'))
        sock.send(('Loop_Msg: %s!' % data.decode('utf-8')).encode('utf-8'))
    #第五步:关闭套接字
    sock.close()
    print('Connection from %s:%s closed.' % addr)
if __name__=="__main__":
    #第一步:创建一个基于IPv4和TCP协议的Socket
    # 套接字绑定的IP(127.0.0.1为本机ip)与端口
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.bind(('127.0.0.1', 999))
    #第二步:监听连接
    s.listen(5)
    print('Waiting for connection...')
    while True:
        # 第三步:接受一个新连接:
        sock, addr = s.accept()
        # 创建新线程来处理TCP连接:
        t = threading.Thread(target=dealClient, args=(sock, addr))
        t.start()

创建客户端

#coding:utf-8
import socket
#初始化Socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
#连接目标的ip和端口
s.connect(('127.0.0.1', 999))
# 接收消息
print('-->>'+s.recv(1024).decode('utf-8'))
# 发送消息
s.send(b'Hello,I am a client')
print('-->>'+s.recv(1024).decode('utf-8'))
s.send(b'exit')
#关闭套接字
s.close()

UDP编程

服务端与客户端

import socket
#创建Socket,绑定指定的ip和端口
#SOCK_DGRAM指定了这个Socket的类型是UDP。绑定端口和TCP一样。
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind(('127.0.0.1', 9999))
print('Bind UDP on 9999...')
while True:
    # 直接发送数据和接收数据
    data, addr = s.recvfrom(1024)
    print('Received from %s:%s.' % addr)
    s.sendto(b'Hello, %s!' % data, addr)

import socket
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
for data in [b'Hello', b'World']:
    # 发送数据:
    s.sendto(data, ('127.0.0.1', 9999))
    # 接收数据:
    print(s.recv(1024).decode('utf-8'))
s.close()

 

原文地址:https://www.cnblogs.com/bai2018/p/10959955.html