Mqtt消息服务器 moquette

在github找了个IOT的消息服务器

https://github.com/moquette-io/moquette

代码没有测试,因为公司没机会让我实践

可以二次开发的,我自己二次开发了一下

验证用户名和密码

public class DBAuthenticator implements IAuthenticator {

    @Override
    public boolean checkValid(String clientId, String username, byte[] password) {
        // Check Username / Password in DB using sqlQuery
       
          SqlSession sqlSession=null;
          try
          {
              sqlSession=SqlSessionFactoryUtil.openSqlSession();
              RoleMapper roleMapper=sqlSession.getMapper(RoleMapper.class);
       
               Cliword cliword=new Cliword();
               cliword.setClino(username);
               String passstring=new String(password);
               cliword.setClipass(passstring);
                 
                long result=roleMapper.checkUserValidate(cliword);
               
                if (result>0)
                {
                   Loginlog loginlog=new Loginlog();
                   loginlog.setClino(username);
                   roleMapper.insertLoginlog(loginlog);
                   roleMapper.insertLoginlog(loginlog);
                   sqlSession.commit();
                   return true;
                }
                else
                {
                    Sysexplog sysexplog=new Sysexplog();
                    String expmsgstr="用户名"+username+"认证失败";
                    sysexplog.setExpmsg(expmsgstr);
                    roleMapper.insertSysexplog(sysexplog);
                    sqlSession.commit();
                    return false;
                }
             
       
          }
          catch(Exception ex)
          {
              Additionalog addtionlog=new Additionalog();
              try
              {
                 
                  String expstr="用户名"+username+"登录发生异常,登录异常原因:"+ex.toString();
                  addtionlog.logtofile(expstr);
                  String topic="用户名"+username+"登录时发生并且无法记录";
                  addtionlog.sendmyemail(topic, expstr);
                 
              }
              catch (Exception e)
              {
                  String expstr="用户名"+username+"登录发生异常,甚至记录异常时也出错,登录异常原因:"+ex.toString()+",记录时异常的原因是"+e.toString();
                  addtionlog.logtofile(expstr);
                 
              }
              sqlSession.rollback();
                 return false;
          }
          finally{
              if (sqlSession!=null)
                  sqlSession.close();
                     
          } 
      
    }
}


控制每个用户的可读可写

public class PermitAllAuthorizatorPolicy  implements IAuthorizatorPolicy  {

   
        @Override   
        public boolean canRead(Topic topic, String user, String client) {       
       
         String topicaname="/orders/"+user.toLowerCase().trim();
         if (topic.toString().equals(topicaname))
         
             return true;
         
         else
             return false;
        }
   
    @Override   
    public boolean canWrite(Topic topic, String user, String client) {       
       
        if(user.equals("sender"))
            return true;
        else
            return false;
    }   
  
}

最后启动服务器

public class Main {
   

   
   
   
    static class PublisherListener extends AbstractInterceptHandler {

       
         
        @Override
        public String getID() {
            return "EmbeddedLauncherPublishListener";
        }

        @Override
        public void onPublish(InterceptPublishMessage msg) {
             final Logger log = Logger.getLogger(PublisherListener.class);
            final String decodedPayload = new String(msg.getPayload().array(), UTF_8);
            System.out.println("Received on topic: " + msg.getTopicName() + " content: " + decodedPayload);
            log.info("Received on topic: " + msg.getTopicName() + " content: " + decodedPayload);
        }
       
        @Override
        public void onConnect(InterceptConnectMessage msg) {
            System.out.println("onConnect");
        }

        @Override
        public void onDisconnect(InterceptDisconnectMessage msg) {
            System.out.println("onDisconnect");
        }

        

        @Override
        public void onSubscribe(InterceptSubscribeMessage msg) {
            System.out.println("onSubscribe");

        }

        @Override
        public void onUnsubscribe(InterceptUnsubscribeMessage msg) {
            System.out.println("onUnsubscribe");
        }
    }

    public static void main(String[] args) throws InterruptedException, IOException {
     
         org.apache.log4j.BasicConfigurator.configure();
        final Logger log = Logger.getLogger(PublisherListener.class);
    //    IResourceLoader classpathLoader = new ClasspathResourceLoader();
     //   final IConfig classPathConfig = new ResourceLoaderConfig(classpathLoader);
      
        String configPath = System.getProperty("moquette.conf", null);
        File defaultConfigurationFile = new File(configPath, "src\main\resources\" + IConfig.DEFAULT_CONFIG);
        System.err.println("Starting Moquette server. Configuration file path={}" +  defaultConfigurationFile.getAbsolutePath());
        IResourceLoader filesystemLoader = new FileResourceLoader(defaultConfigurationFile);
        final IConfig config = new ResourceLoaderConfig(filesystemLoader);
        
         
        final Server mqttBroker = new Server();
        List<? extends InterceptHandler> userHandlers = Collections.singletonList(new PublisherListener());
        
      
        IAuthenticator authenticator=new DBAuthenticator();
        IAuthorizatorPolicy authorizatorPolicy=new PermitAllAuthorizatorPolicy();
        mqttBroker.startServer(config, userHandlers,null,authenticator,authorizatorPolicy);

        System.out.println("Broker started press [CTRL+C] to stop");
        log.info("Broker started press [CTRL+C] to stop");
        //Bind  a shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("Stopping broker");
            log.info("Stopping broker");
            mqttBroker.stopServer();
            System.out.println("Broker stopped");
            log.info("Broker stopped");
        }));

        Thread.sleep(20000);
        System.out.println("Before self publish");
        log.info("Before self publish");
        MqttPublishMessage message = MqttMessageBuilders.publish()
            .topicName("/exit")
            .retained(true)
//        qos(MqttQoS.AT_MOST_ONCE);
//        qQos(MqttQoS.AT_LEAST_ONCE);
            .qos(MqttQoS.EXACTLY_ONCE)
            .payload(Unpooled.copiedBuffer("Hello World!!".getBytes(UTF_8)))
            .build();

        mqttBroker.internalPublish(message, "INTRLPUB");
        System.out.println("After self publish");
        log.info("After self publish");
       
   
    }

    private Main() {
    }
   
   
}

原文地址:https://www.cnblogs.com/redmondfan/p/14251384.html