MQTT 测试DEMO

mqtt消息客户端

package com.cjcx.inter.apimall.beijing.aibee;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

/**
 * Mqtt 简易客户端
 * Topic naming Examples:
 * Valid Topic subscriptions
 * Single topic subscriptions
 * <p>
 * /
 * /house
 * house/room/main-light
 * house/room/side-light
 * Using Wildcards
 * Subscribing to topic house/#
 * <p>
 * Covers
 * <p>
 * house/room1/main-light
 * house/room1/alarm
 * house/garage/main-light
 * house/main-door
 * etc
 * Subscribing to topic house/+/main-light
 * <p>
 * covers
 * <p>
 * house/room1/main-light
 * house/room2/main-light
 * house/garage/main-light
 * but doesn’t cover
 * <p>
 * house/room1/side-light
 * house/room2/side-light
 * Invalid Topic Subscriptions
 * house+ – Reason- no topic level
 * house# – Reason- no topic level
 * Publishing to Topics
 * A client can only publish to an individual topic. That is, using wildcards when publishing is not allowed.
 * <p>
 * E.G- To publish a message to two topics you need to publish the message twice
 */
@Component
public class AibeeMqttClient {
    private Logger logger = LoggerFactory.getLogger(this.getClass());

    private String url;
    private String clientId = null;

    public static MqttClient mqttClient = null;
    private static MemoryPersistence memoryPersistence = null;
    private static MqttConnectOptions mqttConnectOptions = null;

    public AibeeMqttClient() {
        logger.info("AibeeMqttClient come int");
    }

