python入门第二十九天-socketserver

简单并发实例

服务端:

#!/usr/bin/env python3
#-*- coding:utf-8 -*-
'''
Administrator 
2018/8/3 
'''
import socketserver

class MyServer(socketserver.BaseRequestHandler): #定义自己的类,继承固定的类 socketserver.BaseRequestHandler

    def handle(self):  #方法名 必须用handle  为了重新父类的方法
        print ("服务端启动...")
        while True:
            conn = self.request #相当于:conn, addr = s.accept()
            print (self.client_address)
            while True:
                client_data=conn.recv(1024)
                print (str(client_data,"utf8"))
                print ("waiting...")
                conn.sendall(client_data)
            conn.close()

if __name__ == '__main__':
    #调用socketserver 下的多线程TCPSERVer服务方法。需要两个参数,(ip port), 自己定义的类
    server = socketserver.ThreadingTCPServer(('127.0.0.1',8091),MyServer)
    #调用serve_forever()方法。里面封装了
    server.serve_forever()

客户端:

#!/usr/bin/env python3
#-*- coding:utf-8 -*-
'''
Administrator 
2018/8/3 
'''
#-----------------------------------------------------client.py
#-----------------------------------------------------
import socket

ip_port = ('127.0.0.1',8091)
sk = socket.socket()
sk.connect(ip_port)
print ("客户端启动:")
while True:
    inp = input('>>>')
    sk.sendall(bytes(inp,"utf8"))
    if inp == 'exit':
        break
    server_response=sk.recv(1024)
    print (str(server_response,"utf8"))
sk.close()

聊天并发实例

 1 #!/usr/bin/env python3
 2 #-*- coding:utf-8 -*-
 3 '''
 4 Administrator 
 5 2018/8/3 
 6 '''
 7 import socketserver
 8 
 9 class MyServer(socketserver.BaseRequestHandler):
10 
11     def handle(self):
12         print ("服务端启动...")
13         while True:
14             conn = self.request
15             print (self.client_address)
16             while True:
17 
18                 client_data=conn.recv(1024)
19 
20                 print (str(client_data,"utf8"))
21                 print ("waiting...")
22                 server_response=input(">>>")
23                 conn.sendall(bytes(server_response,"utf8"))
24                 # conn.sendall(client_data)
25 
26             conn.close()
27             # print self.request,self.client_address,self.server
28 
29 
30 if __name__ == '__main__':
31     server = socketserver.ThreadingTCPServer(('127.0.0.1',8098),MyServer)
32     server.serve_forever()

kehuduan:

 1 ##########################################
 2 import socket
 3 
 4 
 5 ip_port = ('127.0.0.1',8098)
 6 sk = socket.socket()
 7 sk.connect(ip_port)
 8 print ("客户端启动:")
 9 while True:
10     inp = input('>>>')
11     sk.sendall(bytes(inp,"utf8"))
12     server_response=sk.recv(1024)
13     print (str(server_response,"utf8"))
14     if inp == 'exit':
15         break
16 sk.close()



其它应用

命令传送1:

#------------------------------------------------server
#------------------------------------------------
import socket
import subprocess
ip_port = ('127.0.0.1',8879)
sk = socket.socket()
sk.bind(ip_port)
sk.listen(5)
print ("服务端启动...")
while True:
    conn,address = sk.accept()
    while True:
        try:

            client_data=conn.recv(1024)
        except Exception:
            break
        print (str(client_data,"utf8"))
        print ("waiting...")
        # server_response=input(">>>")
        # conn.sendall(bytes(server_response,"utf8"))
        cmd=str(client_data,"utf8").strip()
        cmd_call=subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE)
        cmd_result=cmd_call.stdout.read()
        if len(cmd_result)==0:
            cmd_result=b"no output!"
        conn.sendall(cmd_result)
        print('send data size',len(cmd_result))
        print('******************')
        print('******************')
        print('******************')

    conn.close()
    
#------------------------------------------------client 
#------------------------------------------------
import socket
ip_port = ('127.0.0.1',8879)
sk = socket.socket()
sk.connect(ip_port)
print ("客户端启动:")
while True:
    inp = input('cdm:>>>').strip( )
    if len(inp)==0:
        continue
    if inp=="q":
        break
    sk.sendall(bytes(inp,"utf8"))
    server_response=sk.recv(1024)
    print (str(server_response,"gbk"))
    print('receive data size',len(server_response))
    if inp == 'exit':
        break
sk.close()
1
2
#试一试
netstat -an

conclusion:

              sendall会把数据直接全部发送到客户端,客户端将所有的数据都放到缓冲区,每次recv多少字节取决于recv内的参数,理论不应该超过8k。

所以,并不能一次recv()无限大数据,所以这里我们应该通过循环去接收。

