python多线程网络编程

背景

使用过flask框架后,我对request这个全局实例非常感兴趣。它在客户端发起请求后会保存着所有的客户端数据,例如用户上传的表单或者文件等。那么在很多客户端发起请求时,服务器是怎么去区分不同的request对象呢?当查看了大量的资料后,发现它使用了一种称为thread local的技术。关于thread local的实现原理其实很简单,就是声明一个全局的字典并且以线程的名字作为字典的键,然后其值就是该线程下的私有数据。具体可以参考这篇ThreadLocal文章。我们都知道http服务器相对socket服务器要更加上层的网络服务,所以研究flaskrequest实现原理前最好能够先理解scoket是怎么实现的。

运行平台与python版本要求

  • 1.使用的是python版本;
  • 2.在mac下测试通过(理论上在所有系统上都能够运行);
  • 3.可以github上获得所有源代码;
  • 4.使用nc命令模拟客户端;

研究目标

  • 1.我将会使用socket完成一个echo服务器的设计;
  • 2.这个服务器使用多线程实现非阻塞;
  • 3.我要使用thread local技术使数据独立安全;

socket的阻塞式实现

程序的设计应该是从实现最简单核心的任务开始的,所以下面我先实现一个简单的socket服务器。

import socket

class ThreadSocket(object):
	"""
	
	"""
	def __init__(self, host, port):
		self.host = host
		self.port = port
		self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
		self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
		self.sock.bind((self.host, self.port))

	def listen(self):
		self.sock.listen(5)
		while True:
			client, address = self.sock.accept()
			client.settimeout(60)
			try:
				data = client.recv(1024)
				if data:
					client.send(data)
				else:
					raise error("Client has disconnected")
			except:
				client.close()
			
if __name__ == '__main__':
	server=ThreadSocket('',9000)
	server.listen()

你也可以直接通过git checkout v0.1命令获得这个版本的代码,代码写完后可以通过下面的方式完成测试。

python simpleSocketServer.py

新打开一个终端,输入下面这个命令进行测试。

nc 127.0.0.1 9000

下面可以看看客户端运行后的输出结果。

可以发现在第二次输入的数据并没有得到服务器的反馈,这是因为服务器还没有切换到读取等待模式。可以通过下面的方式修改从而实现不间断的发送接收任务。

import socket

