微信公众号开发

申请的免费二级域名用于测试

api 接口

微博移动端数据

这个有点东西

接入 微信公众号测试接口 (后台可以使用 java 或者 python 实现)

   虽然现在还是写的稀烂的,但是呢已经打通了剩下的就是优化代码结构实现更多功能了
from app.app import create_app
from flask import (request, jsonify)
from app.libs.constants.token import TOKEN
import hashlib
import re
import xml.etree.cElementTree as et

app = create_app()

@app.route('/', methods=['GET', 'POST'])
def index():
    # 1)将token、timestamp、nonce三个参数进行字典序排序
    # 2)将三个参数字符串拼接成一个字符串进行sha1加密
    # 3)开发者获得加密后的字符串可与signature对比,标识该请求来源于微信
    print(request.args)

    if request.method == 'GET':

        if request.args:
            signature = request.args.get('signature')
            echostr = request.args.get('echostr')
            timestamp = request.args.get('timestamp')
            nonce = request.args.get('nonce')
            sort_list = [TOKEN, timestamp, nonce]
            sort_list.sort()
            my_signature = check_signature(''.join(sort_list))
            if signature == my_signature:
                return request.args.get('echostr')
            else:
                return 'check failed!'
        else:
            return 'home page'

    elif request.method == 'POST':
        if request.data:
            if et.fromstring(request.data.decode('utf-8')).find('MsgType').text == 'text':

                received_data = parse_request(request)
                print(received_data)
                FromUserName = received_data.get('ToUserName')
                ToUserName = received_data.get('FromUserName')
                from time import time
                CreateTime = time()
                MsgType = received_data.get('MsgType')
                Content = '你输入的是 {} 该回复还是固定回复'.format(received_data.get('Content'))
                MsgId = received_data.get('MsgId')
                print(MsgType)
                if MsgType =='text':
                    response_template="""<xml><ToUserName><![CDATA[{}]]></ToUserName>

                    <FromUserName><![CDATA[{}]]></FromUserName>

                    <CreateTime>{}</CreateTime>
<MsgType><![CDATA[text]]></MsgType>

                    <Content><![CDATA[[{}]]]></Content>

                    <MsgId>{}</MsgId>
</xml>
                    """.format(ToUserName,FromUserName,CreateTime,Content,MsgId)

                response_text = re.sub(r's+','',response_template)
                print(response_text)
                return response_text
        else:
            received_data = parse_request_non_text(request)
            print(received_data)
            FromUserName = received_data.get('ToUserName')
            ToUserName = received_data.get('FromUserName')
            from time import time
            CreateTime = time()
            MsgType = request.data.get('MsgType')
            MsgId = received_data.get('MsgId')
            print(MsgType)
            response_template = """<xml><ToUserName><![CDATA[{}]]></ToUserName>

                        <FromUserName><![CDATA[{}]]></FromUserName>

                        <CreateTime>{}</CreateTime>
<MsgType><![CDATA[text]]></MsgType>

                        <Content><![CDATA[[{}]]]></Content>

                        <MsgId>{}</MsgId>
</xml>
                        """.format(FromUserName, ToUserName, CreateTime, '该功能正在开发中', MsgId)
            response_text = re.sub(r's+', '', response_template)
            print(response_text)
            return response_text



def check_signature(sorted_str):
    hash_sh1 = hashlib.sha1()
    hash_sh1.update(sorted_str.encode('utf-8'))
    return hash_sh1.hexdigest()

