9. 粘包struct用法hashlib校验socketserver并发模块引入进程join守护进程

1.粘包现象

总结 : 导致黏包现象的两种情况
hello,worl
d
(1) 在发送端,发送数据太快,频繁发送
(2) 在接收端,接收数据太慢,延迟截取
# ### 服务端
import socket 
sk = socket.socket()
sk.bind( ("127.0.0.1",9000) )
sk.listen()

# 三次握手
conn,addr = sk.accept()

# 收发数据逻辑
conn.send("hello,".encode("utf-8"))
conn.send("world".encode("utf-8"))

# 四次挥手
conn.close()
# 退还端口
sk.close()
# ### 客户端
import socket
import time
sk = socket.socket()
sk.connect( ("127.0.0.1",9000) )

time.sleep(0.1)
print(sk.recv(10))
print(sk.recv(10))
sk.close()

(1)粘包升级版1

定义好传输长度
# ### 服务端

import socket
sk = socket.socket()
sk.bind( ("127.0.0.1",9000) )
sk.listen()

conn,addr = sk.accept()
# 收发数据逻辑
# 告诉接收端,我要发送的数据长度是多少
conn.send("6".encode("utf-8"))
# 发送实际的数据
conn.send("hello,".encode("utf-8"))
conn.send("world".encode("utf-8"))


conn.close()
sk.close()

# "6" "66" "666" "6666" "66666"

# ### 客户端
import socket,time
sk = socket.socket()
sk.connect( ("127.0.0.1",9000) )

time.sleep(0.1)
# 收发数据逻辑
n = int(sk.recv(1).decode("utf-8"))
print(n,type(n))

print(sk.recv(n))
print(sk.recv(10))

sk.close()
(2)粘包升级版2

固定好字节长度
# ### 服务端

import socket
sk = socket.socket()
sk.bind( ("127.0.0.1",9000) )
sk.listen()

conn,addr = sk.accept()
# 收发数据逻辑
# 告诉接收端,我要发送的数据长度是多少
conn.send("00000120".encode("utf-8"))
# 发送实际的数据
msg = "hello," * 20
conn.send(msg.encode("utf-8"))
conn.send("world".encode("utf-8"))


conn.close()
sk.close()

# "6" "66" "666" "6666" "66666" "666666"
# ### 客户端
import socket,time
sk = socket.socket()
sk.connect( ("127.0.0.1",9000) )

time.sleep(0.1)
# 收发数据逻辑
n = int(sk.recv(8).decode("utf-8"))
print(n,type(n))

print(sk.recv(n))
print(sk.recv(10))


sk.close()
(3)粘包升级版3
import socket
import struct 
sk = socket.socket()
sk.bind( ("127.0.0.1" ,9000) )
sk.listen()

conn,addr = sk.accept()

# 收发数据逻辑
inp = input("请输入msg>>>:")
msg = inp.encode("utf-8")

# 把这个长度的数字转化成二进制字节流,然后发送给对面,按照这么大的长度进行截取
res = struct.pack("i",len(msg))
conn.send(res)
conn.send(msg)
conn.send("world".encode("utf-8"))

# 四次挥手
conn.close()
# 退还端口
sk.close()


"""
字节流是一个一个字节组成的,凑在一起就是字节流
总长度就是一共的字节数. 用len算字节流长度,告诉接收端,要截取多少个字节

"""
# ### 客户端
import socket
import struct
import time
sk = socket.socket()
sk.connect( ("127.0.0.1" ,9000) )

time.sleep(0.1)
# 先接收要截取的长度是多少
n = sk.recv(4)
n = struct.unpack("i",n)[0]

# print(n)
# 再去接收真实的数据,防止黏包
print(sk.recv(n))
print(sk.recv(10))


sk.close()
2. 粘包终极struct用法
import struct
# pack  打包
"""
# struct.pack 把任意长度的数字转化成具有固定长度的4个字节的值,组成字节流
pack("i",2200000000) 代表我要转换的这个数据类型是整型,这个整型一般放的是字节长度;
i => int
"""
# unpack 解包
"""
# struct.pack  把4个字节的值恢复成原有的数据,返回的是元组
"""

res = struct.pack("i",10000)
# 小于22亿的长度范围
res = struct.pack("i",2100000000)
print(res)
print(len(res))

