pb序列化和反序列化

import grpc

import redisproxy_pb2_grpc
import argparse
import redisproxy_pb2 as redisproxy__pb2
from google.protobuf import json_format
'''
不支持binary模式,需自己修改命令函数,自己转化value为binary
'''


# proto通信序列化
def SerializeToString(redis):
res = redis.SerializeToString()
print(res)
return res

# proto通信反序列化
def ParseFromString(res):
Redis = redisproxy__pb2.RedisProxyRequest()
Redis.ParseFromString(res)
# 转成json格式
p = json_format.MessageToJson(Redis)
print(p)
return p

# 字符串转二进制
def encode(s):
return ' '.join([bin(ord(c)).replace('0b', '') for c in s])

# 二进制转字符串
def decode(s):
return ''.join([chr(i) for i in [int(b, 2) for b in s.split(' ')]])


class RedisProxyClient():
def __init__(self, mode):
self.mode = mode


def test_command(self, command,
sid='redis_test',
pw='redis_teset',
sAppName='aiad',
sTableName='userprofile',
version=2,
binary=False):
redis = redisproxy__pb2.RedisProxyRequest()
redis.sAppId = sid
redis.sAppPw = pw
redis.sAppName = sAppName
redis.sTableName = sTableName

redis.oRequestHead.nCallMode = redisproxy__pb2.CALLMODE_PIPELINE
redis.oRequestHead.nCallType = redisproxy__pb2.CALLTYPE_SYNC
oRequestBodyList = redis.oRequestBodyList.add()
oRequestBodyList.nVer = version

print('test:', command)
print('nver:', version)

# 字符串模式
if version == 2:
print(version)
oRequestBodyList.sCmd = command
oRequestElementList = oRequestBodyList.oRequestElementList.add()
oRequestElementList.nCharType = redisproxy__pb2.CHARTYPE_STRING

# 一条命令
elif version == 3:
ls = command.split(' ')
# if binary:
# for i, v in enumerate(ls):
# ls[i] = oRequestBodyList.oRequestElementList.add()
# ls[i].nCharType = redisproxy__pb2.CHARTYPE_STRING
# ls[i].sRequestElement = v
# else:
# 向量模式
for i, v in enumerate(ls):
ls[i] = oRequestBodyList.oRequestElementList.add()
ls[i].nCharType = redisproxy__pb2.CHARTYPE_STRING
ls[i].sRequestElement = v

# nRequestElementType = oRequestElementList.nRequestElementType.add()
response = self.send(redis)
print(response)

def send(self, redis):
global ip
global port
channel = grpc.insecure_channel(f"{ip}:{port}")
stub = redisproxy_pb2_grpc.RedisProxyServiceStub(channel=channel)
response = stub.RedisProxyCmd(redis)
return response

def select_mode(self, command='ping'):
if self.mode == 2:
# print('模式2')
self.test_command(command, version=2)
elif self.mode == 3:
# print('模式3')
self.test_command(command, version=3)

def ping(self):
command = 'ping'
self.test_command(command)

def invalid_param(self):
command = 'set add'
self.test_command(command, version=9)

def invalid_userpasswd(self):
command = 'set haha'
self.test_command(command, pw='buzhidao', version=2)

def unknow_command(self):
command = 'met haha'
self.test_command(command, version=2)

def data_notfound(self):
command = 'get hadads'
self.test_command(command, version=2)

def many_requests(self):
pass

def invalid_table(self):
pass

def quotaexcee(self):
pass

def invalid_ip(self):
pass

def system_error(self):
pass

def set(self):
command = 'set 我的 1'
self.select_mode(command)

def get(self):
command = 'get 我的'
self.select_mode(command)

def exists(self):
command = 'exists 我的'
self.select_mode(command)

def zadd(self):
command = 'zadd myzset 2 "world" 3 "bar"" 4 "four" 5 "five" '
self.select_mode(command)

def zcount(self):
command = 'zcount myzset 1 4'
self.select_mode(command)

def zrangeByScore(self):
command = 'zrangeByScore myzset 1 4'
self.select_mode(command)

def zremrangeByScore(self):
command = 'zremrangeByScore myzset 0 2'
self.select_mode(command)

def zremrangeByRank(self):
command = 'zremrangeByRank myzset 2 3'
self.select_mode(command)

def hset(self):
command = 'HSETsite redis redis.com'
self.select_mode(command)

def hget(self):
command = 'HGETsite redis'
self.select_mode(command)

def hmset(self):
command = 'hmset myhash field1 "hello" field3 5 field2 6.02'
self.select_mode(command)

def hmget(self):
command = 'hmget myhash field1 field2'
self.select_mode(command)

def hgetAll(self):
command = 'hgetAll myhash'
self.select_mode(command)

def hdel(self):
command = 'hdel myhash field1'
self.select_mode(command)

def hset(self):
command = 'hset myhash field1 2.1'
self.select_mode(command)

def hincrBy(self):
command = 'hincrBy myhash field3 5'
self.select_mode(command)

def hincrByFloat(self):
command = 'hincrByFloat myhash field2 2.08'
self.select_mode(command)

def incrBy(self):
command = 'incrBy field3 5'
self.select_mode(command)

def setex(self):
command = 'setex mykey 60 redis'
self.select_mode(command)

def getTTL(self):
command = 'ttl mykey'
self.select_mode(command)

def zrevrangebyscore_withscores(self):
command = 'zrevrangebyscore myzset +inf -inf'
self.select_mode(command)


if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("-m", "--mode", help="选择测试模式, 版本2为字符串模式,3为向量模式")
parser.add_argument("-i", "--ip", help="ip address")
parser.add_argument("-p", "--port", help="port")
args = parser.parse_args()
if args.ip:
ip = args.ip
else:
ip = '172.16.155.239'
if args.port:
port = args.port
else:
port = '50051'
print("默认ip:172.16.155.239, port:50051")
if args.mode:
version = int(args.mode)
else:
version = '2'
RedisProxyClient = RedisProxyClient(mode=version)

RedisProxyClient.invalid_param()
RedisProxyClient.invalid_userpasswd()
RedisProxyClient.unknow_command()
RedisProxyClient.data_notfound()
RedisProxyClient.set()
RedisProxyClient.get()
RedisProxyClient.exists()
RedisProxyClient.zadd()
RedisProxyClient.zcount()
RedisProxyClient.zrangeByScore()
RedisProxyClient.zremrangeByScore()
RedisProxyClient.zremrangeByRank()
RedisProxyClient.hget()
RedisProxyClient.hmget()
RedisProxyClient.hgetAll()
RedisProxyClient.hdel()
RedisProxyClient.hset()
RedisProxyClient.hincrBy()
RedisProxyClient.hincrByFloat()
RedisProxyClient.incrBy()
RedisProxyClient.setex()
RedisProxyClient.getTTL()
RedisProxyClient.zrevrangebyscore_withscores()

原文地址:https://www.cnblogs.com/zhaobobo10/p/12750431.html