python multiprocess不能完全关闭socket的验证

近日项目有原来的多线程升级成为多进程模型后,但出现了个问题,在持续运行一天左右系统处理能力开始变慢,并不时打印以下信息:

too many opened files

修改ulimit中open files为10240之后,运行时间稍微变长,但还是会出现该问题。

使用iostat查看统计信息没发现异常,使用netstat 发现系统连接信息如下

#netstat -n | awk '/^tcp/ {++S[$NF]} END {for(a in S) print a, S[a]}'
TIME_WAIT 59 CLOSE_WAIT 28199 ESTABLISHED 170

  出现了大量的CLOSE_WAIT,应该是此处导致了fd泄露,抓包分析,结果如下

1 11:22:11.613144 IP 127.0.0.1.55313 > 127.0.0.1.9000: Flags [S], seq 1383947720, win 32792, options [mss 16396,sackOK,TS val 2912656137 ecr 0,nop,wscale 7], length 0
2 11:22:11.613175 IP 127.0.0.1.9000 > 127.0.0.1.55313: Flags [S.], seq 3913709618, ack 1383947721, win 32768, options [mss 16396,sackOK,TS val 2912656137 ecr 2912656137,nop,wscale 7], length 0
3 11:22:11.613192 IP 127.0.0.1.55313 > 127.0.0.1.9000: Flags [.], ack 1, win 257, options [nop,nop,TS val 2912656137 ecr 2912656137], length 0
4 11:22:11.613435 IP 127.0.0.1.55313 > 127.0.0.1.9000: Flags [P.], seq 1:10, ack 1, win 257, options [nop,nop,TS val 2912656137 ecr 2912656137], length 9
5 11:22:11.613446 IP 127.0.0.1.9000 > 127.0.0.1.55313: Flags [.], ack 10, win 256, options [nop,nop,TS val 2912656137 ecr 2912656137], length 0
6 11:22:11.632594 IP 127.0.0.1.9000 > 127.0.0.1.55313: Flags [P.], seq 1:10, ack 10, win 256, options [nop,nop,TS val 2912656156 ecr 2912656137], length 9
7 11:22:11.632650 IP 127.0.0.1.55313 > 127.0.0.1.9000: Flags [.], ack 10, win 257, options [nop,nop,TS val 2912656156 ecr 2912656156], length 0
8 11:22:11.632732 IP 127.0.0.1.55313 > 127.0.0.1.9000: Flags [F.], seq 10, ack 10, win 257, options [nop,nop,TS val 2912656156 ecr 2912656156], length 0
9 11:22:11.672056 IP 127.0.0.1.9000 > 127.0.0.1.55313: Flags [.], ack 11, win 256, options [nop,nop,TS val 2912656196 ecr 2912656156], length 0

可以看出,在该次第通信中,前期数据包交互一切正常,但在连接结束之后client端发送了fin报文,而server没有发送Fin包,导致大量的tcp连接停留在close_wai状态,close_wait的分析参考这里http://blog.chinaunix.net/uid-9688646-id-3469570.html

追查代码是很确定调用了close函数关闭socket

因为之前多线程时候没有类似的问题,所以考虑是不是引入多进程之后导致的问题

为此编写多进程版本的socket模型,查看其数据包交互模型,参考网上一哥们的代码,如下

 1 #multiprocessserver.py 
 2 import multiprocessing
 3 import socket
 4 
 5 def handle(connection, address):
 6     import logging
 7     logging.basicConfig(level=logging.DEBUG)
 8     logger = logging.getLogger("process-%r" % (address,))
 9     try:
10         logger.debug("Connected %r at %r", connection, address)
11         while True:
12             data = connection.recv(1024)
13             if data == "":
14                 logger.debug("Socket closed remotely")
15                 break
16             logger.debug("Received data %r", data)
17             connection.sendall(data)
18             logger.debug("Sent data")
19     except:
20         logger.exception("Problem handling request")
21     finally:
22         logger.debug("Closing socket")
23         connection.close()
24 
25 class Server(object):
26     def __init__(self, hostname, port):
27         import logging
28         self.logger = logging.getLogger("server")
29         self.hostname = hostname
30         self.port = port
31 
32     def start(self):
33         self.logger.debug("listening")
34         self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
35         self.socket.bind((self.hostname, self.port))
36         self.socket.listen(1)
37 
38         while True:
39             conn, address = self.socket.accept()
40             self.logger.debug("Got connection")
41             process = multiprocessing.Process(target=handle, args=(conn, address))
42             process.daemon = True
43             process.start()
44             self.logger.debug("Started process %r", process)
45 
46 if __name__ == "__main__":
47     import logging
48     logging.basicConfig(level=logging.DEBUG)
49     server = Server("0.0.0.0", 9000)
50     try:
51         logging.info("Listening")
52         server.start()
53     except:
54         logging.exception("Unexpected exception")
55     finally:
56         logging.info("Shutting down")
57         for process in multiprocessing.active_children():
58             logging.info("Shutting down process %r", process)
59             process.terminate()
60             process.join()
61     logging.info("All done")
 1 #mulclient.py 
 2 import socket
 3 
 4 if __name__ == "__main__":
 5     sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 6     sock.connect(("localhost", 9000))
 7     data = "some data"
 8     sock.sendall(data)
 9     result = sock.recv(1024)
10     print result
11     sock.close()

验证了上面的假设

结论:

python2.6.6版本的多进程multiprocessing在关闭socket时是有问题的,会导致close函数无效从而引发大量close_wait状态的tcp链接

原文地址:https://www.cnblogs.com/lovemyspring/p/3792790.html