python_day08 接口与归一化设计 多态与多态性 封装 面向对象高级 异常处理 网络编程

本节课内容:

接口与归一化设计
多态与多态性
封装
面向对象高级
异常处理
网络编程

接口与归一化设计

class Interface:#定义接口Interface类来模仿接口的概念,python中压根就没有interface关键字来定义一个接口。
    def read(self): #定接口函数read
        pass
    def write(self): #定义接口函数write
        pass
class Txt(Interface): #文本,具体实现read和write
    def read(self):
        print('文本数据的读取方法')
    def write(self):
        print('文本数据的写方法')
class Sata(Interface): #磁盘,具体实现read和write
    def du(self):
        print('硬盘数据的读取方法')
    def write(self):
        print('硬盘数据的写方法')
class Process(Interface):
    def read(self):
        print('进程数据的读取方法')
    def xie(self):
        print('进程数据的写方法')
t=Txt()
s=Sata()
p=Process()

t.read()
s.read()
p.read()
Interfase
import abc
class Interface(metaclass=abc.ABCMeta):#定义接口Interface类来模仿接口的概念,python中压根就没有interface关键字来定义一个接口。
    all_type='file'
    @abc.abstractmethod
    def read(self): #定接口函数read
        pass
    @abc.abstractmethod
    def write(self): #定义接口函数write
        pass
class Txt(Interface): #文本,具体实现read和write #必须要定义read和write,不能为其他
    def read(self):
        pass
    def write(self):
        pass
t=Txt()
print(t.all_type)

多态与多态性

多态:同一种事物的多种形态
多态性:可以在不考虑实例类型的前提下使用实例
多态性的好处
1、增加了灵活性
2、增加了可扩展性
import abc
# 多态:同一种事物的多种形态
class Animal: #同一类事物:动物
    def talk(self):
        pass
class People(Animal): #动物的形态之一:人
    def talk(self):
        print('say hello')
class Dog(Animal): #动物的形态之二:狗
    def talk(self):
        print('say wangwang')
class Pig(Animal): #动物的形态之三:猪
    def talk(self):
        print('say aoao')
class Cat(Animal):
    def talk(self):
        print('say miaomiao')
class Bird:
    def talk(self):
        print('jijiji')
# 多态性:可以在不考虑实例类型的前提下使用实例
p1=People()
d=Dog()
p2=Pig()
c=Cat()
b=Bird()
p1.talk()
d.talk()
p2.talk()
c.talk()
b.talk()

def Talk(animal):
    animal.talk() #p1.talk()
Talk(p1)
Talk(d)
Talk(p2)
Talk(c)
Talk(b)

# 多态性的好处
# 1、增加了灵活性
# 2、增加了可扩展性

#list,str,tuple
l=list([1,2,3])
t=tuple((1,2))
s=str('hello')
l.__len__()
t.__len__()
s.__len__()

def my_len(obj):
    return obj.__len__()
print(my_len(l))
print(my_len(t))
print(my_len(s))

封装

#先看如何隐藏
class Foo:
    __N=111111 #_Foo__N
    def __init__(self,name):
        self.__Name=name #self._Foo__Name=name
    def __f1(self): #_Foo__f1
        print('f1')
    def f2(self):
        self.__f1() #self._Foo__f1()

f=Foo('egon')
# print(f.__N)
# f.__f1()
# f.__Name
# f.f2()    #f1
# print(f._Foo__N)    #_Foo_N   #11111
#这种隐藏需要注意的问题:
#1:这种隐藏只是一种语法上变形操作,并不会将属性真正隐藏起来
print(Foo.__dict__)
print(f.__dict__)
print(f._Foo__Name)
print(f._Foo__N)
#2:这种语法级别的变形,是在类定义阶段发生的,并且只在类定义阶段发生
Foo.__x=123123123123123123123123123123123123123123
print(Foo.__dict__)
print(Foo.__x)
f.__x=123123123
print(f.__dict__)
print(f.__x)
#3:在子类定义的__x不会覆盖在父类定义的__x,因为子类中变形成了:_子类名__x,而父类中变形成了:_父类名__x,即双下滑线开头的属性在继承给子类时,子类是无法覆盖的。
class Foo:
    def __f1(self): #_Foo__f1
        print('Foo.f1')
    def f2(self):
        self.__f1() #self._Foo_f1
class Bar(Foo):
    def __f1(self): #_Bar__f1
        print('Bar.f1')