def parse_request(request):
    xml_data = """
    <xml><ToUserName><![CDATA[gh_24f09e47509b]]></ToUserName>

    <FromUserName><![CDATA[o221Atxy35oyLgebe4A9Abfir-H0]]></FromUserName>

    <CreateTime>1566709896</CreateTime>
<MsgType><![CDATA[text]]></MsgType>

    <Content><![CDATA[[xe5x9bxa7]]]></Content>

    <MsgId>22429894037448157</MsgId>
</xml>'
    """
    xml_data = request.data.decode('utf-8')

    xml_rec = et.fromstring(xml_data)
    ToUserName = xml_rec.find('ToUserName').text
    FromUserName = xml_rec.find('FromUserName').text
    CreateTime = xml_rec.find('CreateTime').text
    Content = xml_rec.find('Content').text
    MsgId = xml_rec.find('MsgId').text
    MsgType = xml_rec.find('MsgType').text
    return dict(ToUserName=ToUserName,FromUserName=FromUserName,CreateTime=CreateTime,Content=Content,MsgId=MsgId, MsgType=MsgType)

def parse_request_non_text(request):
    xml_data = request.data.decode('utf-8')
    xml_rec = et.fromstring(xml_data)
    ToUserName = xml_rec.find('ToUserName').text
    FromUserName = xml_rec.find('FromUserName').text
    CreateTime = xml_rec.find('CreateTime').text
    MsgId = xml_rec.find('MsgId').text
    MsgType = xml_rec.find('MsgType').text
    return dict(ToUserName=ToUserName, FromUserName=FromUserName, CreateTime=CreateTime,
                MsgId=MsgId, MsgType=MsgType)


app.add_url_rule('/index/',view_func=index)

if __name__ == '__main__':
    app.run(debug=app.config['DEBUG'],host='0.0.0.0', port=8090)

工程化 可能还是 java 好写一点 , 使用 springboot 进行后台编写

第一步, 申请注册好 微信公众号 测试接口,这一块儿可以网上找到

第二步, 编写 微信签名验证 接口

第三步, 代码编写

验证 微信服务器 签名

controller 层编写

package com.ghc.wechat.wechat.controller;

import com.ghc.wechat.wechat.service.WechatService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

/**
 * @author :Frank Li
 * @date :Created in 2019/8/26 9:32
 * @description:${description}
 * @modified By:
 * @version: $version$
 */


@RestController

public class WechatController {
    @Autowired
    private WechatService wechatService;

    @GetMapping(value = "/wechat")
    public String validate(@RequestParam(value="signature") String signature,
                           @RequestParam(value="timestamp") String timestamp,
                           @RequestParam(value="nonce") String nonce,
                           @RequestParam(value="echostr") String echostr){
        return  wechatService.validateSignature(timestamp,nonce,signature) ? echostr:null;
    }

}

service 层 , utils 等

package com.ghc.wechat.wechat.service;

import com.ghc.wechat.wechat.utils.WebChatUtils;
import org.springframework.stereotype.Service;

/**
 * @author :Frank Li
 * @date :Created in 2019/8/26 10:56
 * @description:${description}
 * @modified By:
 * @version: $version$
 */
@Service
public class WechatService {

    public boolean validateSignature(String timestamp, String nonce, String signature){
        return WebChatUtils.validateSignature(timestamp,nonce,signature);
    }
}

package com.ghc.wechat.wechat.utils;

import com.ghc.wechat.wechat.constants.Token;
import lombok.extern.slf4j.Slf4j;

import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;

/**
 * @author :Frank Li
 * @date :Created in 2019/8/26 10:13
 * @description:${description}
 * @modified By:
 * @version: $version$
 */
@Slf4j
public class WebChatUtils {