# "i" 把二进制字节流转换成整型 ,unpack返回的是元组,通过下标0直接拿到数据
res = struct.unpack("i",res)[0]
print(res)

 3.hashlib模块

3.1 hashlib 模块

import hashlib import random # 基本用法 # (1) 创建一个md5算法的对象 hs = hashlib.md5() # (2) 把想要加密的字符串通过update更新到hs对象中进行处理 hs.update("abc123".encode("utf-8")) # (3) 返回32位16进制的字符串 res = hs.hexdigest() print(res,len(res)) # 加盐 (key 只有自己知道的关键字 ,目的就是增加密码的复杂度) hs = hashlib.md5("XBoy_".encode("utf-8")) hs.update("abc123".encode("utf-8")) res = hs.hexdigest() print(res) # 动态加盐 res = str(random.randrange(10000,100000)) hs = hashlib.md5(res.encode("utf-8")) hs.update("abc123".encode("utf-8")) res = hs.hexdigest() print(res) # f7b924091eef62c6cc9f399f6e3a4c19 """ # md5 加密效率快 , 安全性不是太高, 位数32位的16进制的字符串 # sha1 加密效率慢 , 安全性稍高, 更加精度 , 位数是40位的16进制字符串 # sha512 加密效率慢 , 安全性稍高, 更加精确 , 位数是128位的16进制字符串 """ # sha算法系列 # hs = hashlib.sha1() # sha1 hs = hashlib.sha512() # sha512 hs.update("abc123".encode()) res = hs.hexdigest() print(res,len(res)) # ### hmac '''hmac 加密的字符串强度更高,不容易破解''' import hmac key = b"xboyww" msg = b"abc123" hm = hmac.new(key,msg) res = hm.hexdigest() print(res,len(res)) # 8f566e64d6fe1b9a3342835609f64c3f # 随机返回长度为32位的二进制字节流 import os key = os.urandom(32) print(key,len(key)) hm = hmac.new(key,msg) res = hm.hexdigest() print(res) # 90ca20568e0b8d1029544ac1178ab993

3.2 文件校验
import hashlib
"""
read  在mode = "r"  读取的单位是字符
read  在mode = "rb" 读取的单位是字节;
"""
"""
with open("ceshi1.py",mode="r",encoding="utf-8") as fp:
    res = fp.read(3)
print(res)

with open("ceshi1.py",mode="rb") as fp:
    res = fp.read(3)
print(res)
print(res.decode())
"""

# (1) 针对于小文件的内容校验
def check_md5(file):
    with open(file,mode="rb") as fp:    
        hs = hashlib.md5()
        hs.update( fp.read() )
    return hs.hexdigest()  # 01c57787ae7f28a01b63bc78dac7fe2f

# 如果两个文件加密的32位字符串相同,就可以说明两个文件的内容时一样的
res1 = check_md5("ceshi1.py")
res2 = check_md5("ceshi2.py")
print(res1)
print(res2)