b=Bar()
b.f2()
#封装不是单纯意义的隐藏
#1:封装数据属性:将属性隐藏起来,然后对外提供访问属性的接口,关键是我们在接口内定制一些控制逻辑
# 从而严格控制使用对数据属性的使用
class People:
    def __init__(self,name,age):
        if not isinstance(name,str):
            raise TypeError('%s must be str' %name)
        if not isinstance(age,int):
            raise TypeError('%s must be int' %age)
        self.__Name=name
        self.__Age=age
    def tell_info(self):
        print('<名字:%s 年龄:%s>' %(self.__Name,self.__Age))

    def set_info(self,x,y):
        if not isinstance(x,str):
            raise TypeError('%s must be str' %x)
        if not isinstance(y,int):
            raise TypeError('%s must be int' %y)
        self.__Name=x
        self.__Age=y

p=People('egon',18)
p.tell_info()

p.set_info('Egon',19)
p.tell_info()

#2:封装函数属性:为了隔离复杂度
#取款是功能,而这个功能有很多功能组成:插卡、密码认证、输入金额、打印账单、取钱
#对使用者来说,只需要知道取款这个功能即可,其余功能我们都可以隐藏起来,很明显这么做
#隔离了复杂度,同时也提升了安全性
class ATM:
    def __card(self):
        print('插卡')
    def __auth(self):
        print('用户认证')
    def __input(self):
        print('输入取款金额')
    def __print_bill(self):
        print('打印账单')
    def __take_money(self):
        print('取款')

    def withdraw(self):
        self.__card()
        self.__auth()
        self.__input()
        self.__print_bill()
        self.__take_money()
a=ATM()
a.withdraw()

面向对象高级

#静态属性
class Foo:
    @property
    def f1(self):
         print('f1')
f=Foo()
# f.f1()
f.f1    #property是f1函数不需要加括号也能运行

'''
例:BMI指数(bmi是计算而来的,但很明显它听起来像是一个属性而非方法,如果我们将其做成一个属性,更便于理解)
成人的BMI数值:
过轻:低于18.5
正常:18.5-23.9
过重:24-27
肥胖:28-32
非常肥胖, 高于32
  体质指数(BMI)=体重(kg)÷身高^2(m)
  EX:70kg÷(1.75×1.75)=22.86
'''
class People:
    def __init__(self,name,weight,height):
        self.name=name
        self.weight=weight
        self.height=height
    @property
    def bmi(self):
        return self.weight / (self.height**2)
p=People('egon',75,1.80)
p.height=1.86
# print(p.bmi())
print(p.bmi)

# 访问,设置,删除(了解)
class Foo:
    def __init__(self,x):
        self.__Name=x
    @property
    def name(self):
        return self.__Name
    @name.setter
    def name(self,val):
        if not isinstance(val,str):
            raise TypeError
        self.__Name=val
    @name.deleter
    def name(self):
        print('=-====>')
        del self.__Name
        raise PermissionError
f=Foo('egon')
print(f.name)
f.name='Egon'
# f.name=123123123213
print(f.name)
del f.name
print(f.name)

面向对象高级用法

class Foo:
    x=1
    def __init__(self,name):
        self.name=name
    def f1(self):
        print('from f1')
print(Foo.x) #Foo.__dict__['x']
f=Foo('egon')
print(f.__dict__)
# 1:
print(f.name)
# 2:
print(f.__dict__['name'])

#hasattr
print(hasattr(f,'name')) #f.name
print(hasattr(f,'f1')) #f.f1
print(hasattr(f,'x')) #f.x

#setattr
setattr(f,'age',18)#f.age=18

#getattr
print(getattr(f,'name'))#f.name
print(getattr(f,'abc',None))#f.abc
print(getattr(f,'name',None))#f.abc

func=getattr(f,'f1')#f.f1
print(func)
func()

#delattr
delattr(f,'name')# del f.name
print(f.__dict__)

#根据用户输入的命令直接运行函数
class Ftpserver:
    def __init__(self,host,port):
        self.host=host
        self.port=port
    def run(self):
        while True:
            cmd=input('>>: ').strip()
            if not cmd:continue
            if hasattr(self,cmd):
                func=getattr(self,cmd)
                func()
    def get(self):
        print('get func')
    def put(self):
        print('put func')