    public static boolean validateSignature(String timestamp,String nonce,String signature) {

//        将token、timestamp、nonce三个参数进行字典序排序
        String [] strArray = {Token.TOKEN, timestamp, nonce};
        Arrays.sort(strArray);
//        2)将三个参数字符串拼接成一个字符串进行sha1加密
        MessageDigest sha1Digest = null;
        try{
           sha1Digest = MessageDigest.getInstance("sha1");
        }catch(NoSuchAlgorithmException ne){
            log.error(ne.getMessage());
        }
        byte [] digest =  sha1Digest.digest((strArray[0]+strArray[1]+strArray[2]).getBytes());
        char[] hexArray = {'0','1','2','3','4','5','6','7','8','9','A','B','C','D','E','F'};
        StringBuilder sb = new StringBuilder(3);
        for(byte b:digest){
            char high8 = hexArray[(b>>4)&15];
            char low8 = hexArray[b&15];
            sb.append(high8)
                    .append(low8);
        }
//        3)开发者获得加密后的字符串可与signature对比,标识该请求来源于微信
        log.info(sb.toString()+"
"+signature);
        boolean flag = signature.equalsIgnoreCase(sb.toString());
        log.info(String.valueOf(flag));
        return flag;

    }
}


package com.ghc.wechat.wechat.constants;

/**
 * @author :Frank Li
 * @date :Created in 2019/8/26 10:20
 * @description:${description}
 * @modified By:
 * @version: $version$
 */
public interface Token {
    String  TOKEN = "xxx";
}


修改 application.properties

server.port=8090

设置 IDEA 热部署 spring

spring为开发者提供了一个名为spring-boot-devtools的模块来使Spring Boot应用支持热部署,提高开发者的开发效率,无需手动重启Spring Boot应用。

devtools的原理

深层原理是使用了两个ClassLoader,一个Classloader加载那些不会改变的类(第三方Jar包),另一个ClassLoader加载会更改的类,称为restart ClassLoader,这样在有代码更改的时候,原来的restart ClassLoader 被丢弃,重新创建一个restart ClassLoader,由于需要加载的类相比较少,所以实现了较快的重启时间。

使用需要添加以下的配置:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
            <configuration>
                <fork>true</fork>
            </configuration>
        </plugin>
    </plugins>
</build>
说明:

(1) devtools可以实现页面热部署(即页面修改后会立即生效,这个可以直接在application.properties文件中配置spring.thymeleaf.cache=false来实现),
实现类文件热部署(类文件修改后不会立即生效),实现对属性文件的热部署。
即devtools会监听classpath下的文件变动,并且会立即重启应用(发生在保存时机),注意:因为其采用的虚拟机机制,该项重启是很快的
(2)配置了后在修改java文件后也就支持了热启动,不过这种方式是属于项目重启(速度比较快的项目重启),会清空session中的值,也就是如果有用户登陆的话,项目重启后需要重新登陆。

默认情况下,/META-INF/maven,/META-INF/resources,/resources,/static,/templates,/public这些文件夹下的文件修改不会使应用重启,但是会重新加载(devtools内嵌了一个LiveReload server,当资源发生改变时,浏览器刷新)。

devtools的配置

在application.properties中配置spring.devtools.restart.enabled=false,此时restart类加载器还会初始化,但不会监视文件更新。
在SprintApplication.run之前调用System.setProperty(“spring.devtools.restart.enabled”, “false”);可以完全关闭重启支持,配置内容:

#热部署生效
spring.devtools.restart.enabled: true
#设置重启的目录
#spring.devtools.restart.additional-paths: src/main/java
#classpath目录下的WEB-INF文件夹内容修改不重启
spring.devtools.restart.exclude: WEB-INF/**
IDEA配置

当我们修改了Java类后,IDEA默认是不自动编译的,而spring-boot-devtools又是监测classpath下的文件发生变化才会重启应用,所以需要设置IDEA的自动编译:

(1)File-Settings-Compiler-Build Project automatically



(2)ctrl + shift + alt + /,选择Registry,勾上 Compiler autoMake allow when app running



测试

修改类–>保存:应用会重启
修改配置文件–>保存:应用会重启
修改页面–>保存:应用不会重启,但会重新加载,页面会刷新(原理是将spring.thymeleaf.cache设为false,参考:Spring Boot配置模板引擎)

爬取 公众号文章

# -*- coding: utf-8 -*-
from selenium import webdriver
from selenium.webdriver.support.wait import WebDriverWait
import requests
import json
import time
import re
import random

__author__ = 'Frank Li'

def get_proxy_from_local():
    res = requests.get('http://127.0.0.1:5010/get') # 本地 ip 代理池
    proxy = res.json().get('proxy')
    print(proxy)
    res.close()
    return proxy
proxy = get_proxy_from_local()
ORIGINAL_URL = r'https://mp.weixin.qq.com/?token=&lang=zh_CN'
# chromeOptions = webdriver.ChromeOptions()
# chromeOptions.add_argument("--proxy-server=http://{}".format(proxy))
# DRIVER = webdriver.Chrome() #chrome_options=chromeOptions
# ACCOUNT="5xxx@qq.com"
# PASSWORD="xxx"

ARTICLE_URL = r'https://mp.weixin.qq.com/cgi-bin/operate_appmsg?sub=check_appmsg_copyright_stat'
ARTICAL_JSON_FILE = r'ARTICAL_JSON_FILE.json'

def auto_login():
    DRIVER.get(ORIGINAL_URL)

    time.sleep(2)
    WebDriverWait(DRIVER,10).until(lambda driver:driver.find_elements_by_xpath('//*[@id="header"]/div[2]/div/div/form/div[1]/div[1]/div/span/input')[0]).send_keys(ACCOUNT)
    time.sleep(2)
    WebDriverWait(DRIVER,10).until(lambda driver:driver.find_elements_by_xpath('//*[@id="header"]/div[2]/div/div/form/div[1]/div[2]/div/span/input')[0]).send_keys(PASSWORD)
    time.sleep(2)
    WebDriverWait(DRIVER,10).until(lambda driver:driver.find_elements_by_xpath('//*[@id="header"]/div[2]/div/div/form/div[3]/label'))[0].click()
    time.sleep(2)
    time.sleep(15)
    # WebDriverWait(DRIVER,10).until(lambda driver:driver.find_elements_by_xpath('//*[@id="header"]/div[2]/div/div/form/div[4]/a'))[0].click()
    cookies = {}
    for items in DRIVER.get_cookies():
        cookies[items.get('name')] = items.get('value')
    # 这里我们可以保存为 wechatcookies.json 文件
    return cookies

def get_cookies():
    with open('wechatcookies.json', 'r') as fr:
        cookies = json.loads(fr.read())
    return cookies

def get_token(url=ORIGINAL_URL):
    cookies = get_cookies()
    print('cookies:
',cookies)
    res = requests.get(url,cookies=cookies)
    return cookies, re.findall(r'token=(d+)',str(res.url))[0]


def get_articles(url=ARTICLE_URL, query_words='英雄'):
    cookies, token = get_token()

    headers = {"Accept": "application/json, text/javascript, */*; q=0.01",
               "Accept-Encoding": "gzip, deflate, br",
               "Accept-Language": "en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7",
               "Cache-Control": "no-cache",
               "Connection": "keep-alive",
               "Content-Length": '123',
               "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
               "Host": "mp.weixin.qq.com",
               "Origin":"https://mp.weixin.qq.com",
               "Pragma": "no-cache",
               "Referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?t=media/appmsg_edit_v2&action=edit&isNew=1&type=10&share=1&token="+token+"&lang=zh_CN",
               "Sec-Fetch-Mode": "cors",
               "Sec-Fetch-Site": "same-origin",
               "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.132 Safari/537.36",
               "X-Requested-With": "XMLHttpRequest"
               }
    data = {
        'token':token,
        'lang':'zh_CN',
        'f': 'json',
        'ajax': '1',
        'random': random.random(),
        'url': query_words,
        'allow_reprint': '0',
        'begin': '0',
        'count': '10'
    }
    res = requests.post(url,cookies=cookies,headers=headers,data=data)
    data_list = res.json().get('list')
    final_data = {}
    for item in data_list:
        title = item.get('title')
        article_type = item.get('article_type')
        cover_url = item.get('cover_url')
        head_img_url = item.get('head_img_url')
        nickname = item.get('nickname')
        url = item.get('url')
        final_data.update(dict(title=title,article_type=article_type,cover_url=cover_url,head_img_url=head_img_url,nickname=nickname,url=url))
    return final_data

