MQTT的简单Demo

MQTT是物联网收发数据的一种协议

Elipse Paho是一个开源项目实现了MQTT的Clinent可以方便直接操纵数据的上传和下载

The Eclipse Paho project provides open-source client implementations of MQTT and MQTT-SN messaging protocols aimed at new, existing, and emerging applications for the Internet of Things (IoT).

发送数据:

 1 import org.eclipse.paho.client.mqttv3.MqttClient;
 2 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 3 import org.eclipse.paho.client.mqttv3.MqttException;
 4 import org.eclipse.paho.client.mqttv3.MqttMessage;
 5 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 6 
 7 /**
 8  * 使用同步接口发布一个消息的例子
 9  * @author Administrator
10  * 得到的输出
11 Connecting to broker: tcp://iot.eclipse.org:1883
12 Connected
13 Publishing message: Message from MqttPublishSample
14 Message published
15 Disconnected
16  *
17  */
18 public class MQTT_Client_send {
19 
20     public static void main(String[] args) {
21         // TODO Auto-generated method stub
22 
23         String topic        = "MQTT Examples";
24         String content      = "Message from MqttPublishSample";
25         int qos             = 2;
26         String broker       = "tcp://iot.eclipse.org:1883";
27         String clientId     = "JavaSample";
28         MemoryPersistence persistence = new MemoryPersistence();
29 
30         try {
31             //Create an MqttClient that can be used to communicate with an MQTT server.就是创建client
32             MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
33             //Constructs a new MqttConnectOptions object using the default values.就是创建连接属性
34             MqttConnectOptions connOpts = new MqttConnectOptions();
35             //Sets whether the client and server should remember state across restarts and reconnects.就是说这个连接是无记忆的
36             connOpts.setCleanSession(true);
37             System.out.println("Connecting to broker: "+broker);
38             //使用连接属性,建立连接
39             sampleClient.connect(connOpts);
40             System.out.println("Connected");
41             System.out.println("Publishing message: "+content);
42             //得到一个操作系统默认的编码格式的字节数组,Constructs a message with the specified byte array as a payload, and all other values set to defaults.,就是创建一个mqtt消息
43             MqttMessage message = new MqttMessage(content.getBytes());
44             //Sets the quality of service for this message.Quality of Service 2 - indicates that a message should be delivered once.
45             message.setQos(qos);
46             //调用发布方法,发布一个主题和一个消息
47             sampleClient.publish(topic, message);
48             System.out.println("Message published");
49             //断开连接
50             sampleClient.disconnect();
51             System.out.println("Disconnected");
52             System.exit(0);
53         } catch(MqttException me) {
54             System.out.println("reason "+me.getReasonCode());
55             System.out.println("msg "+me.getMessage());
56             System.out.println("loc "+me.getLocalizedMessage());
57             System.out.println("cause "+me.getCause());
58             System.out.println("excep "+me);
59             me.printStackTrace();
60         }
61     }
62     
63     
64 
65 }

