python redis模块消息发布和订阅

一、需求:

公司CICD平台前后端是分离的,后端用的是python,前端这块主要用的vue;中间有一个环节是将后端应用发布的结果信息通知给前端处理后展示在前端页面。早期的做法是以redis为媒介,将发布结果以广播的形式推送给redis,然后前端监听redis上对应频道的消息,接收到消息处理后发送给前端。
git地址:https://github.com/f1017746640/redisapi.git

二、代码逻辑部分:

2.1 代码结构:

2.2 config.ini为redis连接信息:

[DEFAULT]
redis_host=10.0.0.201
redis_port=30213
redis_db=6
redis_password=None

[LOCAL]

[DEV]
redis_host=10.0.0.201
redis_port=30213
redis_db=6
redis_password=
redis_channel=--deploy--

[QA]

[PROD]

2.3 rds.py 基于redis模块包装的消息发布和订阅的类:

#!/usr/bin/env python
# encoding: utf-8
"""
   > FileName: rds.py
   > Author: FZH
   > Mail: fengzhihai@ilarge.cn
   > CreatedTime: 2020-03-22 16:49
"""
import redis

class RedisHelper(object):
    """
    host: redis ip
    port: redis port
    channel: 发送接送消息的频道
    """
    def __init__(self,
                 host,
                 port,
                 db,
                 channel,
                 password=None):
        self.host = host
        self.port = port
        self.db = db
        self.password = password
        self.channel = channel
        self.__conn = redis.Redis(self.host,
                                  self.port,
                                  self.db,
                                  self.password,
                                  decode_responses=True)

    def ping(self):
        try:
            self.__conn.ping()
            return True
        except Exception as e:
            print(e)
            return False

    # 发送消息
    def public(self,
               msg):
        if self.ping():
            self.__conn.publish(self.channel,
                                msg)
            return True
        else:
            return False

    # 订阅
    def subscribe(self):
        if self.ping():
            # 打开收音机
            pub = self.__conn.pubsub()
            # 调频道
            pub.subscribe(self.channel)
            # 准备接收
            pub.parse_response()
            return pub
        else:
            return False

2.4 public_info.py 发送消息:

#!/usr/bin/env python
# encoding: utf-8
"""
   > FileName: public_info.py
   > Author: FZH
   > Mail: fengzhihai@ilarge.cn
   > CreatedTime: 2020-03-22 17:06
"""
import json
import configparser

from rds import RedisHelper

config = configparser.ConfigParser()
config_file = "config.ini"
config.read(config_file)
sec = config["DEV"]

redis_host = sec["redis_host"]
redis_port = sec["redis_port"]
redis_db = sec["redis_db"]
redis_channel = sec["redis_channel"]
redis_password = sec["redis_password"]
if len(redis_password) == 0:
    redis_password = None

def pub_info(app_name,
             app_version,
             app_owner,
             app_desc,
             deploy_res):
    info = {
        "app_name": app_name,
        "app_version": app_version,
        "app_owner": app_owner,
        "app_desc": app_desc,
        "deploy_res": deploy_res
    }
    obj = RedisHelper(host=redis_host,
                      port=redis_port,
                      db=redis_db,
                      channel=redis_channel,
                      password=redis_password)
    res = obj.public(msg=json.dumps(info))
    if res:
        print('public successful.')
    else:
        print('public fail.')

if __name__ == '__main__':
    app_name = 'inf-demo'
    app_version = '1.1'
    app_owner = 'fengzhihai'
    app_desc = 'inf group demo application'
    deploy_res = 'SUCCESS'
    pub_info(app_name=app_name,
             app_version=app_version,
             app_owner=app_owner,
             app_desc=app_desc,
             deploy_res=deploy_res)

2.5 subscribe_info.py 订阅消息:

#!/usr/bin/env python
# encoding: utf-8
"""
   > FileName: subscribe_info.py
   > Author: FZH
   > Mail: fengzhihai@ilarge.cn
   > CreatedTime: 2020-03-22 17:07
"""
import time
import configparser

from rds import RedisHelper

config = configparser.ConfigParser()
config_file = "config.ini"
config.read(config_file)
sec = config["DEV"]

redis_host = sec["redis_host"]
redis_port = sec["redis_port"]
redis_db = sec["redis_db"]
redis_channel = sec["redis_channel"]
redis_password = sec["redis_password"]
if len(redis_password) == 0:
    redis_password = None

def sub_info():
    obj = RedisHelper(host=redis_host,
                      port=redis_port,
                      db=redis_db,
                      channel=redis_channel,
                      password=redis_password)
    sub_obj = obj.subscribe()
    while True:
        if sub_obj:
            now_time = time.strftime('%Y-%m-%d %H:%M:%S',
                                     time.localtime(time.time()))
            msg = sub_obj.parse_response()
            print('receive:', now_time, msg)
        else:
            break

if __name__ == '__main__':
    sub_info()

三、测试:

3.1 开启循环 监听redis channel消息:

3.2 发送消息:

python public_info.py

修改app_name再次发送消息。

3.3 监听结果:

拿到发布结果后,我们就可以对结果进行分析再处理,在前端展示了。

原文地址:https://www.cnblogs.com/fengzhihai/p/12548462.html