将你的MQTT集群封装成helm chart

背景

MQTT已然成为物联网的基础架构,在智慧园区、智慧城市、智慧楼宇、智慧交通等领域起着重要的作用,大规模的应用也暴露出了MQTTv3.1.1的一些缺陷,诸如session不会过期,只能等待客户端连接时设置clean session,如果客户端最后离线后,未进行上述操作,那么他的session将会保留在服务端;还有服务端主动断开客户端连接时,不能告知客户端原因,这时客户端无法知悉具体的断开原因,从而无法针对不同的事件进行有差别的处理;再有服务端不限制客户端发布的Qos1、2的数量,这时某个客户端可以发送大量的Qos1、2的消息来使得服务端的内存暴增。类似的缺陷和限制使得人们将目光转向新版本的MQTT协议,即MQTT5.0。

MQTT5.0协议发布以来,已经有非常多的实现。然而他们中的大多数都未提供面向k8s的部署方式,这使得在云原生环境下人们不得不自己来封装所使用的MQTT broker。本文旨在帮助读者以更快的速度来构建他们自己的MQTT集群。

k8s环境

在开始我们的实验之前,我们需要一个k8s,如果您还没有使用过k8s,我推荐您使用各个云服务提供商提供的在线k8s服务,如果您希望在本地进行调试,我推荐您使用kubesphere来一键安装。下面我们假设您的k8s已经搭建完成。

gmqtt 和 SailMQ

这里我们选择基于gmqtt开发的SailMQ作为我们的MQTT实现,gmqtt是用Go语言实现的一个具备灵活扩展能力,高性能的MQTT broker,其完整实现了MQTT V3.1.1和V5协议。笔者在此基础之上开发了SailMQ,SailMQ提供了面向智慧园区、智慧楼宇等领域的系统模型,定义了包括设备、设备管理员、App、App管理员和主题通道等慨念,实现了特有的访问控制子系统和规则引擎子系统等。

封装我们的helm chart

首先,我们需要封装配置文件为configMap,如下:

kind: ConfigMap
apiVersion: v1
metadata:
  name: {{ include "sailmq.fullname" . }}
