select、epoll、twisted网络编程

select、poll和epoll的区别

select

select最早于1983年出现在4.2BSD中,它通过一个select()系统调用来监视多个文件描述符的数组,当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程可以获得这些文件描述符从而进行后续的读写操作。
select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点,事实上从现在看来,这也是它所剩不多的优点之一。
select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,不过可以通过修改宏定义甚至重新编译内核的方式提升这一限制。
另外,select()所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的开销也线性增长。同时,由于网络响应时间的延迟使得大量TCP连接处于非活跃状态,但调用select()会对所有socket进行一次线性扫描,所以这也浪费了一定的开销。
 
poll
 
poll在1986年诞生于System V Release 3,它和select在本质上没有多大差别,但是poll没有最大文件描述符数量的限制。
poll和select同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。
另外,select()和poll()将就绪的文件描述符告诉进程后,如果进程没有对其进行IO操作,那么下次调用select()和poll()的时候将再次报告这些文件描述符,所以它们一般不会丢失就绪的消息,这种方式称为水平触发(Level Triggered)。
 
epoll
 
直到Linux2.6才出现了由内核直接支持的实现方法,那就是epoll,它几乎具备了之前所说的一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。
epoll可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,但是代码实现相当复杂。
epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销。
另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。

epoll是调用linux系统底层的;

 网络操作、文件、终端操作都是IO操作,但是对于文件的操作python select检测不到;

1、select见监视终端输入

sys.stdin方法可以捕获终端输入的句柄,当终端有输入的时候,句柄会改变,从而被sys.stdin方法检测到;

#!/usr/bin/env python
# _*_ coding:utf-8 _*_
import threading
import select
import sys
while True:
    #监听用户输入,如果用户输入内容,select感知sys.stdin改变,将改变的文件句柄保存至列表,并将列表作为select的第一个参数返回,如果用户未输入内容,select第一个参数为空列表。
    readable,writeable,error = select.select([sys.stdin,sys.stdout,],[],[],1)    #列表中的内容可以为一个或者多个句柄
    if sys.stdin in readable:
        print 'select get stdin',sys.stdin.readline()
select监视终端输入输出

注意:上述代码需要在linux上执行才会生效

2、select实现socket服务端

sk.setblocking(bool)    #如果bool为True,在遇到accept/resv时阻塞,如果为False则不阻塞。

#!/usr/bin/env python
# _*_ coding:utf-8 _*_

import socket
sk = socket.socket()
ip_port = ('localhost',8888)
sk.bind(ip_port)
sk.setblocking(False)

while True:
    try:
        conn,address = sk.accept()
        conn.close()
        print address
    except Exception,e:    #捕获错误
        print e
    import time
    time.sleep(2)
setbolcking(False)
#!/usr/bin/env python
# _*_ coding:utf-8 _*_

import select
import socket
sk = socket.socket()
ip_port = ('localhost',8888)
sk.bind(ip_port)
sk.setblocking(False)

sk1 = socket.socket()
ip_port1 = ('localhost',9999)
sk1.bind(ip_port1)
sk1.setblocking(False)

while True:
    rlist,w,e = select.select([sk,sk1,],[],[],2)    #在有多个客户端连接的时候,不会断开
    for r in rlist:
        conn,address = r.accept()
        print address
监听多个句柄/端口
服务端
#!/usr/bin/env python
# _*_ coding:utf-8 _*_

import select
import socket
sk = socket.socket()
ip_port = ('localhost',8888)
sk.bind(ip_port)
sk.setblocking(False)

sk1 = socket.socket()
ip_port1 = ('localhost',9999)
sk1.bind(ip_port1)
sk1.setblocking(False)

inputs = [sk,sk1,]
import time
time.sleep(2)
print 'input',inputs
while True:
    rlist,w,e = select.select([sk,sk1,],[],[],2)
    for r in rlist:
        if r == sk:     #判断如果是连接请求句柄,则监听
            conn,address = r.accept()
            inputs.append(conn)    
            print address
        else:                #如果是其他的,就返回数据
            client_data = r.resv(1024)
            r.sendall(client_data)