1
sk.recv(4096)  

命令传送2:解决大数据传送和粘包问题

import socketserver
import subprocess

class Myserver(socketserver.BaseRequestHandler):
    def handle(self):
        while True:
            conn=self.request
            conn.sendall(bytes("欢迎登录","utf8"))
            while True:
                client_bytes=conn.recv(1024)
                if not client_bytes:break
                client_str=str(client_bytes,"utf8")
                print(client_str)
                command=client_str

                result_str=subprocess.getoutput(command)
                result_bytes = bytes(result_str,encoding='utf8')
                info_str="info|%d"%len(result_bytes)
                conn.sendall(bytes(info_str,"utf8"))
                # conn.recv(1024)
                conn.sendall(result_bytes)
            conn.close()

if __name__=="__main__":
    server=socketserver.ThreadingTCPServer(("127.0.0.1",9998),Myserver)
    server.serve_forever()

#####################################client


import socket
ip_port=("127.0.0.1",9998)

sk=socket.socket()
sk.connect(ip_port)
print("客户端启动...")

print(str(sk.recv(1024),"utf8"))

while True:
    inp=input("please input:").strip()


    sk.sendall(bytes(inp,"utf8"))
    basic_info_bytes=sk.recv(1024)
    print(str(basic_info_bytes,"utf8"))
    # sk.send(bytes('ok','utf8'))
    result_length=int(str(basic_info_bytes,"utf8").split("|")[1])

    print(result_length)
    has_received=0
    content_bytes=bytes()
    while has_received<result_length:
        fetch_bytes=sk.recv(1024)
        has_received+=len(fetch_bytes)
        content_bytes+=fetch_bytes
    cmd_result=str(content_bytes,"utf8")
    print(cmd_result)

sk.close()
View Code
import socket,os
ip_port=("127.0.0.1",8898)
sk=socket.socket()
sk.bind(ip_port)
sk.listen(5)
BASE_DIR=os.path.dirname(os.path.abspath(__file__))

while True:
    print("waiting connect")
    conn,addr=sk.accept()
    flag = True
    while flag:

            client_bytes=conn.recv(1024)
            client_str=str(client_bytes,"utf8")
            func,file_byte_size,filename=client_str.split("|",2)

            path=os.path.join(BASE_DIR,'yuan',filename)
            has_received=0
            file_byte_size=int(file_byte_size)

            f=open(path,"wb")
            while has_received<file_byte_size:
                data=conn.recv(1024)
                f.write(data)
                has_received+=len(data)
            print("ending")
            f.close()

#----------------------------------------------client
#----------------------------------------------
import socket
import re,os,sys
ip_port=("127.0.0.1",8898)
sk=socket.socket()
sk.connect(ip_port)
BASE_DIR=os.path.dirname(os.path.abspath(__file__))
print("客户端启动....")

while True:
    inp=input("please input:")

    if inp.startswith("post"):
        method,local_path=inp.split("|",1)
        local_path=os.path.join(BASE_DIR,local_path)
        file_byte_size=os.stat(local_path).st_size
        file_name=os.path.basename(local_path)
        post_info="post|%s|%s"%(file_byte_size,file_name)
        sk.sendall(bytes(post_info,"utf8"))
        has_sent=0
        file_obj=open(local_path,"rb")
        while has_sent<file_byte_size:
            data=file_obj.read(1024)
            sk.sendall(data)
            has_sent+=len(data)
        file_obj.close()
        print("上传成功")
文件上传

注意:

       1  纸条就是conn

       2  一收一发

       3   client_data=conn.recv(1024)        

           if  那边send一个空数据  这边recv为空,则recv继续阻塞,等待其他的数据。所以聊天的时候好好聊,别发空数据。 




socketserver

虽说用Python编写简单的网络程序很方便,但复杂一点的网络程序还是用现成的框架比较好。这样就可以专心事务逻辑,而不是套接字的各种细节。SocketServer模块简化了编写网络服务程序的任务。同时SocketServer模块也是Python标准库中很多服务器框架的基础。

socketserver模块可以简化网络服务器的编写,Python把网络服务抽象成两个主要的类,一个是Server类,用于处理连接相关的网络操作,另外一个则是RequestHandler类,用于处理数据相关的操作。并且提供两个MixIn 类,用于扩展 Server,实现多进程或多线程。

Server类

它包含了种五种server类,BaseServer(不直接对外服务)。TCPServer使用TCP协议,UDPServer使用UDP协议,还有两个不常使用的,即UnixStreamServer和UnixDatagramServer,这两个类仅仅在unix环境下有用(AF_unix)。

Base class for server classes.

1
class BaseServer

This uses the Internet TCP protocol, which provides for continuous streams of data between the client and server. 