data:
  sailmq.yml: >+
    activation_code:
    {{ .Values.activationCode }}

    email: {{ .Values.email }} 

    phone: {{ .Values.phone }}


    postgresql: {{ .Values.postgresql }}

    redis:
      address: {{ .Values.redis.address }}
      password: {{ .Values.redis.password }}
      db: {{ .Values.redis.db }}


    # Path to pid file.

    # If not set, there will be no pid file.

    # pid_file: /var/run/gmqttd.pid


    listeners:
      # bind address
      - address: ":1883"
    #    tls:

    #      cacert: "path_to_ca_cert_file"

    #      cert: "path_to_cert_file"

    #      key: "path_to_key_file"

      - address: ":8883"
        # websocket setting
        websocket:
          path: "/"

    api:
      grpc:
        # The gRPC server listen address. Supports unix socket and tcp socket.
        - address: "tcp://127.0.0.1:8084"
        #- address: "unix:///var/run/gmqttd.sock"
    #      tls:

    #        cacert: "path_to_ca_cert_file"

    #        cert: "path_to_cert_file"

    #        key: "path_to_key_file"
      http:
          # The HTTP server listen address. This is a reverse-proxy server in front of gRPC server.
        - address: "tcp://0.0.0.0:8083"
          map: "tcp://127.0.0.1:8084" # The backend gRPC server endpoint,
    #      tls:

    #        cacert: "path_to_ca_cert_file"

    #        cert: "path_to_cert_file"

    #        key: "path_to_key_file"


    mqtt:
      session_expiry: 2h
      session_expiry_check_timer: 20s
      message_expiry: 2h
      max_packet_size: 268435456
      server_receive_maximum: 100
      max_keepalive: 60
      topic_alias_maximum: 10
      subscription_identifier_available: true
      wildcard_subscription_available: true
      shared_subscription_available: true
      maximum_qos: 2
      retain_available: true
      max_queued_messages: 1000
      max_inflight: 100
      queue_qos0_messages: true
      delivery_mode: onlyonce # overlap or onlyonce
      allow_zero_length_clientid: true

    persistence:
      type: memory  # memory | redis
      # The redis configuration only take effect when type == redis.
      redis:
        # redis server address
        addr: "127.0.0.1:6379"
        # the maximum number of idle connections in the redis connection pool.
        max_idle: 1000
        # the maximum number of connections allocated by the redis connection pool at a given time.
        # If zero, there is no limit on the number of connections in the pool.
        max_active: 0
        # the connection idle timeout, connection will be closed after remaining idle for this duration. If the value is zero, then idle connections are not closed.
        idle_timeout: 240s
        password: ""
        # the number of the redis database.
        database: 0

    # The topic alias manager setting. The topic alias feature is introduced by
    # MQTT V5.

    # This setting is used to control how the broker manage topic alias.

    topic_alias_manager:
      # Currently, only FIFO strategy is supported.
      type: fifo

    plugins:
      prometheus:
        path: "/metrics"
        listen_address: ":8082"
      auth:
        # Password hash type. (plain | md5 | sha256 | bcrypt)
        # Default to MD5.
        hash: md5
        # The file to store password. If it is a relative path, it locates in the same directory as the config file.
        # (e.g: ./gmqtt_password => /etc/gmqtt/gmqtt_password.yml)
        # Defaults to ./gmqtt_password.yml
        # password_file:
      federation:
        # node_name is the unique identifier for the node in the federation. Defaults to hostname.
        # node_name:
        # fed_addr is the gRPC server listening address for the federation internal communication. Defaults to :8901
        fed_addr: :8901
        # advertise_fed_addr is used to change the federation gRPC server address that we advertise to other nodes in the cluster.
        # Defaults to "fed_addr".However, in some cases, there may be a routable address that cannot be bound.
        # If the port is missing, the default federation port (8901) will be used.
        advertise_fed_addr: :8901
        # gossip_addr is the address that the gossip will listen on, It is used for both UDP and TCP gossip. Defaults to :8902
        gossip_addr: :8902
        # advertise_gossip_addr is used to change the gossip server address that we advertise to other nodes in the cluster.
        # Defaults to "GossipAddr" or the private IP address of the node if the IP in "GossipAddr" is 0.0.0.0.
        # If the port is missing, the default gossip port (8902) will be used.
        advertise_gossip_addr: :8902

        # retry_join is the address of other nodes to join upon starting up.
        # If port is missing, the default gossip port (8902) will be used.
        #retry_join:
        #  - 127.0.0.1:8902

        # rejoin_after_leave will be pass to "RejoinAfterLeave" in serf configuration.
        # It controls our interaction with the snapshot file.
        # When set to false (default), a leave causes a Serf to not rejoin the cluster until an explicit join is received.
        # If this is set to true, we ignore the leave, and rejoin the cluster on start.
        rejoin_after_leave: true
        # snapshot_path will be pass to "SnapshotPath" in serf configuration.
        # When Serf is started with a snapshot,it will attempt to join all the previously known nodes until one
        # succeeds and will also avoid replaying old user events.
        snapshot_path:

    # plugin loading orders

    plugin_order:
      # Uncomment auth to enable authentication.
      # - auth
      # - prometheus
      - admin
      - federation
    log:
      level: info # debug | info | warn | error
      format: text # json | text
      # whether to dump MQTT packet in debug level
      dump_packet: false



这里有一点需要特别注意,就是plugins -> federation -> retry_join这个配置,这是用于集群节点间互联的关键配置,我们会在StatefulSet中设置环境变量SAILMQ_FEDERATION_SERVERS,然后用下面的脚本将配置合并起来,脚本如下:

#!/usr/bin/env sh

set -eux
set -o pipefail

sailmq mergeconfig -o sailmq2.yml

sailmq start -c sailmq2.yml

然后需要配置service和headless service, 其中service 对外提供服务,而headless service用于集群间节点通信,配置如下:

apiVersion: v1
kind: Service
metadata:
  name: {{ include "sailmq.fullname" . }}
  labels:
    {{- include "sailmq.labels" . | nindent 4 }}
spec:
  type: {{ .Values.service.type }}
  ports:
    - port: 1883
      targetPort: 1883
      protocol: TCP
      name: mqtt
    - port: 8883
      targetPort: 8883
      protocol: TCP
      name: ws
    - port: 8083
      targetPort: 8083
      protocol: TCP
      name: api
    - port: 8082
      targetPort: 8082
      protocol: TCP
      name: metric
  selector:
    {{- include "sailmq.selectorLabels" . | nindent 4 }}

---

apiVersion: v1
kind: Service
metadata:
  name: {{ include "sailmq.fullname" . }}-headless
  labels:
    {{- include "sailmq.labels" . | nindent 4 }}