def serilize_obj2json_file(obj,target=ARTICAL_JSON_FILE):
    with open(target, mode='w', encoding='utf-8') as fw:
        fw.write(json.dumps(obj,ensure_ascii=False,indent=2))
        fw.flush()

if __name__ == '__main__':
    # print(auto_login())
    # get_token()
    data = get_articles(query_words="问苍茫大地")
    serilize_obj2json_file(data)

输出结果

增加 分页 爬取

# -*- coding: utf-8 -*-
from selenium import webdriver
from selenium.webdriver.support.wait import WebDriverWait
import requests
import json
import time
import re
import random

__author__ = 'Frank Li'

def get_proxy_from_local():
    res = requests.get('http://127.0.0.1:5010/get')
    proxy = res.json().get('proxy')
    print(proxy)
    res.close()
    return proxy
proxy = get_proxy_from_local()
ORIGINAL_URL = r'https://mp.weixin.qq.com/?token=&lang=zh_CN'
# chromeOptions = webdriver.ChromeOptions()
# chromeOptions.add_argument("--proxy-server=http://{}".format(proxy))
# DRIVER = webdriver.Chrome() #chrome_options=chromeOptions
# ACCOUNT="5xxx@qq.com"
# PASSWORD="xxx"

ARTICLE_URL = r'https://mp.weixin.qq.com/cgi-bin/operate_appmsg?sub=check_appmsg_copyright_stat'
ARTICAL_JSON_FILE = r'ARTICAL_JSON_FILE.json'

