Socket

IP 地址、端口

IP 组成与分类

  1. IP地址根据网络号和主机号来分,分为A、B、C三类及特殊地址D、E。全0和全1的都保留不用:
  • A类:(1.0.0.0-126.0.0.0)(默认子网掩码:255.0.0.0或 0xFF000000)第一个字节为网络号,后三个字节为主机号。该类IP地址的最前面为“0”,所以地址的网络号取值于1~126之间。一般用于大型网络。
  • B类:(128.0.0.0-191.255.0.0)(默认子网掩码:255.255.0.0或0xFFFF0000)前两个字节为网络号,后两个字节为主机号。该类IP地址的最前面为“10”,所以地址的网络号取值于128~191之间。一般用于中等规模网络。
  • C类:(192.0.0.0-223.255.255.0)(子网掩码:255.255.255.0或 0xFFFFFF00)前三个字节为网络号,最后一个字节为主机号。该类IP地址的最前面为“110”,所以地址的网络号取值于192~223之间。一般用于小型网络。
  • D类:是多播地址。该类IP地址的最前面为“1110”,所以地址的网络号取值于224~239之间。一般用于多路广播用户[1] 。
  • E类:是保留地址。该类IP地址的最前面为“1111”,所以地址的网络号取值于240~255之间。
  1. 在IP地址3种主要类型里,各保留了3个区域作为私有地址,其地址范围如下:
  • A类地址:10.0.0.0~10.255.255.255
  • B类地址:172.16.0.0~172.31.255.255
  • C类地址:192.168.0.0~192.168.255.255
  • 回送地址:127.0.0.1
  1. 总结:反正知道啥是[192.168.1.1] - [192.168.1.253]可以了

端口

  1. 端口是通过端口号来标记的
  2. 在linux系统中,端口可以有65536(2的16次方)!
  3. 常用就是[8080]端口

Tcp 与 Udp

TCP 介绍

  1. TCP/IP 是互联网相关的各类协议族的总称,比如:TCP,UDP,IP,FTP,HTTP,ICMP,SMTP 等都属于 TCP/IP 族内的协议。
  2. TCP划分为4层:
  • 链路层:负责封装和解封装IP报文,发送和接受ARP/RARP报文等。
  • 网络层:负责路由以及把分组报文发送给目标网络或主机。
  • 传输层:负责对报文进行分组和重组,并以TCP或UDP协议格式封装报文。
  • 应用层:负责向用户提供应用程序,比如HTTP、FTP、Telnet、DNS、SMTP等。

TCP 特点

  1. 面向连接,这种连接是一对一的,因此TCP不适用于广播的应用程序
  2. 可靠传输
  3. TCP采用应答机制
  4. 超时重传
  5. 错误校验
  6. 流量控制和阻塞管理
  7. 可靠、稳定
  8. 传输速度慢
  9. 占用系统资源高

UDP 特点

  1. 面向无连接
  2. 有单播,多播,广播的功能
  3. 面向报文
  4. 不可靠性

头文件

import socket
socket.socket(Address, Type)

# Address:可以选择 AF_INET(用于 Internet 进程间通信) 或者 AF_UNIX(用于同一台机器进程间通信),咱们常用是 AF_INET
# Type:SOCK_STREAM(流式套接字,主要用于 TCP 协议)或者 SOCK_DGRAM(数据报套接字,主要用于 UDP 协议),区分是 TCP 还是 UDP

入门

  • 服务端
# !/usr/bin/env python
# -*- coding: utf-8 -*-
# @Date     : 2020/05/14 14:32:15
# @File     : TCP - 0__服务端.py
# @Link     : https://www.cnblogs.com/BenLam/

import os
import socket