1
class socketserver.TCPServer(server_address, RequestHandlerClass, bind_and_activate=True)

This uses datagrams, which are discrete packets of information that may arrive out of order or be lost while in transit. The parameters are the same as for TCPServer

1
class socketserver.UDPServer(server_address, RequestHandlerClass, bind_and_activate=True)

These more infrequently used classes are similar to the TCP and UDP classes, but use Unix domain sockets; they’re not available on non-Unix platforms. The parameters are the same as for TCPServer. 

1
2
class socketserver.UnixStreamServer(server_address, RequestHandlerClass, bind_and_activate=True)
class socketserver.UnixDatagramServer(server_address, RequestHandlerClass,bind_and_activate=True)<br><br>#---------
class UnixStreamServer(TCPServer):
address_family = socket.AF_UNIX

class UnixDatagramServer(UDPServer):
address_family = socket.AF_UNIX

BaseServer的源码:

class BaseServer:

    """Base class for server classes.

    Methods for the caller:

    - __init__(server_address, RequestHandlerClass)
    - serve_forever(poll_interval=0.5)
    - shutdown()
    - handle_request()  # if you do not use serve_forever()
    - fileno() -> int   # for select()

    Methods that may be overridden:

    - server_bind()
    - server_activate()
    - get_request() -> request, client_address
    - handle_timeout()
    - verify_request(request, client_address)
    - server_close()
    - process_request(request, client_address)
    - shutdown_request(request)
    - close_request(request)
    - service_actions()
    - handle_error()

    Methods for derived classes:

    - finish_request(request, client_address)

    Class variables that may be overridden by derived classes or
    instances:

    - timeout
    - address_family
    - socket_type
    - allow_reuse_address

    Instance variables:

    - RequestHandlerClass
    - socket

    """

    timeout = None

    def __init__(self, server_address, RequestHandlerClass):
        """Constructor.  May be extended, do not override."""
        self.server_address = server_address
        self.RequestHandlerClass = RequestHandlerClass
        self.__is_shut_down = threading.Event()
        self.__shutdown_request = False

    def server_activate(self):
        """Called by constructor to activate the server.

        May be overridden.

        """
        pass

    def serve_forever(self, poll_interval=0.5):
        """Handle one request at a time until shutdown.

        Polls for shutdown every poll_interval seconds. Ignores
        self.timeout. If you need to do periodic tasks, do them in
        another thread.
        """
        self.__is_shut_down.clear()
        try:
            while not self.__shutdown_request:
                # XXX: Consider using another file descriptor or
                # connecting to the socket to wake this up instead of
                # polling. Polling reduces our responsiveness to a
                # shutdown request and wastes cpu at all other times.
                r, w, e = _eintr_retry(select.select, [self], [], [],
                                       poll_interval)
                if self in r:
                    self._handle_request_noblock()

                self.service_actions()
        finally:
            self.__shutdown_request = False
            self.__is_shut_down.set()

    def shutdown(self):
        """Stops the serve_forever loop.

        Blocks until the loop has finished. This must be called while
        serve_forever() is running in another thread, or it will
        deadlock.
        """
        self.__shutdown_request = True
        self.__is_shut_down.wait()

    def service_actions(self):
        """Called by the serve_forever() loop.

        May be overridden by a subclass / Mixin to implement any code that
        needs to be run during the loop.
        """
        pass

    # The distinction between handling, getting, processing and
    # finishing a request is fairly arbitrary.  Remember:
    #
    # - handle_request() is the top-level call.  It calls
    #   select, get_request(), verify_request() and process_request()
    # - get_request() is different for stream or datagram sockets
    # - process_request() is the place that may fork a new process
    #   or create a new thread to finish the request
    # - finish_request() instantiates the request handler class;
    #   this constructor will handle the request all by itself

    def handle_request(self):
        """Handle one request, possibly blocking.

        Respects self.timeout.
        """
        # Support people who used socket.settimeout() to escape
        # handle_request before self.timeout was available.
        timeout = self.socket.gettimeout()
        if timeout is None:
            timeout = self.timeout
        elif self.timeout is not None:
            timeout = min(timeout, self.timeout)
        fd_sets = _eintr_retry(select.select, [self], [], [], timeout)
        if not fd_sets[0]:
            self.handle_timeout()
            return
        self._handle_request_noblock()

    def _handle_request_noblock(self):
        """Handle one request, without blocking.

        I assume that select.select has returned that the socket is
        readable before this function was called, so there should be
        no risk of blocking in get_request().
        """
        try:
            request, client_address = self.get_request()
        except OSError:
            return
        if self.verify_request(request, client_address):
            try:
                self.process_request(request, client_address)
            except:
                self.handle_error(request, client_address)
                self.shutdown_request(request)

    def handle_timeout(self):
        """Called if no new request arrives within self.timeout.

        Overridden by ForkingMixIn.
        """
        pass

    def verify_request(self, request, client_address):
        """Verify the request.  May be overridden.

        Return True if we should proceed with this request.

        """
        return True

    def process_request(self, request, client_address):
        """Call finish_request.

        Overridden by ForkingMixIn and ThreadingMixIn.

        """
        self.finish_request(request, client_address)
        self.shutdown_request(request)

    def server_close(self):
        """Called to clean-up the server.

        May be overridden.

        """
        pass

    def finish_request(self, request, client_address):
        """Finish one request by instantiating RequestHandlerClass."""
        self.RequestHandlerClass(request, client_address, self)

    def shutdown_request(self, request):
        """Called to shutdown and close an individual request."""
        self.close_request(request)

    def close_request(self, request):
        """Called to clean up an individual request."""
        pass

    def handle_error(self, request, client_address):
        """Handle an error gracefully.  May be overridden.

        The default is to print a traceback and continue.

        """
        print('-'*40)
        print('Exception happened during processing of request from', end=' ')
        print(client_address)
        import traceback
        traceback.print_exc() # XXX But this goes to stderr!
        print('-'*40)
