WebSocket

WebSocket

一、服务向客户端推送消息

1.1 轮询

  • 原理:让浏览器每隔几秒钟通过ajax朝服务端发送请求来获取数据,eg:每隔5s中朝服务端发送一次请求
  • 优缺点:
  1. 消息延迟太高
  2. 消耗资源较多
  3. 请求次数较多

1.2 长轮询

  • 原理:服务端给每个客户端建立队列,让浏览器通过ajax向后端偷偷的发送请求,去各自对应的队列中获取数据,如果没有数据会阻塞,但是不会一直阻塞,会通过timeout参数及异常处理的方式限制阻塞事件,比如30s后返回客户端触发回调函数让浏览器再次发送请求
  • 相对于轮询优缺点:
  1. 消息基本没有延迟
  2. 请求次数减少了
  3. 消耗资源较少
  4. 现在web版本的qq和微信还是基于长轮询实现的(大公司web项目可能都会使用)

1.3 实现群聊功能

基于ajax及队列自己实现简易版本的长轮询群聊功能

django中的应用,可以有自己的urls.py,static静态文件夹,templates模版文件夹

如果有多个地方都有模版文件夹,那么查找模版的顺序会按照配置文件中注册了的app的顺序从上往下依次查找

前端:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
    <script src="https://cdn.bootcss.com/jquery/3.4.1/jquery.min.js"></script>
    <link href="https://cdn.bootcss.com/twitter-bootstrap/3.3.1/css/bootstrap.min.css" rel="stylesheet">
    <script src="https://cdn.bootcss.com/twitter-bootstrap/3.3.1/js/bootstrap.min.js"></script>
</head>
<body>

<h1>聊天室: <span id="d3">{{ name }}</span></h1>
<div>
    <input type="text" name="content" id="d1">
    <button id="d2">发送</button>
</div>
<h1>聊天纪录</h1>
<div class="record">

</div>