class ThreadSocket(object):
        """
        
        """
        def __init__(self, host, port):
                self.host = host
                self.port = port
                self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
                self.sock.bind((self.host, self.port))

        def listen(self):
                self.sock.listen(5)
                while True:
                        client, address = self.sock.accept()
                        client.settimeout(60)
                        while True:
                                try:
                                        data = client.recv(1024)
                                        if data:
                                                client.send(data)
                                        else:
                                                raise error("Client has disconne
cted")
                                except:
                                        client.close()

if __name__ == '__main__':
        server=ThreadSocket('',9000)
        server.listen()

现在再看到我们的客户端输出结果如下。

代码可以通过下面的的命令git checkout v0.2获得。但当你启动多个客户端时,你会发现这种实现方式的局限性。
下面启动两个客户端进行演示的输出结果。

你会发现当再使用nc启动一个进程访问服务器的时候是无法与服务器通讯的,因此这种阻塞的实现方式是非常低效的。那么如何才能编写出支持多个客户端的服务器呢?请继续看下去。

socket的多线程实现

为了使每一个客户端都能够得到服务器的响应我们的设计思路是让主线程等待客户端的连接,一旦连接成功就启动一个新的子线程,并且把读写相关的操作都扔给子线程处理。好有了思路下面可以编写代码了。

port socket
import threading

class ThreadSocket(object):
        """
        
        """
        def __init__(self, host, port):
                self.host = host
                self.port = port
                self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
                self.sock.bind((self.host, self.port))

        def listen(self):
                self.sock.listen(5)
                while True:
                        client, address = self.sock.accept()
                        client.settimeout(60)
                        threading.Thread(target=self.handleClientRequest, args=(
client, address)).start()

        def handleClientRequest(self, client, address):
                while True:
                        try:
                                data = client.recv(1024)
                                if data:
                                        client.send(data)
                                else:
                                        raise error("Client has disconnected")
                        except:
                                client.close()

if __name__ == '__main__':
        server=ThreadSocket('',9000)
        server.listen()

注意在顶部导入threading模块,然后把读写网络套接字部分放入线程回调函数里,你可以使用git checkout v0.3获取这个版本的代码。
下面看看运行的结果。

现在我们的服务器可以多人同时访问,并且能够同时提供服务了。

使用现有的socket实现一个ToDo服务器

在编写api时通常我们都会以实现一个ToDo服务功能作为演示示例,下面我们来简单设计一下。

  • 1.使用字典作为数据库保存用户提交的数据;
  • 2.使用GET方法实现获取数据功能;
  • 3.使用POST实现提交数据功能;

要实现上面这几个点也不难,首先我们先实现第一个目标。使用字典保存我们的数据,我们的服务器将不会再返回一个用户的输入数据,而是返回一个字典格式的数据。实现方式如下。

import socket
import threading

class ThreadSocket(object):
        """
        
        """
        todo_list = {
                'task_01':'see someone',
                'task_02':'read book',
                'task_03':'play basketball'

        }

        def __init__(self, host, port):
                self.host = host
                self.port = port
                self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
                self.sock.bind((self.host, self.port))

        def listen(self):
                self.sock.listen(5)
                while True:
                        client, address = self.sock.accept()
                        client.settimeout(60)
                        threading.Thread(target=self.handleClientRequest, args=(
client, address)).start()

        def handleClientRequest(self, client, address):
                while True:
                        try:
                                data = client.recv(1024)
                                if data:
                                        #client.send(data)
                                        response = str(self.todo_list)
                                        client.send(response)
                                else:
                                        raise error("Client has disconnected")
                        except:
                                client.close()

if __name__ == '__main__':
        server=ThreadSocket('',9000)
        server.listen()

通过字典保存了一些数据,当客户发送数据请求时就把这个字典反馈给客户。在把字典发送给客户之前需要使用str()函数强制转换数据为字符串,不然就会出现问题。这里的代码可以使用git checkout v0.4获得。
下面可以查看运行结果如下所示。

可以看到只要随便输入一些数据,它就会反馈服务器中的数据给客户端。这种请求的方式也太简单了,所以我们现在要使用一些方式进行限制,只有特定的命令才能请求到我们的数据。下面我们来实现一个GET方法。

if data:
        #client.send(data)
         if 'GET' in data:
                response = str(self.todo_list)
         else:
                response = 'data no found'
         client.send(response)
else:
       raise error("Client has disconnected")

现在只是简单的通过判断客户端的请求数据中是否包含有关键字GET字符,为了能够在客户端中输出结果完后换行,现在我们可以通过下面这种方式优化一下代码。

response = response+'
'
client.send(response)

运行后可以看到改进后的运行结果如下。

为了使GET方法能够起到真正的查询特定数据的作用,可以先构造一种查询方式。

GET/task_id/status

我们设计的查询语句非常简单,每一个域都是通过斜线分割,那么服务器应该如何解析这种查询语句呢?请看下面的实现方式。

if 'GET' in data:
      method,task_id,status = data.split('/')
      result = self.todo_list.get(task_id,'no key match')
      response = str(result)

可以发现,我使用字符串内置函数直接分割客户的请求数据,然后通过简单的字典查询得到用户指定的数据。
全部代码如下:

import socket
import threading

class ThreadSocket(object):
        """
        
        """
        todo_list = {
                'task_01':'see someone',
                'task_02':'read book',
                'task_03':'play basketball'

        }

        def __init__(self, host, port):
                self.host = host
                self.port = port
                self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
                self.sock.bind((self.host, self.port))

        def listen(self):
                self.sock.listen(5)
                while True:
                        client, address = self.sock.accept()
                        client.settimeout(60)
                        threading.Thread(target=self.handleClientRequest, args=(client,
address)).start()

        def handleClientRequest(self, client, address):
                while True:
                        try:
                                data = client.recv(1024)
                                if data:
                                        #client.send(data)
                                        if 'GET' in data:
                                                method,task_id,status = data.split('/')
                                                result = self.todo_list.get(task_id,'no 
key match')
                                                print 'result',result
                                                response = str(result)
                                        else:
                                                response = 'data no found'

                                        response = response+'
'
                                        client.send(response)
                                else:
                                        raise error("Client has disconnected")
                except:
                                client.close()

if __name__ == '__main__':
        server=ThreadSocket('',9000)
        server.listen()

当前版本的代码可以通过git checkout v0.5获得。下面来看运行的结果。

如何更新我们的数据呢?我们需要实现一个POST方法,下面我们可以设计一下我们的POST语句了。

POST/task_id=value/status

可以看到语句设计的非常简单,下面我们来实现它。

if 'GET' in data:
     method,task_id,status = data.split('/')
     result = self.todo_list.get(task_id,'no key match')
     print 'result',result
     response = str(result)
elif 'POST' in data:
     method,command,status = data.split('/')
     key,value = command.split('=')
     self.todo_list[key] = value
     response = 'submit success'
else:
     response = 'data no found'

  response = response+'
'
  client.send(response)

所有代码将不再粘贴到文章中,可以通过git checkout v0.6获得,下面是运行截图。

总结

目前我们已经实现了socket在多线程下的一个简单的todo应用,通过一点一点的实现整个框架能够深入理解网络编程的原理。那么现在我们还有什么方面可以改进,还有什么地方需要考虑呢?下面给大家列出下一章将会覆盖的话题。

  • 1.使用正则表达式解析用户输入;
  • 2.使用Thread Local技术;
  • 3.如何让浏览器也可以访问我们的服务器;

希望下一章能够见到你:)

原文地址:https://www.cnblogs.com/landpack/p/5674253.html