Python_Example_Thread 线程 学习/经验/示例

 Author: 楚格

2018-11-17     17:34:58

IDE: Pycharm2018.02   Python 3.7   

KeyWord :  线程 threading Thread

Explain:  

-----------------------------------------------------

--

   1 # coding=utf-8
   2 #---------------------------------
   3 '''
   4 # Author  : chu ge 
   5 # Function: 线程 thread
   6 #
   7 '''
   8 #---------------------------------
   9 '''
  10 # --------------------------------
  11 # 导入模块 
  12 # 1.系统库
  13 # 2.第三方库
  14 # 3.相关定义库
  15 # --------------------------------
  16 '''
  17 # 1.系统库
  18 import sys
  19 import os
  20 import time
  21 import random
  22 
  23 #2.第三方库
  24 
  25 #进程导入模块
  26 # from multiprocessing import Process
  27 from multiprocessing import Pool
  28 # from multiprocessing import Queue
  29 # from multiprocessing import Manager
  30 
  31 #进程导入模块
  32 import threading
  33 from threading import Thread
  34 from threading import Lock
  35 from queue import Queue
  36 
  37 
  38 
  39 
  40 
  41 
  42 #
  43 '''
  44 ============================================================================
  45 #》》》》》》》》》》
  46 
  47 线程
  48 
  49 -----------------------
  50 python的thread模块比较底层,
  51 threading模块做了些包装,可以更加方便的被使用
  52 
  53 进程 VS 线程
  54 功能的不同
  55 进程,能够完成多任务,如一台电脑上可以同时运行多个微信
  56 线程,能够完成多任务,如一个微信界面中可以运行多个窗口
  57 定义的不同
  58 进程,是系统进行资源分配和调度的一个独立单位。
  59 线程,是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的
  60 能独立运行的基本单位。
  61     线程自己基本不拥有系统资源,只拥有一点在运行中必不可少的资源,
  62     如:程序计数器,一组寄存器和栈,但是它可与同属一个进程的
  63     其他线程共享进程所拥有的全部的资源。
  64 
  65 区别
  66 一个程序至少有一个进程,一个进程至少有一个线程。
  67 线程的划分尺度小于进程,资源比进程少,使得多线程程序的并发性高。
  68 进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大提高程序的运行效率
  69 线程不能够独立执行,必须依存在进程中
  70 优缺点
  71 线程和进程在使用上各有优缺点:线程执行开销小但不利于资源管理和保护
  72 而进程正相反。
  73 
  74 ----------------------------------------------
  75 1. 线程 使用threading 模块
  76 
  77 -----------------------
  78 1.1 单 / 多线程执行
  79 e.g:
  80 #1如果多个线程执行同一函数,各是各的,互不影响
  81 def Function_Say_Sorry():
  82     print("A say: Sorry !")
  83     time.sleep(1)
  84 if __name__ == "__main__":
  85     # 单线程
  86     for var in range(10):
  87         Function_Say_Sorry()
  88 
  89     # 多线程执行
  90     for var in range(10):
  91         # 创建线程
  92         thread_name = threading.Thread(target = Function_Say_Sorry)
  93         thread_name.start()
  94 result:
  95 1)可以看出使用了多线程并发操作,花费时间要短很多
  96 2)创建好的线程,需要调用start()方法来启动
  97 
  98 -----------------------
  99 e.g:
 100 
 101 if __name__ == "__main__":
 102 
 103 result:
 104 
 105 -----------------------
 106 1.2 主线程会等待所有的子线程结束后才结束
 107 e.g:
 108 def Function_Sing():
 109     for var in range(10):
 110         print("正在唱歌 %d" % var)
 111         time.sleep(1)
 112 
 113 def Function_Dance():
 114     for var in range(5):
 115         print("正在跳舞 %d" % var)
 116         time.sleep(1)
 117         
 118 if __name__ == "__main__":
 119         # 主线程等待子线程结束而结束
 120     print("--- start ---:
 %s" % (time.time()))
 121 
 122     time_A = threading.Thread(target = Function_Sing)
 123     time_B = threading.Thread(target = Function_Dance)
 124     time_A.start()
 125     time_B.start()
 126     time.sleep(5) #
 127     print("--- finish ---:
%s" % (time.time()))
 128     # 查看线程数量
 129     while True:
 130         length = len(threading.enumerate())
 131         print("当前运行的线程数为: [%d] " % (length))
 132         if length <= 1 :
 133             break
 134         time.sleep(0.5)
 135 result:
 136 
 137 -----------------------
 138 1.3 threading 
 139 1.3.1 线程执行代码的封装
 140 通过使用threading模块能完成多任务的程序开发,为了每个线程更完美
 141 所有使用threading模块时,往往定义新的子类class,
 142 只要继承threading.Thread就可以了,然后重写run方法
 143 
 144 e.g:
 145 #线程执行代码的封装
 146 class Class_MyThread(threading.Thread):
 147     def run(self):
 148         for var in range(10):
 149             time.sleep(1)
 150             message = "I am " + self.name + " @ " + str(var)
 151             print("message: %s" % (message))
 152 
 153 
 154 if __name__ == "__main__":
 155     thread_name = Class_MyThread()
 156     thread_name.start()
 157     
 158 result:
 159 python的threading.Thread类有一个run方法,
 160 用于定义线程的功能函数,可以在自己的线程类中覆盖还方法。
 161 而创建自己线程实例后,通过thread类start方法,可以启动该线程,
 162 交给python虚拟机进行调度,当该线程获得执行的机会时,就会调用run方法执行线程。
 163 
 164 -----------------------
 165 1.3.2 线程的执行顺序
 166 e.g:
 167 # 线程的执行顺序
 168   #重复封装类
 169 def Function_Test_A():
 170     for var in range(10):
 171         thread_name_local = Class_MyThread()
 172         thread_name_local.start()
 173 if __name__ == "__main__":
 174     Function_Test_A()
 175     
 176 result:
 177 从执行结果可以看出,多线程程序的执行顺序是不确定的
 178 当执行到sleep语句时,线程将会被阻塞(Blocked),到sleep结束后,
 179 线程进入就绪(Runnable)状态,等待调度,而线程调度就讲自行选择一个线程执行。
 180 上面代码可以保证运行完成的run,但线程启动顺序和run函数中每次循环的执行顺序都不确定。
 181 
 182 总结
 183 1) 每个线程一定会有一个name,尽管示例中没有指定线程对象的name,
 184 但是python会自动为线程指定一个名字。
 185 2)当线程的run函数结束时,该线程完成
 186 3)无法控制线程调度程序,但是可以通过别的方式来影响调度的方式
 187 4)线程的几种状态
 188 
 189      启动         调度        结束
 190 新建 -----> 就绪 <-----> 运行 -----> 死亡
 191                         /
 192          满足条件       等待条件
 193                      /
 194                等待(阻塞) 
 195 
 196 
 197 -----------------------
 198 1.4.1 多线程-共享全局变量
 199 
 200 e.g:
 201 def Function_Work_A():
 202     global global_var_number_A
 203 
 204     for var in range(10):
 205         global_var_number_A += 1
 206     print("work A ,number is %d" % (global_var_number_A))
 207 
 208 
 209 def Function_Work_B():
 210     global global_var_number_A
 211 
 212     print("work B ,number is %d" % (global_var_number_A))
 213 
 214 if __name__ == "__main__":
 215     global_var_number_A = 100
 216 
 217     print("创建线程之前,number is %d" %(global_var_number_A))
 218     thread_name_A = threading.Thread(target = Function_Work_A )
 219     thread_name_A.start()
 220     time.sleep(1)
 221 
 222     thread_name_B = threading.Thread(target = Function_Work_B )
 223     thread_name_B.start()
 224 
 225 result:
 226     创建线程之前,number is 100
 227     work A ,number is 110
 228     work B ,number is 110
 229 
 230 -----------------------
 231 1.4.2 列表当做实参传递到线程中
 232 e.g:
 233 # 列表当做实参传递到线程中
 234 def Function_Work_C(nums):
 235     local_var_number = nums
 236 
 237     local_var_number.append("CC")
 238     print("work C ,number is %s" % (local_var_number))
 239 
 240 
 241 def Function_Work_D(nums):
 242     local_var_number = nums
 243     time.sleep(1)
 244     print("work D ,number is %s" % (local_var_number))
 245 
 246 if __name__ == "__main__":
 247     global_nums = [11,22,33,44,55]
 248     thread_name_C = threading.Thread(target=Function_Work_C, args=(global_nums,))
 249     thread_name_C.start()
 250     thread_name_D = threading.Thread(target=Function_Work_D, args=(global_nums,))
 251     thread_name_D.start()
 252 
 253 result:
 254     work C ,number is [11, 22, 33, 44, 55, 'CC']
 255     work D ,number is [11, 22, 33, 44, 55, 'CC']
 256 总结:
 257 在一线程内的所有线程共享全局变量,能够在不适用其他方式的前提下,
 258 完成多线程之间的数据共享,这点要比多进程要好。
 259 缺点就是,线程是对全局变量随意遂改可能造成多线程之间,
 260 对全局变量的混乱,即线程非安全。
 261 
 262 -----------------------
 263 1.5 线程不安全  同步
 264 
 265 e.g:
 266 # 同步
 267 def Function_Test_B():
 268     global global_var_number
 269     for var in range(1000000):
 270         global_var_number += 1
 271     print("Test B ,number is %d" % (global_var_number))
 272 
 273 def Function_Test_C():
 274     global global_var_number
 275     for var in range(1000000):
 276         global_var_number += 1
 277     print("Test C ,number is %d" % (global_var_number))
 278 
 279 if __name__ == "__main__":
 280     global_var_number = 0
 281     thread_name_E = threading.Thread(target = Function_Test_B)
 282     thread_name_E.start()
 283     time.sleep(3)
 284     thread_name_F = threading.Thread(target = Function_Test_C)
 285     thread_name_F.start()
 286 
 287     print("number: %s" % (global_var_number))
 288 
 289 result:
 290     Test B ,number is 1000000
 291     number: 1059644
 292     Test C ,number is 2000000
 293 
 294 同步:就是协同步调,按照预定的先后次序进行运行。
 295 同指协同,协助,相互配合
 296 如,进程/线程同步,可以理解为进程/线程A和B一块配合,A执行到一定程度时
 297 要依靠B的某个结果,于是停下来,示意B运行,在将结果给A,A继续执行操作。
 298 
 299 解决问题:线程同步
 300 1.系统调用thread_name_E,然后获取到num的值为0,
 301 此时上一把锁,即不允许其他现在操作num
 302 2.对num的值进行+1
 303 3.解锁,此时num的值为1,其他的线程就可以使用num了,
 304 而且num的值是0而不是1
 305 4.同理,其他线程在对num进行修改时,都要先上锁,处理完成后再解锁。
 306 在上锁的整个过程中不允许其他线程访问,就保证了数据的正确性。
 307 
 308 -----------------------
 309 1.6 互斥锁
 310 
 311 当多个线程几乎同时修改某个共享数据的时候,需要进行同步控制。
 312 线程同步能够保证多个线程安全访问竞争资源,最简单的同步机制是引入互斥锁。
 313 
 314 互斥锁为资源引入一个状态: 锁定/非锁定
 315 
 316 某个线程要更改共享数据时,先将其锁定,此时资源的状态为锁定,
 317 其他线程不能更改,直到该线程释放资源,将资源的状态变成非锁定,
 318 其他的线程才能再次锁定该资源。
 319 互斥锁保证每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。
 320 
 321 
 322 threading模块定义Lock类,方便处理锁定:
 323 #创建锁
 324 mutex = threading.Lock()
 325 #锁定
 326 mutex.acquire([blocking])
 327 #释放
 328 mutex.release()
 329 
 330 在锁定方法中,acquire可以有个blocking参数
 331 如果设定blocking为True,则当前线程会阻塞,直到获得这个锁为止,如果没有指定,那么默认为True
 332 如果设定blocking为Fasle,则当前线程不会阻塞。
 333 
 334 
 335 上锁解锁过程:
 336 当一个线程调用锁的acpuire()方法获得解锁时,锁就进入Locked状态。
 337 每次只有一个线程可以获得锁。如果此时另外一个线程试图获得这个锁,
 338 该线程就会变为blocked状态,称为阻塞,直到拥有锁的线程调用锁release()方法释放后,锁进入unlocked状态
 339 线程调度程序从处于同步阻塞状态的线程中选择一个获得锁,并使得该线程进入运行(running)
 340 
 341 总结:
 342 锁的好处:
 343 确保了某段关键代码只能由一个线程从头到尾完整的执行
 344 锁的坏处:
 345 阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式执行,效率降低了。
 346 由于可以存在多个锁,不同的线程持有不同的锁,并试图获得对方持有的锁,可能造成死锁。
 347 
 348 
 349 e.g:
 350 # 互斥锁
 351 def Function_Test_D():
 352     global global_var_number
 353     # Test_D和Test_F线程都在抢着上锁,对这个锁,如果一方上锁成功,
 354     # 导致另一方会堵塞(一直等待)到这个锁被解锁为止
 355     mutexFlag = mutex.acquire(True)
 356     for var in range(1000000):
 357         # True 表示堵塞,如果这个锁在上锁之前已经被上锁了,那么这个线程会在
 358         #False 表示非堵塞,即不管本次调用能够成功上锁,都会卡在这,而继续
 359         if mutexFlag:
 360             global_var_number += 1
 361     # 用来对mutex指向的这个锁,进行解锁,只要解锁了,那么接下来会让所有
 362     # 因为这个锁,被上了锁而堵塞的线程进行抢着上锁
 363     mutex.release()
 364 
 365     print("Test D ,number is %d" % (global_var_number))
 366 
 367 def Function_Test_E():
 368     global global_var_number
 369 
 370     for var in range(1000000):
 371         mutexFlag = mutex.acquire(True)
 372         if mutexFlag:
 373             global_var_number += 1
 374         mutex.release()
 375 
 376     print("Test E ,number is %d" % (global_var_number))
 377 
 378 def Function_Test_F():
 379     global global_var_number
 380     mutexFlag = mutex.acquire(True)
 381     for var in range(1000000):
 382         if mutexFlag:
 383             global_var_number += 1
 384     mutex.release()
 385 
 386     print("Test F ,number is %d" % (global_var_number))
 387 
 388 if __name__ == "__main__":
 389     global_var_number = 0
 390     # 创建锁
 391     mutex = threading.Lock()
 392 
 393     thread_name_D = threading.Thread(target=Function_Test_D)
 394     thread_name_D.start()
 395 
 396     thread_name_E = threading.Thread(target = Function_Test_E)
 397     thread_name_E.start()
 398 
 399     thread_name_F = threading.Thread(target = Function_Test_F)
 400     thread_name_F.start()
 401 
 402     print("number: %s" % (global_var_number))
 403     
 404 result:
 405 
 406 -----------------------
 407 1.7 多线程-非共享数据
 408 
 409 对于全局变量,在多线程中要格外小心,否则容易造成数据错乱的情况发生
 410 在多线程开发中,全局变量是多个线程都是共享的数据,而局部变量等是各自线程,是非共享的。
 411 
 412 e.g:
 413 class Class_My_Thread(threading.Thread):
 414     # 重写 构造方法
 415     def __init__(self,number,SleepTime):
 416         threading.Thread.__init__(self)
 417         self.num       = number
 418         self.SleepTime = SleepTime
 419 
 420     def run(self):
 421         self.num += 1  # 局部变量
 422         time.sleep(self.SleepTime)
 423         print("线程 (%s), number = (%s)" % (self.name,self.num))
 424 
 425 if __name__ == "__main__":
 426     # 创建锁
 427     mutex = threading.Lock()
 428     thread_name_G = Class_My_Thread(100, 3)
 429     thread_name_G.start()
 430 
 431     thread_name_H = Class_My_Thread(200, 1)
 432     thread_name_H.start()
 433 
 434     thread_name_I = Class_My_Thread(300, 2)
 435     thread_name_I.start()
 436     
 437 result:
 438     线程 (Thread-2), number = (201)
 439     线程 (Thread-3), number = (301)
 440     线程 (Thread-1), number = (101)
 441 局部变量是不共享数据的。
 442 
 443 -----------------------
 444 1.8  死锁
 445 
 446 在线程共享多个资源的时候,如果二个线程分别占有一部分资源
 447 并且同时等待对方的资源,就会造成死锁。
 448 尽管死锁很少发生,但一旦发生就会造成应用的停止响应。
 449 
 450 e.g:
 451 # 死锁
 452 class Class_My_Thread_A(threading.Thread):
 453     def run(self):
 454         if mutexA.acquire():
 455             print(self.name + "--do 1 up --")
 456             time.sleep(1)
 457 
 458             if mutexB.acquire():
 459                 print(self.name + "--do 1 down --")
 460                 mutexB.release()
 461             mutexA.release()
 462 
 463 
 464 class Class_My_Thread_B(threading.Thread):
 465     def run(self):
 466         if mutexB.acquire():
 467             print(self.name + "--do 2 up --")
 468             time.sleep(1)
 469 
 470             if mutexA.acquire():
 471                 print(self.name + "--do 2 down --")
 472                 mutexA.release()
 473             mutexB.release()
 474 
 475 if __name__ == "__main__":
 476     # 死锁  不能执行的效果
 477     mutexA = threading.Lock()
 478     mutexB = threading.Lock()
 479     thread_name_J = Class_My_Thread_A()
 480     thread_name_J.start()
 481     thread_name_K = Class_My_Thread_B()
 482     thread_name_K.start()
 483 
 484     print("...")
 485     
 486 result:
 487     #避免死锁
 488     # 程序设计时,要尽量避免,银行家算法
 489     # 添加超时时间等 acquire(timeout)很重要!!!
 490     
 491     
 492 -----------------------
 493 1.9.1 同步应用
 494 多线程有序执行
 495 
 496 可以使用互斥锁完成多个任务,有序的进程工作,这就是线程的同步
 497 
 498 e.g:
 499 # 多线程有序执行
 500 class Class_Task_C(threading.Thread):
 501     def run(self):
 502         while True:
 503             if mutexC.acquire():
 504                 print("--- Task C ---[%s]" % (self.name))
 505                 time.sleep(1)
 506                 mutexD.release()
 507 
 508 class Class_Task_D(threading.Thread):
 509     def run(self):
 510         while True:
 511             if mutexD.acquire():
 512                 print("--- Task D ---[%s]" % (self.name))
 513                 time.sleep(1)
 514                 mutexE.release()
 515 
 516 class Class_Task_E(threading.Thread):
 517     def run(self):
 518         while True:
 519             if mutexE.acquire():
 520                 print("--- Task F ---[%s]" % (self.name))
 521                 time.sleep(1)
 522                 mutexC.release()
 523                 
 524 if __name__ == "__main__":
 525     # 同步应用
 526     # 创建锁
 527     mutexC = threading.Lock()
 528 
 529     mutexD = threading.Lock()
 530     mutexD.acquire()
 531     mutexE = threading.Lock()
 532     mutexE.acquire()
 533 
 534     thread_name_L = Class_Task_C()
 535     thread_name_L.start()
 536 
 537     thread_name_M = Class_Task_D()
 538     thread_name_M.start()
 539 
 540     thread_name_N = Class_Task_E()
 541     thread_name_N.start()
 542 result:
 543     --- Task C ---[Thread-1]
 544     --- Task D ---[Thread-2]
 545     --- Task F ---[Thread-3]
 546     --- Task C ---[Thread-1]
 547     --- Task D ---[Thread-2]
 548     --- Task F ---[Thread-3]
 549     --- Task C ---[Thread-1]
 550     --- Task D ---[Thread-2]
 551 
 552 -----------------------
 553 2.0 Queue 队列
 554 
 555 Queue使用说明
 556 1) 对于Queue,在多线程通信之间扮演重要的角色
 557 2) 添加数据队列中,使用put()方法
 558 3) 从队列中获取数据,使用get()方法
 559 4) 判断队列中是否还有数据,使用qsize()方法
 560 
 561 生产者消费者模式说明:
 562 在线程世界里。生成者就是生产数据的线程,消费者就是消费数据的线程。
 563 在多线程开发中,如果生产者处理速度很快,而消费者处理速度很慢,
 564 那么生成者就必须等待消费者处理完,才能继续生产数据。同样道理
 565 如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。
 566 为了解决这个问题引入了生产者和消费者模式。
 567 
 568 生产者消费者模式,是通过一个容器来解决生产者和消费者的强耦合问题。
 569 生产者和消费者之间彼此之间不直接通讯,而通过阻塞队列来进行通讯,
 570 所有生产者生产数据之后不用等待消费者处理,直接扔给阻塞队列,
 571 消费者不找生产者要数据,而是直接从阻塞队列里取出数据,
 572 阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
 573 这个阻塞队列就是用来给生产者和消费者解耦的。
 574 纵观大多数设计模式,都会找到一个第三者出来进行解耦。
 575 
 576 
 577 e.g:
 578 class Class_Producer_F(threading.Thread):
 579     def run(self):
 580         # global queue
 581         var_count = 0
 582 
 583         while True:
 584             if queue_name.qsize() < 1000 :
 585                 # 每次就生产100个商品,二个线程做个事
 586                 for var in range(100):
 587                     var_count += 1
 588                     message = "生成产品" + str(var_count)
 589                     queue_name.put(message)
 590                     # print("-- Producer F --[%s]" % (self.name))
 591             time.sleep(0.5)
 592 
 593 class Class_Consumer_G(threading.Thread):
 594     def run(self):
 595         # global queue
 596 
 597         while True:
 598             if queue_name.qsize() > 100:
 599                 # 若队列数量大于100,每个线程就取出3个商品,五个线程做这个事
 600                 for var in range(3):
 601                     message = self.name + "消费了" + queue_name.get()
 602                     print("message: (%s)",message)
 603                     # print("-- Consumer G --[%s]" % (self.name))
 604             time.sleep(1)
 605 
 606 if __name__ == "__main__":
 607         # #-----------------------
 608     # Queue  提供了同步、线程安全的队列类
 609     # 队列原则: 先进先出  FIFO
 610     # 栈的原则: 先进后出  LIFO
 611     # 这些队列都实现了锁原语,原子操作
 612     #创建队列 ,这个队列只能用于线程,而进程中不能使用
 613     queue_name = Queue() # 队列
 614     # 队列初创
 615     for var in range(500):
 616         queue_name.put("》》初始产品"+ str(var))
 617     # 创建二次线程 producer
 618     for var in range(2):
 619         producer_name = Class_Producer_F()
 620         producer_name.start()
 621     # 创建二次线程 consumer
 622     for var in range(5):
 623         producer_name = Class_Consumer_G()
 624         producer_name.start()
 625     # 2 + 5 +1(主线程) = 8 (总线程)
 626 result:
 627 
 628 -----------------------
 629 2.1 ThreadLocal
 630 
 631 ThreadLocal不用查找dict,ThreadLocal帮你自动的做这个事:
 632 全局变量Local_school就是一个ThreadLocal对象,
 633 每个Thread对他都可以读写student属性,但互不影响。
 634 你可以把ThreadLocal看出全局变量,但每个属性,
 635 如,Local_school.student都是线程的局部变量,
 636 可以任意读写而互不干扰,也不用管理锁的问题,ThreadLocal内部会处理。
 637 
 638 可以理解为全局变量local_school是dict,不但可以用Local_school.student,
 639 还可以绑定其他变量,如Local_school.teacher等。
 640 
 641 ThreadLocal最常用的地方就是为每个线程绑定一个数据库连接,
 642 HTTP请求,用户身份信息,这样一个线程的所有调用到的处理函数
 643 都可以非常方便的访问这些资源。
 644 
 645 总结:
 646 ThreadLocal变量虽然是全局变量,但是每个线程都只能读写自己线程的独立副本,
 647 互不干扰,ThreadLocal解决了参数在一个线程中各个函数之间互相传递的问题。
 648 
 649 
 650 e.g:
 651 # ThreadLocal
 652 def Function_Process_student():
 653     # 获取当前线程关联的student
 654     student_name = local_school.student
 655     print("%s: (in %s)"% (student_name,threading.current_thread()))
 656 
 657 def Function_Process_thread(name):
 658     local_var_name =name
 659 
 660     # 绑定ThreadLocal的student
 661     local_school.student = local_var_name
 662     Function_Process_student()
 663 
 664 if __name__ == "__main__":
 665     # ThreadLocal
 666     # 创建全局ThreadLocal对象
 667     local_school = threading.local()
 668 
 669     thread_name_O = threading.Thread(target=Function_Process_thread, args=('图形分析',), name='Thread-A')
 670     thread_name_O.start()
 671     thread_name_O.join()
 672 
 673     thread_name_P = threading.Thread(target = Function_Process_thread, args=('数据处理',), name='Thread-B')
 674     thread_name_P.start()
 675     thread_name_P.join()
 676 
 677     thread_name_Q = threading.Thread(target = Function_Process_thread, args=('信号处理',), name='Thread-C')
 678     thread_name_Q.start()
 679     thread_name_Q.join()
 680 result:
 681 
 682 -----------------------
 683 2.2 异步
 684 
 685 e.g:
 686 def Function_Test_G():
 687     print("进程池中的进程PID=[%s],PPID=[%s]" % (os.getpid(),os.getppid()))
 688     for var in range(3):
 689         print("-- [%s]" % (var))
 690         time.sleep(1)
 691     return "Function_Test_G"
 692 
 693 def Function_Test_H(args):
 694     print("callback func PID =[%s]" % (os.getpid()))
 695     print("callback func args=[%s]" % (args))
 696 
 697 if __name__ == "__main__":
 698     # 异步
 699     #创建进程池
 700     pool_name = Pool(3)
 701     # callback 回调函数 子线程结束后,立刻执行回调函数
 702     pool_name.apply_async(func = Function_Test_G,callback= Function_Test_H )
 703 
 704     # 异步的理解:主进程正在做某件事情,
 705     # 突然来了一件更需要立刻去做的事情,
 706     # 那么这种,在父进程去做某件事情的时候 
 707     # 并不知道是什么时候去做,的模式 就称为异步
 708     while True:
 709         time.sleep(1)
 710         print("主进程 PID = [%s]" % (os.getpid()))
 711 result:
 712     进程池中的进程PID=[69792],PPID=[71624]
 713     -- [0]
 714     主进程 PID = [71624]
 715     -- [1]
 716     主进程 PID = [71624]
 717     -- [2]
 718     主进程 PID = [71624]
 719     callback func PID =[71624]
 720     callback func args=[Function_Test_G]
 721     主进程 PID = [71624]
 722     主进程 PID = [71624]
 723     主进程 PID = [71624]
 724     
 725 -----------------------
 726 
 727 ----------------------------------------------
 728 
 729 ============================================================================
 730 '''
 731 
 732 #
 733 '''
 734 # ============================================================================
 735 # Function:  
 736 # Explain :  输入参数   
 737 #         :  输出参数  
 738 # ============================================================================
 739 '''
 740 # #-----------------------
 741 # 单 / 多 线程执行
 742 
 743 def Function_Say_Sorry():
 744     print("A say: Sorry !")
 745     time.sleep(1)
 746 
 747 # #-----------------------
 748 def Function_Sing():
 749     for var in range(10):
 750         print("正在唱歌 %d" % var)
 751         time.sleep(1)
 752 
 753 def Function_Dance():
 754     for var in range(5):
 755         print("正在跳舞 %d" % var)
 756         time.sleep(1)
 757 
 758 # #-----------------------
 759 #线程执行代码的封装
 760 class Class_MyThread(threading.Thread):
 761     def run(self):
 762         for var in range(10):
 763             time.sleep(1)
 764             message = "I am " + self.name + " @ " + str(var)
 765             print("message: %s" % (message))
 766 
 767 # #-----------------------
 768 # 线程的执行顺序
 769   #重复封装类
 770 def Function_Test_A():
 771     for var in range(10):
 772         thread_name_local = Class_MyThread()
 773         thread_name_local.start()
 774 
 775 # #-----------------------
 776 # 多线程-共享全局变量
 777 
 778 def Function_Work_A():
 779     global global_var_number_A
 780 
 781     for var in range(10):
 782         global_var_number_A += 1
 783     print("work A ,number is %d" % (global_var_number_A))
 784 
 785 
 786 def Function_Work_B():
 787     global global_var_number_A
 788 
 789     print("work A ,number is %d" % (global_var_number_A))
 790 
 791 # #-----------------------
 792 # 列表当做实参传递到线程中
 793 def Function_Work_C(nums):
 794     local_var_number = nums
 795 
 796     local_var_number.append("CC")
 797     print("work C ,number is %s" % (local_var_number))
 798 
 799 
 800 def Function_Work_D(nums):
 801     local_var_number = nums
 802     time.sleep(1)
 803     print("work D ,number is %s" % (local_var_number))
 804 
 805 
 806 # #-----------------------
 807 # 同步
 808 def Function_Test_B():
 809     global global_var_number
 810     for var in range(1000000):
 811         global_var_number += 1
 812     print("Test B ,number is %d" % (global_var_number))
 813 
 814 def Function_Test_C():
 815     global global_var_number
 816     for var in range(1000000):
 817         global_var_number += 1
 818     print("Test C ,number is %d" % (global_var_number))
 819 
 820 # #-----------------------
 821 # 互斥锁
 822 def Function_Test_D():
 823     global global_var_number
 824     # Test_D和Test_F线程都在抢着上锁,对这个锁,如果一方上锁成功,
 825     # 导致另一方会堵塞(一直等待)到这个锁被解锁为止
 826     mutexFlag = mutex.acquire(True)
 827     for var in range(1000000):
 828         # True 表示堵塞,如果这个锁在上锁之前已经被上锁了,那么这个线程会在
 829         #False 表示非堵塞,即不管本次调用能够成功上锁,都会卡在这,而继续
 830         if mutexFlag:
 831             global_var_number += 1
 832     # 用来对mutex指向的这个锁,进行解锁,只要解锁了,那么接下来会让所有
 833     # 因为这个锁,被上了锁而堵塞的线程进行抢着上锁
 834     mutex.release()
 835 
 836     print("Test D ,number is %d" % (global_var_number))
 837 
 838 def Function_Test_E():
 839     global global_var_number
 840 
 841     for var in range(1000000):
 842         mutexFlag = mutex.acquire(True)
 843         if mutexFlag:
 844             global_var_number += 1
 845         mutex.release()
 846 
 847     print("Test E ,number is %d" % (global_var_number))
 848 
 849 def Function_Test_F():
 850     global global_var_number
 851     mutexFlag = mutex.acquire(True)
 852     for var in range(1000000):
 853         if mutexFlag:
 854             global_var_number += 1
 855     mutex.release()
 856 
 857     print("Test F ,number is %d" % (global_var_number))
 858 
 859 # #-----------------------
 860 # 多线程-非共享数据
 861 class Class_My_Thread(threading.Thread):
 862     # 重写 构造方法
 863     #   1. 全局变量在多个线程中 共享,为了保证正确运行需要锁
 864     #   2. 非全局变量在每个线程中 各有一份,不会共享,当然了不需要加锁
 865     def __init__(self,number,SleepTime):
 866         threading.Thread.__init__(self)
 867         self.num       = number
 868         self.SleepTime = SleepTime
 869 
 870     def run(self):
 871         self.num += 1  # 局部变量
 872         time.sleep(self.SleepTime)
 873         print("线程 (%s), number = (%s)" % (self.name,self.num))
 874 
 875 # #-----------------------
 876 # 死锁
 877 class Class_My_Thread_A(threading.Thread):
 878 
 879     def run(self):
 880         if mutexA.acquire():
 881             print(self.name + "--do 1 up --")
 882             time.sleep(1)
 883 
 884             if mutexB.acquire(True,3):
 885                 print(self.name + "--do 1 down --")
 886                 mutexB.release()
 887             mutexA.release()
 888 
 889 class Class_My_Thread_B(threading.Thread):
 890 
 891     def run(self):
 892         if mutexB.acquire(True,4):
 893             print(self.name + "--do 2 up --")
 894             time.sleep(1)
 895 
 896             if mutexA.acquire():
 897                 print(self.name + "--do 2 down --")
 898                 mutexA.release()
 899             mutexB.release()
 900 
 901 # #-----------------------
 902 # 多线程有序执行
 903 class Class_Task_C(threading.Thread):
 904     def run(self):
 905         while True:
 906             if mutexC.acquire():
 907                 print("--- Task C ---[%s]" % (self.name))
 908                 time.sleep(1)
 909                 mutexD.release()
 910 
 911 class Class_Task_D(threading.Thread):
 912     def run(self):
 913         while True:
 914             if mutexD.acquire():
 915                 print("--- Task D ---[%s]" % (self.name))
 916                 time.sleep(1)
 917                 mutexE.release()
 918 
 919 class Class_Task_E(threading.Thread):
 920     def run(self):
 921         while True:
 922             if mutexE.acquire():
 923                 print("--- Task E ---[%s]" % (self.name))
 924                 time.sleep(1)
 925                 mutexC.release()
 926 # #-----------------------
 927 # FIFO 队列  生产者消费模式
 928 class Class_Producer_F(threading.Thread):
 929     def run(self):
 930         # global queue
 931         var_count = 0
 932 
 933         while True:
 934             if queue_name.qsize() < 1000 :
 935                 # 每次就生产100个商品,二个线程做个事
 936                 for var in range(100):
 937                     var_count += 1
 938                     message = "生成产品" + str(var_count)
 939                     queue_name.put(message)
 940                     # print("-- Producer F --[%s]" % (self.name))
 941             time.sleep(0.5)
 942 
 943 class Class_Consumer_G(threading.Thread):
 944     def run(self):
 945         # global queue
 946 
 947         while True:
 948             if queue_name.qsize() > 100:
 949                 # 若队列数量大于100,每个线程就取出3个商品,五个线程做这个事
 950                 for var in range(3):
 951                     message = self.name + "消费了" + queue_name.get()
 952                     print("message: (%s)",message)
 953                     # print("-- Consumer G --[%s]" % (self.name))
 954             time.sleep(1)
 955 
 956 # #-----------------------
 957 # ThreadLocal
 958 def Function_Process_student():
 959     # 获取当前线程关联的student
 960     student_name = local_school.student
 961     print("%s: (in %s)"% (student_name,threading.current_thread()))
 962 
 963 def Function_Process_thread(name):
 964     local_var_name =name
 965 
 966     # 绑定ThreadLocal的student
 967     local_school.student = local_var_name
 968     Function_Process_student()
 969 
 970 # #-----------------------
 971 # 异步
 972 def Function_Test_G():
 973     print("进程池中的进程PID=[%s],PPID=[%s]" % (os.getpid(),os.getppid()))
 974     for var in range(3):
 975         print("-- [%s]" % (var))
 976         time.sleep(1)
 977     return "Function_Test_G"
 978 
 979 def Function_Test_H(args):
 980     print("callback func PID =[%s]" % (os.getpid()))
 981     print("callback func args=[%s]" % (args))
 982 
 983 # #-----------------------
 984 
 985 # #-----------------------
 986 
 987 # ============================================================================
 988 '''
 989 # ============================================================================
 990 #   测试专用
 991 # ============================================================================
 992 '''
 993 if __name__ == "__main__":
 994     # #-----------------------
 995     # # 单线程执行
 996     # for var in range(10):
 997     #     Function_Say_Sorry()
 998 
 999 
