MQTT的学习研究(十三) IBM MQTTV3 简单发布订阅实例

使用IBM MQTTv3实现相关的发布订阅功能

MQTTv3的发布消息的实现:

Java代码  收藏代码
  1. package com.etrip.mqttv3;  
  2.   
  3. import com.ibm.micro.client.mqttv3.MqttClient;  
  4. import com.ibm.micro.client.mqttv3.MqttDeliveryToken;  
  5. import com.ibm.micro.client.mqttv3.MqttMessage;  
  6. import com.ibm.micro.client.mqttv3.MqttTopic;  
  7. /** 
  8.  * MQTTV3的发布消息类 
  9.  *  
  10.  * @author longgangbai 
  11.  */  
  12. public class MQTTPub {   
  13.     public static void doTest(){   
  14.         try {   
  15.             MqttClient client = new MqttClient("tcp://192.168.208.46:1883","mqttserver-pub");   
  16.             MqttTopic topic = client.getTopic("tokudu/china");   
  17.             MqttMessage message = new MqttMessage("Hello World. Hello IBM".getBytes());   
  18.             message.setQos(1);   
  19.             client.connect();  
  20.             while(true){  
  21.                 MqttDeliveryToken token = topic.publish(message);   
  22.                 while (!token.isComplete()){   
  23.                     token.waitForCompletion(1000);   
  24.                 }   
  25.             }  
  26.         } catch (Exception e) {   
  27.             e.printStackTrace();   
  28.         }   
  29.     }   
  30. }   

 MQTTV3的订阅消息类

Java代码  收藏代码
  1. package com.etrip.mqttv3;  
  2. import com.ibm.micro.client.mqttv3.MqttClient;  
  3. import com.ibm.micro.client.mqttv3.MqttConnectOptions;  
  4. /** 
  5.  * MQTTV3的订阅消息类 
  6.  *  
  7.  * @author longgangbai 
  8.  */  
  9. public class MQTTSubsribe {   
  10.     public static String doTest() {   
  11.         try {   
  12.             //创建MqttClient  
  13.             MqttClient client = new MqttClient("tcp://192.168.208.46:1883", "java_client0000000000");   
  14.             //回调处理类  
  15.             CallBack callback = new CallBack();   
  16.             client.setCallback(callback);   
  17.             //创建连接可选项信息  
  18.             MqttConnectOptions conOptions = new MqttConnectOptions();   
  19.             //  
  20.             conOptions.setCleanSession(false);   
  21.             //连接broker  
  22.             client.connect(conOptions);   
  23.             //发布相关的订阅  
  24.             client.subscribe("tokudu/china", 1);   
  25.             //client.disconnect();   
  26.         } catch (Exception e) {   
  27.             e.printStackTrace();   
  28.             return "failed";   
  29.         }   
  30.         return "success";   
  31.     }   
  32. }   

 回调处理类处理订阅的消息类

Java代码  收藏代码
  1. package com.etrip.mqttv3;  
  2.   
  3. import com.ibm.micro.client.mqttv3.MqttCallback;  
  4. import com.ibm.micro.client.mqttv3.MqttDeliveryToken;  
  5. import com.ibm.micro.client.mqttv3.MqttMessage;  
  6. import com.ibm.micro.client.mqttv3.MqttTopic;  
  7. /** 
  8.  * 回调处理类 
  9.  * 处理订阅的消息类 
  10.  *  
  11.  * @author longgangbai 
  12.  */  
  13. public class CallBack implements MqttCallback {   
  14.       
  15.     public CallBack() {   
  16.     }   
  17.     /** 
  18.      * 接收到信息的处理 
  19.      */  
  20.     public void messageArrived(MqttTopic topic, MqttMessage message) {   
  21.         try {   
  22.             System.out.println(" MQTTSubsribe  message.toString()"+message.toString());  
  23.         } catch (Exception e) {   
  24.             e.printStackTrace();   
  25.         }   
  26.     }   
  27.     public void connectionLost(Throwable cause) {  
  28.           
  29.     }   
  30.     public void deliveryComplete(MqttDeliveryToken token) {  
  31.           
  32.     }   
  33. }   

测试类:

Java代码  收藏代码
  1. package com.etrip.mqttv3;  
  2. /** 
  3.  * MQTTV3的测试类 
  4.  *  
  5.  * @author longgangbai 
  6.  */  
  7. public class MQTTMain {  
  8.     public static void main(String[] args) {  
  9.         //订阅消息的方法  
  10.         MQTTSubsribe.doTest();  
  11.         //发布消息的类  
  12.         MQTTPub.doTest();  
  13.           
  14.     }  
  15. }  
原文地址:https://www.cnblogs.com/yudar/p/4615697.html