def auto_login():
    DRIVER.get(ORIGINAL_URL)

    time.sleep(2)
    WebDriverWait(DRIVER,10).until(lambda driver:driver.find_elements_by_xpath('//*[@id="header"]/div[2]/div/div/form/div[1]/div[1]/div/span/input')[0]).send_keys(ACCOUNT)
    time.sleep(2)
    WebDriverWait(DRIVER,10).until(lambda driver:driver.find_elements_by_xpath('//*[@id="header"]/div[2]/div/div/form/div[1]/div[2]/div/span/input')[0]).send_keys(PASSWORD)
    time.sleep(2)
    WebDriverWait(DRIVER,10).until(lambda driver:driver.find_elements_by_xpath('//*[@id="header"]/div[2]/div/div/form/div[3]/label'))[0].click()
    time.sleep(2)
    time.sleep(15)
    # WebDriverWait(DRIVER,10).until(lambda driver:driver.find_elements_by_xpath('//*[@id="header"]/div[2]/div/div/form/div[4]/a'))[0].click()
    cookies = {}
    for items in DRIVER.get_cookies():
        cookies[items.get('name')] = items.get('value')
    # 这里我们可以保存为 wechatcookies.json 文件
    return cookies

def get_cookies():
    with open('wechatcookies.json', 'r') as fr:
        cookies = json.loads(fr.read())
    return cookies

def get_token(url=ORIGINAL_URL):
    cookies = get_cookies()
    # print('cookies:
',cookies)
    res = requests.get(url,cookies=cookies)
    return cookies, re.findall(r'token=(d+)',str(res.url))[0]