<script>
    // 朝后端发送用户消息
    $('#d2').click(function () {
        $.ajax({
            url: '/send_msg/',
            type: 'post',
            data: {
                'content': $("#d1").val(),
                'name': $("#d3").text(),
                'csrfmiddlewaretoken': '{{ csrf_token }}'
            },
            {#dataType:"JSON",#}
            success: function (args) {

            }
        })
    });

    // 书写偷偷跟服务端要数据的代码
    function getMsg() {
        $.ajax({
            url: '/get_msg/',
            type: 'get',
            data: {'name': '{{ name }}'},
            {#dataType:"JSON",#}
            success: function (args) {
                if (args.status) {
                    // 有消息 应该DOM操作渲染到页面上
                    // 1 创建标签
                    var pEle = $('<p>');
                    // 2 给标签添加文本内容
                    pEle.text(args.msg.name +" : "+ args.msg.content);
                    // 3 添加到div中
                    $('.record').append(pEle)
                } else {
                    // 没有则继续发送
                }
                getMsg()
            }
        })
    }

    // 页面加载完毕之后自动触发getMsg函数的执行
    $(function () {  // 等待页面加载完毕之后自动调用getMsg函数
        getMsg()
    })
</script>

</body>
</html>

后端:

from django.shortcuts import render,HttpResponse
import queue

# Create your views here.



# 全局定义一个字典用来存储客户端浏览器与队列关系
q_dict = {}



def home(request):
    # 获取用户唯一标识
    name = request.GET.get('name')
    # 给每个客户端浏览器创建独有的队列
    q_dict[name] = queue.Queue()
    return render(request,'index.html',locals())


def send_msg(request):
    if request.method == 'POST':
        con = request.POST.get('content')
        name = request.POST.get('name')
        # 将该消息往所有的群聊的客户端队列中添加
        content = {'name': name, 'content': con, }
        for q in q_dict.values():
            q.put(content)
        return HttpResponse('OK')

import json
from django.http import JsonResponse
def get_msg(request):
    name = request.GET.get("name")
    # 去对应的队列中获取数据
    q = q_dict.get(name)
    back_dic = {'status':True,'msg':''}
    try:
        data = q.get(timeout=10)  # 等10s
        back_dic['msg'] = data
    except queue.Empty as e:
        back_dic['status'] = False
    # return HttpResponse(json.dumps(back_dic))
    return JsonResponse(back_dic)
# 大公司一般情况下都会使用上面长轮询的方式,因为兼容性好

二、WebSocket

websocket(主流浏览器和框架都支持),它也是一个网络协议,并且基于该协议传输数据,数据是加密处理的,

1.WebSocket 与 HTTP
WebSocket 的最大特点就是:服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息,是全 双工通信。

TTP 有 1.1 和 1.0 之说,也就是所谓的 keep-alive ,把多个 HTTP 请求合并为一个,但是 Websocket 其实是一 个新协议,跟 HTTP 协议基本没有关系,只是为了兼容现有浏览器,所以在握手阶段使用了 HTTP

"""
http  网络协议  不加密传输 
https  网络协议  加密传输
	上面这两个协议都是短链接
	
websocket  网络协议 加密传输	 类似长连接 
	websocket的诞生 真正意义上实现了服务端给客户端主动推送消息
"""

2.1 websocket内部原理

1582527578706

1. 握手环节

  • 目的:验证服务端是否支持Websocket协议

  • 流程:

  1. 客户端浏览器第一次访问服务器的时候,浏览器内部会自动生成一个随机字符串,将该随机字符串发送给服务端(基于http)协议)浏览器也保留随机生成的字符串(在请求头里面)
  2. 服务端接收随机字符串之后,会将字符串与magic string(全球统一)做字符串拼接,然后利用加密算法对拼接好的字符串进行加密处理(sha1/base64),此时客户端也对产生的随机字符串做上述的拼接和加密操作
  3. 接着服务器将产生好的随机字符串发送给客户端的浏览器(响应头里面),客户端浏览器会对比服务器发送的随机字符串与浏览器本地操作的随机字符串进行对比,如果一致说明该服务端支持websocket,如果不一致服务端则不支持。
GET /chatsocket HTTP/1.1 
Host: 127.0.0.1:8002 
Connection: Upgrade 
Pragma: no-cache 
Cache-Control: no-cache 
Upgrade: websocket 
Origin: http://localhost:63342 
Sec-WebSocket-Version: 13               #版本 
Sec-WebSocket-Key: mnwFxiOlctXFN/DeMt1Amg== 
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits

  • Sec-WebSocket-Key 是一个 Base64 encode 的值,这个是浏览器随机生成的,发送给服务器
  • 服务端从请求(HTTP的请求头)信息中提取 Sec-WebSocket-Key,利用magic_string 和 Sec-WebSocket-Key 先进行拼接,然后采用hmac1加密,再进行base64加密
  • 将加密结果响应给客户端,服务器会返回下列东西,表示已经接受到请求, 成功建立 WebSocket

2. 收发数据(send/onmessage)

验证成功之后就可以数据交互了 但是交互的数据是加密的 需要解密处理

  • 数据基于网络传输都是二进制格式,单位换算 8bit = 1bytes
  • 步骤一:读取第二个字节的后七位称之为payload,根据payload大小决定不同的处理方式:
  1. =127 再读取8个字节 作为数据报
  2. =126 再读取2个字节 作为数据报
  3. <=125 不再往后读了
  • 步骤二:
# 步骤1之后 会对剩下的数据再读取4个字节(masking-key)		
# 之后依据masking-key算出真实数据
var DECODED = "";
for(var i = 0; i < ENCODED.length; i++) {
       DECODED[i] = ENCODED[i] ^ MASK[i % 4];
  }

3.代码验证

因为我们再实际开发中,不可能去书写上述这么复杂的代码逻辑,我们是直接使用别人封装好的模块即可

这里知识做一个验证而已

后端:

import socket
import base64
import hashlib

# 正常的socket代码
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 防止linux/mac报错
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('127.0.0.1', 8080))
sock.listen(5)


conn, address = sock.accept()
data = conn.recv(1024)  # 获取客户端发送的消息


def get_data(info):
    payload_len = info[1] & 127
    if payload_len == 126:
        extend_payload_len = info[2:4]
        mask = info[4:8]
        decoded = info[8:]
    elif payload_len == 127:
        extend_payload_len = info[2:10]
        mask = info[10:14]
        decoded = info[14:]
    else:
         extend_payload_len = None
         mask = info[2:6]
         decoded = info[6:]
    bytes_list = bytearray()
    for i in range(len(decoded)):
        chunk = decoded[i] ^ mask[i % 4]
        bytes_list.append(chunk)
    body = str(bytes_list, encoding='utf-8')

    return body

def get_headers(data):
    """
    将请求头格式化成字典
    :param data:
    :return:
    """
    header_dict = {}
    data = str(data, encoding='utf-8')

    header, body = data.split('\r\n\r\n', 1)
    header_list = header.split('\r\n')
    for i in range(0, len(header_list)):
        if i == 0:
            if len(header_list[i].split(' ')) == 3:
                header_dict['method'], header_dict['url'], header_dict['protocol'] = header_list[i].split(' ')
        else:
            k, v = header_list[i].split(':', 1)
            header_dict[k] = v.strip()
    return header_dict

