mqttnet3.0用法

.net常用的mqtt类库有2个,m2mqtt和mqttnet两个类库

当然了,这两个库的教程网上一搜一大把

但mqttnet搜到的教程全是2.7及以下版本的,但3.0版语法却不再兼容,升级版本会导致很多问题,今天进行了成功的升级,现记录下来

参考文档地址:https://github.com/chkr1011/MQTTnet/wiki/Client

上代码:

  1 ///开源库地址:https://github.com/chkr1011/MQTTnet
  2 ///对应文档:https://github.com/chkr1011/MQTTnet/wiki/Client
  3 
  4 using MQTTnet;
  5 using MQTTnet.Client;
  6 using MQTTnet.Client.Options;
  7 using System;
  8 using System.Text;
  9 using System.Threading;
 10 using System.Threading.Tasks;
 11 using System.Windows.Forms;
 12 
 13 namespace MqttServerTest
 14 {
 15     public partial class mqtt测试工具 : Form
 16     {
 17         private IMqttClient mqttClient = null;
 18         private bool isReconnect = true;
 19 
 20         public mqtt测试工具()
 21         {
 22             InitializeComponent();
 23         }
 24 
 25         private void Form1_Load(object sender, EventArgs e)
 26         {
 27 
 28         }
 29 
 30         private async void BtnPublish_Click(object sender, EventArgs e)
 31         {
 32             await Publish();
 33         }
 34 
 35         private async void BtnSubscribe_ClickAsync(object sender, EventArgs e)
 36         {
 37             await Subscribe();
 38         }
 39 
 40         private async Task Publish()
 41         {
 42             string topic = txtPubTopic.Text.Trim();
 43 
 44             if (string.IsNullOrEmpty(topic))
 45             {
 46                 MessageBox.Show("发布主题不能为空!");
 47                 return;
 48             }
 49 
 50             string inputString = txtSendMessage.Text.Trim();
 51             try
 52             {
 53 
 54                 var message = new MqttApplicationMessageBuilder()
 55         .WithTopic(topic)
 56         .WithPayload(inputString)
 57         .WithExactlyOnceQoS()
 58         .WithRetainFlag()
 59         .Build();
 60 
 61                 await mqttClient.PublishAsync(message);
 62             }
 63             catch (Exception ex)
 64             {
 65 
 66                 Invoke((new Action(() =>
 67                 {
 68                     txtReceiveMessage.AppendText($"发布主题失败!" + Environment.NewLine + ex.Message + Environment.NewLine);
 69                 })));
 70             }
 71 
 72 
 73 
 74 
 75         }
 76 
 77         private async Task Subscribe()
 78         {
 79             string topic = txtSubTopic.Text.Trim();
 80 
 81             if (string.IsNullOrEmpty(topic))
 82             {
 83                 MessageBox.Show("订阅主题不能为空!");
 84                 return;
 85             }
 86 
 87             if (!mqttClient.IsConnected)
 88             {
 89                 MessageBox.Show("MQTT客户端尚未连接!");
 90                 return;
 91             }
 92 
 93             // Subscribe to a topic
 94             await mqttClient.SubscribeAsync(new TopicFilterBuilder()
 95                 .WithTopic(topic)
 96                 .WithAtMostOnceQoS()
 97                 .Build()
 98                 );
 99             Invoke((new Action(() =>
100             {
101                 txtReceiveMessage.AppendText($"已订阅[{topic}]主题{Environment.NewLine}");
102             })));
103 
104         }
105 
106         private async Task ConnectMqttServerAsync()
107         {
108             // Create a new MQTT client.
109 
110             if (mqttClient == null)
111             {
112                 try
113                 {
114                     var factory = new MqttFactory();
115                     mqttClient = factory.CreateMqttClient();
116 
117                     var options = new MqttClientOptionsBuilder()
118                         .WithTcpServer(txtIp.Text, Convert.ToInt32(txtPort.Text)).WithCredentials(txtUsername.Text, txtPsw.Text).WithClientId(txtClientId.Text) // Port is optional
119                         .Build();
120 
121 
122                     await mqttClient.ConnectAsync(options, CancellationToken.None);
123                     Invoke((new Action(() =>
124                     {
125                         txtReceiveMessage.AppendText($"连接到MQTT服务器成功!" + txtIp.Text);
126                     })));
127                     mqttClient.UseApplicationMessageReceivedHandler(e =>
128                     {
129 
130                         Invoke((new Action(() =>
131                         {
132                             txtReceiveMessage.AppendText($"收到订阅消息!" + Encoding.UTF8.GetString(e.ApplicationMessage.Payload));
133                         })));
134                    
135                     });
136                 }
137                 catch (Exception ex)
138                 {
139 
140                     Invoke((new Action(() =>
141                     {
142                         txtReceiveMessage.AppendText($"连接到MQTT服务器失败!" + Environment.NewLine + ex.Message + Environment.NewLine);
143                     })));
144                 }
145             }
146         }
147 
148         private void MqttClient_Connected(object sender, EventArgs e)
149         {
150             Invoke((new Action(() =>
151             {
152                 txtReceiveMessage.Clear();
153                 txtReceiveMessage.AppendText("已连接到MQTT服务器!" + Environment.NewLine);
154             })));
155         }
156 
157         private void MqttClient_Disconnected(object sender, EventArgs e)
158         {
159             Invoke((new Action(() =>
160             {
161                 txtReceiveMessage.Clear();
162                 DateTime curTime = new DateTime();
163                 curTime = DateTime.UtcNow;
164                 txtReceiveMessage.AppendText($">> [{curTime.ToLongTimeString()}]");
165                 txtReceiveMessage.AppendText("已断开MQTT连接!" + Environment.NewLine);
166             })));
167 
168             //Reconnecting
169             if (isReconnect)
170             {
171                 Invoke((new Action(() =>
172                 {
173                     txtReceiveMessage.AppendText("正在尝试重新连接" + Environment.NewLine);
174                 })));
175 
176                 var options = new MqttClientOptionsBuilder()
177                     .WithClientId(txtClientId.Text)
178                     .WithTcpServer(txtIp.Text, Convert.ToInt32(txtPort.Text))
179                     .WithCredentials(txtUsername.Text, txtPsw.Text)
180                     //.WithTls()
181                     .WithCleanSession()
182                     .Build();
183                 Invoke((new Action(async () =>
184                 {
185                     await Task.Delay(TimeSpan.FromSeconds(5));
186                     try
187                     {
188                         await mqttClient.ConnectAsync(options);
189                     }
190                     catch
191                     {
192                         txtReceiveMessage.AppendText("### RECONNECTING FAILED ###" + Environment.NewLine);
193                     }
194                 })));
195             }
196             else
197             {
198                 Invoke((new Action(() =>
199                 {
200                     txtReceiveMessage.AppendText("已下线!" + Environment.NewLine);
201                 })));
202             }
203         }
204 
205         private void MqttClient_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
206         {
207             Invoke((new Action(() =>
208             {
209                 txtReceiveMessage.AppendText($">> {"### RECEIVED APPLICATION MESSAGE ###"}{Environment.NewLine}");
210             })));
211             Invoke((new Action(() =>
212             {
213                 txtReceiveMessage.AppendText($">> Topic = {e.ApplicationMessage.Topic}{Environment.NewLine}");
214             })));
215             Invoke((new Action(() =>
216             {
217                 txtReceiveMessage.AppendText($">> Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}{Environment.NewLine}");
218             })));
219             Invoke((new Action(() =>
220             {
221                 txtReceiveMessage.AppendText($">> QoS = {e.ApplicationMessage.QualityOfServiceLevel}{Environment.NewLine}");
222             })));
223             Invoke((new Action(() =>
224             {
225                 txtReceiveMessage.AppendText($">> Retain = {e.ApplicationMessage.Retain}{Environment.NewLine}");
226             })));
227         }
228 
229         private void btnLogIn_Click(object sender, EventArgs e)
230         {
231             isReconnect = true;
232             Task.Run(async () => { await ConnectMqttServerAsync(); });
233         }
234 
235         private void btnLogout_Click(object sender, EventArgs e)
236         {
237             isReconnect = false;
238             Task.Run(async () => { await mqttClient.DisconnectAsync(); });
239         }
240 
241     }
242 }
原文地址:https://www.cnblogs.com/bjjjunjie/p/mqtt.html