def get_articles(begin='0',count='10',url=ARTICLE_URL, query_words='英雄'):
    cookies, token = get_token()

    headers = {"Accept": "application/json, text/javascript, */*; q=0.01",
               "Accept-Encoding": "gzip, deflate, br",
               "Accept-Language": "en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7",
               "Cache-Control": "no-cache",
               "Connection": "keep-alive",
               "Content-Length": '123',
               "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
               "Host": "mp.weixin.qq.com",
               "Origin":"https://mp.weixin.qq.com",
               "Pragma": "no-cache",
               "Referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?t=media/appmsg_edit_v2&action=edit&isNew=1&type=10&share=1&token="+token+"&lang=zh_CN",
               "Sec-Fetch-Mode": "cors",
               "Sec-Fetch-Site": "same-origin",
               "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.132 Safari/537.36",
               "X-Requested-With": "XMLHttpRequest"
               }
    data = {
        'token':token,
        'lang':'zh_CN',
        'f': 'json',
        'ajax': '1',
        'random': random.random(),
        'url': query_words,
        'allow_reprint': '0',
        'begin': begin,
        'count': count
    }
    res = requests.post(url,cookies=cookies,headers=headers,data=data)
    data_list = res.json().get('list')
    total = res.json().get('total')


    final_data = {}
    for item in data_list:
        title = item.get('title')
        article_type = item.get('article_type')
        cover_url = item.get('cover_url')
        head_img_url = item.get('head_img_url')
        nickname = item.get('nickname')
        url = item.get('url')
        final_data.update(dict(title=title,article_type=article_type,cover_url=cover_url,head_img_url=head_img_url,nickname=nickname,url=url))
    return final_data, total

def iter_pages(total, query_words='问苍茫大地',begin=0,count=10):
    cookies, token = get_token()
    num = 0
    pages = int(total / count)
    print('开始爬取文章......')
    while pages>=0:
        data = {
            'token': token,
            'lang': 'zh_CN',
            'f': 'json',
            'ajax': '1',
            'random': random.random(),
            'url': query_words,
            'allow_reprint': '0',
            'begin': begin,
            'count': count
        }
        headers = {"Accept": "application/json, text/javascript, */*; q=0.01",
                   "Accept-Encoding": "gzip, deflate, br",
                   "Accept-Language": "en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7",
                   "Cache-Control": "no-cache",
                   "Connection": "keep-alive",
                   "Content-Length": '123',
                   "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
                   "Host": "mp.weixin.qq.com",
                   "Origin": "https://mp.weixin.qq.com",
                   "Pragma": "no-cache",
                   "Referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?t=media/appmsg_edit_v2&action=edit&isNew=1&type=10&share=1&token=" + token + "&lang=zh_CN",
                   "Sec-Fetch-Mode": "cors",
                   "Sec-Fetch-Site": "same-origin",
                   "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.132 Safari/537.36",
                   "X-Requested-With": "XMLHttpRequest"
                   }

        res = requests.post(ARTICLE_URL, cookies=cookies, headers=headers, data=data)
        data_list = res.json().get('list')

        final_data = {}
        for item in data_list:
            title = item.get('title')
            article_type = item.get('article_type')
            cover_url = item.get('cover_url')
            head_img_url = item.get('head_img_url')
            nickname = item.get('nickname')
            url = item.get('url')
            final_data.update(
                dict(title=title, article_type=article_type, cover_url=cover_url, head_img_url=head_img_url,
                     nickname=nickname, url=url))
        serilize_obj2json_file(final_data)
        pages -= 1
        num += 1
        begin = str(num * count)
        time.sleep(2)

    print('完成爬取, 共 爬取 {} 页...'.format(pages))





def serilize_obj2json_file(obj,target=ARTICAL_JSON_FILE):
    with open(target, mode='a', encoding='utf-8') as fw:
        fw.write(json.dumps(obj,ensure_ascii=False,indent=2))
        fw.flush()

if __name__ == '__main__':
    # print(auto_login())
    # get_token()
    data,total = get_articles(query_words="苍茫大地")
    # serilize_obj2json_file(data)
    iter_pages(total)

使用 开源工具包

pip install wechatsogou --upgrade
from time import perf_counter
from requests import get
# https://github.com/tesseract-ocr/tessdata.git
# https://codeload.github.com/tesseract-ocr/tessdata/zip/master