f=Ftpserver('192.168.1.2',21)
f.run()
#item系列
class
Foo: def __getitem__(self, item): print('=====>get') return self.__dict__[item] def __setitem__(self, key, value): self.__dict__[key]=value # setattr(self,key,value) def __delitem__(self, key): self.__dict__.pop(key) f=Foo() f.x=1 print(f.x) print(f.__dict__) f['x']=123123123123 # del f['x'] print(f['x'])
#打印对象信息__str__
class People:
    def __init__(self,name,age,sex):
        self.name=name
        self.age=age
        self.sex=sex
    def __str__(self): #在对象被打印时触发执行
        return '<name:%s age:%s sex:%s>' %(self.name,self.age,self.sex)
p1=People('egon',18,'male')
p2=People('alex',38,'male')
print(p1)
print(p2)
#析构方法__del__
class Foo:
    def __init__(self,x):
        self.x=x
    def __del__(self): #在对象资源被释放时触发
        print('-----del------')
        print(self)
f=Foo(100000)
del f
print('=======================>')

异常处理

# aaaaaaaaa
# print('===>')
#语法错误
# if :pass
# def func:pass
#逻辑错误
# TypeError
# for i in 3:
#     pass

# NameError
# aaaaa

# ValueError
# int('asdfsadf')

#IndexError
# l=[1,2]
# l[1000]

#KeyError
# d={'a':1}
# d['b']

# AttributeError
# class Foo:pass
# Foo.x

try:
    f=open('a.txt')
    next(f)
    next(f)
    next(f)
    next(f)
    next(f)
    next(f)
    next(f)
except StopIteration as e:
    pass
print('====>')

try:
    # aaaa
    print('==-==>1')
    l=[]
    l[3]
    print('==-==>2')
    d={}
    d['x']
    print('==-==>3')
except NameError as e:
    print(e)
except IndexError as e:
    print(e)
except KeyError as e:
    print(e)

try:
    # aaaa
    print('==-==>1')
    l=[]
    l[3]
    print('==-==>2')
    d={}
    d['x']
    print('==-==>3')
except Exception as e:
    print(e)

#什么时候用try ...except
#错误一定会发生,但是无法预知错误发生条件

try:
    aaaa
    print('==-==>1')
    # l=[]
    # l[3]
    # print('==-==>2')
    # d={}
    # d['x']
    # print('==-==>3')
except NameError as e:
    print(e)
except IndexError as e:
    print(e)
except KeyError as e:
    print(e)
except Exception as e:
    print(e)
else:
    print('在没有错误的时候执行')
finally:
    print('无论有无错误,都会执行')
raise TypeError('----')
class EgonException(BaseException):
    def __init__(self,msg):
        self.msg=msg
    def __str__(self):
        return '<%s>' %self.msg
raise EgonException('egon 的异常')

l=[1,2,3]
assert len(l) > 3

网络编程
加上通讯循环和链接循环的服务端和客户端代码

server端

import socket
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
phone.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) #就是它,在bind前加
phone.bind(('127.0.0.1',8080))
phone.listen(5)
print('server start...')
while True: #链接循环
    conn,client_addr=phone.accept()
    print(conn,client_addr)

    while True: #通讯循环
        try:
            client_data=conn.recv(1024)
            if not client_data:break #针对linux系统
            # print('has rev')
            conn.send(client_data.upper())
        except Exception: #针对windwos
            break
    conn.close()

phone.close()

客户端

import socket
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
phone.connect(('127.0.0.1',8080))

while True:
    msg=input('>>: ').strip()
    if not msg:continue
    phone.send(msg.encode('utf-8'))
    # print('====>has send')
    server_data=phone.recv(1024)
    # print('====>has recv')
    print(server_data.decode('utf-8'))

phone.close()
import socket
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
phone.connect(('127.0.0.1',8080))

while True:
    msg=input('>>: ').strip()
    if not msg:continue
    phone.send(msg.encode('utf-8'))
    # print('====>has send')
    server_data=phone.recv(1024)
    # print('====>has recv')
    print(server_data.decode('utf-8'))

phone.close()
client1
import socket
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
phone.connect(('127.0.0.1',8080))

while True:
    msg=input('>>: ').strip()
    if not msg:continue
    phone.send(msg.encode('utf-8'))
    # print('====>has send')
    server_data=phone.recv(1024)
    # print('====>has recv')
    print(server_data.decode('utf-8'))

phone.close()
client2

模拟ssh远程执行命令