View Code

There are five classes in an inheritance diagram, four of which represent synchronous servers of four types:

There are five classes in an inheritance diagram, four of which represent
synchronous servers of four types:

        +------------+
        | BaseServer |
        +------------+
              |
              v
        +-----------+        +------------------+
        | TCPServer |------->| UnixStreamServer |
        +-----------+        +------------------+
              |
              v
        +-----------+        +--------------------+
        | UDPServer |------->| UnixDatagramServer |
        +-----------+        +--------------------+

RequestHandler类

所有requestHandler都继承BaseRequestHandler基类。

class BaseRequestHandler:

    """Base class for request handler classes.

    This class is instantiated for each request to be handled.  The
    constructor sets the instance variables request, client_address
    and server, and then calls the handle() method.  To implement a
    specific service, all you need to do is to derive a class which
    defines a handle() method.

    The handle() method can find the request as self.request, the
    client address as self.client_address, and the server (in case it
    needs access to per-server information) as self.server.  Since a
    separate instance is created for each request, the handle() method
    can define arbitrary other instance variariables.

    """

    def __init__(self, request, client_address, server):
        self.request = request
        self.client_address = client_address
        self.server = server
        self.setup()
        try:
            self.handle()
        finally:
            self.finish()

    def setup(self):
        pass

    def handle(self):
        pass

    def finish(self):
        pass

源码
View Code

创建一个socketserver 至少分以下几步

  1. First, you must create a request handler class by subclassing the BaseRequestHandlerclass and overriding its handle() method; this method will process incoming requests.   
  2. Second, you must instantiate one of the server classes, passing it the server’s address and the request handler class.
  3. Then call the handle_request() orserve_forever() method of the server object to process one or many requests.
  4. Finally, call server_close() to close the socket.
    import socketserver
    
    class MyTCPHandler(socketserver.BaseRequestHandler):
        """
        The request handler class for our server.
    
        It is instantiated once per connection to the server, and must
        override the handle() method to implement communication to the
        client.
        """
    
        def handle(self):
            # self.request is the TCP socket connected to the client
            self.data = self.request.recv(1024).strip()
            print("{} wrote:".format(self.client_address[0]))
            print(self.data)
            # just send back the same data, but upper-cased
            self.request.sendall(self.data.upper())
    
    if __name__ == "__main__":
        HOST, PORT = "localhost", 9999
    
        # Create the server, binding to localhost on port 9999
        server = socketserver.TCPServer((HOST, PORT), MyTCPHandler)
    
        # Activate the server; this will keep running until you
        # interrupt the program with Ctrl-C
        server.serve_forever()
    View Code

    让你的socketserver并发起来, 必须选择使用以下一个多并发的类

    1
    2
    3
    4
    5
    6
    7
    class socketserver.ForkingTCPServer
     
    class socketserver.ForkingUDPServer
     
    class socketserver.ThreadingTCPServer
     
    class socketserver.ThreadingUDPServer

    所以:

    server = socketserver.TCPServer((HOST, PORT), MyTCPHandler)
    #替换为
    server = socketserver.ThreadingTCPServer((HOST, PORT), MyTCPHandler)

    思考:

         1  tcp与udp的区别(三次握手后,建立连接,双向通道,一个收,一个发,tcp每次接到数据后都会有一个应答,有了应答,新的数据就会被覆盖掉)

         2  粘包  

原文地址:https://www.cnblogs.com/Mengchangxin/p/9413284.html