网络编程(7day-selector模块,队列)

 http://www.cnblogs.com/yuanchenqi/articles/6755717.html

 selector模块

@程序运行流程
   1,建议大家用selectors模块,此模块IO多路复用会自动选择最佳模型
   2,在客户端第一次连接的时候,完成一次注册sel.register(sock,selectors.EVENT_READ,accept),其中注册只是放到列表中
      一旦有变化(sock)就会触发accept函数, # print(key.fileobj)    #第一次时sock  
                                           # print(key.data)       #第一次时 accept
      accept函数完成对conn的注册,加入到注册表中监控起来,sel.register(conn,selectors.EVENT_READ,read)
      开始通信时,sock不变,   # print(key.fileobj)           # 通信时 conn
                               # print(key.data)              # 通信时 read
      然后read调用conn,如果客户端在win下突然断掉还会报错,所以就try,把注册去掉就行了,linux也是
      
      3,用的时候都不用动,只要写accept函数,和read函数及可

import selectors  # 基于select模块实现的IO多路复用,建议大家使用

import socket

sock=socket.socket()
sock.bind(("127.0.0.1",8800))

sock.listen(5)

sock.setblocking(False)

sel=selectors.DefaultSelector() #根据具体平台选择最佳IO多路机制,比如在linux,选择epoll

def read(conn,mask):

    try:
        data=conn.recv(1024)
        print(data.decode("UTF8"))
        data2=input(">>>")
        conn.send(data2.encode("utf8"))
    except Exception:
        sel.unregister(conn)

def accept(sock,mask):

    conn, addr = sock.accept()
    print("conn",conn)
    sel.register(conn,selectors.EVENT_READ,read) #监听谁就把谁写到注册函数里,read是自己写的, conn

sel.register(sock,selectors.EVENT_READ,accept)  # 注册事件,不是监听 不会卡到那,只是把sock,appcet绑定到这个对像去了,
                                                    #sock有变化直接触发accept,中间的参数没用

while 1:

    print("wating...")
    events=sel.select()   #  监听,会卡到那    [(key1,mask1),(key2,mask2)],mask没有用,每注册一个都会以元组形式加到这里面,
    for key,mask in events:

        # print(key.fileobj)    #第一次时sock          # 通信时 conn
        # print(key.data)       #第一次时 accept       # 通信时 read
        func=key.data
        obj=key.fileobj

        func(obj,mask)  # 1次 accept(sock,mask)    # 2次 read(conn,mask)


IO多路复用实现机制
win: 下只有select
linux: 下有select, poll, epoll

select就是反复的遍历,而epoll是绑定回调函数,实现信号触发,不再遍历

select是效率最低的 只有一个函数
    1,每次调用select都需要将所有的
       fd(文件描述符)从用户空间cp到内核空间,导致效率下降
    2,遍历所有的fd是否有数据访问,最重要的问题cpu数的时间多
    3,最大连接数1024
poll跟select完全一样,只是在最大连接数做了个改进,没有限制了,只有一个函数

epoll:通过三个函数来实现
    1  第一个函数来创建一个epoll句柄,fd(文件描述符)从用户空间cp到内核空间,只拷一次
    2,回调函数,某一个函数或者某一个动作成功完成后会触发的函数
        为所有的fd绑定一个回调函数,一旦有数据访问,触发回调函数
        回调函数将fd放到链表中
    3,连接数不限(端口,什么的也没那嘛多)

队列

import queue


#q=queue.Queue(3)  # 默认是  先进先出(FIFO)


# q.put(111)
# q.put("hello")
# q.put(222)
#
# q.put(223,False)
#
# print(q.get())
# # print(q.get())
# # print(q.get())
# #
# q.get(False)


# queue 优点: 线程安全的  我不想用锁来控制,就想用queue来控制



# join和task_done


# q=queue.Queue(5)

# q.put(111)
# q.put(222)
# q.put(22222)
#
#
# while not q.empty():
#         a=q.get()
#         print(a)
#q.task_done()


# b=q.get()
# print(b)
# q.task_done()

# q.join()
#
# print("ending")


#  先进后出模式

# q=queue.LifoQueue()  #  Lifo  last in first out
#
#
# q.put(111)
# q.put(222)
# q.put(333)
#
# print(q.get())



# 优先级

# q=queue.PriorityQueue()
#
# q.put([4,"hello4"])
# q.put([1,"hello"])
# q.put([2,"hello2"])
#
# print(q.get())
# print(q.get())



# import queue
#
#
# q=queue.Queue()
#
# q.put(111)
# q.put(2222)
# q.put(22333)
#
# print( )


#生产者消费者模型

import time,random
import queue,threading

q = queue.Queue()

def Producer(name):
  count = 0
  while count <10:
    print("making........")
    time.sleep(2)
    q.put(count)
    print('Producer %s has produced %s baozi..' %(name, count))

    count +=1
    #q.task_done()
    #q.join()
    print("ok......")

def Consumer(name):
  count = 0
  while count <10:
    time.sleep(1)
    if not q.empty():
        data = q.get()
        #q.task_done()
        #q.join()
        print(data)
        print('33[32;1mConsumer %s has eat %s baozi...33[0m' %(name, data))
    else:
        print("-----no baozi anymore----")

    count +=1

p1 = threading.Thread(target=Producer, args=('A',))
c1 = threading.Thread(target=Consumer, args=('B',))

