mongo 增量同步之 MongoShake(3) kafka python处理kafka oplog mongoUtils

场景:  采用 MongoShake 同步数据到kafka, 然后用python 处理,  做etl , 或者其他操作:

# 同步 db_sea_user  ,  db_tom_user   to db_all_user    (mongodb) , 并且添加db_name(customer)

 代码: 

MongoOplogHandler.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# @Time    : 9/27/21 10:48 AM
# @Author  : Sea
# @File    : MongoOplogHandler.py
# @Software: IntelliJ IDEA
# #pip install kafka-python
# ******************************
from kafka.consumer.group import KafkaConsumer
from backports.configparser.helpers import str
import json

from MongoUtils import MongoUtils

topic = "mgtest1"
bootstrap_servers = "192.168.18.51:9092"
group_id = 'etl'
mongo_utils = MongoUtils()


def start_kafkaListener():
    consumer = KafkaConsumer(topic, bootstrap_servers=[bootstrap_servers],
                             group_id=group_id,
                             auto_offset_reset='earliest',
                             enable_auto_commit=False)
    print("=============+++ start consumer +++=========== ")
    for msg in consumer:
        # recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
        recv = "topic:%s  partition:%d    offset:%d" % (msg.topic, msg.partition, msg.offset)
        print(recv)
        # tb_value = msg.value.decode("utf-8").replace("true", "True").replace("false", "False").replace("null", "None")
        etl(msg.value)
        consumer.commit()


def etl(value):
    value_dict = json.loads(value)
    print(type(value_dict))
    # print(value_dict)
    if value_dict:
        # i:insert u:update   d:delete(only get _id)
        op = value_dict["op"]
        # db.table
        ns = value_dict["ns"]
        # ######filter some db or table which we need do #####
        db_tb = str(ns).split('.')
        db = db_tb[0]
        tb = db_tb[1]
        # if db == 'sea_user' or db == 'tom_user' or tb =='config':
        if '_user' in db:
            # etl tab_value
            tb_values = value_dict["o"]
            tb_values = json_json(tb_values)
            handler_result(op, ns, tb_values)


# swich case
def handler_result(op, ns, value):
    todo = {
        "i": insert,
        "u": update,
        "d": delete
    }
    method = todo.get(op, other)
    if method:
        method(ns, value)


def other(ns, value):
    print("ns:" + ns + " v:" + str(value))
    print("other")


def update(ns, value):
    print("ns:" + ns + " v:" + str(value))
    print("update")
    db_tb = str(ns).split('.')
    db = db_tb[0]
    tb = db_tb[1]
    # if db == 'sea_user' or db == 'tom_user':
    if '_user' in db:
        value['db'] = db
        mongo_utils.save_or_update_dict_by_id(tb, value, 'all_user')


def insert(ns, value):
    print("ns:" + ns + " v:" + str(value))
    print("insert")
    db_tb = str(ns).split('.')
    print("**********************************" + str(db_tb))
    db = db_tb[0]
    tb = db_tb[1]
    # if db == 'sea_user' or db == 'tom_user':
    if '_user' in db:
        value['db'] = db  # add  from
        mongo_utils.save_or_update_dict_by_id(tb, value, 'all_user')


def delete(ns, value):
    print("ns:" + ns + " v:" + str(value))
    print("delete")
    db_tb = str(ns).split('.')
    db = db_tb[0]
    tb = db_tb[1]
    # if db == 'sea_user' or db == 'tom_user':
    if '_user' in db:
        mongo_utils.delById(tb, value['_id'])


'''
k:节点name
v:节点value
cn:当前节点
'''


def setNode(k, v, cn):
    pass
    if type(v) == list:  # 无论是map还是list,都是list封装
        # get type
        if v:
            v_type = type(v[0])  # 子节点的type
            if v_type == dict:  # dict
                c_value = {}
                for v1 in v:
                    k2 = v1["Name"]
                    v2 = v1["Value"]
                    # v_value[v1["Name"]] = v1["Value"]
                    setNode(k2, v2, c_value)
                cn[k] = c_value
            if v_type == list:  # list
                c_value_dict = {}  # list_dict
                c_value_list = []  # list list
                for v2 in v:  # list 子节点可能为list,和 dict
                    if type(v2) == dict:
                        # v2 is dict
                        k2 = v2["Name"]
                        v2 = v2["Value"]
                        setNode(k2, v2, c_value_dict)
                    if type(v2) == list:
                        # # TODO if list in dict,here just add it
                        # c_value_list.append(v2)
                        pass
                        set_list_list(v2, c_value_list)
                if c_value_list:
                    cn[k] = c_value_list
                if c_value_dict:
                    cn[k] = c_value_dict
            if (not v_type == list) and (not v_type == dict):  # 子节点的节点为普通类型
                cn[k] = v
    else:  # 普通节点
        cn[k] = v


'''
nodelist: list 节点
cnode: 当前节点_list
'''