def services():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(("127.0.0.1", 1111))
    server.listen(128)
    # 复用端口
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    # 设置为非阻塞, 默认是 True 阻塞
    # 开启非阻塞就会报错,用 try 去捕获
    server.setblocking(False)
    conn_list = []
    print("就绪......")

    while True:
        try:
            conn, addr = server.accept()  # 被动接受TCP客户的连接,等待连接的到来,收不到时会报异常
            print("[客户端接入] %s connected!" %str(addr))
            conn_list.append(conn)
        except BlockingIOError as e:
            pass

        tmp_list = [conn for conn in conn_list]
        for conn in tmp_list:
            try:
                data = conn.recv(1024) # 接收数据1024字节
                if data:
                    print(f"{data}| 客户端地址- {str(addr)}")
                    conn.send(b"response #> %s" %data)
                else:
                    print("[客户端断开] %s disconnected! " % str(addr))
                    conn.close()
                    conn_list.remove(conn)
                    print(f"当前在线人数 => {len(conn_list)}")
            except IOError:
                pass

if __name__ == '__main__':
    services()
  • 客户端
# !/usr/bin/env python
# -*- coding: utf-8 -*-
# @Date     : 2020/05/14 15:28:47
# @File     : TCP - 0__客户端.py
# @Link     : https://www.cnblogs.com/BenLam/

import socket
import threading

def client():
    obj = socket.socket()
    obj.connect(("127.0.0.1", 1111))
    for _ in range(1):
        obj.sendall(bytes("test...........", encoding="utf-8"))
        content = str(obj.recv(1024), encoding="utf-8")
    # obj.close()


def main():
    thread = []
    for _ in range(3):
        x = threading.Thread(target=client, args=())
        thread.append(x)
    
    for _ in thread:
        _.start()


if __name__ == '__main__':
    main()

TCP 例子

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Date    : 2020/3/31 11:54
# @File    : 0-socket.py
# @Author  : BenLam
# @Link    : https://www.cnblogs.com/BenLam/
# @Version : PyCharm


from socket import *

server_ip = "127.0.0.1"
server_port = 1111
server_addr = (server_ip, server_port)
tcp_server = socket(AF_INET, SOCK_STREAM)


def server():
    """ TCP 服务端"""
    tcp_server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) # 复用端口
    address = ("", server_port)
    tcp_server.bind(address)
    tcp_server.listen(128)
    print("等待客户端访问.........")
    try:
        while True:
            client_sendtext, client_Addr = tcp_server.accept()
            print(f"当前客户端连接地址: - {client_Addr}")
            while True:
                recv_data = client_sendtext.recv(1024)  # 接收最大为 1024 个字节
                # print("接收客户端数据为:", recv_data.decode("gbk"))
                if not recv_data: continue
                if recv_data == b"exit":
                    client_sendtext.send(b"Goodbye!~")
                    break
                client_sendtext.send(b"response #> %s" % recv_data)
            client_sendtext.close()
    except Exception as e:
        print(e)


