python grpc+jwt+s3 对用户token进行验证

写在前面:
用户信息是存储在aws的s3上,现在要做到用户登陆创建新token,访问api,验证token。

目录结构:


1.grpc部分:
第一步,序列化,规定api方法及参数等。。
syntax = "proto3";

package authsvc;
"""
In Python, the package directive is ignored, since Python modules are organized according to their location in the file system.
"""

# server + server端类名
# authenticate: 方法名,规定请求参数与返回参数
# validate: 同上
service Authenticate {
rpc authenticate (authRequest) returns (authResponse);
rpc validate (validRequest) returns (validResponse);
}
# auth的请求参数规定
message authRequest {
string id = 1;
string passwd = 2;
}
# auth返回参数规定
message authResponse {
string token = 1;
}

message validRequest {
string token = 1;
}
# 枚举型返回值,0无法序列化
enum validResult {
UNKNOWN = 0;
VALID = 1;
INVALID = 2;
}

message validResponse {
validResult result = 1;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
2.到terminal的当前目录下执行(固定格式,修改最后文件名即可):

$ python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. ./authsvc.proto
1
2. server端
考虑到这个功能做出来是作为模块的,要把主main函数跟项目设为同一目录级

打开参考看代码会更容易理解!
S3存储桶参考官方文档

jwt:用的是PyJWT==1.7.1
PyJWT参考这里

# -*- coding: utf-8 -*-
import datetime
import json
import grpc
import time
from concurrent import futures
from auth import authsvc_pb2, authsvc_pb2_grpc
import jwt
import boto3


class Authenticate(authsvc_pb2_grpc.AuthenticateServicer):

def __init__(self, bucket_name, **kwargs):
"""
:param bucket_name: s3存储桶名字
:param kwargs: key=timeout, token过期时间
"""
self.bucket_name = bucket_name
self.conn = boto3.resource('s3')
self.client = boto3.client('s3')

# 这里填写secret_key
self.secret = "******************************"
self.timeout = kwargs.get('timeout', 1)

# 验证token
def validate(self, request, context):

# 解码token,获取payload
try:
token = request.token
payload = jwt.decode(token, self.secret, algorithms=['HS256'],
leeway=datetime.timedelta(hours=self.timeout))
except Exception:
return authsvc_pb2.validResponse(result=authsvc_pb2.INVALID)
id = payload['id']

# 获取s3对应的账户信息
try:
response = self.client.get_object(
Bucket=self.bucket_name,
Key=id)
except Exception:
return authsvc_pb2.validResponse(result=authsvc_pb2.INVALID)

if response['Body'].read():
return authsvc_pb2.validResponse(result=authsvc_pb2.VALID)
return authsvc_pb2.validResponse(result=authsvc_pb2.INVALID)

# 创建token
def authenticate(self, request, context):
id = request.id
pwd = request.passwd

# 获取s3上对应id的用户信息
try:
response = self.client.get_object(
Bucket=self.bucket_name,
Key=id)
right_pwd_r = response['Body'].read()
right_pwd = json.loads(right_pwd_r)["passwd"]
except Exception:
# 出错把token设置为空
return authsvc_pb2.authResponse(token='')
# 使用jwt创建token
if pwd == right_pwd:
# exp是设置token的过期时间
payload = {
'exp': datetime.datetime.utcnow() + datetime.timedelta(days=0, hours=self.timeout),
'id': id
}
token = jwt.encode(payload, self.secret, algorithm='HS256')
return authsvc_pb2.authResponse(token=token)

return authsvc_pb2.authResponse(token='')


def serve(one_day_second, host, port, bucket_name, **kwargs):
"""
:param one_day_second: 服务器停止时间
:param host: ip
:param port: 端口
:param bucket_name: s3存储桶名字
:param kwargs: key=max_workers,设置最大连接数,类似于线程池的概念
:return:

"""
# 定义服务器并设置最大连接数,corcurrent.futures是一个并发库,类似于线程池的概念
# 创建一个服务器
max_workers = kwargs.get('max_workers', 4)
grpcServer = grpc.server(futures.ThreadPoolExecutor(max_workers=max_workers))
# 在服务器中添加派生的接口服务(自己实现了处理函数)
authsvc_pb2_grpc.add_AuthenticateServicer_to_server(Authenticate(bucket_name), grpcServer)
# 添加监听端口
grpcServer.add_insecure_port('{0}:{1}'.format(host, port))
# 启动服务器
grpcServer.start()
try:
while True:
time.sleep(one_day_second)
except KeyboardInterrupt:
grpcServer.stop(0) # 关闭服务器


if __name__ == '__main__':
serve(one_day_second=60 * 60 * 24, host='localhost', port='8080', bucket_name='zzxc')

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
3. client端
client相对简单:

# -*- coding: utf-8 -*-
import grpc
from auth import authsvc_pb2, authsvc_pb2_grpc


class RpcClient(object):
def __init__(self, host, port):
"""
:param host: ip
:param port: 端口
:param channel:创建连接
:param client :连接server
"""
self.host = host
self.port = port
self.channel = grpc.insecure_channel('{0}:{1}'.format(self.host, self.port))
self.client = authsvc_pb2_grpc.AuthenticateStub(channel=self.channel)

def authenticate(self, id, passwd):
response = self.client.authenticate(authsvc_pb2.authRequest(id=id, passwd=passwd))
return response.token

def validate(self, token):
response = self.client.validate(authsvc_pb2.validRequest(token=token))
return response


if __name__ == '__main__':
client = RpcClient(host='localhost', port='8080')
a = client.authenticate(id='user.txt', passwd='123456')
print 'a:{0}'.format(a)
# token = ''
# b = client.validate(token=token)
# print b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
4. 测试
先运行server_main.py
再运行client_main.py

以上

原文地址:https://www.cnblogs.com/ExMan/p/12156855.html