MQTT协议笔记之mqtt.io项目Websocket协议支持

前言

MQTT协议专注于网络、资源受限环境,建立之初不曾考虑WEB环境,倒也正常。虽然如此,但不代表它不适合HTML5环境。

HTML5 Websocket是建立在TCP基础上的双通道通信,和TCP通信方式很类似,适用于WEB浏览器环境。虽然MQTT基因层面选择了TCP作为通信通道,但我们添加个编解码方式,MQTT Over Websocket也可以的。

这样做的好处,MQTT的使用范畴被扩展到HTML5、桌面端浏览器、移动端WebApp、Hybrid等,多了一些想像空间。这样看来,无论是移动端,还是WEB端,MQTT都会有自己的使用空间。

浏览器支持

话说,现代化浏览器都已经支持Websocket,这里有一个所有浏览器支持列表:

b84feb4f-ff91-45fc-9798-ce6f44e76af9

更详细列表,请直接访问:http://caniuse.com/websockets

毫无疑问,火狐和谷歌浏览器带动了现代浏览器的发展,对HTML5标准的支持也是如此。支持Websocket的浏览器单纯从上面数字来讲,73.88%的支持率。但实际上还得参考浏览器市场占有率:

1[1]

上图数据,来源于: 2014年4月份全球主流浏览器市场份额排行榜

超过60%用户机器上浏览器的支持Websocket,数据很可观。

移动hybrid型应用会很欢迎Websocket

  • 内置浏览器支持HTML5,Javascript操作Websocket连接MQTT
  • 借助于原生TCP socket通道连接MQTT服务器,暴露JavaScript接口,间接使用

不支持Websocket的桌面浏览器,可以考虑Flash socket来帮忙!

针对不支持websocker的部分历史浏览器,可以考虑一下Flash socket,虽然使用Flashsocket用以模拟Websocket就很容易理解,但条件如下: - 需要单独占用一个端口专用于安全跨域访问策略 - 需要浏览器支持二进制Blob 支持二进制操作的浏览器现状:

xhr2

来源于:http://caniuse.com/xhr2

比较一下支持Websocket和XHR2的桌面浏览器,重叠率很高,使用Flash Socket用以模拟Websocket必要性不大,在类似于IE平台上,不如直接使用Flash版本的

https://github.com/yangboz/as3MQTT/tree/master/MQTTClient_AS3

不支持Websocket浏览器怎么办

不是所有浏览器都支持Websocket,尤其是阻碍历史发展的IE6/IE7/IE8/IE9。MQTT协议为二进制协议压根和HTTP纯文本不兼容,尤其浏览器端JavaScript处理文本很合适,但二进制就显得笨手笨角,除非支持XHR2。

  1. 单独使用Flash MQTT Client,这这方面见解不深,实践很少,不便多说,您要是很了解,不妨告知一二。
  2. HTTP纯文本方式进行二进制对接

这部分后面专门会讲到。

服务器端支持

现有一些解决方案可能是后面为MQTT Broker,前面是添加一层代理。比如:例如 mod_websocket ,对应在线示范:http://test.mosquitto.org/ws.html

表面上看着很解藕的,实际上模仿的还是传统型的短连接反向代理架构:Nginx/Apache +Java/PHP/Python/Ruby。

客户端建立一条连接,服务器端需要使用到至少两个文件句柄,中间多了一层路径。优雅的解决方案,可以向socket.io看起。一套服务端程序,同时提供若干种协议供终端选择。其实,一台MQTT Broker中间件服务器,可以绑定多个端口,一个面向纯TCP的1883端口,一个面向Websocket的80/8080端口,共享基础逻辑,面向不同协议。

Websocket协议适配

服务器添加对Websocket支持,基本不用做多大改动。对比Tcp的附加到单个Channel的处理器列表:

1234567891011
public class TcpChannelInitializer extends ChannelInitializer<SocketChannel> {
 
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(
new MqttMessageNewEncoder(),
new MqttMessageNewDecoder(),
new MqttMessageHandler());
}
}

Websocket对应单个Channel的处理器列表:

12345678910111213141516171819202122232425262728293031323334
import io.mqtt.handler.HttpRequestHandler;
import io.mqtt.handler.MqttMessageHandler;
import io.mqtt.handler.coder.MqttMessageWebSocketFrameDecoder;
import io.mqtt.handler.coder.MqttMessageWebSocketFrameEncoder;
import io.mqtt.handler.http.HttpJsonpTransport;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
 
public class WebsocketChannelInitializer extends ChannelInitializer<SocketChannel> {
private final static String websocketUri = "/websocket";
 
private HttpRequestHandler httpRequestHandler = new HttpRequestHandler(
websocketUri);
 
static {
HttpJsonpTransport httpJsonpTransport = new HttpJsonpTransport();
HttpRequestHandler.registerTransport(httpJsonpTransport);
}
 
@Override
public void initChannel(final SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new HttpServerCodec(),
new MqttMessageWebSocketFrameEncoder(),
new HttpObjectAggregator(65536),
httpRequestHandler,
new WebSocketServerProtocolHandler(websocketUri),
new MqttMessageWebSocketFrameDecoder(),
new MqttMessageHandler());
}
}

为了支持Websocket协议,仅仅额外增加了:

1234567891011121314
@Sharable
public class MqttMessageWebSocketFrameEncoder extends
MessageToMessageEncoder<Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message msg,
List<Object> out) throws Exception {
if (msg == null)
return;
 
byte[] data = ((Message) msg).toBytes();
 
out.add(new BinaryWebSocketFrame(Unpooled.wrappedBuffer(data)));
}
}
1234567891011121314151617
public class MqttMessageWebSocketFrameDecoder extends
MessageToMessageDecoder<BinaryWebSocketFrame> {
 
private MqttMessageNewDecoder messageNewDecoder;
 
public MqttMessageWebSocketFrameDecoder() {
messageNewDecoder = new MqttMessageNewDecoder();
}
 
@Override
protected void decode(ChannelHandlerContext ctx,
BinaryWebSocketFrame wsFrame, List<Object> out) throws Exception {
ByteBuf buf = wsFrame.content();
 
this.messageNewDecoder.decode(ctx, buf, out);
}
}

小结

啰啰嗦嗦的讲了一大通Websocket,总之对Websocket的支持还算容易。后面有时间写写如何使用HTTP协议达到MQTT OVER HTTP的效果。

原文 http://www.blogjava.net/yongboy/archive/2014/05/26/414130.html

原文地址:https://www.cnblogs.com/yudar/p/4642412.html