# 想将http协议的数据处理成字典的形式方便后续取值
header_dict = get_headers(data)  # 将一大堆请求头转换成字典数据  类似于wsgiref模块
client_random_string = header_dict['Sec-WebSocket-Key']  # 获取浏览器发送过来的随机字符串

# magic string拼接
magic_string = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'  # 全球共用的随机字符串 一个都不能写错
value = client_random_string + magic_string  # 拼接

# 算法加密
ac = base64.b64encode(hashlib.sha1(value.encode('utf-8')).digest())  # 加密处理

# 将处理好的结果再发送给客户端校验
tpl = "HTTP/1.1 101 Switching Protocols\r\n" \
      "Upgrade:websocket\r\n" \
      "Connection: Upgrade\r\n" \
      "Sec-WebSocket-Accept: %s\r\n" \
      "WebSocket-Location: ws://127.0.0.1:8080\r\n\r\n"
response_str = tpl %ac.decode('utf-8')  # 处理到响应头中

# 将随机字符串给浏览器返回回去
conn.send(bytes(response_str, encoding='utf-8'))


# 基于websocket通信
while True:
    data = conn.recv(1024)  # 数据是加密处理的
    # print(data)
    # 对data进行解密操作
    value = get_data(data)
    print(value)

前端:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
    <meta name="viewport" content="width=device-width, initial-scale=1">
    <link href="https://cdn.bootcss.com/twitter-bootstrap/3.4.1/css/bootstrap.min.css" rel="stylesheet">
    <script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.min.js"></script>
    <script src="https://cdn.bootcss.com/twitter-bootstrap/3.4.1/js/bootstrap.min.js"></script>
</head>
<body>


<script>
    var ws = new WebSocket('ws://127.0.0.1:8080/')
    // 上面这句代码干了很多事
    // 1 产生随机字符串并发送给服务端  Sec-WebSocket-Key: EMy5N4dwjl/jHoU0eYDDGQ==
    // 2 服务端发送处理好的内容过来之后 自动校验
</script>
</body>
</html>
通过浏览器控制台发送消息

img

三、后端框架实现websocket通信

"""
django 
	-默认不支持
	-下载第三方模块:channels
flask
	-默认不支持
	-下载第三方模块:geventwebsocket
tornado
	-默认就是支持的
"""

# 如果你想在django支持websocket的话 需要安装对应的模块
"""
在django安装channles时候不要直接安装最新版本 可能会自动将的解释器中的django版本改为最新版
python解释器环境推荐使用3.6(官网:3.5可能会出问题,3.7也可能会出问题,没有给出具体的解释)
"""
pip3 install channles==2.3

四、channel-layers配置

#### channels基础配置

总共分为三步

#### channel-layers实现群聊功能

- 配置文件中配置


  # channel-layers配置
  CHANNEL_LAYERS = {
      'default':{
          'BACKEND':'channels.layers.InMemoryChannelLayer'
      }
  }



- 需要在websocket_connect中提前创建群聊


  # 如何获取无名有名分组的正则表达式匹配到的数据
  task_id = self.scope['url_route']['kwargs'].get("task_id")
  
  # task_id:去群名类似qq群名 3452
  async_to_sync(self.channel_layer.group_add)(task_id,self.channel_name)


- 如何群发消息


  def websocket_receive(self, message):
      async_to_sync(self.channel_layer.group_send)(task_id,{'type':'my.send','message':{'code':'init','data':node_list}})
  # 第二个参数字典的键是固定的 就叫type message
              # type指向的是一个方法 这个方法就是用来发送数据的 数据就是message后面的值
              # 如果type后面是my.send 那么你就需要创建my_send方法
              # xxx.ooo    xxx_ooo方法
              # my.send    my_send方法
  # 会将群聊中所有的对象取出指向下面的方法
  def my_send(self,event):
          # 发送数据
          message = event['message']  # {'code':'init','data':node_list}
          self.send(json.dumps(message))
          


- 如何踢出群成员

  def websocket_disconnect(self, message):
  		async_to_sync(self.channel_layer.group_discard)(task_id,self.channel_name)


### 

实现群聊

"""
@author RansySun
@create 2020-02-28-20:44
"""
from channels.generic.websocket import WebsocketConsumer
from channels.exceptions import StopConsumer
import json
from asgiref.sync import async_to_sync
from Code import models
import time
import threading
import subprocess
import shutil