class ProcessBar:
    def __init__(self, started_char='[', ended_char=']', finished_char='-', next_finished_char='>',
                 unfinished_char=' '):
        self.started_char = started_char
        self.ended_char = ended_char
        self.finished_char = finished_char
        self.next_finished_char = next_finished_char
        self.unfinished_char = unfinished_char

    def intermediate_state(self, finished):
        if finished < 100:
            return f'{self.started_char}{self.finished_char*finished}{self.next_finished_char}' 
                f'{self.unfinished_char*(99-finished)}{self.ended_char} {finished}% {self.time_format()}'
        return f'{self.started_char}{self.finished_char*finished}{self.ended_char} {finished}% {self.time_format()}'

    @staticmethod
    def time_format():
        second = int(perf_counter())
        minute = hour = day = 0
        if second >= 60:
            minute = second//60
            second = second % 60
        if minute >= 60:
            hour = minute//60
            minute = minute % 60
        if hour >= 24:
            day = hour//24
            hour = hour % 24
        if second < 10:
            second = f'0{second}'
        if minute < 10:
            minute = f'0{minute}'
        if hour < 10:
            hour = f'0{hour}'
        return f'{day}d {hour}:{minute}:{second}'


class GithubDownloader:
    def __init__(self, url, file_name, headers=None, proxies=None, download_location=''):
        self.url = url
        self.headers = headers
        self.proxies = proxies
        self.file_name = file_name
        self.download_location = download_location

    @staticmethod
    def format_unit(byte):
        if byte >= 1024:
            kb = byte/1024
        else:
            return'%7.2fB' % byte
        if kb >= 1024:
            mb = kb/1024
        else:
            return'%7.2fKB' % kb
        if mb >= 1024:
            gb = mb/1024
        else:
            return'%7.2fMB' % mb
        if gb >= 1024:
            tb = gb/1024
            return'%7.2fTB' % tb
        return'%7.2fGB' % gb

    def download(self):
        process_bar = ProcessBar()
        with get(self.url, headers=self.headers, proxies=self.proxies, stream=True)as response:
            if response.headers.get('Content-Length'):
                file_size = int(response.headers['Content-Length'])
                print('文件大小:', file_size)
                with open(self.download_location + self.file_name, 'wb')as file:
                    current_size = 0
                    print('开始下载……')
                    for chunk in response.iter_content(chunk_size=1024):
                        finished = int(current_size / file_size * 100)
                        print('下载进度:', process_bar.intermediate_state(finished), end='
', flush=True)
                        current_size += len(chunk)
                        if chunk:
                            file.write(chunk)
                    print('下载完成!', process_bar.intermediate_state(100), end='
', flush=True)
            else:
                with open(self.download_location + self.file_name, 'wb')as file:
                    current_size = 0
                    print('开始下载……')
                    for chunk in response.iter_content(chunk_size=1024):
                        print(f'已下载:{self.format_unit(current_size)} {ProcessBar.time_format()}', end='
',
                              flush=True)
                        current_size += len(chunk)
                        if chunk:
                            file.write(chunk)
                    print('下载完成!', process_bar.intermediate_state(100), end='
', flush=True)


if __name__ == '__main__':
    github_downloader = GithubDownloader('https://codeload.github.com/tesseract-ocr/tessdata/zip/master', 'master.zip',
                                         headers={
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,applicatio'
                      'n/signed-exchange;v=b3', 'Accept-Encoding': 'gzip, deflate, br',
            'Accept-Language': 'zh-CN,zh;q=0.9', 'Connection': 'keep-alive', 'Host': 'codeload.github.com',
            'Upgrade-Insecure-Requests': '1',
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0'
                          '.3770.100 Safari/537.36'},
                                         proxies={'https': 'socks5://localhost:1080'})
    github_downloader.download()
原文地址:https://www.cnblogs.com/Frank99/p/11407237.html