MQTT客户端编程

1.导入maven依赖

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.0</version>
</dependency>

2.建立连接

  • serverURI:EMQ X的连接网址,例如 tcp://localhost:1883
  • clientId:标识客户端的唯一ID,必须确保该ID在同一EMQ X服务器中是唯一的,否则该服务器在处理会话时会遇到问题
  • MqttClientPersistence:当本地消息处理涉及服务器端的忙或不可用状态时,需要持久存储本地消息的持久实例,在该状态中可以传递持久类实例eqw
package paho_demo;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class Demo {
    public static void main(String[] args) {
        String broker = "tcp://localhost:1883";
        String clientId = "JavaSample";
         //Use the memory persistence
        MemoryPersistence persistence = new MemoryPersistence();

        try {
            MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(true);
            System.out.println("Connecting to broker:" + broker);
            sampleClient.connect(connOpts);
            System.out.println("Connected");
        } catch (MqttException me) {
            System.out.println("reason" + me.getReasonCode());
            System.out.println("msg" + me.getMessage());
            System.out.println("loc" + me.getLocalizedMessage());
            System.out.println("cause" + me.getCause());
            System.out.println("excep" + me);
            me.printStackTrace();
        }
    }
}

执行此代码后,如果可以成功连接到服务器,则会在控制台中打印以下内容。如果发生异常,请根据异常信息定位并解决问题。

Connecting to broker: tcp://localhost:1883
Connected

3.订阅

只有在成功建立连接后才能进行订阅。MqttClient提供了多种subscribe方法,可以使用不同的方式来订阅主题。主题可以是明确的单个主题或通配符。

MqttCallback订阅成功时设置一个回调实例。接收消息时调用调用实例的功能。消息订阅部分的代码是:

String topic = "demo/topics";
System.out.println("Subscribe to topic:" + topic);
sampleClient.subscribe(topic);

sampleClient.setCallback(new MqttCallback() {
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        String theMsg = MessageFormat.format("{0} is arrived for topic {1}.", new String(message.getPayload()), topic);
        System.out.println(theMsg);
    }

    public void deliveryComplete(IMqttDeliveryToken token) {
    }

    public void connectionLost(Throwable throwable) {
    }
});

4.发布

publish方法MqttClient用于发布消息

MqttClient 还为用户提供了一种在发布消息时指定QoS以及是否需要维护消息的方式:

String topic = "demo/topics";
String content = "Message from MqttPublishSample";
int qos = 2;
System.out.println("Publishing message:" + content);
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
sampleClient.publish(topic, message);
System.out.println("Message published");

5.例

package paho_demo;

import java.text.MessageFormat;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class Demo {
    public static void main(String[] args) {
        String broker = "tcp://localhost:1883";
        String clientId = "JavaSample";
        //Use the memory persistence
        MemoryPersistence persistence = new MemoryPersistence();

        try {
            MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(true);
            System.out.println("Connecting to broker:" + broker);
            sampleClient.connect(connOpts);
            System.out.println("Connected");

            String topic = "demo/topics";
            System.out.println("Subscribe to topic:" + topic);
            sampleClient.subscribe(topic);
            sampleClient.setCallback(new MqttCallback() {
                public void messageArrived(String topic, MqttMessage message) throws Exception {
                    String theMsg = MessageFormat.format("{0} is arrived for topic {1}.", new String(message.getPayload()), topic);
                    System.out.println(theMsg);
                }

                public void deliveryComplete(IMqttDeliveryToken token) {
                }

                public void connectionLost(Throwable throwable) {
                }
            });


            String content = "Message from MqttPublishSample";
            int qos = 2;
            System.out.println("Publishing message:" + content);
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(qos);
            sampleClient.publish(topic, message);
            System.out.println("Message published");

        } catch (MqttException me) {
            System.out.println("reason" + me.getReasonCode());
            System.out.println("msg" + me.getMessage());
            System.out.println("loc" + me.getLocalizedMessage());
            System.out.println("cause" + me.getCause());
            System.out.println("excep" + me);
            me.printStackTrace();
        }
    }
}

运行结果:

Connecting to broker: tcp://localhost:1883
Connected
Subscribe to topic: demo/topics
Publishing message: Message from MqttPublishSample
Message published
Message from MqttPublishSample is arrived for topic demo/topics.
原文地址:https://www.cnblogs.com/zhou-tt/p/12696704.html