ASP.NET Core 实现 MQTT通讯协议 Demo(开源库:MQTTnet)

1、什么是MQTT

  MQTT(message queuing telemetry transport)是IBM开发的即时通讯协议,是一种发布/订阅极其轻量级的消息传输协议,专门为网络受限设备、低宽带以及高延迟和不可靠的网络而设计的。由于以上轻量级的特点,是实现智能家居的首选传输协议,相比于XMPP,更加轻量级而且占用宽带低。简单来说HQTT是一种通信协议,要实现发布/订阅就必须遵循这个协议。

2、实现MQTT通讯协议.NET开源库有哪些?

  MQTTnet、MqttDotNet、nMQTT、M2MQTT等,这里我们使用MQTTnet(但MQTTnet搜到的教程基本都是2.7及以下版本的,我们使用的是3.0.9版本)

3、展示MQTT实现效果图

  

  例:客户端1只要订阅了positon主题,客户端2、客户端3、客户端4.....同样订阅了position主题则他们之间就能共享position主题的所发的内容了

  如果客户端1订阅了position主题,客户端2订阅了beautiful主题,1发给消息2是收不到的。

4、创建.NETCore项目(Server和Client)

  

5、服务器

  添加Nuget包:安装MQTTnet

  

class Program
    {
        public static IMqttServer mqttServer;
        static void Main(string[] args)
        {
            StartMqttServer();
        }

        //启动Mqtt服务器
        private static async void StartMqttServer()
        {
            try
            {
                //验证客户端信息
                var options = new MqttServerOptions
                {
                    //连接验证
                    ConnectionValidator = new MqttServerConnectionValidatorDelegate(p =>
                    {
                        if (p.ClientId == "SpecialClient")
                        {
                            if (p.Username != "USER" || p.Password != "PASS")
                            {
                                p.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
                            }
                        }
                    })
                };

                //设置端口号
                options.DefaultEndpointOptions.Port = 8031;

                //创建Mqtt服务器
                mqttServer = new MqttFactory().CreateMqttServer();

                //开启订阅事件
                mqttServer.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(MqttNetServer_SubscribedTopic);

                //取消订阅事件
                mqttServer.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(MqttNetServer_UnSubscribedTopic);

                //客户端消息事件
                mqttServer.UseApplicationMessageReceivedHandler(MqttServe_ApplicationMessageReceived);

                //客户端连接事件
                mqttServer.UseClientConnectedHandler(MqttNetServer_ClientConnected);

                //客户端断开事件
                mqttServer.UseClientDisconnectedHandler(MqttNetServer_ClientDisConnected);

                //启动服务器
                await mqttServer.StartAsync(options);

                Console.WriteLine("服务器启动成功!输入任意内容并回车停止服务!");
                Console.ReadLine();

                await mqttServer.StopAsync();
            }
            catch (Exception e)
            {
                Console.Write($"服务器启动失败 Msg:{e}");
            }

        }

        /// <summary>
        /// 客户订阅
        /// </summary>
        private static void MqttNetServer_SubscribedTopic(MqttServerClientSubscribedTopicEventArgs e)
        {
            //客户端Id
            var ClientId = e.ClientId;
            var Topic = e.TopicFilter.Topic;
            Console.WriteLine($"客户端[{ClientId}]已订阅主题:{Topic}");
        }

        /// <summary>
        /// 客户取消订阅
        /// </summary>
        private static void MqttNetServer_UnSubscribedTopic(MqttServerClientUnsubscribedTopicEventArgs e)
        {
            //客户端Id
            var ClientId = e.ClientId;
            var Topic = e.TopicFilter;
            Console.WriteLine($"客户端[{ClientId}]已取消订阅主题:{Topic}");
        }

        /// <summary>
        /// 接收消息
        /// </summary>
        private static void MqttServe_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
        {
            var ClientId = e.ClientId;
            var Topic = e.ApplicationMessage.Topic;
            var Payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
            var Qos = e.ApplicationMessage.QualityOfServiceLevel;
            var Retain = e.ApplicationMessage.Retain;
            Console.WriteLine($"客户端[{ClientId}]>> 主题:[{Topic}] 负载:[{Payload}] Qos:[{Qos}] 保留:[{Retain}]");
        }

        /// <summary>
        /// 客户连接
        /// </summary>
        private static void MqttNetServer_ClientConnected(MqttServerClientConnectedEventArgs e)
        {
            var ClientId = e.ClientId;
            Console.WriteLine($"客户端[{ClientId}]已连接");
        }

        /// <summary>
        /// 客户连接断开
        /// </summary>
        private static void MqttNetServer_ClientDisConnected(MqttServerClientDisconnectedEventArgs e)
        {
            var ClientId = e.ClientId;
            Console.WriteLine($"客户端[{ClientId}]已断开连接");
        }
    }

