跟踪mqttv3源码(一)

Spring整合MQTT

pom.xml

<!-- MQTT消息队列 -->
        <dependency> 
            <groupId>org.eclipse.paho</groupId> 
             <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.0.2</version> 
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.1</version>
        </dependency>
        <!-- 消息推送
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>4.1.0</version>
        </dependency>
         -->
        <dependency>  
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
            <version>4.3.5.RELEASE</version>
            <exclusions>  
                <exclusion>  
                    <groupId>org.eclipse.paho</groupId>  
                    <artifactId>mqtt-client</artifactId>  
                </exclusion>  
                <exclusion>  
                    <groupId>org.springframework</groupId>  
                    <artifactId>spring-messaging</artifactId>  
                </exclusion>  
            </exclusions>  
        </dependency>  
        <!--
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-core</artifactId>
            <version>5.0.0.M5</version>
        </dependency>
         -->
        
        <dependency>  
            <groupId>org.fusesource.mqtt-client</groupId>  
            <artifactId>mqtt-client</artifactId>  
            <version>1.14</version>
        </dependency>

spring-mqtt.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:mqtt="http://www.springframework.org/schema/integration/mqtt"
    xmlns:int-mqtt="http://www.springframework.org/schema/integration/mqtt"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/integration/mqtt http://www.springframework.org/schema/integration/mqtt/spring-integration-mqtt-4.3.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.3.xsd">
    
    <!-- 引入配置文件:classpath等同于src目录,两种配置方式 -->
    <context:property-placeholder location="classpath:mqtt.properties"  ignore-unresolvable="true" />
    
    <!-- mqtt客户端订阅消息 -->
    <bean id="clientFactory" class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
        <property name="userName" value="${broker.userName}"/>
        <property name="password" value="${broker.password}"/>
    </bean>

    <!-- 
    消息适配器 org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter 
    org.springframework.messaging.MessageChannel
    org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler
    DefaultPahoMessageConverter
    -->
    <!-- 消息转换器
    <bean id="myConverter" class="org.springframework.integration.mqtt.support.DefaultPahoMessageConverter"></bean>
     -->
     
    <int-mqtt:message-driven-channel-adapter
        id="mqttInbound" 
        client-id="client" 
        url="${broker.host}"
        topics="activate"
        qos="1" 
        client-factory="clientFactory"
        auto-startup="true" 
        send-timeout="12" 
        recovery-interval="10000"
        channel="startCase" />
    
    <!-- 
    <int:channel id="startCase" />
     -->
    
    <!-- 对接收消息进行过滤 @tstamp + ' ' + headers.get('mqtt_topic') + ': ' + payload.toString() + @newline
    <int:transformer id="convert"
        input-channel="startCase"
        expression="payload.toString() + headers.get('mqtt_topic')"
        output-channel="toMqttService" />
     -->
         
    <!-- 方案一 -->
    <int:service-activator id="startCaseService"
        input-channel="startCase" ref="mqttCaseService" method="startCase" />
    
    <!-- 方案二   id="toMqttService" channel="toMqttService"
    <int:outbound-channel-adapter id="toMqttService"  
        ref="mqttCaseService" 
        method="startCase" />
    -->
    
    <bean id="mqttCaseService" class="com.vguang.service.impl.MqttService2"></bean>
    
</beans>

实现消息处理类

public class MqttService2{
    private static final Logger log = LoggerFactory.getLogger(MqttService2.class);

    private MqttPahoMessageHandler mqtt;
    private volatile Integer serialno = 0;
    
    public void startCase(Message<String> recmsg){
                //mqtt5.0
//        String topic = (String) recmsg.getHeaders().get("mqtt_receivedTopic");
        String topic = (String) recmsg.getHeaders().get("mqtt_topic");
        String payload = recmsg.getPayload();

        log.info("消息解析headers结果:{},{}", topic, payload);
    }
}

startCase()方法中的参数目前我知道的有三种:

原文地址:https://www.cnblogs.com/wangwanchao/p/7516733.html