Python Socket Sever

1. Server code

 1 # !/usr/bin/env python
 2 # coding:utf-8
 3 import multiprocessing
 4 import socket
 5 import sys
 6 import json
 7 import time
 8 
 9 import app
10 
11 
12 class Server(object):
13     address_family = socket.AF_INET
14     socket_type = socket.SOCK_STREAM
15     request_queue_size = 5
16 
17     def __init__(self, server_address):
18         try:
19             self.listen_socket = socket.socket(self.address_family, self.socket_type)
20             self.listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
21             self.listen_socket.bind(server_address)
22             self.listen_socket.listen(self.request_queue_size)
23         except socket.error as e:
24             print(e)
25             exit()
26 
27         self.servr_name = "pythonserver"
28         self.server_port = server_address[1]
29         self.client_socket = None
30         self.application = None
31         self.recv_data = None
32         self.headers_set = None
33 
34     def serve_forever(self):
35         while True:
36             print(time.time())
37             self.client_socket, client_address = self.listen_socket.accept()
38             new_client_process = multiprocessing.Process(target=self.handle_request)
39             # new_client_process.daemon = True
40             new_client_process.start()
41 
42             # self.client_socket.close()
43             # print('close')
44 
45     def set_app(self, application):
46 
47         self.application = application
48 
49     def handle_request(self):
50         self.recv_data = self.client_socket.recv(2014)
51         recv_str = str(self.recv_data, encoding='utf-8')
52         recv_json = json.loads(recv_str)
53         env = recv_json
54         body_content = self.application(env, self.start_response)
55         self.finish_response(body_content)
56 
57     def start_response(self, status, response_headers):
58         self.headers_set = [status, response_headers]
59 
60     def finish_response(self, body_content):
61         try:
62             status, response_headers = self.headers_set
63             self.client_socket.send(bytes(response_headers, encoding="utf8"))
64         except Exception as e:
65             print(str(e))
66             self.client_socket.send(bytes(str(e), encoding="utf8"))
67         finally:
68             self.client_socket.close()
69 
70 
71 server_addr = (HOST, PORT) = '', 8888
72 python_root = './wsgiPy'
73 
74 
75 def make_server(server_addr, application):
76     server = Server(server_addr)
77     server.set_app(application)
78     return server
79 
80 
81 def main():
82     module, application = ('app', 'app')
83     sys.path.insert(0, python_root)
84     module = __import__(module)
85     application = getattr(module, application)
86     httpd = make_server(server_addr, application)
87     print('listening on port %d ...
' % PORT)
88     httpd.serve_forever()
89 
90 
91 if __name__ == '__main__':
92     multiprocessing.freeze_support()
93     main()

2. Code App(application process)

# !/usr/bin/env python
# -*- coding:utf-8 -*-
import pandas as pd
import os
import os.path
import configparser

CONFIGPATH = './config.ini'


def app(environ, start_response):
    try:
        method = environ['method']
        data = environ['data']
        print('entry app')
        if method == 'get_repository':
            ret = get_repository(data)
            print('get_repository')
        elif method == 'authority_check':
            ret = authority_check(data)
            print('authority_check')

        status = '200 OK'
        response_headers = ret
        start_response(status, response_headers)
    except Exception as e:
        print(str(e))
        start_response('400', str(e))


def get_repository(data=None):
    user_name = data['name']
    prj_name = data['project']
    conf = load_config()
    data_excel = pd.read_excel(conf.user_info, sheet_name=None)

    user = data_excel['user']
    project = data_excel['project']

    try:
        df_user = user[user['User name'] == user_name]
        group = df_user['Group name'].values.tolist()[0]
        df_proj = project[(project['Project name'] == prj_name) & (project['Group name'] == group)]
        url = df_proj['Gitlab path'].values.tolist()[0]
        return url
    except Exception as e:
        print(str(e))
        return ''


def authority_check(data=None):
    user_name = data['name']
    prj_name = data['project']
    conf = load_config()
    data_excel = pd.read_excel(conf.user_info, sheet_name=None)
    user = data_excel['user']
    project = data_excel['project']
    try:
        df_user = user[user['User name'] == user_name]
        group1 = df_user['Group name'].values.tolist()[0]
        df_proj = project[project['Project name'] == prj_name]
        if df_proj.empty():
            return 'N, no such a project:' + prj_name
        group2 = df_proj['Group name'].values.tolist()[0]
        if group1 == group2:
            return 'Y'
        else:
            return 'N,you do not have authority of ' + group2
    except Exception as e:
        print(str(e))
        return str(e)


def load_config(path='./config.ini'):
    conf = ConfigHandle()
    conf.load_config()
    return conf


class ConfigHandle(object):
    def __init__(self):
        self.conf_path = CONFIGPATH
        self.user_info = ''
        self.user = ''
        self.passw = ''

    @staticmethod
    def deal_path(path_name):
        """deal with windows file path"""
        if path_name:
            path_name = path_name.strip()

        if path_name:
            path_name = r'%s' % path_name
            path_name = path_name.replace('/', '\')
            path_name = os.path.abspath(path_name)
            if path_name.find(":\") == -1:
                path_name = os.path.join(os.getcwd(), path_name)
        return path_name

    def load_config(self):
        """load configuration from config.ini file
        :return:
        """
        file = self.deal_path(CONFIGPATH)
        if not os.path.isfile(file):
            print('Can not find the config.ini!')
            return False
        parser = configparser.ConfigParser()
        parser.read(file, encoding='UTF-8')

        self.user_info = parser.get('pathconfig', 'user_info').strip()

        return self

3. Client Code

# !/usr/bin/env python
# -*- coding:utf-8 -*-
import socket
import json

# 链接服务端ip和端口
ip_port = ('sinm1bappt55.ent.com', 8888)
# ip_port = ('localhost', 8888)
# 生成一个句柄
sk = socket.socket()
# 请求连接服务端
sk.connect(ip_port)

req_dic = {
    'method': 'get_repository',
    'data': {'name': 'WANGZ94',
             'project': 'TEST'}
}

send_json = json.dumps(req_dic)

# 发送数据
sk.sendall(bytes(send_json, 'utf8'))
# 接受数据
server_reply = sk.recv(1024)
# 打印接受的数据
print(str(server_reply, 'utf8'))
# 关闭连接
sk.close()

 4. user

# !/usr/bin/env python
# coding:utf-8

# *&----------------------------------------------------*
# *& Author       : Joe Wang                            *
# *& Creation Date: 2021.08.27                          *
# *& Description  : this script is a http client , for  *
#                   testing socket server               *
# *& Change Tags  : initial                             *
# *&----------------------------------------------------*
# *&----------------------------------------------------*
# *& Author       : Joe Wang                            *
# *& Creation Date: 2021.08.28                          *
# *& Description  : bug fix                             *
# *& Change Tags  : Begin Add:JoeWang-20210828          *
# *&                End   Add:JoeWang-20210828          *
# *&                Begin Del:JoeWang-20210828          *
# *&                End   Del:JoeWang-20210828          *
# *&----------------------------------------------------*

import socket
import json
import socket
import getpass


class AuthorityCheck(object):
    def __init__(self, project, url, port):
        self.project = project
        self.user = self.get_user()
        self.url = url
        self.port = port
        self.ret = self.authority_check()

    def authority_check(self):
        # 链接服务端ip和端口
        ip_port = (self.url, self.port)
        # 生成一个句柄
        sk = socket.socket()
        # 请求连接服务端
        sk.connect(ip_port)

        req_dic = {
            'method': 'authority_check',
            'data': {'name': self.user,
                     'project': self.project}
        }

        send_json = json.dumps(req_dic)

        # 发送数据
        sk.sendall(bytes(send_json, 'utf8'))
        # 接受数据
        server_reply = sk.recv(1024)
        # 关闭连接
        sk.close()
        # 打印接受的数据
        return str(server_reply, 'utf8')

    def get_user(self):
        """

        :return:
        """
        user_name = getpass.getuser()
        return user_name.upper()
原文地址:https://www.cnblogs.com/JackeyLove/p/15217376.html