6、客户端

  也要添加Nuget包:安装MQTTnet

 public static IMqttClient mqttClient;

        static void Main(string[] args)
        {
            ConnectMqttServerAsync();
            ImportData();
        }
        private static async void ConnectMqttServerAsync()
        {
            try
            {

                var factory = new MqttFactory();

                mqttClient = factory.CreateMqttClient();

                var options = new MqttClientOptionsBuilder()
                    .WithTcpServer("127.0.0.1", 8031)
                    .WithCredentials("test", "test")
                    .WithClientId(Guid.NewGuid().ToString().Substring(0, 5))
                    .Build();

                //消息
                mqttClient.UseApplicationMessageReceivedHandler(e =>
                {
                    Console.WriteLine("### 收到的信息 ###");
                    Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}");//主题
                    Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");//页面信息
                    Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}");//消息等级
                    Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}");//是否保留
                    Console.WriteLine();
                });

                //重连机制
                mqttClient.UseDisconnectedHandler(async e =>
                {
                    Console.WriteLine("与服务器断开连接!");
                    await Task.Delay(TimeSpan.FromSeconds(5));
                    try
                    {
                        await mqttClient.ConnectAsync(options);
                    }
                    catch (Exception exp)
                    {
                        Console.Write($"重新连接服务器失败 Msg:{exp}");
                    }
                });

               await mqttClient.ConnectAsync(options);

               Console.Write("连接服务器成功!输入任意内容并回车进入菜单页面!");
            }
            catch (Exception exp)
            {
                Console.Write($"连接服务器失败 Msg:{exp}");
            }
        }

        private static void ImportData()
        {
            Console.ReadLine();
            bool isExit = false;
            while (!isExit)
            {
                Console.WriteLine(@"请输入
                    1.订阅主题
                    2.取消订阅
                    3.发送消息
                    4.退出");
                var input = Console.ReadLine();

                switch (input)
                {
                    case "1":
                        Console.WriteLine(@"请输入主题名称:");
                        var topicName = Console.ReadLine();
                        Subscribe(topicName);
                        break;
                    case "2":
                        Console.WriteLine(@"请输入需要取消订阅主题名称:");
                        topicName = Console.ReadLine();
                        Unsubscribe(topicName);
                        break;
                    case "3":
                        Console.WriteLine("请输入需要发送的主题名称");
                        topicName = Console.ReadLine();
                        Console.WriteLine("请输入需要发送的消息");
                        var message = Console.ReadLine();
                        Publish(topicName, message);
                        break;
                    case "4":
                        isExit = true;
                        break;
                    default:
                        Console.WriteLine("请输入正确指令!");
                        break;
                }
            }
        }

        /// <summary>
        /// 订阅
        /// </summary>
        /// <param name="topicName"></param>
        private static async void Subscribe(string topicName)
        {
            string topic = topicName.Trim();
            if (string.IsNullOrEmpty(topic))
            {
                Console.Write("订阅主题不能为空!");
                return;
            }

            if (!mqttClient.IsConnected)
            {
                Console.Write("MQTT客户端尚未连接!");
                return;
            }
            await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(topic).Build());
        }

        /// <summary>
        /// 取消订阅
        /// </summary>
        /// <param name="topicName"></param>
        private static async void Unsubscribe(string topicName)
        {
            string topic = topicName.Trim();
            if (string.IsNullOrEmpty(topic))
            {
                Console.Write("订阅主题不能为空!");
                return;
            }

            if (!mqttClient.IsConnected)
            {
                Console.Write("MQTT客户端尚未连接!");
                return;
            }
            await mqttClient.UnsubscribeAsync(topic);
        }

        /// <summary>
        /// 发送消息
        /// </summary>
        /// <param name="message"></param>
        private static async void Publish(string topicName, string message)
        {
            string topic = topicName.Trim();
            string msg = message.Trim();

            if (string.IsNullOrEmpty(topic))
            {
                Console.Write("主题不能为空!");
                return;
            }
            if (!mqttClient.IsConnected)
            {
                Console.Write("MQTT客户端尚未连接!");
                return;
            }

            var MessageBuilder = new MqttApplicationMessageBuilder()
                .WithTopic(topic)
                .WithPayload(msg)
                .WithExactlyOnceQoS()
                .WithRetainFlag()
                .Build();

            await mqttClient.PublishAsync(MessageBuilder);

        }

  源代码:

  链接:https://pan.baidu.com/s/1rxTBZHHAmkDVcO6XmJPXng
  提取码:05qr
  后续会陆续更新其他资料,喜欢请关注哦!

原文地址:https://www.cnblogs.com/duhaoran/p/12718305.html