使用metaweblog API实现通用博客发布 之 API测试

使用metaweblog API实现通用博客发布 之 API测试

使用博客比较少,一则是文笔有限,怕写出的东西狗屁不通,有碍观瞻, 二则是懒,很讨厌要登录到网站上写东西,也没有那么多时间(借口)。个人最喜欢用于记录的工具是Zim https://zim-wiki.org/ ,记录东西超级方便,可惜只支持PC版本, 记录的东西可以到处为MarkDown 格式,非常方便(你现在看到的这篇就是用Zim写的)。

无意间看到Vs Code上有博客园的插件,作为程序员,顺手google/百度了一下,原来通用博客都支持使用metaweblog API来访问,还支持直接发布markdown 格式,简直不要太好。 找了找2年前注册的博客源账号,用来测试一下。

发挥典型中国程序员的拿来主义精神,经过goolgle/百度一番搜索,参考以下文档进行API测试,在此表示感谢!!
https://www.cnblogs.com/caipeiyu/p/5475761.html
https://github.com/1024th/cnblogs_githook

1 在哪里找API说明

在博客设置最的最末端,有MetaWeblog 的访问地址链接
点击进入页面,有metaweblog API 的详细说明
具体内容不赘述了。

2 测试API

使用python3 进行API测试,直接上代码:

	#encoding = utf-8
	#!/bin/sh python3
	
	import xmlrpc.client as xmlrpclib
	import json
	
	'''
	配置字典:
	type | description(example)
	str  | metaWeblog url, 博客设置中有('https://rpc.cnblogs.com/metaweblog/1024th')
	str  | appkey, Blog地址名('1024th')
	str  | blogid, 这个无需手动输入,通过getUsersBlogs得到
	str  | usr, 登录用户名
	str  | passwd, 登录密码
	str  | rootpath, 博文存放根路径(添加git管理)
	'''
	
	'''
	POST:
	dateTime	dateCreated - Required when posting.
	string	description - Required when posting.
	string	title - Required when posting.
	array of string	categories (optional)
	struct Enclosure	enclosure (optional)
	string	link (optional)
	string	permalink (optional)
	any	postid (optional)
	struct Source	source (optional)
	string	userid (optional)
	any	mt_allow_comments (optional)
	any	mt_allow_pings (optional)
	any	mt_convert_breaks (optional)
	string	mt_text_more (optional)
	string	mt_excerpt (optional)
	string	mt_keywords (optional)
	string	wp_slug (optional)
	'''
	
	class MetablogClient():
	  def __init__(self, configpath):
	    '''
	    @configpath: 指定配置文件路径
	    '''
	    self._configpath = configpath
	    self._config = None
	    self._server = None
	    self._mwb = None
	  
	  def createConfig(self):
	    '''
	    创建配置
	    '''
	    while True:
	        cfg = {}
	        for item in [("url", "metaWeblog url, 博客设置中有
	            ('https://rpc.cnblogs.com/metaweblog/blogaddress')"),
	                     ("appkey", "Blog地址名('blogaddress')"),
	                     ("usr", "登录用户名"),
	                     ("passwd", "登录密码"),
	                     ("rootpath", "博文本地存储根路径")]:
	            cfg[item[0]] = input("输入"+item[1])
	        try:
	            server = xmlrpclib.ServerProxy(cfg["url"])
	            userInfo = server.blogger.getUsersBlogs(
	                cfg["appkey"], cfg["usr"], cfg["passwd"])
	            print(userInfo[0])
	            # {'blogid': 'xxx', 'url': 'xxx', 'blogName': 'xxx'}
	            cfg["blogid"] = userInfo[0]["blogid"]
	            break
	        except:
	            print("发生错误!")
	    with open(self._configpath, "w", encoding="utf-8") as f:
	        json.dump(cfg, f, indent=4, ensure_ascii=False)
	    
	  def existConfig(self):
	    '''
	    返回配置是否存在
	    '''
	    try:
	        with open(self._configpath, "r", encoding="utf-8") as f:
	            try:
	                cfg = json.load(f)
	                if cfg == {}:
	                    return False
	                else:
	                    return True
	            except json.decoder.JSONDecodeError:  # 文件为空
	                return False
	    except:
	        with open(self._configpath, "w", encoding="utf-8") as f:
	            json.dump({}, f)
	            return False
	
	  def readConfig(self):
	    '''
	    读取配置
	    '''
	    if not self.existConfig():
	      self.createConfig()
	
	    with open(self._configpath, "r", encoding="utf-8") as f:
	      self._config = json.load(f)
	      self._server = xmlrpclib.ServerProxy(self._config["url"])
	      self._mwb = self._server.metaWeblog
	
	  def getUsersBlogs(self):
	    '''
	    获取博客信息
	    @return: {
	      string  blogid
	      string	url
	      string	blogName
	    }
	    '''
	    userInfo = self._server.blogger.getUsersBlogs(self._config["appkey"], self._config["usr"], self._config["passwd"])
	    return userInfo
	
	  def getRecentPosts(self, num):
	    '''
	    读取最近的博文信息
	    '''
	    return self._mwb.getRecentPosts(self._config["blogid"], self._config["usr"], self._config["passwd"], num)
	
	  def newPost(self, post, publish):
	    '''
	    发布新博文
	    @post: 发布内容
	    @publish: 是否公开
	    '''
	    while True:
	      try:
	          postid = self._mwb.newPost(self._config['blogid'], self._config['usr'], self._config['passwd'], post, publish)
	          break
	      except:
	          time.sleep(5)
	    return postid
	
	  def editPost(self, postid, post, publish):
	    '''
	    更新已存在的博文
	    @postid: 已存在博文ID
	    @post: 发布内容
	    @publish: 是否公开发布
	    '''
	    self._mwb.editPost(postid, self._config['usr'], self._config['passwd'], post, publish)
	
	  def deletePost(self, postid, publish):
	    '''
	    删除博文
	    '''
	    self._mwb.deletePost(self._config['appkey'], postid, self._config['usr'], self._config['passwd'], post, publish)
	
	  def getCategories(self):
	    '''
	    获取博文分类
	    '''
	    return self._mwb.getCategories(self._config['blogid'], self._config['usr'], self._config['passwd'])
	
	  def getPost(self, postid):
	    '''
	    读取博文信息
	    @postid: 博文ID
	    @return: POST
	    '''
	    return self._mwb.getPost(postid, self._config['usr'], self._config['passwd'])
	
	  def newMediaObject(self, file):
	    '''
	    资源文件(图片,音频,视频...)上传
	    @file: {
	      base64	bits
	      string	name
	      string	type
	    }
	    @return: URL
	    '''
	    return self._mwb.newMediaObject(self._config['blogid'], self._config['usr'], self._config['passwd'], file)
	  
	  def newCategory(self, categoray):
	    '''
	    新建分类
	    @categoray: {
	      string	name
	      string	slug (optional)
	      integer	parent_id
	      string	description (optional)
	    }
	    @return : categorayid
	    '''
	    return self._server.wp.newCategory(self._config['blogid'], self._config['usr'], self._config['passwd'], categoray)
	```

#以上是对API的简单封装,万事具备,开始测试

### 2.1 获取分类
```python
	import core.metablogclient as blogclient
	
	client = blogclient.MetablogClient('blog_config.json')
	client.readConfig()
	catLst = client.getCategories()
	print(catLst)
	[{'description': '[发布至博客园首页]', 'htmlUrl': '', 'rssUrl': '', 'title': '[发布至博客园首页]', 'categoryid': '0'}, 
	{'description': '[Markdown]', 'htmlUrl': '', 'rssUrl': '', 'title': '[Markdown]', 'categoryid': '-5'}...]

获取了所有的分类信息,其中我在网站上自建了一个随笔分类,也可以获取到

2.2 新建分类

	import core.metablogclient as blogclient
	
	client = blogclient.MetablogClient('blog_config.json')
	client.readConfig()
	catid = client.newCategory({
	"name": "[随笔分类]测试分类",
	"slug": "",
	"parent_id": 0,
	"description": "测试建立一个随笔子分类"
	})
	print("新建分类:", catid)
	新建分类: 1536823

但是在博客园网站上无法看到这个分类,使用获取分类再次测试,也无法获取到该分类,使用该分类发布博客,也是无
效的,所以我想__根据年月自动分类__的想法就泡汤啦

2.3 拉取现有博文

	import core.metablogclient as blogclient
	
	client = blogclient.MetablogClient('blog_config.json')
	client.readConfig()
	posts = client.getRecentPosts(9999)
	print(posts)
	[{'dateCreated': <DateTime '20190829T11:21:00' at 0x2a80990>, 'description': '<p>测试</p>', 'title': '测试', 'enclosure': {'length': 0}, 
	'link': 'https://www.cnblogs.com/robert-9/p/11428668.html', 'permalink': 'https://www.cnblogs.com/robert-9/p/11428668.html', 
	'postid': '11428668', 'source': {}, 'userid': '-2'}]

正确拉取现有博文,通过API文档,发现无法获取博文是否处于发布状态,这是一个遗憾

2.4 发布博文

	import core.metablogclient as blogclient
	import datetime
	
	client = blogclient.MetablogClient('blog_config.json')
	client.readConfig()
	postid = client.newPost({
	  "time": datetime.datetime.now(),
	  "title": "metaweblog API随笔发布",
	  "description": "##metaweblog API随笔发布
测试
",
	  "categories": ["[Markdown]"],
	  "mt_keywords": "metaweblog;python"
	}, False)
	print('发布随笔:', postid)

测试发布成功,并能在网站上看到该随笔, 如果想发布为文章,日志或新闻,加入必要的分类即可。

2.5 上传图片

	import datetime
	import base64
	import core.metablogclient as blogclient
	
	client = blogclient.MetablogClient('blog_config.json')
	client.readConfig()
	with open('abc.png', 'rb') as f:
	  bs64_str = base64.b64encode(f.read())
	  url = client.newMediaObject({
	      "bits": bs64_str,
	      "name": "abc.png",
	      "type": "image/png"
	  })
	  print(url)
	{'url': 'https://img2018.cnblogs.com/blog/1211514/201908/1211514-20190829114435333-814710358.png'}

测试成功, 这样就可以在上传Markdown 格式之前,自动将本地的图片上传到服务器上了。

出处:https://www.cnblogs.com/robert-9/p/11428982.html

======================================================

上面的github中的  cnblogs_githook  我也一起贴到这里,方便查看:

cnblogs_githook

基于rpcxml协议,利用githook,在commit时自动发布本地markdown文章到博客园。

使用说明

本脚本用python3编写,请配置好运行环境。

  1. 第一次使用前先把./hooks/commit-msg文件复制到./.git/hooks/中。
  2. 运行cnblogs.py
    1. 程序有一个可选参数。
      • config 设置博客信息。
      • download 下载文章。
    2. 第一次运行cnblogs.py时默认选择config参数,设置博客信息。
    3. 此后每次运行程序时,./articles/*.md将被上传到博客并发布;./unpublished/*.md将被上传到博客,但不发布(并标注分类“unpublished”)。文章均以文件名为题,且不发布的文章。如果博客中已经存在同名文章,将替换其内容!
  3. 编辑./articles/./unpublished/中markdown文件,在本地git仓库commit更改,自动运行./cnblogs.py(需要使用终端命令才能查看返回信息)。

注意事项/已知Bug

  1. 本程序不保证稳定性,为防止数据丢失,建议使用前预先备份博客。
  2. clone仓库不能下载.git文件夹,因此需要手动复制调用cnblogs.py的脚本./hooks/commit-msg.git
  3. 由于metaWeBlog本身没有提供查看文章是否已发布的接口,所有使用“unpublished”分类标注未发布文章。也就是说,当执行python cnblogs.py download命令时,博客中没有发布也没有“unpublished”分类的文章也会存到./articles/,下次运行时将被自动发布。
  4. 由于接口不允许将已经发布的文章设置为未发布,所以若./unpublished/内的文章在博客内有同名文章时不会被上传。

下面是  cnblog.py  文件

#! /usr/bin/env python
# coding=utf-8

# 使用python xmlrpc 发送内容到博客园
# http://rpc.cnblogs.com/metaweblog/WeyneChen 从链接可以看到支持的metaweblog API
import xmlrpc.client as xmlrpclib
import glob
import os
import sys
import json
import time
import datetime

# 发布文章路径(article path)
art_path = "./articles/"
# 不发布文章路径(unpublished article path)
unp_path = "./unpublished/"
# 博客配置路径(config path)
cfg_path = "blog_config.json"
# 备份路径(backup path)
bak_path = "./backup/"
# 获取文章篇数
recentnum = 99999

# 创建路径
for path in [art_path, unp_path, bak_path]:
    if not os.path.exists(path):
        os.makedirs(path)

# -----配置读写操作-----
'''
配置字典:
type | description(example)
str  | metaWeblog url, 博客设置中有('https://rpc.cnblogs.com/metaweblog/1024th')
str  | appkey, Blog地址名('1024th')
str  | blogid, 这个无需手动输入,通过getUsersBlogs得到
str  | usr, 登录用户名
str  | passwd, 登录密码
'''


def exist_cfg():
    '''
    返回配置是否存在
    '''
    try:
        with open(cfg_path, "r", encoding="utf-8") as f:
            try:
                cfg = json.load(f)
                if cfg == {}:
                    return False
                else:
                    return True
            except json.decoder.JSONDecodeError:  # 文件为空
                return False
    except:
        with open(cfg_path, "w", encoding="utf-8") as f:
            json.dump({}, f)
            return False


def create_cfg():
    '''
    创建配置
    '''
    while True:
        cfg = {}
        for item in [("url", "metaWeblog url, 博客设置中有
            ('https://rpc.cnblogs.com/metaweblog/blogaddress')"),
                     ("appkey", "Blog地址名('blogaddress')"),
                     ("usr", "登录用户名"),
                     ("passwd", "登录密码")]:
            cfg[item[0]] = input("输入"+item[1])
        try:
            server = xmlrpclib.ServerProxy(cfg["url"])
            userInfo = server.blogger.getUsersBlogs(
                cfg["appkey"], cfg["usr"], cfg["passwd"])
            print(userInfo[0])
            # {'blogid': 'xxx', 'url': 'xxx', 'blogName': 'xxx'}
            cfg["blogid"] = userInfo[0]["blogid"]
            break
        except:
            print("发生错误!")
    with open(cfg_path, "w", encoding="utf-8") as f:
        json.dump(cfg, f, indent=4, ensure_ascii=False)


url = appkey = blogid = usr = passwd = ""
server = None
mwb = None
title2id = {}


def get_cfg():
    global url, appkey, blogid, usr, passwd, server, mwb, title2id
    with open(cfg_path, "r", encoding="utf-8") as f:
        cfg = json.load(f)
        url = cfg["url"]
        appkey = cfg["appkey"]
        blogid = cfg["blogid"]
        usr = cfg["usr"]
        passwd = cfg["passwd"]
        server = xmlrpclib.ServerProxy(cfg["url"])
        mwb = server.metaWeblog
        # title2id[title]=postid  储存博客中文章标题对应的postid
        recentPost = mwb.getRecentPosts(
            cfg["blogid"], cfg["usr"], cfg["passwd"], recentnum)
        for post in recentPost:
            # 1.把datetime转成字符串
            dt = post["dateCreated"]
            # post["dateCreated"] = dt.strftime("%Y%m%dT%H:%M:%S")
            post["dateCreated"] = dt.__str__()
            # 2.把字符串转成datetime
            # datetime.datetime.strptime(st, "%Y%m%dT%H:%M:%S")
            # datetime.datetime.fromisoformat(str)
            title2id[post["title"]] = post["postid"]
        # 格式化成20160320-114539形式
        filename = time.strftime("%Y%m%d-%H%M%S", time.localtime())
        with open(bak_path+filename+".json", "w", encoding="utf-8") as f:
            json.dump(recentPost, f, indent=4)


# server = xmlrpclib.ServerProxy(url)
# userInfo = server.blogger.getUsersBlogs(appkey, usr, passwd)
# recentPost = mwb.getRecentPosts(blogid, usr, passwd, 9)
def newPost(blogid, usr, passwd, post, publish):
    while True:
        try:
            postid = mwb.newPost(blogid, usr, passwd, post, publish)
            break
        except:
            time.sleep(5)
    return postid


def post_art(path, publish=True):
    title = os.path.basename(path)  # 获取文件名做博客文章标题
    [title, fename] = os.path.splitext(title)  # 去除扩展名
    with open(mdfile, "r", encoding="utf-8") as f:
        post = dict(description=f.read(), title=title)
        post["categories"] = ["[Markdown]"]
        # 不发布
        if not publish:
            # 对于已经发布的文章,直接修改为未发布会报错:
            # xmlrpc.client.Fault: <'published post can not be saved as draft'>
            # 所以先删除这个文章
            # if title in title2id.keys():
            #     server.blogger.deletePost(
            #         appkey, title2id[title], usr, passwd, True)
            if title not in title2id.keys():
                post["categories"].append('[随笔分类]unpublished')  # 标记未发布
                # post["postid"] = title2id[title]
                postid = newPost(blogid, usr, passwd, post, publish)
                print("New:[title=%s][postid=%s][publish=%r]" %
                      (title, postid, publish))
        # 发布
        else:
            if title in title2id.keys():  # 博客里已经存在这篇文章
                mwb.editPost(title2id[title], usr, passwd, post, publish)
                print("Update:[title=%s][postid=%s][publish=%r]" %
                      (title, title2id[title], publish))
            else:  # 博客里还不存在这篇文章
                postid = newPost(blogid, usr, passwd, post, publish)
                print("New:[title=%s][postid=%s][publish=%r]" %
                      (title, postid, publish))


def download_art():
    recentPost = mwb.getRecentPosts(blogid, usr, passwd, recentnum)
    for post in recentPost:
        if "categories" in post.keys():
            if '[随笔分类]unpublished' in post["categories"]:
                with open(unp_path+post["title"]+".md",
                          "w", encoding="utf-8") as f:
                    f.write(post["description"])
        else:
            with open(art_path+post["title"]+".md",
                      "w", encoding="utf-8") as f:
                f.write(post["description"])


if __name__ == "__main__":
    if not exist_cfg():
        create_cfg()
    get_cfg()
    if len(sys.argv) > 1:
        if sys.argv[1] == "download":
            download_art()
        elif sys.argv[1] == "config":
            create_cfg()
            get_cfg()
    for mdfile in glob.glob(art_path+"*.md"):
        post_art(mdfile, True)
    for mdfile in glob.glob(unp_path+"*.md"):
        post_art(mdfile, False)

刚刚还在网上看到一个  metaWeblog.py  文件,具体如下:

#!/usr/local/bin/python
#filename:metaWeblog
'''
    MetaWeblog API wrapper for python
    Copyright 2013. All right reserved to sooop.
'''



import xmlrpclib

class Post:
    def __init__(self):
        self.keys = ["username", "permaLink", "guid", 
        "description", "pubDate", "author", 
        "title", "dateCreated", "tags", 
        "link", "postid", "categories"]

        self.categories = []
        self.postid = ""
        self.link = ""
        self.tags = []
        self.dateCreated = ""
        self.title = ""
        self.author = ""
        self.pubDate = ""
        self.description = ""
        self.guid = ""
        self.permaLink = ""
        self.username = ""

        self.publish = True

    def struct(self):
        struct = {}
        for k in self.keys:
            struct[k] = getattr(self, k)
        return struct

    def addCategory(self, cats):
        if type(cats) == list:
            self.categories.extend(cats)
        else:
            self.categories.append(cats)

    def addTags(self, *args, **kwargs):
        if type(self.tags) == str and self.tags.strip() == "":
            self.tags = []
        for i in args:
            print i
            self.tags.append(i)

    @staticmethod
    def postFromStructs(structs):
        if type(structs) == list:
            result = []
            for elem in structs:
                a = Post()
                for key in a.keys:
                    setattr(a, key, elem[key])
                result.append(a)
            if len(result) > 1:
                return result
            else:
                return result[0]
        else:
            result = Post()
            for key in structs:
                setattr(result, key, structs[key])
            return result





class Weblog:
    def __init__(self, service_url, blog_id=None, user_id=None, password=None):
        self.blog_id = blog_id
        self.user_id = blog_id
        self.password = password
        self.server_url = service_url
        self.server = xmlrpclib.ServerProxy(self.server_url)
        self.categories = []
        self.lastPostID = None;

        self.getCategories()


    def getCategories(self,refresh=False):
        cats = self.server.metaWeblog.getCategories(self.blog_id,self.user_id,self.password)
        if refresh or self.categories == []:
            for elem in cats:
                self.categories.append(elem['title'])
        return cats

    def getPost(self,post_id):
        result = self.server.metaWeblog.getPost(post_id, self.user_id, self.password)
        if result:
            pass
        return result

    def getRecentPosts(self, count=1):
        result = self.server.metaWeblog.getRecentPosts(self.blog_id, self.user_id, self.password, count)
        if result:
            self.lastPostID = result[len(result)-1]['postid']
        return result

    def newPost(self, aPost):
        newPostID = self.server.metaWeblog.newPost(self.blog_id, self.user_id, self.password, aPost.struct(), aPost.publish)
        self.lastPostID = newPostID
        return newPostID

    def editPost(self, aPost, post_id=None):
        if post_id == None:
            post_id = aPost.postid
        result = self.server.metaWeblog.editPost(post_id, self.user_id, self.password, aPost.struct(), aPost.publish)
        if result:
            self.lastPostID = post_id
        return result

出处:https://gist.github.com/sooop/4986757

原文地址:https://www.cnblogs.com/mq0036/p/12782116.html