1000 
1001     # # -----------------------
1002     # # 多线程执行
1003     # for var in range(10):
1004     #     # 创建线程
1005     #     thread_name = threading.Thread(target = Function_Say_Sorry)
1006     #     thread_name.start()
1007 
1008 
1009 
1010     # # # -----------------------
1011     # # 主线程等待子线程结束而结束
1012     # print("--- start ---:
 %s" % (time.time()))
1013     #
1014     # time_A = threading.Thread(target = Function_Sing)
1015     # time_B = threading.Thread(target = Function_Dance)
1016     # time_A.start()
1017     # time_B.start()
1018     # # time.sleep(5) #
1019     # print("--- finish ---:
%s" % (time.time()))
1020     #
1021     # # 查看线程数量
1022     # while True:
1023     #     length = len(threading.enumerate())
1024     #     print("当前运行的线程数为: [%d] " % (length))
1025     #     if length <= 1 :
1026     #         break
1027     #     time.sleep(0.5)
1028 
1029 
1030 
1031     # # # -----------------------
1032     # #线程执行代码的封装
1033     # thread_name = Class_MyThread()
1034     # thread_name.start()
1035 
1036 
1037 
1038     # # # -----------------------
1039     # #线程执行顺序
1040     # Function_Test_A()
1041 
1042 
1043 
1044     # # -----------------------
1045     # 多线程-共享全局变量
1046     # global_var_number_A = 100
1047     #
1048     # print("创建线程之前,number is %d" %(global_var_number_A))
1049     # thread_name_A = threading.Thread(target = Function_Work_A )
1050     # thread_name_A.start()
1051     # time.sleep(1)
1052     #
1053     # thread_name_B = threading.Thread(target = Function_Work_B )
1054     # thread_name_B.start()
1055 
1056 
1057 
1058     # # -----------------------
1059     # 列表当做实参传递到线程中
1060     # global_nums = [11,22,33,44,55]
1061     # thread_name_C = threading.Thread(target=Function_Work_C, args=(global_nums,))
1062     # thread_name_C.start()
1063     # thread_name_D = threading.Thread(target=Function_Work_D, args=(global_nums,))
1064     # thread_name_D.start()
1065 
1066 
1067 
1068     # # # -----------------------
1069     # # 同步
1070     # global_var_number = 0
1071     # thread_name_E = threading.Thread(target = Function_Test_B)
1072     # thread_name_E.start()
1073     # time.sleep(3)
1074     # thread_name_F = threading.Thread(target = Function_Test_C)
1075     # thread_name_F.start()
1076     #
1077     # print("number: %s" % (global_var_number))
1078 
1079 
1080 
1081     # # -----------------------
1082     # 互斥锁
1083     # global_var_number = 0
1084     # # 创建锁
1085     # mutex = threading.Lock()
1086     #
1087     # thread_name_D = threading.Thread(target=Function_Test_D)
1088     # thread_name_D.start()
1089     #
1090     # thread_name_E = threading.Thread(target = Function_Test_E)
1091     # thread_name_E.start()
1092     #
1093     # thread_name_F = threading.Thread(target = Function_Test_F)
1094     # thread_name_F.start()
1095     #
1096     # print("number: %s" % (global_var_number))
1097 
1098 
1099     # # # -----------------------
1100     # # 多线程-非共享数据
1101     # # 创建锁
1102     # mutex = threading.Lock()
1103     # thread_name_G = Class_My_Thread(100, 3)
1104     # thread_name_G.start()
1105     #
1106     # thread_name_H = Class_My_Thread(200, 1)
1107     # thread_name_H.start()
1108     #
1109     # thread_name_I = Class_My_Thread(300, 2)
1110     # thread_name_I.start()
1111 
1112 
1113 
1114     # # # -----------------------
1115     # # 死锁  不能执行的效果
1116     # mutexA = threading.Lock()
1117     # mutexB = threading.Lock()
1118     # thread_name_J = Class_My_Thread_A()
1119     # thread_name_J.start()
1120     # thread_name_K = Class_My_Thread_B()
1121     # thread_name_K.start()
1122     #
1123     # print("...")
1124     # #避免死锁
1125     # # 程序设计时,要尽量避免,银行家算法
1126     # # 添加超时时间等 acquire(timeout)很重要!!!
1127 
1128 
1129     # # #-----------------------
1130     # # 同步应用
1131     # # 创建锁
1132     # mutexC = threading.Lock()
1133     #
1134     # mutexD = threading.Lock()
1135     # mutexD.acquire()
1136     # mutexE = threading.Lock()
1137     # mutexE.acquire()
1138     #
1139     # thread_name_L = Class_Task_C()
1140     # thread_name_L.start()
1141     #
1142     # thread_name_M = Class_Task_D()
1143     # thread_name_M.start()
1144     #
1145     # thread_name_N = Class_Task_E()
1146     # thread_name_N.start()
1147 
1148 
1149     # # #-----------------------
1150     # # Queue  提供了同步、线程安全的队列类
1151     # # 队列原则: 先进先出  FIFO
1152     # # 栈的原则: 先进后出  LIFO
1153     # # 这些队列都实现了锁原语,原子操作
1154     # #创建队列 ,这个队列只能用于线程,而进程中不能使用
1155     # queue_name = Queue() # 队列
1156     # # 队列初创
1157     # for var in range(500):
1158     #     queue_name.put("》》初始产品"+ str(var))
1159     # # 创建二次线程 producer
1160     # for var in range(2):
1161     #     producer_name = Class_Producer_F()
1162     #     producer_name.start()
1163     # # 创建二次线程 consumer
1164     # for var in range(5):
1165     #     producer_name = Class_Consumer_G()
1166     #     producer_name.start()
1167     # # 2 + 5 +1(主线程) = 8 (总线程)
1168 
1169 
1170 
1171     # # #-----------------------
1172     # # ThreadLocal
1173     # # 创建全局ThreadLocal对象
1174     # local_school = threading.local()
1175     # thread_name_O = threading.Thread(target=Function_Process_thread, args=('图形分析',), name='Thread-A')
1176     # thread_name_O.start()
1177     # thread_name_O.join()
1178     # thread_name_P = threading.Thread(target = Function_Process_thread, args=('数据处理',), name='Thread-B')
1179     # thread_name_P.start()
1180     # thread_name_P.join()
1181     #
1182     # thread_name_Q = threading.Thread(target = Function_Process_thread, args=('信号处理',), name='Thread-C')
1183     # thread_name_Q.start()
1184     # thread_name_Q.join()
1185 
1186 
1187     # # #-----------------------
1188     # # 异步
1189     # #创建进程池
1190     # pool_name = Pool(3)
1191     # # callback 回调函数 子线程结束后,立刻执行回调函数
1192     # pool_name.apply_async(func = Function_Test_G,callback= Function_Test_H )
1193     #
1194     # # 异步的理解:主进程正在做某件事情,
1195     # # 突然来了一件更需要立刻去做的事情,
1196     # # 那么这种,在父进程去做某件事情的时候 
1197     # # 并不知道是什么时候去做,的模式 就称为异步
1198     # while True:
1199     #     time.sleep(1)
1200     #     print("主进程 PID = [%s]" % (os.getpid()))
1201 
1202     print("learn finish")
1203     

--

-----------------------------------------------------

原文地址:https://www.cnblogs.com/caochucheng/p/9974843.html