MQTT专题(Spring boot + maven整合MQTT、EMQ搭建MQTT服务器和客户端模拟工具)

MQTT 协议基本介绍   较详细,含EMQ搭建服务器

MQTT--入门

 

使用Eclipse创建Maven项目参考:使用eclipse创建Spring Boot项目

Spring提供了对多种消息中间件的整合,其中也包括MQTT。具体请参见以下链接:

https://docs.spring.io/spring-integration/reference/html/

Spring整合MQTT步骤如下:

1、创建Spring Boot Maven工程,poxm.xml引入如下依赖:

        <!-- mqtt -->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>

2、application.properties文件中增加MQTT配置参数

#MQTT Config
com.mqtt.url=tcp://mqttServerhost:1883
com.mqtt.inboundclientid=in_clientid
com.mqtt.outboundclientid=out_clientid
com.mqtt.topics=+/V1/T/+/topic

3、增加MQTT配置类

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import org.springframework.integration.mqtt.support.MqttHeaders;


@Configuration
public class MqttConfiguration {
    
    @Value("${com.mqtt.url}")
    private String url;

    @Value("${com.mqtt.topics}")
    private String topics;

    @Value("${com.mqtt.inboundclientid}")
    private String inclientid;

    @Value("${com.mqtt.outboundclientid}")
    private String outclientid;

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setServerURIs(url);
        return factory;
    }

    /** InBound Begin 消息接收端 ****/
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        String clientid = inclientid + "_" + System.currentTimeMillis();
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(url, clientid);

        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        adapter.addTopic(topics);

        return adapter;
    }

    // ServiceActivator注解表明当前方法用于处理MQTT消息,inputChannel参数指定了用于接收消息信息的channel。
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MqttInboundMessageHandler();
//        return new MessageHandler() {
//            // 消息消费            
//            @Override
//            public void handleMessage(Message<?> message) throws MessagingException {
//                String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
//                System.out.println(topic + ":收到消息 " + message.getPayload().toString());
//            }
//        };
    }

    /** InBound End ****/

    /** OutBound Begin 消息发送端 ****/

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    /*****
     * 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory
     * 
     * @return
     */
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        // 在这里进行mqttOutboundChannel的相关设置
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(outclientid, mqttClientFactory());
        messageHandler.setAsync(true); // 如果设置成true,发送消息时将不会阻塞。
        messageHandler.setDefaultTopic("testTopic");
        return messageHandler;
    }

    @Component
    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface MyGateway {

        // 定义重载方法,用于消息发送
        void sendToMqtt(String data);

        // 指定topic进行消息发送
        void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);

        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);

    }
    /** OutBound End ****/

}
View Code

4、创建类MqttInboundMessageHandler实现MessageHandler用于处理MQTT消息  

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;

public class MqttInboundMessageHandler implements MessageHandler {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
        //String topic = message.getHeaders().get("mqtt_topic").toString();  //低版本使用 mqtt_topic
        String payload = message.getPayload().toString();        
        //mqtt消息写入日志
        StringBuilder sb = new StringBuilder();
        sb.append("
").append("topic:").append(topic).append("
").append("payload:").append(payload);
        logger.info(sb.toString());
    }
}
View Code

日志配置文件为resources下的logback-spring.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration  scan="true" scanPeriod="10 seconds">

    <contextName>logback</contextName>
    <property name="log.path" value="D:/logs/mqttlogs" />
    <conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter" />
    <conversionRule conversionWord="wex" converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter" />
    <conversionRule conversionWord="wEx" converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter" />
    <property name="CONSOLE_LOG_PATTERN" value="${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}"/>
    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
            <level>info</level>
        </filter>
        <encoder>
            <Pattern>${CONSOLE_LOG_PATTERN}</Pattern>
            <charset>UTF-8</charset>
        </encoder>
    </appender>
    <appender name="DEBUG_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <encoder>
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
            <charset>UTF-8</charset>
        </encoder>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>${log.path}/debug/uiot-mqtt-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
            <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
                <maxFileSize>100MB</maxFileSize>
            </timeBasedFileNamingAndTriggeringPolicy>
            <maxHistory>15</maxHistory>
        </rollingPolicy>
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <level>debug</level>
            <onMatch>ACCEPT</onMatch>
            <onMismatch>DENY</onMismatch>
        </filter>
    </appender>
    <appender name="INFO_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <encoder>
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
            <charset>UTF-8</charset>
        </encoder>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>${log.path}/info/uiot-mqtt-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
            <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
                <maxFileSize>100MB</maxFileSize>
            </timeBasedFileNamingAndTriggeringPolicy>
            <maxHistory>15</maxHistory>
        </rollingPolicy>
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <level>info</level>
            <onMatch>ACCEPT</onMatch>
            <onMismatch>DENY</onMismatch>
        </filter>
    </appender>
    <appender name="WARN_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <encoder>
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
            <charset>UTF-8</charset>
        </encoder>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>${log.path}/warn/uiot-mqtt-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
            <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
                <maxFileSize>100MB</maxFileSize>
            </timeBasedFileNamingAndTriggeringPolicy>
            <maxHistory>15</maxHistory>
        </rollingPolicy>
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <level>warn</level>
            <onMatch>ACCEPT</onMatch>
            <onMismatch>DENY</onMismatch>
        </filter>
    </appender>
    <appender name="ERROR_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <encoder>
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
            <charset>UTF-8</charset>
        </encoder>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>${log.path}/error/uiot-mqtt-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
            <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
                <maxFileSize>100MB</maxFileSize>
            </timeBasedFileNamingAndTriggeringPolicy>
            <maxHistory>15</maxHistory>
        </rollingPolicy>
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <level>ERROR</level>
            <onMatch>ACCEPT</onMatch>
            <onMismatch>DENY</onMismatch>
        </filter>
    </appender>
    
    <root level="info">
        <appender-ref ref="CONSOLE" />
        <appender-ref ref="DEBUG_FILE" />
        <appender-ref ref="INFO_FILE" />
        <appender-ref ref="WARN_FILE" />
        <appender-ref ref="ERROR_FILE" />
    </root>

