Python之mqtt接收异步消息

    由于系统上传图片有时候C端没有接收到消息,需要做一个同步功能。C端加载图片的时候不用请求远程图片库而是加载本地的图片,相当于做了个缓存,大大减少了C端加载图片的时间,提高了用户体验。

一、功能作用

   mqtt是rabbitmq服务器的一个插件,可以用它发布与订阅主题。

   这个同步功能,其实就是用了rabbitmq的应用场景之一异步处理。

二、流程步骤

  1、设置mqtt唯一ID,因客户端id不能重复,所以选当前时间为唯一ID

client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
client = mqtt.Client(client_id)  # ClientId不能重复,所以使用当前时间

  2、设置rabbitmq服务器的用户名和密码

client.username_pw_set("dev", "YTc4Mj")

  3、订阅主题

client.subscribe("sync")

  4、接收消息

recvmsg = msg.payload.decode("utf-8")

  5、处理消息

三、demo源码

1、mqtt连接rabbitmq服务器

import paho.mqtt.client as mqtt
import time
import socketclient
import logger
import demjson
import common
import syncfile

log = logger.Logger("info")

HOST = "127.0.0.1"
PORT = 1883

def client_loop():
    client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
    client = mqtt.Client(client_id)  # ClientId不能重复,所以使用当前时间
    client.username_pw_set("dev", "YTc4Mj")  # 必须设置,否则会返回「Connected with result code 4」
    client.on_connect = on_connect
    client.on_message = on_message
    log.info('开始连接mqtt' + HOST + ':' + str(PORT))
    client.connect(HOST, PORT, 60)
    log.info('mqtt连接完成' + HOST + ':' + str(PORT))
    client.loop_forever()

def on_connect(client, userdata, flags, rc):
    log.info("Connected with result code " + str(rc))
    client.subscribe("projector-remote-control")
    log.info("订阅消息 projector-remote-control")
    client.subscribe("holo-file-sync")
    log.info("订阅消息 holo-file-sync")
    client.subscribe("sync")
    log.info("订阅消息 sync")
    client.subscribe("sync-single-work")
    log.info("订阅消息 sync-single-work")

def on_message(client, userdata, msg):
    recvmsg = msg.payload.decode("utf-8")
    log.info("收到消息" + recvmsg + ",开始执行命令")
    print(msg.topic + "" + recvmsg)

    if(msg.topic=='sync-single-work'):
      common.insert_sql(recvmsg)
    elif(msg.topic=='sync'):
      common.insert_sql(recvmsg)
    elif(msg.topic=='holo-file-sync'):
      common.insert_sql(recvmsg)
    elif(msg.topic=='projector-remote-control'):
      text = demjson.decode(msg)
      command = text['command']
      projectors = text['projectors']
      #socketclient.sendSocket(recvmsg)
      socketclient.handlerMsg(command,projectors)

if __name__ == '__main__':
    syncfile.scheduletask()
    syncfile.scheduleuncompletedtask()
    client_loop() 

2、消息处理

import logger
from threading import Timer
import os
import requests
from io import BytesIO
from PIL import Image 
import common

log = logger.Logger("info")

def scheduletask():
    t = Timer(10,scheduletask)
    t.start()
    #print('定时任务已开启,等待接收参数中...')
    list = common.select_sql(3)
    if(list == None):
        return
    getimagesize(list[0],list[2])
def scheduleuncompletedtask():
    t = Timer(10,scheduleuncompletedtask)
    t.start()
    #print('定时任务已开启,等待接收参数中...')
    list = common.select_sql(2)
    if(list == None):
        return
    getimagesize(list[0],list[2])

def getimagesize(id,url):
    savepath = common.readJson()['holoImageUrl']
    try:
       response = requests.get(url)
    except:
       common.delete_sql(id)
       return
    tmpIm = BytesIO(response.content)
    im = Image.open(tmpIm)
    imgpath = url[url.index('.com/')+5:]
    dirpath = imgpath[:imgpath.rindex('/')].replace('/','\')
    filename= imgpath[imgpath.rfind('/')+1:]
    targetpath = savepath+'\'+dirpath+'\'
    filename1 = filename[:filename.find('!')]
    if (os.path.exists(targetpath)):pass
    else:os.makedirs(targetpath)
    im.save(targetpath+filename1)
    if(os.path.exists(targetpath + filename)):
        os.remove(targetpath + filename)
    os.rename(targetpath + filename1, targetpath + filename)
    #localfilesize = os.path.getsize(targetpath+filename)
    #remotefilesize = dict(response.headers).get('Content-Length', 0)
    # if(localfilesize == remotefilesize):  
    if(os.path.exists(targetpath+filename) and os.path.getsize(targetpath+filename) != 0): 
        common.delete_sql(id)
        log.info("增量图片下载完成...")
    else:
        common.update_sql(id)
        log.info("增量图片部分下载 ... ")

if __name__ == '__main__':
    scheduletask()

3、公共模块

import json
import sqlite3
import re
from threading import Timer
import logger

log = logger.Logger("info")

def readJson():
    with open('config.json', 'rt') as jsonFile:
        val = jsonFile.read()
        Config = json.loads(val)
        return Config

def create_table():
    conn = sqlite3.connect('imagesnyc.xs')
    curs=conn.cursor() # 获取游标
    try:
        create_tb_cmd='''
        CREATE TABLE IF NOT EXISTS image_sync(id INTEGER PRIMARY KEY AUTOINCREMENT,imgsize INTEGER,url TEXT,status INTEGER)
        '''
        #主要就是上面的语句
        curs.execute(create_tb_cmd)
        conn.commit()
    except:
        log.info('Create table failed')
        return False
    finally:
        return conn

def insert_sql(url):
    conn = create_table()
    curs=conn.cursor() # 获取游标
    imgurls = re.split(',',url)
    for imageurl in imgurls:
       curs.execute("INSERT INTO image_sync(imgsize,url,status) VALUES('{}','{}','{}');".format(0,imageurl,3))#添加记录
    conn.commit()
    log.info("插入完成")
             

def select_sql(status):
    conn = create_table()
    curs = conn.cursor() # 获取游标
    curs.execute("select * from image_sync where status ='%s' order by id ASC LIMIT 1"%status)#查询记录
    list = curs.fetchone()
    conn.commit()
    return list

def update_sql(id):
    conn = create_table()
    curs=conn.cursor() # 获取游标
    try:
      curs.execute("update image_sync set status = 2 where id = '%s'"%id)#更新记录
      conn.commit()
    except:
        log.info('根据主键id更新失败')
    finally:
        print('continue')

def delete_sql(id):
    conn = create_table()
    curs=conn.cursor() # 获取游标
    curs.execute("delete from image_sync where id = '%s'"%id)#删除记录
    conn.commit()
    #log.info('成功删除已下载完成的记录')

 四、技术难点

    1、同步一个作品,接收到的作品字符串中可能包含多个图片地址,则需要分割字符串然后存储到内存数据库或者sqllite免安装数据库,如果同时下载多张图片会造成线程阻塞,所以用了python的定时器功能,设置好图片预计需要下载的时间。

    2、由于接受到的消息字符串地址是预览图片格式,最后有!的,Image不能保存该图片格式的地址,所以先截取最后面一个!,然后保存,保存成功后再改变为预览地址,C端便可成功加载。

原文地址:https://www.cnblogs.com/772933011qq/p/11857998.html