    public void init(String url, String clientId) {
        if (!StringUtils.hasLength(clientId)) {
            logger.warn("MQTT clientId is null");
            return;
        }
        this.url = url;
        this.clientId = clientId;

        // 初始化连接设置对象
        mqttConnectOptions = new MqttConnectOptions();
        // true可以安全地使用内存持久性作为客户端断开连接时清除的所有状态
        mqttConnectOptions.setCleanSession(true);
        // 设置连接超时
        mqttConnectOptions.setConnectionTimeout(30);
        // 设置持久化方式
        memoryPersistence = new MemoryPersistence();
        try {
            mqttClient = new MqttClient(url, clientId, memoryPersistence);
        } catch (MqttException e) {
            e.printStackTrace();
            logger.warn("MQTT new MqttClient() 异常:{}", e);
            return;
        }

        // 设置连接和回调
        if (!mqttClient.isConnected()) {
            // 客户端添加回调函数
            mqttClient.setCallback(new MqttCallbackExtended() {
                @Override
                public void connectionLost(Throwable throwable) {
                    try {
                        logger.info("MQTT 连接已断开, 60秒后重新连接");
                        Thread.sleep(60 * 1000L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    reConnect();
                }

                @Override
                public void messageArrived(String topic, MqttMessage message) throws Exception {
                    System.out.println("Client 接收消息主题 : " + topic);
                    System.out.println("Client 接收消息Qos : " + message.getQos());
                    System.out.println("Client 接收消息内容 : " + new String(message.getPayload()));
                }

                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    logger.info("deliveryComplete");
                }

                @Override
                public void connectComplete(boolean b, String s) {
                    logger.info("connectComplete:{}, s:{}", b, s);
                }
            });
            //    创建连接
            try {
                mqttClient.connect(mqttConnectOptions);
                logger.info("MQTT 连接状态: {}", (mqttClient.isConnected() ? "已连接" : "未连接"));
            } catch (MqttException e) {
                e.printStackTrace();
                logger.warn("MQTT 连接异常:{}", e);
            }
        } else {
            logger.info("MQTT 连接状态已经连接..");
        }
    }

    //关闭连接
    public void closeConnect() {
        // 关闭存储方式
        if (null != memoryPersistence) {
            try {
                memoryPersistence.close();
            } catch (MqttPersistenceException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        } else {
            logger.info("memoryPersistence is null");
        }

        // 关闭连接
        if (null != mqttClient) {
            if (mqttClient.isConnected()) {
                try {
                    mqttClient.disconnect();
                    mqttClient.close();
                } catch (MqttException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            } else {
                logger.info("mqttClient is not connect");
            }
        } else {
            logger.info("mqttClient is null");
        }
    }

    //    发布消息
    public void publishMessage(String pubTopic, String message, int qos) {
        if (null != mqttClient && mqttClient.isConnected()) {
            logger.info("发布消息   " + mqttClient.isConnected());
            logger.info("id:" + mqttClient.getClientId());
            MqttMessage mqttMessage = new MqttMessage();
            mqttMessage.setQos(qos);
            mqttMessage.setPayload(message.getBytes());

            MqttTopic topic = mqttClient.getTopic(pubTopic);
            if (null != topic) {
                try {
                    MqttDeliveryToken publish = topic.publish(mqttMessage);
                    if (!publish.isComplete()) {
                        logger.info("消息发布成功");
                    } else {
                        logger.info("消息发布失败");
                    }
                } catch (MqttException e) {
                    e.printStackTrace();
                }
            }
        } else {
            reConnect();
        }

    }

    //    重新连接
    public void reConnect() {
        if (null != mqttClient) {
            if (!mqttClient.isConnected()) {
                if (null != mqttConnectOptions) {
                    try {
                        mqttClient.connect(mqttConnectOptions);
                    } catch (MqttException e) {
                        e.printStackTrace();
                    }
                } else {
                    logger.info("MQTT 重连 mqttConnectOptions is null");
                }
            } else {
                logger.info("MQTT 重连 mqttClient is null or connect");
            }
        } else {
            init(url, clientId);
        }
    }

    //    订阅主题
    public void subTopic(String topic, int qos) {
        if (null != mqttClient && mqttClient.isConnected()) {
            try {
                mqttClient.subscribe(topic, qos);
            } catch (MqttException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        } else {
            logger.info("mqttClient is error");
        }
    }

    //    清空主题
    public void cleanTopic(String topic) {
        if (null != mqttClient && !mqttClient.isConnected()) {
            try {
                mqttClient.unsubscribe(topic);
            } catch (MqttException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        } else {
            logger.info("mqttClient is error");
        }
    }
}
View Code

测试demo

package com.cjcx.inter.apimall.beijing.aibee;

import org.apache.commons.lang3.StringUtils;

public class ClientTest {

    public static void main(String[] args) {
        AibeeMqttClient aibeeMqttClient = new AibeeMqttClient();
        aibeeMqttClient.init("tcp://127.0.0.1:1883", "123");

        aibeeMqttClient.subTopic("/topic", 0);
        aibeeMqttClient.subTopic("house/room/a", 1);
        aibeeMqttClient.subTopic("house/room/b", 1);

        java.util.Scanner sc = new java.util.Scanner(System.in);
        int i = 1;
        StringBuilder sb = new StringBuilder();
        sb.append("使用指南:
");
        sb.append(i++ + "、发送消息,输入 'send topic message'.
");
        sb.append(i++ + "、退出程序,输入 'exit'.
");

        System.out.println(sb);

        String line = sc.nextLine(); // 这个就是用户输入的数据
        while (true) {
            if ("exit".equalsIgnoreCase(line)) {
                System.out.println("Thanks for using! bye bye.");
                break;
            } else if ("?".equals(line)) {
                System.out.println(sb);
            }
            System.out.println("line:" + line);

            processCommand(aibeeMqttClient, line);

            line = sc.nextLine(); // 这个就是用户输入的数据
        }

        // aibeeMqttClient.publishMessage("/topic", "i am comming...", 1);
        aibeeMqttClient.closeConnect();
    }

    private static void processCommand(AibeeMqttClient aibeeMqttClient, String line) {
        try {
            if (StringUtils.isBlank(line)) {
                return;
            }

            String[] arr = line.split(" ");//StrUtil.split(line, " ");
            String command = arr[0];
            System.out.println("command:" + command);
            if ("send".equalsIgnoreCase(command)) {
                aibeeMqttClient.publishMessage(arr[1], arr[2], 1);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
View Code
原文地址:https://www.cnblogs.com/eason-d/p/12089934.html