一、socket提升
1、熟悉socket.socket()中的省略部分
socket.socket(AF.INET,socket.SOCK_STREAM)
2、send与recv发送大文件时对于黏包的处理。
2.1 sendall的发送方式与for i in f(局部)
2.2 使用send、recv交替的方式做一个ack来解决黏包
3、socketserver多并发处理
3.1 5种不同的socket类
3.1.1 baseserver用于继承,不对外提供服务
3.1.2 tcpserver继承baseserver类用于处理tcp连接
3.1.3 unixstreamserver继承tcpserver使用UNIX域套接字实现面向数据流协议(sock_stream --->tcp)
3.1.4 udpserver继承tcpsever用于处理udp连接
3.1.5 unixdatagramserver继承tcpserver使用针对UNIX域套接字来处理数据报式协议(sock_dgram--->udp)
3.2 socketserver的实现
3.2.1 step1 定义一个类,存放handler方法
需要继承socketserver.BaseRequestHandler,之后每生成一个新连接,都会实例化一个类,并调用这个handler方法,和客户端所有的交互都是在这个handler里面定义的
class myclass(socketserver.BaseRequestHandler): def handle(self): print('xxx') self.request.send(b'ok')
3.2.2 step2 定义一个实例,此实例关联上一步的handler、并绑定ip和及端口,监听端口处理新发起的连接,将新发起的连接实例化对像,并交给handler处理。
此处我们要处理tcp,所以是tcpserver。考虑到并发,此处使用threading多线程实现,或者可以使用ForkingServer来处理
s1=socketserver.ThreadingTCPServer(('localhost',9999),myclass)
s2=socketserver.FuckingTCPServer(('localhost',8888),myclass)
3.2.3 step3 server.forever()
s1.serve_forever()
s2.serve_forever()
3.2.4 step4 客户端对接测试
import socketserver class myclass(socketserver.BaseRequestHandler): def handle(self): print('xxx') self.request.send(b'ok') s1=socketserver.ThreadingTCPServer(('localhost',9999),myclass) s2=socketserver.FockingTCPServer(('localhost',8888),myclass) s1.serve_forever() s2.serve_forever() ------------------------------- import socket c=socket.socket() c.connect(('localhost',8888)) print(c.recv(1024).decode())
4、断点续传的实现->seek
通过byte的方式读取文件结合使用ab的方式续写文件的方式来实现断点续传,其核心的思路就是,获得当前半成品文件的字节数,使用seek将被操作文件的句柄移动到此处再往后读取,以此实现断点续传
import socket c1=socket.socket() c1.connect(('localhost',6666)) ''' f=open(r'E:L.exe','rb') count = 0 for i in f: c1.send(i) count+=1 if count > 10: break ''' x=input('seek到哪个位置?') f=open(r'E:L.exe','rb') f.seek(int(x)) print('开始传输') count=0 for i in f: print(' %s'%count) count+=1 c1.send(i) print('ok') --------------------- import socket import os s1=socket.socket() s1.bind(('localhost',6666)) s1.listen() print('----begin----') conn,addr=s1.accept() print('连接建立') print(os.path.getsize(r'E:\xxxx.exe')) f=open(r'E:\xxxx.exe','ab') while True: data=conn.recv(1024) f.write(data) if not data: print('end') break ''' f=open(r'E:\xxxx.exe','wb') while True: data=conn.recv(1024) f.write(data) if not data: print('end') break '''
5、optparse模块的使用
5.1 定义对象
import optparse parse=optparse.OptionParser()
5.2 添加参数
parse.add_option('-u','--user',dest='user',action='store',type=str,metavar='user',help='Enter User Name!!') parse.add_option('-p','--port',dest='port',type=int,metavar='xxxxx',default=3306,help='Enter Mysql Port!!')
#-u,--user 表示一个是短选项 一个是长选项
#dest='user' 将该用户输入的参数保存到变量user中,可以通过options.user方式来获取该值
#type=str,表示这个参数值的类型必须是str字符型,如果是其他类型那么将强制转换为str(可能会报错)
#metavar='user',当用户查看帮助信息,如果metavar没有设值,那么显示的帮助信息的参数后面默认带上dest所定义的变量名
#help='Enter..',显示的帮助提示信息
#default=3306,表示如果参数后面没有跟值,那么将默认为变量default的值
#parse.set_defaults(v=1.2) #也可以这样设置默认值
5.3 监听
将监听结果赋值给options和args,一个结果为属性一个结果为列表。
options,args=parse.parse_args()
5.4 例子
import optparse class test(): def __init__(self): parse=optparse.OptionParser() parse.add_option('-s',dest='x',help='server binding host',metavar='HOST') parse.add_option('-p',dest='port',help='server binding port') (options,args)=parse.parse_args() print(options.x,options.port) for i in args: print(i) F:ftp服务器_sockserver版servercore>python main.py -s 0.0.0.0 -p xxx aaa bbb ccc 0.0.0.0 xxx aaa bbb ccc F:ftp服务器_sockserver版servercore>python main.py -h Usage: main.py [options] Options: -h, --help show this help message and exit -s HOST server binding host -p PORT server binding port
二、paramiko模块的使用
1、远程ssh并执行指令返回结果
import paramiko #step1 实例化ssh ssh = paramiko.SSHClient() #step2 #加上这句话不用担心选yes的问题,会自动选上(用ssh连接远程主机时,第一次连接时会提示是否继续进行远程连接,选择yes) ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy)#家目录/.ssh/known_hosts #step3 连接 ssh.connect(hostname='xxx.cm',port=22,username='xxxxx',password='xxxxx') #step4 执行 stdin,stdout,stderr=ssh.exec_command('df -h;pwd')#可以用;执行多个指令 x1=stdout.read() x2=stderr.read() result=x1 if x1 else x2#三元运算 print(result.decode()) #step5 关闭 ssh.close()
------------------------
Filesystem Size Used Avail Use% Mounted on
/dev/sda1 7.9G 3.8G 3.8G 51% /
none 3.9G 4.0K 3.9G 1% /dev/shm
/dev/sda2 8.7G 298M 7.9G 4% /xxx/conf
/dev/sda3 893G 88G 761G 11% /xxx/data
/xxx/data/home/xxxxxx
2、transport文件远程scp文件
import paramiko #step1 写连接信息 linkit=paramiko.Transport('xxx.cm',22) linkit.connect(username='xxxxx',password='xxxx') #step2 创建连接对象 sftp_object=paramiko.SFTPClient.from_transport(linkit) #step3 上传下载文件 sftp_object.put('config.conf','tmpfromwin')#上传 # config.conf 为本地文件 tmpfromwin为本地文件上传到服务器上的文件名 sftp_object.get('xxx.zip',r'f:x.zip')#下载 #xxx.zip为服务器的文件名 r'f:x.zip'本地保存的位置及文件名
3、免密登录
3.1 免密登录的思路
PC-A 生成公钥和私钥
PC-A 将公钥发给PC B
PC-A 无密码登录PC-B
Python 调用pca的私钥即可完成对pcb的无密码登录
3.2 免密登录的key生成
pc-A
[root@localhost ~]# ssh-keygen Generating public/private rsa key pair. Enter file in which to save the key (/root/.ssh/id_rsa): Enter passphrase (empty for no passphrase): Enter same passphrase again: Your identification has been saved in /root/.ssh/id_rsa. Your public key has been saved in /root/.ssh/id_rsa.pub. The key fingerprint is: SHA256:cNTWlMyJXaOFTQMY3dFKCIywHfVzgmpXvjt0vVfuZao root@localhost.localdomain The key's randomart image is: +---[RSA 2048]----+ | ..o=+%o@Bo | | +..*o@o+o.| | o o.. *... | | o . o +. | | S . . . | | . . ... o| | ... .=| | .. ++| | Eo..o| +----[SHA256]-----+ [root@localhost ~]# [root@localhost ~]# cd .ssh/ [root@localhost .ssh]# ll 总用量 8 -rw-------. 1 root root 1675 4月 29 12:44 id_rsa -rw-r--r--. 1 root root 408 4月 29 12:44 id_rsa.pub [root@localhost .ssh]# ssh-copy-id -p 22 root@192.168.99.172 /usr/bin/ssh-copy-id: INFO: Source of key(s) to be installed: "/root/.ssh/id_rsa.pub" The authenticity of host '192.168.99.172 (192.168.99.172)' can't be established. ECDSA key fingerprint is SHA256:JRJkZRzFncdiupBqjji0LP6XNMQ9eSdFKm0wVoDp8RY. ECDSA key fingerprint is MD5:f3:c8:77:ef:15:36:b7:3f:b9:36:bd:1a:4e:1a:5d:33. Are you sure you want to continue connecting (yes/no)? /usr/bin/ssh-copy-id: INFO: attempting to log in with the new key(s), to filter out any that are already installed The authenticity of host '192.168.99.172 (192.168.99.172)' can't be established. ECDSA key fingerprint is SHA256:JRJkZRzFncdiupBqjji0LP6XNMQ9eSdFKm0wVoDp8RY. ECDSA key fingerprint is MD5:f3:c8:77:ef:15:36:b7:3f:b9:36:bd:1a:4e:1a:5d:33. Are you sure you want to continue connecting (yes/no)? yes /usr/bin/ssh-copy-id: INFO: 1 key(s) remain to be installed -- if you are prompted now it is to install the new keys root@192.168.99.172's password: Number of key(s) added: 1 Now try logging into the machine, with: "ssh -p '22' 'root@192.168.99.172'" and check to make sure that only the key(s) you wanted were added. [root@localhost .ssh]# ssh 192.168.99.172 Last login: Sun Apr 29 12:39:28 2018 from 192.168.99.239 [root@localhost ~]# exit 登出 Connection to 192.168.99.172 closed. ----------------------------------------------------------
PC-B drwx------. 2 root root 29 4月 29 12:48 .ssh [root@localhost ~]# cd .ssh/ [root@localhost .ssh]# ll 总用量 4 -rw-------. 1 root root 408 4月 29 12:48 authorized_keys [root@localhost .ssh]# cat authorized_keys ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDMeTAz8cAtlZON+sUMSYpTXjhW32IkAdE+336BwW9LotWGznIOoS7d6cdvY...
3.3 python的实现
import paramiko #step 1 获取私钥 pk=paramiko.RSAKey.from_private_key_file('F:id_rsa') #step 2 连接服务器,设置用户名并关联私钥 link1=paramiko.Transport('xxx.cm',22) link1.connect(username='xxxx',pkey=pk) #step 3 创建transport对象 收发文件 myobject=paramiko.SFTPClient.from_transport(link1) myobject.get('xxx.sh',r'f:xxx.sh')
3.4 一则网上的代码,此处先不作分析
import paramiko import time def verification_ssh(host,username,password,port,root_pwd,cmd): s=paramiko.SSHClient() s.load_system_host_keys() s.set_missing_host_key_policy(paramiko.AutoAddPolicy()) s.connect(hostname = host,port=int(port),username=username, password=password) if username != 'root': ssh = s.invoke_shell() time.sleep(0.1) ssh.send('su - ') buff = '' while not buff.endswith('Password: '): resp = ssh.recv(9999) buff +=resp ssh.send(root_pwd) ssh.send(' ') buff = '' while not buff.endswith('# '): resp = ssh.recv(9999) buff +=resp ssh.send(cmd) ssh.send(' ') buff = '' while not buff.endswith('# '): resp = ssh.recv(9999) buff +=resp s.close() result = buff else: stdin, stdout, stderr = s.exec_command(cmd) result = stdout.read() s.close() return result
三、多线程 多进程的引入
1、进程、线程、协程的基本概念(两个最小,需要分清楚)
1.1 进程是系统进行资源分配和的基本单位
1.2 线程是CPU运算调度的最小单位
1.3 协程比线程还小的单位
2、进程和线程的关系
2.1 进程就是资源的集合,这些资源包括对内存、对硬盘、对光驱等各种资源的调用的集合。
2.2 线程包含在进程中(进程是线程的容器)是进程的实际运算单位。
2.3 一个进程可以包括一个或多个线程
2.4 进程需要操作cpu 必须通过创建线程来操作
2.5 一个线程,实际就是一个cpu的控制流,一个进程中可以并发多个线程
2.6 程序是指令、数据及其组织形式的描述,进程是程序的实体。
3、多线程、多进程、进程上下文
3.1 一个进程是有一系列的相关的资源的集合一个进程的资源包括了memory page (内存页,存储页,存储页面),文件句柄,socket连接,一些安全信息 如谁的id启动了这个进程
3.2 进程上下文,意思是可执行程序代码是进程的重要组成部分。进程上下文实际上是进程执行活动全过程的静态描述。
3.3 每个进程执行过的、执行时的以及待执行的指令和数据;在指令寄存器、堆栈、状态字寄存器等中的内容。此外, 还包括进程打开的文件描述符等.
3.4 多个线程共享进程上下文(即资源)
3.5 一个进程的所有线程 共享同一块内存空间
3.6 线程快还是进程快 没有可比性 一个是资源的集合 一个是cpu的控制流,进程需要执行 也必须通过创建线程来执行
3.7 线程创建快 还是进程创建快 线程创建快 它就是一段cpu的指令集,而进程需要去申请各种资源组成集合
3.8 线程共享内存空间 进程间资源互相独立
3.8.1一个父进程创建子进程,相当于克隆了一份独立的内容 多个子进程直接的资源是不能互相访问的
3.8.2 一个线程,创建多个线程都共享同一个进程的资源
3.9同一个进程的多个线程直接能直接联系交流交互,两个进程想通信交流 要找一个中间代理
3.10
3.10.1对于一个主线程的修改 可能或影响其他线程的运行(因为共享资源)
3.10.2对于父进程的修改,不会影响到子进程(资源独立)
四、线程语法详解
1、创建多线程
两步,一步关联函数和变量,一步start
import threading import time def PointIt(x): print('---->',x) time.sleep(5) print(x,'--->ok') PointIt('n1') PointIt('n2') #对比n1/n2/n3/n4的输出用时 #step 1 创建进程,target为函数的函数名,args内输入函数的参数 n3=threading.Thread(target=PointIt,args=('n3',))#逗号不能漏 n4=threading.Thread(target=PointIt,args=('n4',))#逗号不能漏
#step 2 start
n3.start() n4.start() print('done')
2、面向对象的方式创建多线程
import threading import time #step 1 建立一个类,需要继承线程类 class Mythread_class(threading.Thread): #step 2定义构造函数,接收需要输入的变量 def __init__(self,n): super(Mythread_class, self).__init__() self.n=n #step 3 定义run ->这里必须是run这个名字 def run(self): print(self.n) time.sleep(3) print(self.n,'done') #step 4 生成实例,每个实例运行start方法都会启动一个线程运行run方法 t1=Mythread_class('t1') t2=Mythread_class('t2') t1.run()#这样不行 这样还是单线程 t2.run()#如果直接是run方法,还是单线程 t1.start() t2.start()#多线程 for i in range(5): x=Mythread_class('x-%s'%i) x.start() print('主线程done')
程序运行的主线程在启动子线程之后不会等子线程执行完毕 而是继续运行主线程
同样,在主线程中也无法计算出子线程运行所花费的时间
3、join()方法的使用
如果run()运行结束则join()结束,如果run()没有完成,则卡住等待完成。通过这个方法来判断各个线程都运行结束所花费的时间
import threading import time class Mythread(threading.Thread): def __init__(self,n): super(Mythread, self).__init__() self.n=n def run(self): print('t-%s is running,当前线程为%s,当前活跃线程数为%s'%(self.n,threading.current_thread(),threading.active_count())) time.sleep(3) print('t-%s done'%self.n) time1=time.time() t_object=[]#使用这个列表来记录线程对象 for i in range(10): x=Mythread(i) x.start() t_object.append(x) print('main',threading.current_thread()) for i in t_object: i.join() time2=time.time() print(time2-time1) ------------------------------------------ t-0 is running,当前线程为<Mythread(Thread-1, started 2712)>,当前活跃线程数为2 t-1 is running,当前线程为<Mythread(Thread-2, started 9332)>,当前活跃线程数为3 t-2 is running,当前线程为<Mythread(Thread-3, started 2696)>,当前活跃线程数为4 t-3 is running,当前线程为<Mythread(Thread-4, started 9884)>,当前活跃线程数为5 t-4 is running,当前线程为<Mythread(Thread-5, started 3272)>,当前活跃线程数为6 t-5 is running,当前线程为<Mythread(Thread-6, started 9208)>,当前活跃线程数为7 t-6 is running,当前线程为<Mythread(Thread-7, started 1828)>,当前活跃线程数为8 t-7 is running,当前线程为<Mythread(Thread-8, started 9256)>,当前活跃线程数为9 t-8 is running,当前线程为<Mythread(Thread-9, started 1060)>,当前活跃线程数为10 t-9 is running,当前线程为<Mythread(Thread-10, started 9780)>,当前活跃线程数为11 我是主线程main <_MainThread(MainThread, started 2056)> t-0 done t-4 done t-5 done t-2 done t-3 done t-1 done t-8 done t-9 done t-7 done t-6 done
3.003652811050415
4、守护线程
当主线程执行完毕后,主线程结束,则所有守护线程全部结束,无论运行到什么状态 setDeamon
import threading import time class Myclass(threading.Thread): def __init__(self,n): super(Myclass, self).__init__() self.n=n def run(self): print(self.n,'begin') time.sleep(3) print(self.n,'------------->done','目前还活跃的线程数:',threading.active_count()) tmp_list=[] for i in range(20): j=Myclass(i) j.setDaemon(True)#setdaesmon来设置为守护进程 j.start() tmp_list.append(j) time.sleep(3) print('我要结束主进程了') ----------------------------------------------- 0 begin 1 begin 2 begin 3 begin 4 begin 5 begin 6 begin 7 begin 8 begin 9 begin 10 begin 11 begin 12 begin 13 begin 14 begin 15 begin 16 begin 17 begin 18 begin 19 begin 1 ------------->done 目前还活跃的线程数: 21 0 ------------->done 目前还活跃的线程数: 20 4 ------------->done 目前还活跃的线程数: 19 3 ------------->done 目前还活跃的线程数: 18 6 ------------->done 目前还活跃的线程数: 17 5 ------------->done 目前还活跃的线程数: 16 2 ------------->done 目前还活跃的线程数: 15 11 ------------->done 目前还活跃的线程数: 14 12 ------------->done 目前还活跃的线程数: 14 10 ------------->done 目前还活跃的线程数: 14 7 ------------->done 目前还活跃的线程数: 12 9 ------------->done 目前还活跃的线程数: 12 14 ------------->done 目前还活跃的线程数: 11 8 ------------->done 目前还活跃的线程数: 9 我要结束主进程了
5、GIL 全局解释器锁
5.1 In CPython,This lock is necessary mainly because CPython’s memory management is not thread-safe.
5.2 多个线程都打到多个cpu的核上 但是同一时刻只能有一个线程在真正的工作
5.3 全局解释器锁(Global Interpreter Lock)是计算机程序设计语言解释器用于同步线程的工具,使得任何时刻仅有一个线程在执行
5.4 一个python解释器进程内有一条主线程,以及多条用户程序的执行线程。即使在多核CPU平台上,由于GIL的存在,所以禁止多线程的并行执行。
5.5 Python 3.2开始使用新的GIL。
5.6 可以创建独立的进程来实现并行化
6、互斥锁
一个全局变量,每个线程都要对其进行操作,为了防止一个线程还未对其操作完毕,即这个变量的值还未被原线程修改时,这个变量的值就被传到下一个线程进行操作,这样有可能照成最终的结果不准确。为了防止这种情况,使用互斥锁解决
step1 实例一个锁对象
step2 acqiure()方法->锁变量
step3 release()方法释放变量
import threading import time mylock=threading.Lock() num=0 class Myclass(threading.Thread): def __init__(self,n): super(Myclass, self).__init__() self.n=n def run(self): print(self.n,'begin') mylock.acquire() time.sleep(3) global num num += 1 mylock.release() time.sleep(1) #mylock.release()使用完后迅速释放 tmp_list=[] for i in range(5000): x=Myclass(i) x.start() tmp_list.append(x) for i in tmp_list: i.join() print(num)
7、递归锁
mylock=threading.RLock()#RLock 递归锁
import threading mylock=threading.RLock()#RLock 递归锁 number1=0 number2=1111 count=0 def run1(): mylock.acquire() global number1 number1+=1 mylock.release() def run2(): #mylock.acquire() 只能套两层锁,如果套三层锁,即把这个注释去掉,就卡死了 global number2 number2+=2 #mylock.acquire() def terminal_fun(): mylock.acquire() global count run1() print('between run1 with run2') run2() print('count:',count) count+=1 mylock.release() for i in range(5): x=threading.Thread(target=terminal_fun) x.start() while threading.active_count() != 1: print(threading.active_count()) else: print('ok')
---------------------------------
between run1 with run2
count: 0
between run1 with run2
count: 1
between run1 with run2
count: 2
between run1 with run2
count: 3
between run1 with run2
count: 4
ok
此代码说明:注意主程序结束的写法法,本程序使用的是:等待活跃线程数等于1结束的方法
8、信号量,
不能单纯理解为线程并发数
threading.BoundedSemaphore(3) 同时访问资源的线程数量,但不是活跃的线程数量
线程中,信号量主要是用来维持有限的资源,使得在一定时间使用该资源的线程只有指定的数量
是一个变量,控制着对公共资源或者临界区的访问。信号量维护着一个计数器,指定可同时访问资源或者进入临界区的线程数。
每次有一个线程获得信号量时,计数器-1。若计数器为0,其他线程就停止访问信号量,直到另一个线程释放信号量。
以下代码,io为一个信号量,查看活跃的线程,实际所有的线程都跑起来了,但是,能输出的只有三个线程
import threading import time limit_num = threading.BoundedSemaphore(3) # 控制并发,不是三个一组,释放一个,就新增一个 def run(x): limit_num.acquire() print('i am %s' % x, threading.active_count()) time.sleep(3) print('i am %s' % x, threading.active_count()) limit_num.release() for i in range(20): x = threading.Thread(target=run, args=(i,)) x.start() while threading.active_count() != 1: pass else: print('done!') ------------------------------------------- i am 0 2 i am 1 3 i am 2 4 i am 1 21 i am 2 21 i am 4 20 i am 0 19 i am 3 18 i am 5 18 i am 3 18 i am 4 18 i am 6 16 i am 7 16 i am 5 16 i am 8 15 i am 7 15 i am 6 15 i am 9 13 i am 10 13 i am 8 13 i am 11 12 i am 10 12 i am 9 12 i am 11 11 i am 13 10 i am 14 9 i am 12 9 i am 14 9 i am 13 9 i am 12 9 i am 15 7 i am 17 7 i am 16 7 i am 15 6 i am 17 5 i am 18 5 i am 16 5 i am 19 3 i am 18 3 i am 19 2 done!
9、事件 event
可用于多个线程间同步信息 如同一个线程控制红绿灯,一个线程等待红绿灯
四个知识点
9.1 生成事件 threading.Event()
9.2 set事件 x.set()
9.3 clear事件 x.clear()
9.4 wait事件 x.wait()
import threading import time my_green_red_light=threading.Event() def light():#负责对light event 进行set或clear time_count=0 my_green_red_light.set() while True: if time_count >8 and time_count < 16: my_green_red_light.clear() print('