# 将创建节点的代码封装到函数中
def create_node(task_id, task_obj):
    node_object_list = []
    # 先判断当前任务是否已经生成过节点了
    node_queryset = models.Node.objects.filter(task_id=task_id)
    if not node_queryset:
        # 动态创建数据并发送给前端
        start_node = models.Node.objects.create(text='开始', task_id=task_id)
        node_object_list.append(start_node)

        # 判断是否有下载前的钩子脚本
        if task_obj.before_download_script:
            """
            在创建钩子节点的时候 需要注意箭头指向问题
            解决方式 利用变量名指向的问题
                没有钩子的时候start_node指向的是开始节点
                如果有钩子的时候statrt_node指向钩子节点
            """
            start_node = models.Node.objects.create(
                text='下载前',
                task_id=task_id,
                parent=start_node
            )
            node_object_list.append(start_node)

        download_node = models.Node.objects.create(text='下载', task_id=task_id, parent=start_node)
        node_object_list.append(download_node)

        # 判断是否有下载后的钩子脚本
        # 同理
        if task_obj.after_download_script:
            """
            在创建钩子节点的时候 需要注意箭头指向问题
            解决方式 利用变量名指向的问题
                没有钩子的时候start_node指向的是开始节点
                如果有钩子的时候statrt_node指向钩子节点
            """
            download_node = models.Node.objects.create(
                text='下载后',
                task_id=task_id,
                parent=download_node
            )
            node_object_list.append(download_node)

        upload_node = models.Node.objects.create(text='上传', task_id=task_id, parent=download_node)
        node_object_list.append(upload_node)

        # 一个项目可能需要部署到多个服务器 也就意味着服务器节点可能有多个
        task_obj = models.DeployTask.objects.filter(pk=task_id).first()
        # 根据任务对象获取项目对象从而获取到对应的服务器对象
        for server_obj in task_obj.project.servers.all():
            server_node = models.Node.objects.create(
                text=server_obj.hostname,
                task_id=task_id,
                parent=upload_node,
                server=server_obj
            )
            node_object_list.append(server_node)

            # 发布前钩子节点
            if task_obj.before_deploy_script:
                server_node = models.Node.objects.create(
                    text='发布前',
                    task_id=task_id,
                    parent=server_node,
                    server=server_obj
                )
                node_object_list.append(server_node)

            # 在服务器节点的后面 再创建一个发布节点
            deploy_node = models.Node.objects.create(
                text='发布',
                task_id=task_id,
                parent=server_node,
                server=server_obj
            )
            node_object_list.append(deploy_node)

            # 发布后钩子节点
            # 发布前钩子节点
            if task_obj.after_deploy_script:
                after_deploy_node = models.Node.objects.create(
                    text='发布后',
                    task_id=task_id,
                    parent=deploy_node,
                    server=server_obj
                )
                node_object_list.append(after_deploy_node)

        # node_object_list = [obj1,obj2,obj3...]
    else:
        node_object_list = node_queryset

    return node_object_list


# 将构造返回给gojs数据的代码封装到函数
def convert_object_to_gojs(node_object_list):
    node_list = []
    for node_obj in node_object_list:
        tmp = {
            'key': str(node_obj.pk),  # 用户主键作为key 用来区分箭头关系
            'text': node_obj.text
        }
        # 判断当前节点是否有父节点
        if node_obj.parent:
            # 还需要添加一组简直对
            tmp['parent'] = str(node_obj.parent_id)
        node_list.append(tmp)
    return node_list


