Java连接MQTT服务-wss方式

特别提示:本人博客部分有参考网络其他博客,但均是本人亲手编写过并验证通过。如发现博客有错误,请及时提出以免误导其他人,谢谢!欢迎转载,但记得标明文章出处:http://www.cnblogs.com/mao2080/

说明:前面介绍的tcp、ws方式适合Java程序在局域网内使用,不涉及到安全问题。但由于Android手机APP需要通过websocket方式来连接,就必须考虑安全性问题了,这时候就采用了wss+CA证书方式进行认证,而且在数据传输中也是加密的。大致与ws方式相同,只不过是加了证书。

1、Java代码

  1 package com.mao.mqtt;
  2 
  3 import java.io.FileInputStream;
  4 import java.security.KeyStore;
  5 import java.security.cert.CertificateFactory;
  6 import java.security.cert.X509Certificate;
  7 import java.text.SimpleDateFormat;
  8 import java.util.Date;
  9 
 10 import javax.net.ssl.SSLContext;
 11 import javax.net.ssl.SSLSocketFactory;
 12 import javax.net.ssl.TrustManagerFactory;
 13 
 14 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
 15 import org.eclipse.paho.client.mqttv3.MqttCallback;
 16 import org.eclipse.paho.client.mqttv3.MqttClient;
 17 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 18 import org.eclipse.paho.client.mqttv3.MqttException;
 19 import org.eclipse.paho.client.mqttv3.MqttMessage;
 20 import org.eclipse.paho.client.mqttv3.MqttTopic;
 21 
 22 /**
 23  * 
 24  * 功能描述:MQTT测试
 25  * 创建人: mao2080@sina.com
 26  * 创建时间:2017年7月4日 下午5:08:59
 27  * 修改人: mao2080@sina.com
 28  * 修改时间:2017年7月4日 下午5:08:59
 29  */
 30 public class MQTTTest_wss {
 31 
 32     /**MQTT服务端ip及端口*/
 33     private static String host = "wss://ip:443";
 34 
 35     /**账号*/
 36     private static String username = "li2080";
 37 
 38     /**密码*/
 39     private static String password = "123";
 40 
 41     /**订阅的主题*/
 42     private static String subTopic = "a/b/c";
 43     
 44     /**clientID*/
 45     private static String clientId = "li2080";
 46     
 47     /**发布的主题*/
 48     private static String pubTopic = "a/b/c";
 49 
 50     /**MQTT-Client*/
 51     private static MqttClient client;
 52     
 53     /**证书路径*/
 54     private static String caPath = "E:\mqtt-demo\certfile\CA.crt";
 55     
 56     /**
 57      * @throws InterruptedException 
 58      * @throws MqttException */
 59     public static void main(String[] args) throws InterruptedException, MqttException {
 60         
 61         // 订阅消息的方法
 62         subscribe();
 63 //        
 64         publish();
 65     }
 66 
 67     /**
 68      * 
 69      * 描述:订阅信息
 70      * @author mao2080@sina.com
 71      * @created 2017年7月4日 下午4:53:47
 72      * @since 
 73      * @return
 74      */
 75     public static void subscribe() {
 76         try {
 77             // 创建MqttClient
 78             MQTTTest_wss.getClient().setCallback(new MqttCallback() {
 79 
 80                 public void connectionLost(Throwable arg0) {
 81                     
 82                 }
 83 
 84                 public void messageArrived(String topic, MqttMessage message) throws Exception {
 85                     System.out.println("MQTT Rece:" + message.toString());
 86                 }
 87 
 88                 public void deliveryComplete(IMqttDeliveryToken token) {
 89 
 90                 }
 91 
 92             });
 93             MQTTTest_wss.getClient().subscribe(subTopic, 0);
 94             System.out.println("连接状态:" + client.isConnected());
 95         } catch (Exception e) {
 96             e.printStackTrace();
 97         }
 98     }
 99     
100     /**
101      * 
102      * 描述:获取MqttClient
103      * @author mao2080@sina.com
104      * @created 2017年7月6日 上午9:56:37
105      * @since 
106      * @return
107      * @throws MqttException
108      */
109     public static MqttClient getClient() throws MqttException{
110         try {
111             if(client == null){
112                 client = new MqttClient(host, clientId);
113                 MqttConnectOptions conOptions = new MqttConnectOptions();
114                 conOptions.setUserName(username);
115                 conOptions.setPassword(password.toCharArray());
116                 conOptions.setCleanSession(true);
117                 conOptions.setSocketFactory(getSSLSocktet(caPath));
118                 client.connect(conOptions);
119             }
120             if(!client.isConnected()){
121                 client.reconnect();
122             }
123         } catch (Exception e) {
124             e.printStackTrace();
125         }
126         return client;
127     }
128 
129     /**
130      * 
131      * 描述:发布信息
132      * @author mao2080@sina.com
133      * @throws MqttException 
134      * @created 2017年7月4日 下午4:53:32
135      * @since
136      */
137     public static void publish() throws MqttException {
138         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
139         String sendMsg = "{time:"+sdf.format(new Date())+", content:"+com.lds.iot.common.util.UUIDUtil.getLowerLetterNumber(15)+", from: java console}";
140         try {
141             MqttTopic topic = MQTTTest_wss.getClient().getTopic(pubTopic);
142             MqttMessage message = new MqttMessage(sendMsg.getBytes());
143             message.setQos(0);
144             topic.publish(message);
145             System.out.println("MQTT Send:" + sendMsg);
146         } catch (Exception e) {
147             e.printStackTrace();
148         }
149     }
150     
151     /**
152      * 获取SSLSocketFactory
153      * @param caPath
154      * @return
155      * @throws Exception
156      */
157     public static SSLSocketFactory getSSLSocktet(String caPath) throws Exception {
158         CertificateFactory cAf = CertificateFactory.getInstance("X.509");
159         FileInputStream caIn = new FileInputStream(caPath);
160         X509Certificate ca = (X509Certificate) cAf.generateCertificate(caIn);
161         KeyStore caks = KeyStore.getInstance(KeyStore.getDefaultType());
162         caks.load(null, null);
163         caks.setCertificateEntry("ca-certificate", ca);
164         TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
165         tmf.init(caks);
166         caIn.close();
167         SSLContext context = SSLContext.getInstance("TLSv1");
168         context.init(null, tmf.getTrustManagers(), null);
169         return context.getSocketFactory();
170     }
171     
172 }

2、Maven配置

1 <dependency>
2       <groupId>org.eclipse.paho</groupId>
3       <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
4       <version>1.2.0</version>
5 </dependency>

3、服务端配置

请参考另一篇blog:http://www.cnblogs.com/mao2080/p/7772893.html

4、运行效果

原文地址:https://www.cnblogs.com/mao2080/p/7793849.html