#!/usr/bin/env python
# _*_ coding:utf-8 _*_
import socket
obj = socket.socket()
ip_port = ('localhost',8888)
obj.connect(ip_port)
obj.settimeout(5)
while True:
    inp = raw_input('please input:')
    obj.sendall(inp)
    print obj.recv(1024)
obj.close()
客户端

 3、监视客户端额服务端句柄

 1 import select
 2 ip_port = ('127.0.0.1',8888)
 3 sk = socket.socket()
 4 sk.bind(ip_port)
 5 sk.setblocking(False)
 6 inputs = [sk,]
 7 output = []
 8 import time
 9 time.sleep(2)
10 print inputs
11 while True:
12     #第一个参数表示,只要有变化,就感知到
13     #第二个参数表示output存在内容就感知到
14     #第三个参数表示有异常就感知到
15     #第四个参数表示超时时间,可以不写,但是如果不写,select是阻塞的,在没有接收到数据的时候不会往下执行
16     rList,w,e = select.select(inputs,output,[],0.05)
17     for r in rList:
18         if r == sk:
19             conn,address = r.accept()
20             inputs.append(conn)
21             output.append(conn)
22             print address
23         else:
24             client_data = r.recv(1024)
25             if client_data:
26                 r.sendall(client_data)
27             else:
28                 inputs.remove(r)
服务端代码

4、select实现socket服务端(接受和返回数据)

select定义方法:
    句柄列表11,句柄列表22,句柄列表33=select.select(句柄序列1,句柄序列2,句柄序列3,超时时间)
while True:
    rList,wList,e = select.select(inputs,output,[],0.05)
    #文件描述符可读,rList,一只有变化,感知
    #文件描述符可写,wList,二只要有,感知
    for r in rList:
        if r == sk:
            conn,address = r.accept()
            inputs.append(conn)
            output.append(conn)
        else:
            client_data = r.recv(1024)
            if client_data:
                output.append(r)
    for w in wList:
        w.sendall('123')
        output.remove(r)
接受和返回数据

5、select实现socket服务端(分离读写操作原理)

为什么使用读写分离:存在多个客户端请求时,返回数据会出现混乱

 使用队列实现

#队列是先进先出,栈是先进后出
from Queue import Queue
import select
import socket
ip_port = ('127.0.0.1',8888)
sk = socket.socket()
sk.bind(ip_port)
sk.setblocking(False)
inputs = [sk,]
output = []
message = {}
while True:
    rList,wList,e = select.select(inputs,output,inputs,1)
    for r in rList:
        if r == sk:
            conn,address = r.accept()
            inputs.append(conn)
            message[conn] = Queue()   #句柄加入队列
        else:
            #获取数据
            client_data = r.recv(1024)
            if client_data:
                output.append(r)
                message[r].put(client_data)    #在指定队列中插入数据
    for w in wList:
        try:
            data = message[w].get_nowait()
            w.sendall(data)
        except Queue.Empty:
            pass
        output.remove(w)
        del message[w]
读写分离
import Queue
obj = Queue.Queue()
obj.put(1)    
obj.put(2)
obj.put(3)
print obj.qsize()    #队列大小
print obj.get()    #如果没有内容,就会阻塞等待
print obj.get_nowait()    #不阻塞,如果有内容,获取,没有内容,则报错


try:
    print obj.get_nowait()
except Queue.Empty:
    print 'error'
print 'end'


obj = Queue.Queue(2)    #队列默认没有大小限制,这里限制为2
obj.put(1)
print '1'
obj.put(2)
print '2'
obj.put(3)
print '3'
obj.get_nowait(3)
Queue的基本使用

6、SocketServer源码剖析