class PublishConsumer(WebsocketConsumer):

    # 由于待封装的代码中出现了self,所以封装成类的方法更加的合适
    def deploy(self, task_id, task_obj):
        # 第一步:开始 去数据库中找到该节点数据该为绿色即可 同时将状态给前端返回
        start_node = models.Node.objects.filter(text='开始', task_id=task_id).first()
        start_node.status = 'green'
        start_node.save()
        # 群发  gojs如何修改图表颜色  数据的主键值以及颜色参数
        async_to_sync(self.channel_layer.group_send)(task_id, {'type': 'my.send',
                                                               'message': {'code': 'update', 'node_id': start_node.pk,
                                                                           'color': 'green'}})

        # 第二步:下载前
        import os
        from django.conf import settings

        project_name = task_obj.project.title
        uid = task_obj.uid
        # 脚本文件文件夹路径
        script_folder = os.path.join(settings.DEPLOY_CODE_PATH, project_name, uid, 'scripts')
        # 代码文件文件夹路径
        project_folder = os.path.join(settings.DEPLOY_CODE_PATH, project_name, uid, project_name)

        # 压缩文件文件夹路径
        package_folder = os.path.join(settings.PACKAGE_PATH, project_name)
        # 通过代码动态创建文件夹
        if not os.path.exists(script_folder):
            # mkdir只能创建单层  如果你想一次性创建多层 makedirs
            os.makedirs(script_folder)
        if not os.path.exists(project_folder):
            os.makedirs(project_folder)

        if not os.path.exists(package_folder):
            os.makedirs(package_folder)

        if task_obj.before_download_script:
            # TODO:指向钩子脚本内容 根据指向结果的不同返回不同的颜色
            """
            在发布机上执行钩子脚本内容
            1.将钩子脚本内容写入到本地文件   这个地方我们为了方便 直接规定成python脚本文件

            2.在本地执行这个脚本文件 如果成功则是绿色 否则是红色
            """
            # 需要提前创建一个颜色的变量
            status = 'green'

            # 脚本文件的文件名  这个地方我们为了方便 直接规定成python脚本文件
            script_name = 'before_download_script.py'
            script_path = os.path.join(script_folder, script_name)
            # 文件操作读写数据
            with open(script_path, 'w', encoding='utf-8') as f:
                f.write(task_obj.before_download_script)
            try:
                # 执行脚本文件内容
                subprocess.check_output('python {0}'.format(script_name), shell=True, cwd=script_folder)
                # 先切换到script_folder文件夹下 再执行脚本文件  python a.txt
                # 执行成功与否我修改绿色或红色
            except Exception as e:
                status = 'red'
            before_download_node = models.Node.objects.filter(text='下载前', task_id=task_id).first()
            before_download_node.status = status
            before_download_node.save()
            async_to_sync(self.channel_layer.group_send)(task_id, {'type': 'my.send',
                                                                   'message': {'code': 'update',
                                                                               'node_id': before_download_node.pk,
                                                                               'color': status}})

            # 如果脚本文件执行失败了 你觉得后续的流程还有必要执行吗???
            # 一旦执行失败 那么后续的操作根本就没有必要执行了
            if status == 'red':
                return

        # 第三步:下载
        # TODO:gitpython模块去远程仓库拉取代码
        """
        1.获取仓库地址:task_obj.project.repo

        2.仓库中下载代码
            git clone -b v1 https://github.com/xxx.git

            gitpython模块
        """
        # 将我们封装好的gitpython代码拷贝到一个文件中 这里之际引用即可
        # TODO:课下自己拷贝 地址换成你们自己仓库的地址试试
        # 需要提前创建一个颜色的变量
        from app01.utils.ab_gitpython import GitRepository
        status = 'green'
        try:
            git_obj = GitRepository(project_folder, task_obj.project.repo)
            git_obj.pull()
        except Exception as e:
            status = 'red'
        download_node = models.Node.objects.filter(text='下载', task_id=task_id).first()
        download_node.status = status
        download_node.save()
        async_to_sync(self.channel_layer.group_send)(task_id, {'type': 'my.send',
                                                               'message': {'code': 'update',
                                                                           'node_id': download_node.pk,
                                                                           'color': status}})
        if status == 'red':
            return

        # 第四步:下载后
        if task_obj.after_download_script:
            # TODO:指向钩子脚本内容 根据指向结果的不同返回不同的颜色
            # 跟下载前一样的逻辑
            # 需要提前创建一个颜色的变量
            status = 'green'

            # 脚本文件的文件名  这个地方我们为了方便 直接规定成python脚本文件
            script_name = 'after_download_script.py'
            script_path = os.path.join(script_folder, script_name)
            # 文件操作读写数据
            with open(script_path, 'w', encoding='utf-8') as f:
                f.write(task_obj.after_download_script)
            try:
                # 执行脚本文件内容
                subprocess.check_output('python {0}'.format(script_name), shell=True, cwd=script_folder)
                # 先切换到script_folder文件夹下 再执行脚本文件  python a.txt
                # 执行成功与否我修改绿色或红色
            except Exception as e:
                status = 'red'
            after_download_node = models.Node.objects.filter(text='下载后', task_id=task_id).first()
            after_download_node.status = status
            after_download_node.save()
            async_to_sync(self.channel_layer.group_send)(task_id, {'type': 'my.send',
                                                                   'message': {'code': 'update',
                                                                               'node_id': after_download_node.pk,
                                                                               'color': status}})

            # 如果脚本文件执行失败了 你觉得后续的流程还有必要执行吗???
            # 一旦执行失败 那么后续的操作根本就没有必要执行了
            if status == 'red':
                return
            after_download_node = models.Node.objects.filter(text='下载后', task_id=task_id).first()
            after_download_node.status = status
            after_download_node.save()
            async_to_sync(self.channel_layer.group_send)(task_id, {'type': 'my.send',
                                                                   'message': {'code': 'update',
                                                                               'node_id': after_download_node.pk,
                                                                               'color': status}})

        # 第五步:上传  直接变绿 不考虑错误情况
        upload_node = models.Node.objects.filter(text='上传', task_id=task_id).first()
        upload_node.status = 'green'
        upload_node.save()
        async_to_sync(self.channel_layer.group_send)(task_id, {'type': 'my.send',
                                                               'message': {'code': 'update',
                                                                           'node_id': upload_node.pk,
                                                                           'color': 'green'}})

        # 第六步:链接服务器
        for server_obj in task_obj.project.servers.all():
            # 6.1  上传代码
            # TODO:通过paramiko模块将代码上传到服务器  对代码文件进行压缩处理
            status = 'green'
            try:
                # 要上传的文件路径
                upload_folder_path = os.path.join(settings.DEPLOY_CODE_PATH, project_name, uid)

                # 压缩包路径
                package_path = shutil.make_archive(
                    base_name=os.path.join(package_folder, uid + '.zip'),  # 压缩包路径
                    format='zip',
                    root_dir=upload_folder_path
                )
                # 上传压缩文件
                """
                主机名
                """
                # TODO:拷贝封装好的paramiko模块代码实现上传
                from app01.utils.ab_paramiko import SSHProxy

                with SSHProxy(server_obj.hostname, settings.SSH_PORT, settings.SSH_USER, settings.SSH_PASSWORD) as ssh:
                    remote_folder = os.path.join(settings.SERVER_PACHAGE_PATH, project_name)
                    ssh.command('mkdir -p {0}'.format(remote_folder))
                    ssh.upload(package_path, os.path.join(remote_folder, uid + '.zip'))

            except Exception as e:
                status = 'red'

            server_node = models.Node.objects.filter(text=server_obj.hostname, task_id=task_id,
                                                     server=server_obj).first()
            server_node.status = status
            server_node.save()
            async_to_sync(self.channel_layer.group_send)(task_id, {'type': 'my.send',
                                                                   'message': {'code': 'update',
                                                                               'node_id': server_node.pk,
                                                                               'color': status}})
            # 这里应该用continue 因为是多台服务器 一台不成功不影响其他台
            if status == 'red':
                continue

            # 6.2 发布前的钩子
            # TODO:在远程服务器上指向发布前的钩子脚本内容
            """
            方案1
                1 在本地生成一个脚本文件
                2 把脚本文件上传到远程服务器
                3 执行脚本

            方案2  我们采用的方案2
                1 在上传代码之前 将脚本写入到scripts文件夹
                2 代码与脚本文件一起压缩
                3 解压执行
            """
            if task_obj.before_deploy_script:
                before_deploy_node = models.Node.objects.filter(text='发布前', task_id=task_id, server=server_obj).first()
                before_deploy_node.status = 'green'
                before_deploy_node.save()
                async_to_sync(self.channel_layer.group_send)(task_id, {'type': 'my.send',
                                                                       'message': {'code': 'update',
                                                                                   'node_id': before_deploy_node.pk,
                                                                                   'color': 'green'}})
            # 6.3  发布
            deploy_node = models.Node.objects.filter(text='发布', task_id=task_id, server=server_obj).first()
            deploy_node.status = 'green'
            deploy_node.save()
            async_to_sync(self.channel_layer.group_send)(task_id, {'type': 'my.send',
                                                                   'message': {'code': 'update',
                                                                               'node_id': deploy_node.pk,
                                                                               'color': 'green'}})

            # 6.4  发布后钩子
            # TODO:发布后的脚本钩子执行
            """
            方案1
                1 在本地生成一个脚本文件
                2 把脚本文件上传到远程服务器
                3 执行脚本

            方案2  我们采用的方案2
                1 在上传代码之前 将脚本写入到scripts文件夹
                2 代码与脚本文件一起压缩
                3 解压执行 
            """
            if task_obj.after_deploy_script:
                after_deploy_node = models.Node.objects.filter(text='发布后', task_id=task_id, server=server_obj).first()
                after_deploy_node.status = 'green'
                after_deploy_node.save()
                async_to_sync(self.channel_layer.group_send)(task_id, {'type': 'my.send',
                                                                       'message': {'code': 'update',
                                                                                   'node_id': after_deploy_node.pk,
                                                                                   'color': 'green'}})

    def websocket_connect(self, message):
        self.accept()

        # 获取路由中有名分组的数据
        task_id = self.scope['url_route']['kwargs'].get("task_id")
        # 无名就换成args
        # self.scope['url_route']是一个大字典,里存放了很多前端发送的数据 cookie session等信息

        async_to_sync(self.channel_layer.group_add)(task_id, self.channel_name)
        # 第一个参数是群号 需要我们自己定义
        # self.channel_name 自动给每一个客户端创建唯一标识

        # 为了实现不同的任务之间不干扰的情况 所以我们将任务的id作为群号

        # 判断 数据库是否有对应任务的节点 如果有应该查出返回 让用户无需再次点击按钮初始化
        node_queryset = models.Node.objects.filter(task_id=task_id)
        if node_queryset:
            node_list = convert_object_to_gojs(node_queryset)
            # 这个地方是群发还是单独发送
            # 这个地方是单独发送
            self.send(text_data=json.dumps({'code': 'init', 'data': node_list}))

    def websocket_receive(self, message):
        txt = message.get('text')
        task_id = self.scope['url_route']['kwargs'].get("task_id")
        task_obj = models.DeployTask.objects.filter(pk=task_id).first()
        # 对txt做判断
        if txt == 'init':
            # 先用假数据模拟后端的数据
            # node_list = [
            #     {"key": "start", "text": '开始', "figure": 'Ellipse', "color": "lightgreen"},
            #     {"key": "download", "parent": 'start', "text": '下载代码', "color": "lightgreen", "link_text": '执行中...'},
            #     {"key": "compile", "parent": 'download', "text": '本地编译', "color": "lightgreen"},
            # ]

            # 创建好数据之后 还需要将数据返回给前端 并且需要构造成gojs需要的数据格式列表套字典的格式
            # 1 调用创建节点的方法
            node_object_list = create_node(task_id, task_obj)

            # 2 调用构造返回给gojs的数据格式方法
            node_list = convert_object_to_gojs(node_object_list)
            # 发送给前端
            # self.send(text_data=json.dumps({"code":'init','data':node_list}))

            # 给特定群号里面的所有人发送消息
            # 群发
            async_to_sync(self.channel_layer.group_send)(task_id, {'type': 'my.send',
                                                                   'message': {'code': 'init', 'data': node_list}})
            # 第二个参数字典的键是固定的 就叫type message
            # type指向的是一个方法 这个方法就是用来发送数据的 数据就是message后面的值
            # 如果type后面是my.send 那么你就需要创建my_send方法
            # xxx.ooo    xxx_ooo方法
            # my.send    my_send方法

        if txt == 'deploy':
            # 调用封装好的执行流程方法
            # self.deploy(task_id,task_obj)  # 内部是单线程的 没法实现实时展示 而是等待所有的结果执行完毕一次性返回

            # 通用的解决方式  开线程单独处理  记住此用法即可!!!
            thread = threading.Thread(target=self.deploy, args=(task_id, task_obj))
            thread.start()

    # 会将群聊中所有的对象取出指向下面的方法
    def my_send(self, event):
        # 发送数据
        message = event['message']  # {'code':'init','data':node_list}
        self.send(json.dumps(message))

    def websocket_disconnect(self, message):
        # 用户断开链接之后 需要将用户踢出群聊
        task_id = self.scope['url_route']['kwargs'].get("task_id")
        async_to_sync(self.channel_layer.group_discard)(task_id, self.channel_name)
        raise StopConsumer()