# c2 = threading.Thread(target=Consumer, args=('C',))
# c3 = threading.Thread(target=Consumer, args=('D',))
p1.start()
c1.start()
# c2.start()
# c3.start()

 回顾

进程:是最小的资源管理单位,可以理解成一个容器,放线程的 还包括其他的
线程:是最小的执行单位
cpython因为有GIL锁,一个进程只能同时由一个线程出去执行
join,
关于setdaemon:程序直到不存在非守护线程时退出
同步锁  由于多线程处理公共数据 同步锁和递归锁都是互斥锁,一个拿到acquire其他的线程就拿不到
    有acquire release方法,
event 线程通信不是锁 创建 threading.Event  wait是设为True,这个进
    程等着,等到其他线程 set=false才往下进行
    
semaphore = threading.Semaphore(5)可以控制同时拿到acquire的线程数为5
用到数据的连接池上的概念
    
    
# # # import threading
# # # from time import ctime,sleep
# # # import time
# # #
# # # def Music(name):
# # #
# # #         print ("Begin listening to {name}. {time}".format(name=name,time=ctime()))
# # #         sleep(6)
# # #         print("end listening {time}".format(time=ctime()))
# # #
# # # def Blog(title):
# # #
# # #         print ("Begin recording the {title}. {time}".format(title=title,time=ctime()))
# # #         sleep(5)
# # #         print('end recording {time}'.format(time=ctime()))
# # #
# # #
# # # threads = []
# # #
# # #
# # # t1 = threading.Thread(target=Music,args=('FILL ME',),name="")
# # # t2 = threading.Thread(target=Blog,args=('',))
# # #
# # # threads.append(t1)
# # # threads.append(t2)
# # #
# # # if __name__ == '__main__':
# # #
# # #     t1.setDaemon(True)
# # #
# # #     for t in threads:
# # #
# # #         #t.setDaemon(True) #注意:一定在start之前设置
# # #         t.start()
# # #
# # #         #t.join()
# # #
# # #     # t1.join()
# # #     # t2.join()    #  考虑这三种join位置下的结果?
# # #
# # #     print ("all over %s" %ctime())
# #
# #
# #
# #
# #
# # # import time
# # # import threading
# # #
# # # def addNum():
# # #
# # #     LOCK.acquire()
# # #     global num #在每个线程中都获取这个全局变量
# # #     #num-=1
# # #
# # #     temp=num
# # #     time.sleep(0.1)
# # #     num =temp-1  # 对此公共变量进行-1操作
# # #
# # #     LOCK.release()
# # #
# # # num = 100  #设定一个共享变量
# # #
# # # thread_list = []
# # #
# # # LOCK=threading.Lock()
# # #
# # # for i in range(100):
# # #     t = threading.Thread(target=addNum)
# # #     t.start()
# # #     thread_list.append(t)
# # #
# # # for t in thread_list: #等待所有线程执行完毕
# # #     t.join()
# # #
# # # print('Result: ', num)
# #


# #同步锁会造成死锁
# # import threading
# # import time
# #
# # # mutexA = threading.Lock()
# # # mutexB = threading.Lock()
# # RLock = threading.RLock() 递归锁是内部维护一个计数器,只要大于0别的线程就无法抢,抢到就加1
# #
# # class MyThread(threading.Thread):
# #
# #     def __init__(self):
# #         threading.Thread.__init__(self)
# #
# #     def run(self): 使用继续方法使用线程,重写init,run方法是必须的
# #
# #         self.fun1()
# #
# #         self.fun2()
# #
# #     def fun1(self):
# #         RLock.acquire()  # 如果锁被占用,则阻塞在这里,等待锁的释放
# #
# #         print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
# #
# #         RLock.acquire()
# #         print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
# #         RLock.release()
# #         RLock.release()
# #
# #
# #     def fun2(self):
# #         RLock.acquire()
# #         print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
# #         time.sleep(0.5)
# #         RLock.acquire()
# #
# #         print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
# #         RLock.release()
# #
# #         RLock.release()
# #
# # if __name__ == "__main__":
# #
# #     print("start---------------------------%s"%time.time())
# #
# #     for i in range(0, 10):
# #
# #         my_thread = MyThread()
# #         my_thread.start()
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#threading.Event对像例子

# import threading
# import time
# import logging
#
# logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s',)
#
# def worker(event):
#
#     logging.debug('Waiting for redis ready...')
#
#     event.wait()  # flag=True 继续执行
#
#     logging.debug('redis ready, and connect to redis server and do some work [%s]', time.ctime())
#     time.sleep(1)
#
# def main():
#
#     readis_ready = threading.Event()
#
#     t1 = threading.Thread(target=worker, args=(readis_ready,), name='t1')
#     t1.start()
#
#     t2 = threading.Thread(target=worker, args=(readis_ready,), name='t2')
#     t2.start()
#
#     logging.debug('first of all, check redis server, make sure it is OK, and then trigger the redis ready event')
#     time.sleep(3) # simulate the check progress
#     readis_ready.set()  # flag=True
#
# if __name__=="__main__":
#
#     main()







import threading
import time

semaphore = threading.Semaphore(5)

def func():

    semaphore.acquire()

    print (threading.currentThread().getName() + ' get semaphore')
    time.sleep(2)

    semaphore.release()

for i in range(20):
  t1 = threading.Thread(target=func)
  t1.start()
 
原文地址:https://www.cnblogs.com/wanchenxi/p/7887238.html