SocketServer本质是创建线程,让客户端请求分别交给各个线程去处理

def process():
    pass
while True:
    r,w,e = select.select([sk,],[],[],1)
    print 'loop'
    if sk in r:
        print 'get request'
        request,client_address = sk.accept()
        t = threading.Thread(target=process,args = (request,client_address))    #创建线程
        t.daemon=False
        t.start()
    sk.close()
class MyServer():
    pass
创建线程
SocketServer.ThreadingTCPServer(('127.0.0.1',8009),MyServer)


class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass    #ThreadingTCPServer类继承了Threading和TCPServer两个类

class TCPServer(BaseServer):
    def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
        """Constructor.  May be extended, do not override."""
        BaseServer.__init__(self, server_address, RequestHandlerClass)
        self.socket = socket.socket(self.address_family,
                                    self.socket_type)    #经典类执行父类的构造方法,其中server_address为ip和端口,RequestHandlerClass为自定义的类MyClass
       #super(TCPServer,self).__init__(self, server_address, RequestHandlerClass)    #新式类提供的执行方法


class BaseServer:
    def __init__(self, server_address, RequestHandlerClass):
        """Constructor.  May be extended, do not override."""
        self.server_address = server_address
        self.RequestHandlerClass = RequestHandlerClass
SocketServer源码解析
SocketServer源码中实际上是调用了ThreadingMinIN、TCPServer和BaseServer三个类,没有多线程和多进程,只用到了select
ThreadingTcpServer内容为pass ,但是由于继承了其他的类,所以可以执行其他类的构造方法和普通方法;在执行方法的时候,按就近的类查找,没有的话去其他的类中查找;
说明

7、多进程ForkingTCPServer源码解析

SocketServer.ForkingTCPServer和SocketServer.ThreadingTCPServer,其中父类ThreadingMININ中的方法process_request调用系统底层,pid=os.fork()创建进程

from multiprocessing import process
p = process(target=xx)    #创建进程
p.start()
创建进程

8、twisted和事件驱动

twisted是一个事件驱动的网络框架

委托链:将类插入到列表中,委托web框架去执行类(的代码),如果要执行类,将类插入到列表中,即将类注册到事件列表中

委托和事件的区别:委托不能为空,事件可以为空

from twisted.internet import protocol
from twisted.internet import reactor

class Echo(protocol.Protocol):    #必须写这个类,继承指定的类,其他的都替你干了
    def dataReceived(self, data):
        self.transport.write(data)

factory = protocol.ServerFactory()
factory.protocol=Echo

reactor.listenTCP(8000,factory)
reactor.run()

#注意:在python提供了进程池,没有提供线程池
twisted实现

twisted echo简单示例如下:

server side:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from twisted.internet import protocol
from twisted.internet import reactor

class Echo(protocol.Protocol):
    def dataReceived(self, data):#只要twisted一收到 数据 ,就会调用 此方法
        self.transport.write(data) # 把收到的数据 返回给客户端

def main():
    factory = protocol.ServerFactory() #定义基础工厂类
    factory.protocol = Echo #类似于socketserver 中handle

    reactor.listenTCP(9000,factory)
    reactor.run()

if __name__ == '__main__':
    main()

client side

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from twisted.internet import reactor, protocol

# a client protocol

class EchoClient(protocol.Protocol):
    """Once connected, send a message, then print the result."""
    def connectionMade(self): #连接一建立成功,就会自动调用 此方法
        print("connection is build, sending data...")
        self.transport.write("hello charles!")

    def dataReceived(self, data):       #data表示connectionMade中的hello charles!""
        "As soon as any data is received, write it back."
        print "Server said:", data
        self.transport.loseConnection()
        # exit('exit')

    def connectionLost(self, reason):
        print "====connection lost==="

class EchoFactory(protocol.ClientFactory):
    protocol = EchoClient  #handle

    def clientConnectionFailed(self, connector, reason):
        print "Connection failed - goodbye!"
        reactor.stop()

    def clientConnectionLost(self, connector, reason):
        print "Connection lost - goodbye!"
        reactor.stop()