spec:
  type: ClusterIP
  clusterIP: None
  ports:
    - port: 1883
      targetPort: 1883
      protocol: TCP
      name: mqtt
    - port: 8883
      targetPort: 8883
      protocol: TCP
      name: ws
    - port: 8083
      targetPort: 8083
      protocol: TCP
      name: api
    - port: 8082
      targetPort: 8082
      protocol: TCP
      name: metric
    - port: 8901
      targetPort: 8901
      protocol: TCP
      name: fed
    - port: 8902
      targetPort: 8902
      protocol: TCP
      name: gossip
  selector:
    {{- include "sailmq.selectorLabels" . | nindent 4 }}

最后配置StatefulSet,如下:

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: {{ include "sailmq.fullname" . }}
  labels:
    {{- include "sailmq.labels" . | nindent 4 }}
spec:
  replicas: {{ .Values.replicaCount }}
  serviceName: {{ include "sailmq.fullname" . }}-headless 
  updateStrategy:
    type: RollingUpdate
  selector:
    matchLabels:
      {{- include "sailmq.selectorLabels" . | nindent 6 }}
  template:
    metadata:
    {{- with .Values.podAnnotations }}
      annotations:
        {{- toYaml . | nindent 8 }}
    {{- end }}
      labels:
        {{- include "sailmq.selectorLabels" . | nindent 8 }}
    spec:
      {{- with .Values.imagePullSecrets }}
      imagePullSecrets:
        {{- toYaml . | nindent 8 }}
      {{- end }}
      serviceAccountName: {{ include "sailmq.serviceAccountName" . }}
      securityContext:
        {{- toYaml .Values.podSecurityContext | nindent 8 }}
      containers:
        - name: {{ .Chart.Name }}
          securityContext:
            {{- toYaml .Values.securityContext | nindent 12 }}
          image: "hevienz/sailmq:latest"
          imagePullPolicy: {{ .Values.image.pullPolicy }}
          command: ["run.sh"]
          env:
            - name: SAILMQ_FEDERATION_SERVERS
              {{- $replicaCount := int .Values.replicaCount }}
              {{- $sailmqFullname := include "sailmq.fullname" . }}
              {{- $sailmqHeadlessServiceName := printf "%s-%s" $sailmqFullname "headless" | trunc 63  }}
              value: {{ range $i, $e := until $replicaCount }}{{ $sailmqFullname }}-{{ $e }}.{{ $sailmqHeadlessServiceName }}:8902 {{ end }}
          ports:
            - name: mqtt
              containerPort: 1883
              protocol: TCP
            - name: ws
              containerPort: 8883
              protocol: TCP
            - name: api
              containerPort: 8083
              protocol: TCP
            - name: metric
              containerPort: 8082
              protocol: TCP
            - name: fed
              containerPort: 8901
              protocol: TCP
            - name: gossip
              containerPort: 8902
              protocol: TCP
          volumeMounts:
          - name: sailmqv
            mountPath: /etc/sailmq
            readOnly: true
          livenessProbe:
            tcpSocket:
              port: 1883
          readinessProbe:
            tcpSocket:
              port: 1883
          resources:
            {{- toYaml .Values.resources | nindent 12 }}
      volumes:
        - name: sailmqv
          configMap:
            name: {{ include "sailmq.fullname" . }}
      {{- with .Values.nodeSelector }}
      nodeSelector:
        {{- toYaml . | nindent 8 }}
      {{- end }}
      {{- with .Values.affinity }}
      affinity:
        {{- toYaml . | nindent 8 }}
      {{- end }}
      {{- with .Values.tolerations }}
      tolerations:
        {{- toYaml . | nindent 8 }}
      {{- end }}

请特别注意上文中提到的环境变量 SAILMQ_FEDERATION_SERVERS,她会生成基于headless service的集群节点列表,这里特别将其提取出来:

          env:
            - name: SAILMQ_FEDERATION_SERVERS
              {{- $replicaCount := int .Values.replicaCount }}
              {{- $sailmqFullname := include "sailmq.fullname" . }}
              {{- $sailmqHeadlessServiceName := printf "%s-%s" $sailmqFullname "headless" | trunc 63  }}
              value: {{ range $i, $e := until $replicaCount }}{{ $sailmqFullname }}-{{ $e }}.{{ $sailmqHeadlessServiceName }}:8902 {{ end }}

总结

将MQTT集群封装成helm chart,极大地提高了运维人员的效率,是应用部署的推荐方式。SailMQ是一个MQTT服务端的实现,基于gmqtt提供的全面的MQTT v3.1.1和MQTT v5支持,为用户提供面向未来的智慧园区基础设施。

作者:Hevienz
出处:http://www.cnblogs.com/hymenz/
知识共享许可协议
本博客原创作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。
原文地址:https://www.cnblogs.com/hymenz/p/14686603.html