def set_list_list(nodelist, cnode):
    c_value_dict = {}  # list_dict
    c_value_list = []  # list list
    c_value_other = []  # list other
    for v2 in nodelist:  # list 子节点可能为list,和 dict
        if type(v2) == dict:  # list dict
            # v2 is dict
            k2 = v2["Name"]
            v2 = v2["Value"]
            setNode(k2, v2, c_value_dict)
        if type(v2) == list:  # list list
            set_list_list(v2, c_value_list)
        if (not type(v2) == list) and (not type(v2) == dict):
            c_value_other.append(v2)
    if c_value_list:
        cnode.append(c_value_list)
    if c_value_dict:
        cnode.append(c_value_dict)
    if (not c_value_dict) or (not c_value_list):
        cnode.append(c_value_other)


'''
dict to dict
'''


def json_json(tb_value_json_dict):
    etl_dict = {}
    for kv in tb_value_json_dict:
        setNode(kv["Name"], kv["Value"], etl_dict)
    return etl_dict


"""
    json_str to python dict or list
"""


def json_to_json(tb_value_json_str):
    etl_dict = {}
    etl_list = []
    tb_value_json = json.loads(tb_value_json_str)
    if type(tb_value_json) == list:
        for tb_value_dict in tb_value_json:
            node_dict = {}
            for kv in tb_value_dict:
                setNode(kv["Name"], kv["Value"], node_dict)
            if node_dict:
                etl_list.append(node_dict)
        return etl_list
    if type(tb_value_json) == dict:
        for kv in tb_value_json:
            setNode(kv["Name"], kv["Value"], etl_dict)
        return etl_dict


if __name__ == '__main__':
    start_kafkaListener()
MongoUtils:
import pymongo
from bson import ObjectId
class MongoUtils:

    def __init__(self):
        # host = MONGO_CONFIG['MONGODB_HOST']
        # port = MONGO_CONFIG['MONGODB_PORT']
        # dbname = MONGO_CONFIG['MONGODB_DBNAME']
        # mongo_user = MONGO_CONFIG['MONGODB_USER']
        # mongo_password = MONGO_CONFIG['MONGODB_PASSWORD']
        # client = pymongo.MongoClient(host=host, port=port)
        self.client = pymongo.MongoClient("mongodb://root:root@192.168.18.176:27019/?authSource=admin&replicaSet=rs&readPreference=primary&appname=MongoDB%20Compass&ssl=false")
        # 指向指定的数据库
        # self.mdb = client["all_user"]
        # self.mdb.authenticate(mongo_user, mongo_password)
        # 获取数据库里存放数据的表名
        # self.post = mdb["categoryname"]

    def insert_json_str(self, table, str_json_data, db='all_user'):
        mdb = self.client[str(db)]
        post = mdb[str(table)]
        post.insert(dict(str_json_data))

    def insert_dict(self, table, dicts, db='all_user'):
        mdb = self.client[str(db)]
        post = mdb[str(table)]
        post.insert_one(dicts)

    def save_or_update_dict_by_id(self, table, dicts, db='all_user'):
        mdb = self.client[str(db)]
        post = mdb[str(table)]
        post.find_one_and_replace({"_id": dicts['_id']}, dicts, upsert=True)
        # post.update_one({"_id":"1111111"},{'$set':{"s1sss":"12"}},upsert=True)

    def drop_table(self, table, db='all_user'):
        mdb = self.client[str(db)]
        post = mdb[str(table)]
        post.drop()


    def find(self,table, query={}, db='all_user'):
        """ query = {'name':'sea'}  return list """
        mdb = self.client[str(db)]
        tb = mdb[str(table)]
        return tb.find(query)

    def findone(self, table, query={}, db='all_user'):
        """ query = {'name':'sea'}  return list """
        mdb = self.client[str(db)]
        tb = mdb[str(table)]
        return tb.find_one(query)

    def delById(self, table, id, db='all_user'):
        mdb = self.client[str(db)]
        tb = mdb[str(table)]
        tb.delete_one({"_id": id})

    def __del__(self):
        self.client.close()



if __name__ == '__main__':
    mongo_utils = MongoUtils()
    # user_dict = {"name": "sea"}
    # # mongo_utils.insert_dict("user",user_dict)
    # # mongo_utils.delById("user",'616f7befdcc2ceea4c6f1559')
    # findone = mongo_utils.findone("user", {"_id": ObjectId("616f7e1fcee92cff1561cd92")})
    # print(findone)
    # mongo_utils.delById('user',ObjectId("616f7e1fcee92cff1561cd92"))
    mongo_utils.client[str('all_user')][str('user')].find_one_and_replace({"_id":"1232131"},{"_id":"1232131","s1sss":"1"},upsert=True)
    ## save or update ##
    # mongo_utils.client[str('all_user')][str('user')].update_one({"_id":"1111111"},{'$set':{"s1sss":"12"}},upsert=True)
原文地址:https://www.cnblogs.com/lshan/p/15344028.html