{% extends 'base.html' %}
{% load staticfiles %}

{% block content %}
    <h1>channels发布/部署</h1>
    <div style="margin: 10px">
        <button class="btn btn-primary" onclick="createTable()">初始化图表</button>
        <button class="btn btn-primary" onclick="releaseTask()">发布任务</button>
    </div>
    <table class="table-hover table table-bordered table-striped">
        <tbody>
        <tr>
            <td>项目名称:{{ project_obj.title }}</td>
            <td>环境:{{ project_obj.get_env_display }}</td>
        </tr>
        <tr>
            <td>版本:{{ task_obj.tag }}</td>
            <td>状态:{{ task_obj.get_status_display }}</td>
        </tr>
        <tr>
            <td colspan="2">仓库地址:{{ project_obj.repo }}</td>
        </tr>
        <tr>
            <td colspan="2">线上路径:{{ project_obj.path }}</td>
        </tr>
        <tr>
            <td colspan="2">
                <p>关联服务器</p>
                <ul>
                    {% for server_obj in project_obj.servers.all %}
                        <li>{{ server_obj.hostname }}</li>
                    {% endfor %}

                </ul>
            </td>
        </tr>
        </tbody>
    </table>
    <div id="diagramDiv" style="100%; min-height:450px; background-color: #DAE4E4;"></div>