def client():
    """ TCP 客户端"""
    while True:
        connect = socket(AF_INET, SOCK_STREAM)
        connect.connect(server_addr)
        print("> 连接成功....")
        while True:
            send_data = input("请输入内容,输入'kill'指令即可退出程序,输入'exit'重连:
")
            if send_data == "kill": kill()
            if not send_data: continue
            connect.send(send_data.encode("gbk"))
            recvData = connect.recv(1024)
            print("接收服务端返回数据: ", recvData.decode("gbk"))
            if recvData == b"Goodbye!~":
                break
        connect.close()


def main():
    import threading
    t = threading.Thread(target=server, args=()).start()
    t2 = threading.Thread(target=client, args=()).start()


def kill():
    import os
    os.system("taskkill /f /t /im python.exe")
    os.system("taskkill /f /t /im python36.exe")

if __name__ == '__main__':
    main()

with 实现非阻塞

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Date    : 2020/5/15 11:54
# @File    : 0-socket.py
# @Author  : BenLam
# @Link    : https://www.cnblogs.com/BenLam/

import socket

def server():
    online_client = []
    HOST = ("", 1111)
    with socket.socket() as sock:
        sock.bind(HOST)
        sock.listen(6)
        sock.setblocking(False)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        print("就绪...")
        while True:
            try:
                conn, addr = sock.accept()
                print("[客户端接入] %s connected! " % str(addr))
                online_client.append(conn)
            except BlockingIOError as e:
                pass

            _client = [conn for conn in online_client]
            for conn in _client:
                try:
                    data = conn.recv(1024)
                    conn.sendall(b"response #> " + data)
                    # print(f" #> 消息- {str(data, encoding='utf-8')}| 客户端地址- {addr}")
                    if not data:
                        print("[-客户端断开] %s disconnected! " % str(addr))
                        conn.close()
                        online_client.remove(conn)
                        print(f"当前在线剩余人数 => {len(online_client)}")
                        break
                except OSError:
                    pass

def client():
    with socket.socket() as obj:
        obj.connect(("127.0.0.1", 1111))
        obj.sendall(bytes("测试", encoding="utf-8"))
        content = str(obj.recv(1024), encoding="utf-8")
        # print(content)
        obj.close()

def main():
    import threading

    li = []
    _server = threading.Thread(target=server).start()

    for _ in range(10):
        a = threading.Thread(target=client)
        li.append(a)
    for _ in li:
        _.start()

if __name__ == '__main__':
    main()
    
-------------------------------------------
就绪...
[客户端接入] ('127.0.0.1', 62904) connected! 
[-客户端断开] ('127.0.0.1', 62904) disconnected! 
当前在线剩余人数 => 0
(......)

UDP 例子

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Date    : 2020/3/31 12:54
# @File    : 0-socket.py
# @Author  : BenLam
# @Link    : https://www.cnblogs.com/BenLam/
# @Version : PyCharm


import socket
import threading
import time

HOST = "127.0.0.1"
BUFSIZ = 1024
server_ADDR = (HOST, 1111)
client_ADDR = (HOST, 2222)

def udp_server():
    udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)  # 创建客户端套接字
    udp.bind(server_ADDR)
    print(" UDP - 启动成功 ".center(28, "-"))
    while True:
        data, addr = udp.recvfrom(BUFSIZ)  # 连续接收指定字节的数据,接收到的是字节数组
        if not data: udp.sendto(b"send the message is Null
", addr)
        if data == b"exit":
            udp.sendto(b"#Restart Service! > Hello 
", addr)
            continue
        if data == b"kill":
            udp.sendto(b"Good bye!", addr)
            kill()
        now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
        udp.sendto(b"[%s] UDP server response #>  Hello %s!
" %(now.encode('gbk'), data), addr)
    udp.close()

def send_text(udp):
    while True:
        send_data = input("请输入内容,输入'kill'指令即可退出程序,输入'exit'重连:
")
        udp.sendto(bytes(send_data, 'gbk'), server_ADDR)  # 客户端发送消息,必须发送字节数组
        if send_data == "close":
            break

def recv_text(udp):
    while True:
        data, _ADDR = udp.recvfrom(BUFSIZ)  # 接收回应消息,接收到的是字节数组
        print(f'{_ADDR}- 收到消息-> {data.decode("gbk")}')  # 打印回应消息
        if not data:  # 如果接收服务器信息失败,或没有消息回应
            break
        elif data == b"exit":
            continue
        elif data in b"Good bye!":
            print(" UDP 服务端终止通讯 ".center(28, "-"))
            kill()
    udp.close()

def udp_client():
    udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)  # 创建客户端套接字
    udp.bind(client_ADDR)
    t = threading.Thread(target=send_text, args=(udp,)).start()
    t2 = threading.Thread(target=recv_text, args=(udp,)).start()

def main():
    t = threading.Thread(target=udp_server, args=()).start()
    t2 = threading.Thread(target=udp_client, args=()).start()

def kill():
    import os
    os.system("taskkill /f /t /im python.exe")
    os.system("taskkill /f /t /im python36.exe")

if __name__ == '__main__':
    main()

模拟 WSGI 服务实现

  • 通过 Python - socket 库实现 WSGI服务
  • 分成三部分:
  1. 静态 HTML 页面
  2. 静态 Views 接口
  3. 静态 WSGI 服务接口

HTML 静态页面比较简单

@File : tset.py
文件名 :demo.html

<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1">
<title>Ben_WSGI_Server</title>
<meta name="description" content="">
<meta name="keywords" content="">
<link href="" rel="stylesheet">
</head>
<body>
    <h1> Hello World! </h1>
    <h1> 这是一个 WSGI_demo </h1>
    <br >
    <h1><a href="./register.json">注册页面</a></h1>
    <h1><a href="./index.json">首页</a></h1>
    <h1><a href="./center.json">个人主页</a></h1>
    <h1><a href="./help.json">帮助中心</a></h1>
</body>
</html>

Views 接口

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Date    : 2020-04-01 17:40:28
# @File    : framework.py
# @Author  : BenLam
# @Link    : https://www.cnblogs.com/BenLam/
# @Version : $Id$


import time


class route():
    """
    Views 接口页面
    """

    def __init__(self, env, set_header):
        """
        env → {'PATH_INFO': '/center.py'}
        set_header → 200 OK [('Content-Type', 'text/html; charset=UTF-8')]
        """
        status = "200 OK"
        response_headers = [("Content-Type", "text/html; charset=UTF-8")]
        set_header(status, response_headers)
        self.path_info = env["PATH_INFO"]

    def views(self):

        if self.path_info == "/index.json":
            response_body = index()
        elif self.path_info == "/center.json":
            response_body = center()
        elif self.path_info == "/register.json":
            response_body = register()
        elif self.path_info == "/help.json":
            response_body = help()
        else:
            response_body = "----Not Found---- <br> 当前时间 %s" % time.ctime()
        return response_body


def index():
    return "<h1>----主页---- <br> 当前时间 %s</h1>" % time.strftime("%Y--%m--%d %H:%M:%S", time.localtime())


def center():
    return "<h1>----中心主页---- <br> 当前时间 %s</h1>" % time.strftime("%Y--%m--%d %H:%M:%S", time.localtime())


def register():
    return "<h1>----注册主页---- <br> 当前时间 %s</h1>" % time.strftime("%Y--%m--%d %H:%M:%S", time.localtime())


def help():
    return "这是个帮助页面 <br> 当前时间 %s</h1>" % time.strftime("%Y--%m--%d %H:%M:%S", time.localtime())

WSGI 接口

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Date    : 2020-04-01 17:40:28
# @File    : WSGI_demo.py
# @Author  : BenLam
# @Link    : https://www.cnblogs.com/BenLam/
# @Version : 1.0v

import re
import time
import socket
import threading
import multiprocessing

from framework import route
import gevent
from gevent import monkey
monkey.patch_all()


class WSGIServer(object):
    """
    迷你 WSGI 服务页面
    """

    def __init__(self):
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_socket.setsockopt(
            socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server_socket.bind(('', 1111))
        self.server_socket.listen(128)

    def request_handler(self, client_socket):

        recv_data = client_socket.recv(1024)
        if not recv_data:
            print("客户端已经断开连接.")
            client_socket.close()
            return

        # 对请求报文进行切割,获取请求行中的请求路径:demo.html
        request_str_data = recv_data.decode()
        # 使用正则表达式获取url
        ret = re.match(r"[^/]+([^ ]+)", request_str_data)
        if ret:
            path_info = ret.group(1)    # /index.html
        else:
            path_info = "/"
        print("用户访问路径: %s" % path_info)
        if path_info == "/":
            path_info = "/demo.html"

        # 区分动态请求/静态请求
        if not path_info.endswith(".json"):
            try:
                with open("./" + path_info, "rb") as f:
                    file_data = f.read()
            except Exception as e:
                response_line = "HTTP/1.1 404 Not Found
"
                response_header = "Server: Ben_WSGI_Server
"
                response_body = "Error!!!!!!"
                response_data = response_line + response_header + "
" + response_body
                client_socket.send(response_data.encode())
            else:
                response_header = "HTTP/1.1 200 OK
"
                response_header += "Server: Ben_WSGI_Server
"
                response_header += "Content-Type: text/html; charset=UTF-8
"
                response_header += "
"
                response_body = file_data
                response_data = response_header.encode("utf-8") + response_body
                client_socket.send(response_data)
            client_socket.close()
        else:
            env = dict()
            env["PATH_INFO"] = path_info

            # 响应体
            _route = route(env, self.set_headers) # 实例化
            response_body = _route.views() # 执行 views 类方法
            response = (self.response_header + response_body).encode("utf-8")
            client_socket.send(response)

    def set_headers(self, status, headers):

        response_header = "HTTP/1.1 %s
" % status
        for temp in headers:
            response_header += "%s: %s
" % (temp[0], temp[1])
        response_header += "
"
        self.response_header = response_header

    def start(self, level):

        print("Ben_WSGI_Server 启动中......")
        while True:
            client_socket, client_addr = self.server_socket.accept()
            # 配置运行方式
            if level == "thread":
                thread = threading.Thread(target=request_handler, args=(client_socket,))
                thread.start()
            elif level == "process":
                process = multiprocessing.Process(target=self.request_handler, args=(client_socket,))
                process.start()
                client_socket.close()
            elif level == "gevents" or level == "gevent":
                gevent.spawn(self.request_handler, client_socket)
            else:
                print("请配置正确的运行方式")
                break

def main():
    wsgi_server = WSGIServer()
    wsgi_server.start(level="gevent")

if __name__ == '__main__':
    main()

ORM 接口

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Date    : 2020-04-08 14:40:28
# @File    : ORM.py
# @Author  : BenLam
# @Link    : https://www.cnblogs.com/BenLam/
# @Version : 1.0v

class ModelMetaclass(type):
    def __new__(cls, tab_name, bases, attrs):
        mappings = dict()
        # 判断是否需要保存
        for k, v in attrs.items():
            # 判断是否是指定的StringField或者IntegerField的实例对象
            if isinstance(v, tuple):
                print('对象为: %s -→ %s' % (k, v))
                mappings[k] = v

        # 删除这些已经在字典中存储的属性
        for k in mappings.keys():
            attrs.pop(k)

        # 将之前的object_id/name/email/password以及对应的对象引用、类名字
        attrs['__mappings__'] = mappings  # 保存属性和列的映射关系
        attrs['__table__'] = tab_name  # 数据库表名称
        return type.__new__(cls, tab_name, bases, attrs)


class User(metaclass=ModelMetaclass):
    """
    数据库 User 表结构,有数据库基础比较好理解
    # 当指定元类之后,以上的类属性将不在类中,而是在__mappings__属性指定的字典中存储
    # 以上User类中有
    # __mappings__ = {
    #     "object_id": ('object_id', "int unsigned")
    #     "name": ('username', "varchar(30)")
    #     "email": ('email', "varchar(30)")
    #     "password": ('password', "varchar(30)")
    # }
    # __table__ = "User"
    """

    object_id = ('object_id', "int unsigned")
    name = ('username', "varchar(30)")
    password = ('password', "varchar(30)")
    email = ('email', "varchar(30)")

    def __init__(self, **kwargs):
        for name, value in kwargs.items():
            setattr(self, name, value)

    def save(self):
        fields = []
        args = []
        for k, v in self.__mappings__.items():
            fields.append(v[0])
            args.append(getattr(self, k, None))

        # 格式化输出修改
        fields = str(fields).replace("[", "").replace("]", "")
        args = str([str(i) for i in args]).replace("[", "").replace("]", "")
        sql = f"insert into {self.__table__} ({fields}) values ({args})"
        print('SQL: %s' % sql)


if __name__ == '__main__':

    u = User(object_id='0000000000000001', name='张三',
             password='密码', email='xxxxxxxx@BenLam.com')
    u.save()
    # print(u.__dict__)

"""
输出:
    对象为: object_id -→ ('object_id', 'int unsigned')
    对象为: name -→ ('username', 'varchar(30)')
    对象为: password -→ ('password', 'varchar(30)')
    对象为: email -→ ('email', 'varchar(30)')
    SQL: insert into User ('object_id', 'username', 'password', 'email') values ('0000000000000001', '张三', '密码', 'xxxxxxxx@BenLam.com')
    [Finished in 0.5s]
"""
原文地址:https://www.cnblogs.com/BenLam/p/12435955.html