# this connects the protocol to a server running on port 8000
def main():
    f = EchoFactory()
    reactor.connectTCP("localhost", 9000, f)
    reactor.run()

# this only runs if the module was *not* imported
if __name__ == '__main__':
    main()

8、select监听多个客户端连接完整代码

select会监听三个通信列表,第一个是所有接受到的数据,第二个是所有要发送出去的数据,第三个是异常时的信息;

如果接受到的数据是客户端请求的连接,需要为每一个新创建的连接新创建一个队列,用于将接下来接受到存储到队列中;在检测到第二个通信列表发生变化时,将数据send到客户端;

server side:

#!/usr/bin/env python
# _*_ coding:utf-8 _*_
__author__ = 'Charles Chang'

import select
import socket
import sys
import Queue

#创建一个TCP/IP socket
server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server.setblocking(False)    #设置socket为非阻塞模式

#绑定 socket 端口
server_address = ('localhost',10000)
print(sys.stderr,'starting up on %s port %s' %server_address)
server.bind(server_address)

#监听输入的连接
server.listen(5)
print(server)

#希望监听的socket
inputs = [server]

#希望写的socket
outputs = []

#存放消息队列,用于将writable的数据发送出去
message_queue = {}
while inputs:
    #等待至少有一个socket连接发送进来
    print('
waiting for the next event')
    readable,writable,exceptional = select.select(inputs,outputs,inputs)

    #处理输入的socket
    for s in readable:
        if s is server:
            print(s,'s.......')
            connection,client_address = s.accept()
            print(connection,'connection')
            print('new connecion from',client_address)
            connection.setblocking(False)
            inputs.append(connection)
            #为每一个连接创建数据队列
            message_queue[connection] = Queue.Queue()
        else:
            data = s.recv(1024)
            if data:
                print(sys.stderr,'reveived %s from %s'%(data,s.getpeername()))
                message_queue[s].put(data)
                if s not in outputs:
                    outputs.append(s)
            else:
                print('closing',client_address,'after reading no data')
                #停止监听输入连接
                if s in outputs:
                    outputs.remove(s)
                inputs.remove(s)
                s.close()

                del message_queue[s]
    for s in writable:
        try:
            #从队列中获取数据
            next_msg = message_queue[s].get_nowait()
        except Queue.Empty:
            print('out queue for',s.getpeername(),'is empty')
            outputs.remove(s)
        else:
            #将数据发送回客户端
            print('sending %s to %s' %(next_msg,s.getpeername()))
            s.send(next_msg)
    for s in exceptional:
        print('handle exceptional condition for',s.getpeername())
        inputs.remove(s)
        if s in outputs:
            outputs.remove(s)
        s.close()

        del message_queue[s]

client side:

#!/usr/bin/env python
# _*_ coding:utf-8 _*_
__author__ = 'Charles Chang'

import socket
import sys

messages = [
    'this is the message',
    'It will be sent',
    'in parts',
]
server_address = ('localhost',10000)
socks = [
    socket.socket(socket.AF_INET,socket.SOCK_STREAM),
    socket.socket(socket.AF_INET,socket.SOCK_STREAM),
]

print(sys.stderr,'connection to %s port %s' %server_address)
for s in socks:
    s.connect(server_address)

for message in messages:

    # Send messages on both sockets
    for s in socks:
        print(sys.stderr, '%s: sending "%s"' % (s.getsockname(), message))
        s.send(message)

    # Read responses on both sockets
    for s in socks:
        data = s.recv(1024)
        print(sys.stderr, '%s: received "%s"' % (s.getsockname(), data))
        if not data:
            print(sys.stderr, 'closing socket', s.getsockname())
            s.close()
原文地址:https://www.cnblogs.com/cqq-20151202/p/5246172.html