IP 地址、端口
IP 组成与分类
- IP地址根据网络号和主机号来分,分为A、B、C三类及特殊地址D、E。全0和全1的都保留不用:
- A类:(1.0.0.0-126.0.0.0)(默认子网掩码:255.0.0.0或 0xFF000000)第一个字节为网络号,后三个字节为主机号。该类IP地址的最前面为“0”,所以地址的网络号取值于1~126之间。一般用于大型网络。
- B类:(128.0.0.0-191.255.0.0)(默认子网掩码:255.255.0.0或0xFFFF0000)前两个字节为网络号,后两个字节为主机号。该类IP地址的最前面为“10”,所以地址的网络号取值于128~191之间。一般用于中等规模网络。
- C类:(192.0.0.0-223.255.255.0)(子网掩码:255.255.255.0或 0xFFFFFF00)前三个字节为网络号,最后一个字节为主机号。该类IP地址的最前面为“110”,所以地址的网络号取值于192~223之间。一般用于小型网络。
- D类:是多播地址。该类IP地址的最前面为“1110”,所以地址的网络号取值于224~239之间。一般用于多路广播用户[1] 。
- E类:是保留地址。该类IP地址的最前面为“1111”,所以地址的网络号取值于240~255之间。
- 在IP地址3种主要类型里,各保留了3个区域作为私有地址,其地址范围如下:
- A类地址:10.0.0.0~10.255.255.255
- B类地址:172.16.0.0~172.31.255.255
- C类地址:192.168.0.0~192.168.255.255
- 回送地址:127.0.0.1
- 总结:反正知道啥是[192.168.1.1] - [192.168.1.253]可以了
端口
- 端口是通过端口号来标记的
- 在linux系统中,端口可以有65536(2的16次方)!
- 常用就是[8080]端口
Tcp 与 Udp
TCP 介绍
- TCP/IP 是互联网相关的各类协议族的总称,比如:TCP,UDP,IP,FTP,HTTP,ICMP,SMTP 等都属于 TCP/IP 族内的协议。
- TCP划分为4层:
- 链路层:负责封装和解封装IP报文,发送和接受ARP/RARP报文等。
- 网络层:负责路由以及把分组报文发送给目标网络或主机。
- 传输层:负责对报文进行分组和重组,并以TCP或UDP协议格式封装报文。
- 应用层:负责向用户提供应用程序,比如HTTP、FTP、Telnet、DNS、SMTP等。
TCP 特点
- 面向连接,这种连接是一对一的,因此TCP不适用于广播的应用程序
- 可靠传输
- TCP采用应答机制
- 超时重传
- 错误校验
- 流量控制和阻塞管理
- 可靠、稳定
- 传输速度慢
- 占用系统资源高
UDP 特点
- 面向无连接
- 有单播,多播,广播的功能
- 面向报文
- 不可靠性
头文件
import socket
socket.socket(Address, Type)
# Address:可以选择 AF_INET(用于 Internet 进程间通信) 或者 AF_UNIX(用于同一台机器进程间通信),咱们常用是 AF_INET
# Type:SOCK_STREAM(流式套接字,主要用于 TCP 协议)或者 SOCK_DGRAM(数据报套接字,主要用于 UDP 协议),区分是 TCP 还是 UDP
入门
- 服务端
# !/usr/bin/env python
# -*- coding: utf-8 -*-
# @Date : 2020/05/14 14:32:15
# @File : TCP - 0__服务端.py
# @Link : https://www.cnblogs.com/BenLam/
import os
import socket
def services():
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(("127.0.0.1", 1111))
server.listen(128)
# 复用端口
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# 设置为非阻塞, 默认是 True 阻塞
# 开启非阻塞就会报错,用 try 去捕获
server.setblocking(False)
conn_list = []
print("就绪......")
while True:
try:
conn, addr = server.accept() # 被动接受TCP客户的连接,等待连接的到来,收不到时会报异常
print("[客户端接入] %s connected!" %str(addr))
conn_list.append(conn)
except BlockingIOError as e:
pass
tmp_list = [conn for conn in conn_list]
for conn in tmp_list:
try:
data = conn.recv(1024) # 接收数据1024字节
if data:
print(f"{data}| 客户端地址- {str(addr)}")
conn.send(b"response #> %s" %data)
else:
print("[客户端断开] %s disconnected! " % str(addr))
conn.close()
conn_list.remove(conn)
print(f"当前在线人数 => {len(conn_list)}")
except IOError:
pass
if __name__ == '__main__':
services()
- 客户端
# !/usr/bin/env python
# -*- coding: utf-8 -*-
# @Date : 2020/05/14 15:28:47
# @File : TCP - 0__客户端.py
# @Link : https://www.cnblogs.com/BenLam/
import socket
import threading
def client():
obj = socket.socket()
obj.connect(("127.0.0.1", 1111))
for _ in range(1):
obj.sendall(bytes("test...........", encoding="utf-8"))
content = str(obj.recv(1024), encoding="utf-8")
# obj.close()
def main():
thread = []
for _ in range(3):
x = threading.Thread(target=client, args=())
thread.append(x)
for _ in thread:
_.start()
if __name__ == '__main__':
main()
TCP 例子
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Date : 2020/3/31 11:54
# @File : 0-socket.py
# @Author : BenLam
# @Link : https://www.cnblogs.com/BenLam/
# @Version : PyCharm
from socket import *
server_ip = "127.0.0.1"
server_port = 1111
server_addr = (server_ip, server_port)
tcp_server = socket(AF_INET, SOCK_STREAM)
def server():
""" TCP 服务端"""
tcp_server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) # 复用端口
address = ("", server_port)
tcp_server.bind(address)
tcp_server.listen(128)
print("等待客户端访问.........")
try:
while True:
client_sendtext, client_Addr = tcp_server.accept()
print(f"当前客户端连接地址: - {client_Addr}")
while True:
recv_data = client_sendtext.recv(1024) # 接收最大为 1024 个字节
# print("接收客户端数据为:", recv_data.decode("gbk"))
if not recv_data: continue
if recv_data == b"exit":
client_sendtext.send(b"Goodbye!~")
break
client_sendtext.send(b"response #> %s" % recv_data)
client_sendtext.close()
except Exception as e:
print(e)
def client():
""" TCP 客户端"""
while True:
connect = socket(AF_INET, SOCK_STREAM)
connect.connect(server_addr)
print("> 连接成功....")
while True:
send_data = input("请输入内容,输入'kill'指令即可退出程序,输入'exit'重连:
")
if send_data == "kill": kill()
if not send_data: continue
connect.send(send_data.encode("gbk"))
recvData = connect.recv(1024)
print("接收服务端返回数据: ", recvData.decode("gbk"))
if recvData == b"Goodbye!~":
break
connect.close()
def main():
import threading
t = threading.Thread(target=server, args=()).start()
t2 = threading.Thread(target=client, args=()).start()
def kill():
import os
os.system("taskkill /f /t /im python.exe")
os.system("taskkill /f /t /im python36.exe")
if __name__ == '__main__':
main()
with 实现非阻塞
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Date : 2020/5/15 11:54
# @File : 0-socket.py
# @Author : BenLam
# @Link : https://www.cnblogs.com/BenLam/
import socket
def server():
online_client = []
HOST = ("", 1111)
with socket.socket() as sock:
sock.bind(HOST)
sock.listen(6)
sock.setblocking(False)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
print("就绪...")
while True:
try:
conn, addr = sock.accept()
print("[客户端接入] %s connected! " % str(addr))
online_client.append(conn)
except BlockingIOError as e:
pass
_client = [conn for conn in online_client]
for conn in _client:
try:
data = conn.recv(1024)
conn.sendall(b"response #> " + data)
# print(f" #> 消息- {str(data, encoding='utf-8')}| 客户端地址- {addr}")
if not data:
print("[-客户端断开] %s disconnected! " % str(addr))
conn.close()
online_client.remove(conn)
print(f"当前在线剩余人数 => {len(online_client)}")
break
except OSError:
pass
def client():
with socket.socket() as obj:
obj.connect(("127.0.0.1", 1111))
obj.sendall(bytes("测试", encoding="utf-8"))
content = str(obj.recv(1024), encoding="utf-8")
# print(content)
obj.close()
def main():
import threading
li = []
_server = threading.Thread(target=server).start()
for _ in range(10):
a = threading.Thread(target=client)
li.append(a)
for _ in li:
_.start()
if __name__ == '__main__':
main()
-------------------------------------------
就绪...
[客户端接入] ('127.0.0.1', 62904) connected!
[-客户端断开] ('127.0.0.1', 62904) disconnected!
当前在线剩余人数 => 0
(......)
UDP 例子
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Date : 2020/3/31 12:54
# @File : 0-socket.py
# @Author : BenLam
# @Link : https://www.cnblogs.com/BenLam/
# @Version : PyCharm
import socket
import threading
import time
HOST = "127.0.0.1"
BUFSIZ = 1024
server_ADDR = (HOST, 1111)
client_ADDR = (HOST, 2222)
def udp_server():
udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # 创建客户端套接字
udp.bind(server_ADDR)
print(" UDP - 启动成功 ".center(28, "-"))
while True:
data, addr = udp.recvfrom(BUFSIZ) # 连续接收指定字节的数据,接收到的是字节数组
if not data: udp.sendto(b"send the message is Null
", addr)
if data == b"exit":
udp.sendto(b"#Restart Service! > Hello
", addr)
continue
if data == b"kill":
udp.sendto(b"Good bye!", addr)
kill()
now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
udp.sendto(b"[%s] UDP server response #> Hello %s!
" %(now.encode('gbk'), data), addr)
udp.close()
def send_text(udp):
while True:
send_data = input("请输入内容,输入'kill'指令即可退出程序,输入'exit'重连:
")
udp.sendto(bytes(send_data, 'gbk'), server_ADDR) # 客户端发送消息,必须发送字节数组
if send_data == "close":
break
def recv_text(udp):
while True:
data, _ADDR = udp.recvfrom(BUFSIZ) # 接收回应消息,接收到的是字节数组
print(f'{_ADDR}- 收到消息-> {data.decode("gbk")}') # 打印回应消息
if not data: # 如果接收服务器信息失败,或没有消息回应
break
elif data == b"exit":
continue
elif data in b"Good bye!":
print(" UDP 服务端终止通讯 ".center(28, "-"))
kill()
udp.close()
def udp_client():
udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # 创建客户端套接字
udp.bind(client_ADDR)
t = threading.Thread(target=send_text, args=(udp,)).start()
t2 = threading.Thread(target=recv_text, args=(udp,)).start()
def main():
t = threading.Thread(target=udp_server, args=()).start()
t2 = threading.Thread(target=udp_client, args=()).start()
def kill():
import os
os.system("taskkill /f /t /im python.exe")
os.system("taskkill /f /t /im python36.exe")
if __name__ == '__main__':
main()
模拟 WSGI 服务实现
- 通过 Python - socket 库实现 WSGI服务
- 分成三部分:
- 静态 HTML 页面
- 静态 Views 接口
- 静态 WSGI 服务接口
HTML 静态页面比较简单
@File : tset.py
文件名 :demo.html
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1">
<title>Ben_WSGI_Server</title>
<meta name="description" content="">
<meta name="keywords" content="">
<link href="" rel="stylesheet">
</head>
<body>
<h1> Hello World! </h1>
<h1> 这是一个 WSGI_demo </h1>
<br >
<h1><a href="./register.json">注册页面</a></h1>
<h1><a href="./index.json">首页</a></h1>
<h1><a href="./center.json">个人主页</a></h1>
<h1><a href="./help.json">帮助中心</a></h1>
</body>
</html>
Views 接口
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Date : 2020-04-01 17:40:28
# @File : framework.py
# @Author : BenLam
# @Link : https://www.cnblogs.com/BenLam/
# @Version : $Id$
import time
class route():
"""
Views 接口页面
"""
def __init__(self, env, set_header):
"""
env → {'PATH_INFO': '/center.py'}
set_header → 200 OK [('Content-Type', 'text/html; charset=UTF-8')]
"""
status = "200 OK"
response_headers = [("Content-Type", "text/html; charset=UTF-8")]
set_header(status, response_headers)
self.path_info = env["PATH_INFO"]
def views(self):
if self.path_info == "/index.json":
response_body = index()
elif self.path_info == "/center.json":
response_body = center()
elif self.path_info == "/register.json":
response_body = register()
elif self.path_info == "/help.json":
response_body = help()
else:
response_body = "----Not Found---- <br> 当前时间 %s" % time.ctime()
return response_body
def index():
return "<h1>----主页---- <br> 当前时间 %s</h1>" % time.strftime("%Y--%m--%d %H:%M:%S", time.localtime())
def center():
return "<h1>----中心主页---- <br> 当前时间 %s</h1>" % time.strftime("%Y--%m--%d %H:%M:%S", time.localtime())
def register():
return "<h1>----注册主页---- <br> 当前时间 %s</h1>" % time.strftime("%Y--%m--%d %H:%M:%S", time.localtime())
def help():
return "这是个帮助页面 <br> 当前时间 %s</h1>" % time.strftime("%Y--%m--%d %H:%M:%S", time.localtime())
WSGI 接口
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Date : 2020-04-01 17:40:28
# @File : WSGI_demo.py
# @Author : BenLam
# @Link : https://www.cnblogs.com/BenLam/
# @Version : 1.0v
import re
import time
import socket
import threading
import multiprocessing
from framework import route
import gevent
from gevent import monkey
monkey.patch_all()
class WSGIServer(object):
"""
迷你 WSGI 服务页面
"""
def __init__(self):
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(
socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind(('', 1111))
self.server_socket.listen(128)
def request_handler(self, client_socket):
recv_data = client_socket.recv(1024)
if not recv_data:
print("客户端已经断开连接.")
client_socket.close()
return
# 对请求报文进行切割,获取请求行中的请求路径:demo.html
request_str_data = recv_data.decode()
# 使用正则表达式获取url
ret = re.match(r"[^/]+([^ ]+)", request_str_data)
if ret:
path_info = ret.group(1) # /index.html
else:
path_info = "/"
print("用户访问路径: %s" % path_info)
if path_info == "/":
path_info = "/demo.html"
# 区分动态请求/静态请求
if not path_info.endswith(".json"):
try:
with open("./" + path_info, "rb") as f:
file_data = f.read()
except Exception as e:
response_line = "HTTP/1.1 404 Not Found
"
response_header = "Server: Ben_WSGI_Server
"
response_body = "Error!!!!!!"
response_data = response_line + response_header + "
" + response_body
client_socket.send(response_data.encode())
else:
response_header = "HTTP/1.1 200 OK
"
response_header += "Server: Ben_WSGI_Server
"
response_header += "Content-Type: text/html; charset=UTF-8
"
response_header += "
"
response_body = file_data
response_data = response_header.encode("utf-8") + response_body
client_socket.send(response_data)
client_socket.close()
else:
env = dict()
env["PATH_INFO"] = path_info
# 响应体
_route = route(env, self.set_headers) # 实例化
response_body = _route.views() # 执行 views 类方法
response = (self.response_header + response_body).encode("utf-8")
client_socket.send(response)
def set_headers(self, status, headers):
response_header = "HTTP/1.1 %s
" % status
for temp in headers:
response_header += "%s: %s
" % (temp[0], temp[1])
response_header += "
"
self.response_header = response_header
def start(self, level):
print("Ben_WSGI_Server 启动中......")
while True:
client_socket, client_addr = self.server_socket.accept()
# 配置运行方式
if level == "thread":
thread = threading.Thread(target=request_handler, args=(client_socket,))
thread.start()
elif level == "process":
process = multiprocessing.Process(target=self.request_handler, args=(client_socket,))
process.start()
client_socket.close()
elif level == "gevents" or level == "gevent":
gevent.spawn(self.request_handler, client_socket)
else:
print("请配置正确的运行方式")
break
def main():
wsgi_server = WSGIServer()
wsgi_server.start(level="gevent")
if __name__ == '__main__':
main()
ORM 接口
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Date : 2020-04-08 14:40:28
# @File : ORM.py
# @Author : BenLam
# @Link : https://www.cnblogs.com/BenLam/
# @Version : 1.0v
class ModelMetaclass(type):
def __new__(cls, tab_name, bases, attrs):
mappings = dict()
# 判断是否需要保存
for k, v in attrs.items():
# 判断是否是指定的StringField或者IntegerField的实例对象
if isinstance(v, tuple):
print('对象为: %s -→ %s' % (k, v))
mappings[k] = v
# 删除这些已经在字典中存储的属性
for k in mappings.keys():
attrs.pop(k)
# 将之前的object_id/name/email/password以及对应的对象引用、类名字
attrs['__mappings__'] = mappings # 保存属性和列的映射关系
attrs['__table__'] = tab_name # 数据库表名称
return type.__new__(cls, tab_name, bases, attrs)
class User(metaclass=ModelMetaclass):
"""
数据库 User 表结构,有数据库基础比较好理解
# 当指定元类之后,以上的类属性将不在类中,而是在__mappings__属性指定的字典中存储
# 以上User类中有
# __mappings__ = {
# "object_id": ('object_id', "int unsigned")
# "name": ('username', "varchar(30)")
# "email": ('email', "varchar(30)")
# "password": ('password', "varchar(30)")
# }
# __table__ = "User"
"""
object_id = ('object_id', "int unsigned")
name = ('username', "varchar(30)")
password = ('password', "varchar(30)")
email = ('email', "varchar(30)")
def __init__(self, **kwargs):
for name, value in kwargs.items():
setattr(self, name, value)
def save(self):
fields = []
args = []
for k, v in self.__mappings__.items():
fields.append(v[0])
args.append(getattr(self, k, None))
# 格式化输出修改
fields = str(fields).replace("[", "").replace("]", "")
args = str([str(i) for i in args]).replace("[", "").replace("]", "")
sql = f"insert into {self.__table__} ({fields}) values ({args})"
print('SQL: %s' % sql)
if __name__ == '__main__':
u = User(object_id='0000000000000001', name='张三',
password='密码', email='xxxxxxxx@BenLam.com')
u.save()
# print(u.__dict__)
"""
输出:
对象为: object_id -→ ('object_id', 'int unsigned')
对象为: name -→ ('username', 'varchar(30)')
对象为: password -→ ('password', 'varchar(30)')
对象为: email -→ ('email', 'varchar(30)')
SQL: insert into User ('object_id', 'username', 'password', 'email') values ('0000000000000001', '张三', '密码', 'xxxxxxxx@BenLam.com')
[Finished in 0.5s]
"""