{% endblock %}



{% block js %}
    <script src="{% static 'go.js' %}"></script>
    <script>
        // 由于操作图表需要使用diagram 发送websocket请求需要使用ws
        // 为了能够让所有的函数都能够使用到 将上述两个变量做成全局变量
        var diagram;
        var ws;
        function initTable() {
            var $ = go.GraphObject.make;
            diagram = $(go.Diagram, "diagramDiv", {
                layout: $(go.TreeLayout, {
                    angle: 0,
                    nodeSpacing: 20,
                    layerSpacing: 70
                })
            });

            // 先创建一个模版
            diagram.nodeTemplate = $(go.Node, "Auto",
                $(go.Shape, {
                    figure: "RoundedRectangle",
                    fill: 'yellow',
                    stroke: 'yellow'
                }, new go.Binding("figure", "figure"), new go.Binding("fill", "color"), new go.Binding("stroke", "color")),
                $(go.TextBlock, {margin: 8}, new go.Binding("text", "text"))
            );

            // 先创建一个模版
            diagram.linkTemplate = $(go.Link,
                {routing: go.Link.Orthogonal},
                $(go.Shape, {stroke: 'yellow'}, new go.Binding('stroke', 'link_color')),
                $(go.Shape, {toArrow: "OpenTriangle", stroke: 'yellow'}, new go.Binding('stroke', 'link_color'))
            );

            // 数据格式是列表套字典 也就意味着可以从后端构造数据
            {#var nodeDataArray = [#}
            {#    {key: "start", text: '开始', figure: 'Ellipse', color: "lightgreen"},#}
            {#    {key: "download", parent: 'start', text: '下载代码', color: "lightgreen", link_text: '执行中...'},#}
            {#    {key: "compile", parent: 'download', text: '本地编译', color: "lightgreen"},#}
            {#    {key: "zip", parent: 'compile', text: '打包', color: "red", link_color: 'red'},#}
            {#    {key: "c1", text: '服务器1', parent: "zip"},#}
            {#    {key: "c11", text: '服务重启', parent: "c1"},#}
            {#    {key: "c2", text: '服务器2', parent: "zip"},#}
            {#    {key: "c21", text: '服务重启', parent: "c2"},#}
            {#    {key: "c3", text: '服务器3', parent: "zip"},#}
            {#    {key: "c31", text: '服务重启', parent: "c3"}#}
            {#];#}
            {#diagram.model = new go.TreeModel(nodeDataArray);#}

            // 动态控制节点颜色变化  先找到节点之后改变

            /*
            var node = diagram.model.findNodeDataForKey("zip");
            diagram.model.setDataProperty(node, "color", "lightgreen");
            */
        }

        function initWebSocket() {
            ws = new WebSocket('ws://127.0.0.1:8000/publish/{{ task_obj.pk }}/');
            // ws.onopen ws.onmessage  ws.close
            // 一旦服务端有消息就会自动触发
            ws.onmessage = function (event) {
                // 对数据进行返序列话操作
                var res = JSON.parse(event.data);
                // 判断code的类型
                if(res.code==='init'){
                    diagram.model = new go.TreeModel(res.data);
                }else if (res.code==='update'){
                    var node = diagram.model.findNodeDataForKey(res.node_id);
                    diagram.model.setDataProperty(node, "color", res.color);
                }

            }
        }

        // 页面加载完毕之后立刻执行上述两个方法 获取两个操作对象
        $(function () {
            initTable();
            initWebSocket()
        });

        // 给初始化图表绑定事件 朝后端获取图表数据
        function createTable() {
            // 基于websocket协议发送数据
            ws.send('init')
        }

        // 给发布任务按钮绑定事件 朝后端发送消息 让后端指向发布流程
        function releaseTask() {
            ws.send('deploy')
        }
    </script>
{% endblock %}

四、总结

客户端第一次访问服务端,客户端会随机生成一个字符串发送给服务端Sec-WebSocket-Key,浏览器也会保存一份,然后服务器解析获取随机字符串与magic string进行拼接,然后通过base64和sha1进行加密,返回给浏览器,浏览器保留的字符串也会经过上面操作之后与后端生成的加密字符串进行比较,如果相同则说明服务端支持websocket,如果不一致则不支持(握手环节)。验证成功之后,就可以数据交互,交互的数据是加密的,所以需要解密,解密:首先读取第二个字节后七位,后七位也称之为payload,根据payload的大小进行不同的操作,如果 =127 再读取后面8个字节作为数据报,如果=126在读取后面2个字节作为数据报,如果<125 则不再往后读取数据,最后会对剩下的数据再次读取4个字节之后,依据masking-key 算出真实的数据结果。

原文地址:https://www.cnblogs.com/randysun/p/15517757.html