# (2) 针对于大文件的内容校验
hs = hashlib.md5()
hs.update("昨天晚上
拉肚子了".encode())
res = hs.hexdigest()
print(res) # 601c7672e146286ecd24d6afc323d322

# 可以利用update , 分次更新内容.
# 利用update 这个特性,可以把较大的内容分次进行加密
hs = hashlib.md5()
hs.update("昨天晚上
".encode())
hs.update("拉肚子了".encode())
res = hs.hexdigest()
print(res)


# 方法一 不停的读字节,直到为空的时候,终止循环
def check_md5(file):
    # 创建对象
    hs = hashlib.md5()
    with open(file,mode="rb") as fp:
        while True:
            # 按照每次读取一个字节
            content = fp.read(1)
            # 如果读取的是空字节,那么直接break
            if content:
                hs.update(content)
            else:
                break
        return hs.hexdigest()
print("<==>")
print(check_md5("ceshi1.py"))
print(check_md5("ceshi2.py"))


# 方法二 不停的减去响应的字节数,直到减到0,循环终止;
import os
# 计算文件大小 os.path.getsize(文件名)
def check_md5(file):
    file_size = os.path.getsize(file)
    # print(file_size)
    hs = hashlib.md5()
    with open(file,mode="rb") as fp:
        while file_size:
            # fp.read(100) 最多读取100个字节
            content = fp.read(1)
            hs.update(content)
            # 按照实际=读取的字节数进行相减
            file_size -= len(content)
        return hs.hexdigest()
print(check_md5("ceshi1.py"))  # b78b34cc398ca4110c43792fdde5a55d
print(check_md5("ceshi2.py"))

4.socketserver并发

# ### socketserver 实现tcp连接的并发操作
'''
文件 <=>   模块
文件夹 <=> 包
'''
import socketserver
class MyServer(socketserver.BaseRequestHandler):
    def handle(self):
        print("---> 执行该操作")

# 创建一个对象 , 通过 ThreadingTCPServer创建 ThreadingTCPServer( ip端口 , 自定义类 )
server = socketserver.ThreadingTCPServer( ("127.0.0.1" , 9000) , MyServer  )
# 循环调用
server.serve_forever()
# ### 客户端
import socket
sk = socket.socket()
sk.connect( ("127.0.0.1",9000) )
# 收发数据逻辑
sk.close()

 

# ### socketserver 实现tcp连接的并发操作
'''
文件 <=>   模块
文件夹 <=> 包
'''
import socketserver
class MyServer(socketserver.BaseRequestHandler):
    def handle(self):
        # print(self.request) # conn = "<socket.socket fd=456, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9001), raddr=('127.0.0.1', 63623)>"
        # self.request 相当于conn ,在socketserver底层已经给你封装好了,直接拿来用就可以;
        conn = self.request
        
        # ('127.0.0.1', 63692) request和client_address  就是sk.accept() 三次握手的返回值,只不过用两个不同的变量接受了.
        # print(self.client_address)
        '''
        while True:
            msg = conn.recv(1024).decode("utf-8")
            print(msg)
            conn.send(msg.upper().encode("utf-8"))'''
# 创建一个对象 , 通过 ThreadingTCPServer创建 ThreadingTCPServer( ip端口 , 自定义类 )
server = socketserver.ThreadingTCPServer( ("127.0.0.1" , 9001) , MyServer  )
# 循环调用
server.serve_forever()
# ### 客户端
import socket
sk = socket.socket()
sk.connect( ("127.0.0.1",9001) )

# 
    
# 收发数据逻辑
sk.close()

 5.模块引入

"""
如果当前模块是作为主文件调用的,
__name__ 返回的是 __main__  主进程

如果当前模块是通过import,被别人导入的
__name__ 返回的是当前模块名 子进程

直接运行的是主进程
被导入的是  子进程.

用来做测试的,导入模块时,不需要把测试代码一并导入,
节省空间和效率,所以加一个判断 if __name__ == "__main__":
防止浪费时间,浪费空间,所以把不需要导入的东西排除在外;
"""

# 直接打印__name__
print(__name__) # __main__


# 自定义模块
boy = "神秘男孩"

def skill():
    print("可以预言未来")
    
class b_boy_famliy():
    father = "王健林"
    def hobby(self):
        print("我交朋友从来不在乎他有没有钱,反正都没有我有钱")
        
if __name__ == "__main__":
    skill()
# 调用方式1
import mymodule # mymodule
# print(mymodule.boy)

# 调用方式2
# from mymodule import boy
# print(boy)

# from mymodule import  skill
# skill()

# 引入多个,用as 可以给b_boy_famliy起别名叫做bbf
'''
from mymodule import skill,b_boy_famliy as bbf

# 实例化类产生对象obj
obj = bbf()
obj.hobby()
'''

6.进程

进程的状态

# ### 进程
# 获取进程号 => 相当于人的身份证,是唯一值
import os
# 获取当前进行id [当前子进程]
# res = os.getpid()
# print(res)
# 获取父进程id
# res = os.getppid()
# print(res)

# (1) 进程的基本用法
from multiprocessing import Process
import time
"""
def func():
    print(">>222>>当前子进程id>>:%s,它的父进程id>>:%s" % (os.getpid(),os.getppid()) ) 

if __name__ == "__main__":
    print(">>111>>1.子进程%s , 父进程%s" % (os.getpid(),os.getppid())  )
    # 创建子进程
    '''target = 函数 单独用一个进程去执行谁,去完成哪个任务'''
    p = Process(target=func)
    # 调用子进程
    p.start()
"""
# (2) 带有参数的函数
'''
异步程序:不等每一行代码执行结束,就往下执行其他代码是异步程序

创建进程时候,需要从创建 -> 就绪,cpu才能过来执行就绪态的程序;
创建进程时,需要分配空间.分配空间会出现阻塞现象;
'''
'''
def func():
    for i in range(1,5):
        print(">>222>>当前子进程id>>:%s,它的父进程id>>:%s" % (os.getpid(),os.getppid()) ) 

if __name__ == "__main__":
    print(">>111>>1.子进程%s , 父进程%s" % (os.getpid(),os.getppid())  )
    # 创建进程,返回进程对象
    p = Process(target= func)
    p.start()
    
    n = 5
    for i in range(1,n+1):
        print("*" * i )
'''

"""
def func(n):
    for i in range(1,n+1):
        time.sleep(0.1)
        print(">>222>>当前子进程id>>:%s,它的父进程id>>:%s" % (os.getpid(),os.getppid()) ) 

if __name__ == "__main__":
    print(">>111>>1.子进程%s , 父进程%s" % (os.getpid(),os.getppid())  )
    n = 5
    '''args = (参数1,参数2,...) args类型是元组'''
    p = Process(target=func,args= (n,))
    p.start()

    for i in range(1,n+1):
        time.sleep(0.1)
        print("*" * i )
"""

# (3) 进程之间的数据,彼此是隔离的
"""
count = 99
def func():
    global count
    count += 1
    print("当前子进程id号%s" % (os.getpid()) , count)

if __name__ == "__main__":
    # 创建进程
    p = Process(target=func)
    # 调用进程
    p.start()
    # 为了先让子进程跑完,在执行主进程中的count ,看看是否通过子进程进行了修改
    time.sleep(1)
    print("我是主进程",count)
"""

# (4) 多进程之间的并发
'''
在程序并发时,因为cpu的调度策略问题,不一定谁先执行,谁后执行,
但是如果遇到阻塞一定会进行切换,任务的执行是互相抢占cpu资源的过程
以目前程序来看,主进程执行的稍快,子进程执行稍慢;
主进程和子进程齐头并进往前跑,谁在前后说不准,依赖cpu的调度策略
'''

'''
def func(args):
    print(">>222>>当前子进程id>>:%s,它的父进程id>>:%s" % (os.getpid(),os.getppid()) ) 
    print("end",args)


if __name__ == "__main__":
    for i in range(10):
        Process(target=func,args= (i,) ).start()

    print("主进程执行结束...")
'''
# (5) 主进程和父进程之间的关系
"""
主进程执行完所有代码之后,开始等待,
等待所有子进程全部结束之后
在彻底终止程序.

如果不等待,主进程终止了,子进程就会变成僵尸程序
在后台不停的运行,占用内存和cpu
因为进程数太多,不容易找到,也不容易管理,
所以主进程跑完后,在彻底结束程序;

"""
def func(args):
    print(">>222>>当前子进程id>>:%s,它的父进程id>>:%s" % (os.getpid(),os.getppid()) ) 
    time.sleep(0.1)
    print("end",args)

if __name__ == "__main__":
    for i in range(10):
        Process(target=func,args= (i,) ).start()

    print("主进程执行结束...")

7.join

# ### join  功能:等待子进程执行完毕之后,主进程在向下执行
from multiprocessing import Process
import time,os

# (1)join 基本用法
"""
def func():
    print("发送第一封邮件")

if __name__ == "__main__":
    p = Process(target=func)
    p.start()
    
    # time.sleep(1)
    '''针对于p进程对象来说,必须等待p这个进程任务执行完毕之后,主进程的代码在向下执行'''
    p.join()
    print("发送第二封邮件")
"""
# (2)多个子进程通过join加阻塞,可以实现同步控制
'''
def func(index):
    # time.sleep(0.5)
    print("第%s封邮件已经发送 ... " % (index))
    # time.sleep(5)
    
if __name__ == "__main__":
    lst = []
    for i in range(10):
        # 创建进程  1个主进程  + 10个子进程 = 11个进程 创建的是异步程序,加上join是同步程序;
        p = Process(target=func,args=(i,))
        # 调用进程
        p.start()
        lst.append(p)
        # 如果把join加到循环里,当前这个进程对象.join必须执行结束,下一个进程对象才能创建,变成了同步程序,而进程的提示是为了提升执行的速度;
        # p.join()
    # 循环列表中的每一个进程对象,都加上一个join,可以让所有的进程对象都执行完毕,就释放阻塞往下执行;保证子进程和主进程之间的同步性;
    for i in lst:
        i.join()
    print("发送最后一封邮件...")
'''

# ### 使用第二种方法创建进程
"""用自定义类的方式创建进程"""
# (1) 基本使用
# 必须继承父类 Process 类
'''
class MyProcess(Process):
    # 类似于handle ,必须写成run方法
    def run(self):
        print("子进程%s , 父进程%s" % (os.getpid(), os.getppid() ))
    
if __name__ == "__main__":
    p = MyProcess()
    p.start()
    print("主进程:{}".format(os.getpid()))
'''
# (2) 带参数的子进程函数
class MyProcess(Process):

    def __init__(self,arg):
        # 必须调用一下父类的构造方法
        super().__init__()
        # 把参数通过arg来进行保存
        self.arg  = arg

    # 类似于handle ,必须写成run方法
    def run(self):
        print("子进程%s , 父进程%s" % (os.getpid(), os.getppid() ))
        print(self.arg)


if __name__ == "__main__":
    lst = []
    # 进程的并发是异步程序
    for i in range(10):
        p = MyProcess( "参数:%s" % (i) )
        p.start()
        lst.append(p)

    # 等待所有子进程结束在执行主进程代码是同步程序;
    for i in lst:
        i.join()
    print("最后打印子进程id" , os.getpid())

8.守护进程

# ### 守护进程
from multiprocessing import  Process
import time
"""
# 守护进程语法:
进程对象.daemon = True
设置该进程对象为守护进程
守护进程需要在start()方法之前设置

守护进程为主进程守护,主进程如果代码执行完毕了,该守护进程自动终止
但其他子进程全部执行完毕之后,主进程彻底终止程序
"""

# (1) 基本语法
"""
def func():
    print("子进程start")
    time.sleep(0.1)
    print("子进程end")

if __name__ == "__main__":
    p = Process(target=func)
    # 在start开始之前设置该进程时守护进程
    p.daemon = True
    p.start()
    
    print("主进程执行结束")
"""
# (2) 多个子进程的情况
"""
2个子进程 + 1个主进程
当主进程里面的代码全部执行完毕之后,守护进程自动终止,
因为func2这个任务进程没有执行完毕,所有主进程不能立刻终止程序


代码执行完毕
和程序执行完毕是两回事

代码执行完毕 意味着 守护进程立刻终止
只有非守护进程func2也都执行完毕之后,主进程才会真正的终止程序.

func1 是守护进程
func2 是非守护进程 就是一个普通进程而已
默认主进程会等待所有进程执行完毕之后,才会最终终止程序;
子进程和主进程彼此独立,数据也不共享,为了防止僵尸程序,才是等待的意义;
"""

'''
def func1():
    count = 1
    while True:
        print("*" * count)
        time.sleep(0.5)    
        count += 1

def func2():    
    print("func2 start")
    time.sleep(3)
    print("func2 end")

if __name__ == "__main__":
    p1 = Process(target=func1)
    p1.daemon = True
    p2 = Process(target=func2)
    p1.start()
    p2.start()
    
    time.sleep(1)
    print("主进程代码执行完毕")
'''

# (3) 守护进程的实际用途 : 报活 
def alive():
    while True:
        print("1号服务主机... i am ok ~")
        # 相隔0.5秒开始报活
        time.sleep(0.5)

def func():
    print("1号服务器主要负责统计mysql日志")
    time.sleep(3)

if __name__ == "__main__":
    p1 = Process(target=alive)
    p1.daemon = True
    p1.start()
    p2 = Process(target=func)
    p2.start()
    
    # 用join 添加一下阻塞,如果join执行结束了,就代表服务器统计日志的功能失效了
    # 或者服务器崩溃,机器也会终止程序,终止报活
    p2.join()
    print(" ... ")
善战者,求之于势,不责于人,故能择人而任势
原文地址:https://www.cnblogs.com/NGU-PX/p/11404392.html