python多线程下载

# -*- coding=utf-8 -*-
import sys
import os
import os.path
import time
import urllib.request, urllib.parse, urllib.error
from threading import Thread
 
local_proxies = {}
 
 
class ThreadDownComplete:
    def __init__(self, down_loaded):
        self.down_loaded = down_loaded
 
 
class ThreadDown(Thread, urllib.request.FancyURLopener):
    def __init__(self, thread_name, url, filename, ranges=0, proxies={}):
        Thread.__init__(self, name=thread_name)
        urllib.request.FancyURLopener.__init__(self, proxies)
        self.name = thread_name
        self.url = url
        self.filename = filename
        self.ranges = ranges
        self.down_loaded = 0
        self.url_handler = None
        self.one_time_size = 16384  # 16kByte/time
        self.data_start_point = self.ranges[0]
        self.data_end_point = self.ranges[1]
        self.down_start_time = int(time.time())
        self.recent_down_time = int(time.time())
        self.stop = False
        self.complete = False
        self.download_loan = self.data_end_point - self.data_start_point
        try:
            self.down_loaded = os.path.getsize(self.filename)
        except OSError:
            self.down_loaded = 0
        self.start_point = self.data_start_point + self.down_loaded
 
        if not self.complete_check():
            self.data_check()
 
    def start_reset(self):
        try:
            os.remove(self.filename)
        except:
            pass
 
        self.down_loaded = 0
        self.complete = False
        self.stop = False
        self.start_point = self.data_start_point
 
    def data_check(self):
        if self.start_point > self.data_end_point:
            print("Part %s has been down_loaded over.
" % self.filename)
            self.start_reset()
 
    def flow_check(self):
        if self.down_loaded > self.download_loan + 1:
            self.stop = False
            return False
        return True
 
    def complete_check(self):
        if self.down_loaded == self.download_loan + 1:
            self.complete = True
            self.stop = True
            return True
        return False
 
    def down(self):
        try:
            return self.url_handler.read(self.one_time_size)
        except:
            return ''
 
    def __run(self):
 
        print("task %s will down_load from %d to %d" % (self.name, self.start_point, self.data_end_point))
        self.addheader("Range", "bytes=%d-%d" % (self.start_point, self.data_end_point))
        self.url_handler = self.open(self.url)
        data = self.down()
 
        while not self.stop and not self.complete:
            if data:
                self.recent_down_time = int(time.time())
                file_handler = open(self.filename, 'ab+')
                file_handler.write(data)
                file_handler.close()
                self.down_loaded += len(data)
 
            if self.complete_check():
                break
            if not self.flow_check():
                break
 
            data = self.down()
 
    def run(self):
 
        if self.complete:
            return
 
        self.__run()
        self.complete_check()
        while not self.stop and not self.complete:
            self.start_reset()
            self.__run()
            self.complete_check()
 
 
def get_file_size(url, proxies={}):
    url_handler = urllib.request.urlopen(url)
    return int(url_handler.info()['Content-Length'])
 
 
def split_blocks(total_size, block_number):
    block_size = int(total_size / block_number)
    ranges = []
    for i in range(0, block_number - 1):
        ranges.append((i * block_size, (i + 1) * block_size - 1))
    ranges.append(((block_number - 1) * block_size, total_size - 1))
    return ranges
 
 
def is_live(tasks):
    for index, task in enumerate(tasks):
        if isinstance(task, ThreadDownComplete):
            continue
        if int(time.time()) - task.recent_down_time > 8:
            thread_name = task.name
            filename = task.filename
            ranges = task.ranges
            url = task.url
            task.stop = True
            tasks[index] = start_down_thread(thread_name, url, filename, ranges)
            return True
        if task.complete:
            tasks[index] = ThreadDownComplete(task.down_loaded)
        if task.is_alive():
            return True
    return False
 
 
def start_down_thread(thread_name, url, filename, ranges):
    task = ThreadDown(thread_name, url, filename, ranges)
    task.setDaemon(True)
    task.start()
    return task
 
 
def log(msg):
    sys.stdout.write(msg)
    sys.stdout.flush()
 
 
def down_load(url, output, blocks=6, proxies=local_proxies):
    down_file_size = get_file_size(url, proxies)
    ranges = split_blocks(down_file_size, blocks)
 
    thread_name = ["thread_%d" % i for i in range(0, blocks)]
    filename = [output + "_%d" % i for i in range(0, blocks)]
 
    tasks = []
    for i in range(0, blocks):
        tasks.append(start_down_thread(thread_name[i], url, filename[i], ranges[i]))
 
    while is_live(tasks):
        down_loaded = sum([task.down_loaded for task in tasks])
        process = down_loaded / float(down_file_size) * 100
        log("
filesize:%d down_loaded:%d Completed:%.2f%%" % (down_file_size, down_loaded, process))
        time.sleep(0.01)
 
    file_handler = open(output, 'wb+')
    for i in filename:
        f = open(i, 'rb')
        file_handler.write(f.read())
        f.close()
        try:
            os.remove(i)
            pass
        except:
            pass
 
    file_handler.close()
 
    if os.path.getsize(output) == down_file_size:
        log("
Completed
")
    else:
        log("
Error
")
 
    sys.exit(0)
 
 
if __name__ == '__main__':
    _url = "http://dldir1.qq.com/qqfile/qq/QQ7.9Light/14308/QQ7.9Light.exe"
    down_load(_url, os.path.basename(_url), blocks=30, proxies={})
原文地址:https://www.cnblogs.com/hltswd/p/5678319.html