接收数据:

  1 import java.io.BufferedWriter;
  2 import java.io.FileWriter;
  3 import java.io.IOException;
  4 import java.util.ArrayList;
  5 import java.util.List;
  6 
  7 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
  8 import org.eclipse.paho.client.mqttv3.MqttCallback;
  9 import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
 10 import org.eclipse.paho.client.mqttv3.MqttClient;
 11 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 12 import org.eclipse.paho.client.mqttv3.MqttException;
 13 import org.eclipse.paho.client.mqttv3.MqttMessage;
 14 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 15 public class MQTT_Client_Receive {
 16     
 17     public static void Write2Files(String filePath,String text) throws IOException {
 18         FileWriter fileWriter=new FileWriter(filePath,true);
 19         fileWriter.write("");
 20         BufferedWriter bufferedWriter=new BufferedWriter(fileWriter);
 21         bufferedWriter.append(text+"
");
 22         bufferedWriter.close();
 23         fileWriter.close();
 24     }
 25     
 26     public static void Mqtt_Client_2_File(String Topic,String Path) throws MqttException, InterruptedException {
 27 //        String broker="tcp://iotdevrd.chinacloudapp.cn:1889";
 28 //        String clientID="5036cf062ade4a28bf74726e5bff895a";
 29 //        int qos = 2;
 30 //        String topic=Topic;
 31 //        String userName="121";
 32 //        String passWord="121";
 33         //内网测试数据
 34         String broker="tcp://3.1.2.244:1889";
 35         String clientID="5036cf062ade4a28bf74726e5bff895a";
 36         int qos = 2;
 37         String topic=Topic;
 38         String userName="121";
 39         String passWord="121";
 40         //内网决赛数据地址
 41 //        String broker="tcp://3.1.2.244:1889";
 42 //        String clientID="5036cf062ade4a28bf74726e5bff895a";
 43 //        int qos = 2;
 44 //        String topic=Topic;
 45 //        String userName="121";
 46 //        String passWord="121";
 47         
 48         MemoryPersistence persistence = new MemoryPersistence();
 49         
 50         MqttClient myClient=new MqttClient(broker, clientID, persistence);
 51         MqttConnectOptions connectOptions=new MqttConnectOptions();
 52         connectOptions.setUserName(userName);
 53         connectOptions.setPassword(passWord.toCharArray());
 54         connectOptions.setCleanSession(true);
 55         
 56         myClient.connect(connectOptions);
 57         MqttCallback callback=new MqttCallbackExtended() {
 58             @Override
 59             public void messageArrived(String arg0, MqttMessage arg1) throws Exception {
 60                 //String filePath="C:\eclipse-workspace\Projects\IOTProjectPrepare\src\123.txt";
 61                 String filePath=Path;
 62                 Write2Files(filePath, arg1.toString());
 63                 System.out.println(new String(arg1.toString()));
 64             }
 65             @Override
 66             public void deliveryComplete(IMqttDeliveryToken arg0) {
 67             }
 68             @Override
 69             public void connectionLost(Throwable arg0) {
 70             }
 71             @Override
 72             public void connectComplete(boolean arg0, String arg1) {
 73             }
 74         };
 75         myClient.setCallback(callback);
 76         myClient.subscribe(topic, qos);
 77         //监听10秒钟最后断开连接
 78         Thread.sleep(20000);
 79         if(myClient.isConnected())
 80             myClient.disconnect();
 81         //myClient.disconnect();
 82         //System.exit(0);
 83         
 84     }
 85 
 86     
 87     public static void Topics2Files(int index, String test_final,long time) throws MqttException, InterruptedException {
 88         
 89         //循环接收的逻辑
 90 //        List<String> topicList=new ArrayList<>();
 91 ////        topicList.add("GPSLocation/test1/1");
 92 ////        topicList.add("GPSLocation/test1/2");
 93 ////        topicList.add("GPSLocation/test2/1");
 94 ////        topicList.add("GPSLocation/test2/2");
 95 //        topicList.add("GPSLocation/test3/1");
 96 //        topicList.add("GPSLocation/test3/2");
 97 ////        topicList.add("GPSLocation/test4/1");
 98 ////        topicList.add("GPSLocation/test4/2");
 99 //        topicList.add("GPSLocation/test6/1");
100 //        topicList.add("GPSLocation/test6/2");
101 //        for (int i = 0; i < topicList.size(); ++i) {
102 //            String array_element = topicList.get(i);
103 //            String Path="D:\"+(index)+".json";
104 //            System.out.println(Path);
105 //            Mqtt_Client_2_File(array_element, Path);
106 //        }
107         //正式比赛一个一个接收,使用下面的逻辑
108         String topic="";
109         if(test_final.equals("test")) {
110             topic="GPSLocation/test"+index+"/2";
111         }else if(test_final.equals("final")) {
112             topic="GPULocation/"+index;
113         }
114         System.out.println("当前订阅路径是:"+topic);
115         String path="D:\"+(index)+"_"+time+".json";
116         System.out.println("当前订阅的保存路径是:"+path);
117         Mqtt_Client_2_File(topic, path);
118         
119     }
120     public static void main(String[] args) throws MqttException, InterruptedException  {
121         //Topics2Files();
122         long time=System.currentTimeMillis()/1000;
123         Topics2Files(7, "test",time);
124 
125     }
126 
127 }

万事走心 精益求美


原文地址:https://www.cnblogs.com/kongchung/p/9914134.html