import socket
import subprocess
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
phone.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) #就是它,在bind前加
phone.bind(('127.0.0.1',8080))
phone.listen(5)
print('server start...')
while True: #链接循环
    conn,client_addr=phone.accept()
    print(conn,client_addr)

    while True: #通讯循环
        try:
            cmd=conn.recv(1024)
            if not cmd:break

            #执行命令,拿到结果
            res=subprocess.Popen(cmd.decode('utf-8'),
                             shell=True,
                             stdout=subprocess.PIPE,
                             stderr=subprocess.PIPE)

            stdout=res.stdout.read()
            stderr=res.stderr.read()

            conn.send(stdout+stderr)
        except Exception: #针对windwos
            break
    conn.close()

phone.close()
import socket
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
phone.connect(('127.0.0.1',8080))

while True:
    cmd=input('>>: ').strip()
    if not cmd:continue
    #发命令
    phone.send(cmd.encode('utf-8'))

    #收命令的执行结果
    cmd_res=phone.recv(1024)

    #打印结果
    print(cmd_res.decode('gbk'))

phone.close()

定制报头,客户端先接收报头长度,再接收报头,再接收数据

import socket
import struct
import subprocess
import json
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
phone.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) #就是它,在bind前加
phone.bind(('127.0.0.1',8080))
phone.listen(5)
print('server start...')
while True: #链接循环
    conn,client_addr=phone.accept()
    print(conn,client_addr)

    while True: #通讯循环
        try:
            cmd=conn.recv(1024)
            if not cmd:break

            #执行命令,拿到结果
            res=subprocess.Popen(cmd.decode('utf-8'),
                             shell=True,
                             stdout=subprocess.PIPE,
                             stderr=subprocess.PIPE)

            stdout=res.stdout.read()
            stderr=res.stderr.read()


            #制作报头
            header_dic={'total_size':len(stdout)+len(stderr),'md5':None}
            header_json=json.dumps(header_dic)
            header_bytes=header_json.encode('utf-8')


            #1 先发报头的长度(固定4个bytes)
            conn.send(struct.pack('i',len(header_bytes)))


            #2 先发报头
            conn.send(header_bytes)


            #3 再发真实的数据
            conn.send(stdout)
            conn.send(stderr)

        except Exception: #针对windwos
            break
    conn.close()

phone.close()
import socket
import struct
import json
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
phone.connect(('127.0.0.1',8080))

while True:
    cmd=input('>>: ').strip()
    if not cmd:continue
    #发命令
    phone.send(cmd.encode('utf-8'))


    #先收报头的长度
    struct_res=phone.recv(4)
    header_size=struct.unpack('i',struct_res)[0]

    #再收报头
    header_bytes=phone.recv(header_size)
    head_json=header_bytes.decode('utf-8')
    head_dic=json.loads(head_json)

    total_size=head_dic['total_size']
    #再收命令的执行结果
    recv_size=0
    data=b''
    while recv_size < total_size:
        recv_data=phone.recv(1024)
        recv_size+=len(recv_data)
        data+=recv_data

    #打印结果
    print(data.decode('gbk'))

phone.close()

sockerserver实现并发

import socketserver
class MyTcphandler(socketserver.BaseRequestHandler):
    def handle(self):
        while True: #通信循环
            data=self.request.recv(1024)
            self.request.send(data.upper())
if __name__ == '__main__':
    #取代链接循环
    server=socketserver.ThreadingTCPServer(('127.0.0.1',8080),MyTcphandler)
    server.serve_forever()
import socket
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
phone.connect(('127.0.0.1',8080))

while True:
    msg=input('>>: ').strip()
    if not msg:continue
    phone.send(msg.encode('utf-8'))
    server_data=phone.recv(1024)
    print(server_data.decode('utf-8'))

phone.close()
import socket
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
phone.connect(('127.0.0.1',8080))

while True:
    msg=input('>>: ').strip()
    if not msg:continue
    phone.send(msg.encode('utf-8'))
    server_data=phone.recv(1024)
    print(server_data.decode('utf-8'))

phone.close()
import socket
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
phone.connect(('127.0.0.1',8080))

while True:
    msg=input('>>: ').strip()
    if not msg:continue
    phone.send(msg.encode('utf-8'))
    server_data=phone.recv(1024)
    print(server_data.decode('utf-8'))

phone.close()
clien2
原文地址:https://www.cnblogs.com/liweijing/p/7412402.html