pika的阻塞式使用

[root@cloudplatform ELK]# cat startIncHouTai.py

import os

# 杀掉内存中的进程 
cmd='pgrep -f  PutDataToKafkaInc.py  | xargs kill -9 '
os.system(cmd)

# 后台启动进程
cmd='nohup /usr/bin/python3 -u /usr/local/software/ELK/PutDataToKafkaInc.py >nohup.log 2>&1 &'
os.system(cmd)

print('增量脚本启动成功')

 增量.py

import os
import sys

# pip install kafka-python
sys.path.append("/usr/local/software/ELK")
from Util.DateEncoder import *
from kafka import KafkaProducer
from Util.TextUtil import *
from Util.MySQLHelper import *
from Util.GetAreaPartionIdUtil import *
from Util.RabbitMqUtil import *

# 区域码
AreaCode = mysql_AreaCode
MemoryDict = []


# 处理某张表中ID的数据变更
def doAction(sql):
    while True:
        try:
            dt = db.query(sql)
            if len(dt) > 0:
                # 将字段大写转为小写
                for row in dt:
                    new_dics = {}
                    for k, v in row.items():
                        new_dics[k.lower()] = v
                jstr = json.dumps(new_dics, cls=DateEncoder)
                logInfo('准备上报kafka,数据:'+jstr)
                # 声明Kafka生产者
                producer = KafkaProducer(bootstrap_servers=kafka_servers)
                producer.send(topic=topicName, partition=partitionId, value=jstr.encode('utf-8'))
                # 提交一下
                producer.flush()
                # 关闭kafkaProducer
                producer.close()
                logInfo('成功完成上报工作!')
                break
        except Exception as err:
            logInfo('是不是没有连接上kafka??将休息3秒...'+str(err))
            time.sleep(3)


# 通过表名,找到对应的配置文件,读取这个表的PK是什么
def GetPk(table):
    file = os.getcwd().replace('\', '/') + '/Sql/' + table + '.json'
    if os.path.exists(file):
        jsonStr = ReadContent(file)
        obj = json.loads(jsonStr)
        # 主键
        pk = obj['pk']
        return pk
    else:
        logInfo('文件:' + file + "不存在,程序无法继续!")
        sys.exit()


# 通过表名和PK的真实值,拼接出SQL语句
def GetSql(table, id):
    file = os.getcwd().replace('\', '/') + '/Sql/' + table + '.json'
    if os.path.exists(file):
        jsonStr = ReadContent(file)
        obj = json.loads(jsonStr)
        # 主键
        pk = obj['pk']
        sql = str(obj['sql']).replace('>', '=')
        sql = sql.replace("#area_code#", AreaCode).replace("order by t1.#pk#", "").replace("order by t2.#pk#",
                                                                                           "").replace(
            "order by t3.#pk#", "").replace("order by t4.#pk#", "").replace("order by #pk#", "").replace("#pk#",
                                                                                                         pk).replace(
            "#id#", str(id)).replace("#limit#", "")
        return sql
    else:
        logInfo('文件:' + file + "不存在,程序无法继续!")
        sys.exit()


# 解析rabbitmq中的json数据,知道当前变化的是哪个ID
def RabbitMqId(dataJson, pk):
    if dataJson['event'] == 'insert':
        return int(dataJson['columns'][pk]['value'])
    elif dataJson['event'] == 'update':
        return int(dataJson['before']['columns'][pk]['value'])
    else:
        logInfo('不是insert也不是update,这是不行的,程序无法执行!')
        return 0


# 黄海定义的输出信息的办法,带当前时间
def logInfo(msg):
    i = datetime.datetime.now()
    print(" %s            %s" % (i, msg))


if __name__ == '__main__':
    logInfo('开始获取主机对应的PartitionId...')
    partitionId = GetAreaPartitionId()
    logInfo('成功获取主机的对应PartitionId=' + str(partitionId))
    # 队列名
    queue_Name = 'kafka_queue'
    # 交换机名
    switch_Name = 'elk_switch'
    # 声明mysql数据库
    db = MySQLHelper()

    # 统一的topic名称
    topicName = 'dsideal_db'

    while True:
        try:
            # 准备连接到Rabbitmq
            logInfo("正在连接到RabbitMQ...")
            credentials = pika.PlainCredentials(RabbitMq_User, RabbitMq_Password)
            connection = pika.BlockingConnection(pika.ConnectionParameters(RabbitMq_IP, int(RabbitMq_Port), '/', credentials))
            channel = connection.channel()
            logInfo("成功连接到RabbitMQ!正在阻塞等待消息..")
            for method_frame, properties, body in channel.consume(queue_Name):
                # 阻塞式获取消息
                # 将body转为json
                logInfo("发现RabbiMQ中的消息!正在处理...")
                dataJson = json.loads(body.decode(encoding='utf-8').lower())
                logInfo('dataJson='+str(dataJson))

                tableName = str(dataJson['table'])
                event = str(dataJson['event'])
                # 1、获取pk是什么字段名称
                pk = GetPk(tableName)
                logInfo("成功获取PK="+pk)

                # 2、解析rabbitmq中的json数据,知道这个主键的对应真实值是什么?
                id = RabbitMqId(dataJson, pk)
                logInfo("成功获取ID="+str(id))

                # 3、组装查询的sql
                sql = GetSql(tableName, id)
                # 4、处理这个表中ID的数据变更
                logInfo("成功获取SQL="+sql)
                # 成功处理
                doAction(sql)
                logInfo('成功处理' + tableName + '' + event + "事件一条!id=" + str(id))

                # 确认收到这条消息
                channel.basic_ack(method_frame.delivery_tag)
                logInfo("成功设置ACK标识!")
                break
            requeued_messages = channel.cancel()
            connection.close()
        except Exception as err:
            logInfo("发生了异常:"+str(err)+",将休息10秒...")
            time.sleep(10000)
原文地址:https://www.cnblogs.com/littlehb/p/9188847.html