</configuration>
View Code

 5、创建RestController,提供通过http发送MQTT消息的API

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/mqtt/send")
public class MqttController {

    @Autowired
    private MyGateway mqttGateway;

    @RequestMapping(value="/sendMessage", method=RequestMethod.POST)
    public String send(@RequestParam String topic, @RequestParam String message) {
        mqttGateway.sendToMqtt(message, topic);
        return topic + "send message : " + message;
    }
    
    @RequestMapping(value="/getTest", method=RequestMethod.GET)
    public String getTest() {
        return "success";
    }
    
    @RequestMapping(value="/postTest", method=RequestMethod.POST)
    public String getTest(String data) {
        return "success,data=" + data;
    }

}
View Code

 6、打包方式改为war包,发布到Tomcat。入口App扩展 SpringBootServletInitializer

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;


@SpringBootApplication
public class App  extends SpringBootServletInitializer
{
    public static void main(String[] args)  {
        SpringApplication.run(App.class, args);
    }
    
    @Override
    protected SpringApplicationBuilder configure(SpringApplicationBuilder builder) {
        return builder.sources(App.class);
    }

}
View Code

打包和发布完整pom.xml文件如下:

<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.demo</groupId>
    <artifactId>utiot-mqtt</artifactId>
    <version>1.0.0.200311</version>
    <packaging>war</packaging>

    <name>utiot-mqtt</name>
    <!-- <url>http://maven.apache.org</url> -->
    <repositories>
        <repository>
            <id>nexus</id>
            <name>Team Nexus Repository</name>
            <url>http://192.168.80.194:8081/nexus/content/groups/public</url>
        </repository>
    </repositories>
    <pluginRepositories>
        <pluginRepository>
            <id>nexus</id>
            <name>Team Nexus Repository</name>
            <url>http://192.168.80.194:8081/nexus/content/groups/public</url>
        </pluginRepository>
    </pluginRepositories>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.6.RELEASE</version>
        <relativePath /> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <java.version>1.8</java.version>
        <encoding>UTF-8</encoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-tomcat</artifactId>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- mqtt -->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>
    </dependencies>

    <distributionManagement>
        <repository>
            <id>nexus-releases</id>
            <name>nexus releases</name>
            <url>http://192.168.80.194:8081/nexus/content/repositories/releases/</url>
        </repository>
        <snapshotRepository>
            <id>nexus-snapshots</id>
            <name>nexus snapshots</name>
            <url>http://192.168.80.194:8081/nexus/content/repositories/snapshots/</url>
        </snapshotRepository>
    </distributionManagement>
    
    <build>
    <!--     <finalName>mqtt</finalName> -->
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <configuration>
                    <skipTests>true</skipTests>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
View Code

打包方法:右键项目,选择“Show In”—“Terminal”,输入mvn deploy(发布到服务器),或 mvn install (本地打包)

其他参考资料:

Spring 整合MQTT    订阅和推送、代码示例

springboot整合mqtt   连接配置(含用户名、密码)

Spring整合MQTT   参数说明

SpringBoot2.0集成MQTT功能之消息订阅处理  参数配置

物联网架构成长之路(32)-SpringBoot集成MQTT客户端

 

MQTT服务器

EMQ官网下载

使用EMQ搭建MQTT服务器

客户端模拟工具:

MQTTBox

MQTT.fx的安装和使用

原文地址:https://www.cnblogs.com/cheng2015/p/12143279.html