WebSocket编解码器
客户端代码
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <script type="text/javascript"> var socket; if (window.WebSocket) { socket = new WebSocket("ws://localhost:7000/hello"); //收到服务端回送的消息 socket.onmessage = function (ev) { var rt = document.getElementById('responseText'); rt.value = rt.value + "\n" + ev.data; } //连接开启 socket.onopen = function (ev) { var rt = document.getElementById('responseText'); rt.value = '连接开启了....'; } //连接关闭 socket.onclose = function (ev) { var rt = document.getElementById('responseText'); rt.value = rt.value + "\n" + "连接关闭了......."; } } else { alert("当前浏览器不支持websocket"); } function send(message) { if (window.WebSocket) { if (socket.readyState == WebSocket.OPEN) { socket.send(message); } else { alert("未连接") } } } </script> <form onsubmit="return false"> <textarea name="message" style="height: 300px; 300px"></textarea> <input type="button" value="发送消息" onclick="send(this.form.message.value)"/> <textarea id="responseText" style="height: 300px; 300px"></textarea> <input type="button" value="清空" onclick="document.getElementById('responseText').value = ''"/> </form> </body> </html>
服务端代码
public class WebSocketServer { private final int PORT = 7000; public static void main(String[] args) { new WebSocketServer().run(); } public void run() { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .handler(new LoggingHandler())//在bossGroup增加一个日志处理器 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) throws Exception { //基于http协议,使用http的编码和解码器 channel.pipeline().addLast(new HttpServerCodec()); //是以块方式写,添加ChunkedWriteHandler处理器 channel.pipeline().addLast(new ChunkedWriteHandler()); //http数据在传输过程中是分段,HttpObjectAggregator,就是可以将多个段聚合 channel.pipeline().addLast(new HttpObjectAggregator(8192)); //websocket数据是以帧(frame)形式传递 //浏览器请求时 ws://localhost:7000/hello 表示请求的uri //WebSocketServerProtocolHandler 核心功能将http协议升级为ws协议,保持长连接 channel.pipeline().addLast(new WebSocketServerProtocolHandler("/hello")); //处理业务的handler channel.pipeline().addLast(new SimpleChannelInboundHandler<TextWebSocketFrame>() { /** * * @param ctx * @param msg 文本帧 * @throws Exception */ @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { System.out.println("服务器收到的信息:" + msg.text()); ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器当前时间" + LocalDateTime.now() + msg.text())); } //当web客户端连接后,触发该方法 public void handlerAdded(ChannelHandlerContext ctx) throws Exception { //id表示唯一值,LongText是唯一的,ShortText不是唯一的 System.out.println("handlerAdded 被调用" + ctx.channel().id().asLongText()); System.out.println("handlerAdded 被调用" + ctx.channel().id().asShortText()); } public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { System.out.println("handlerRemoved 被调用" + ctx.channel().id().asLongText()); System.out.println("handlerRemoved 被调用" + ctx.channel().id().asShortText()); } public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("异常信息:" + cause.getMessage()); ctx.close(); } }); } }); ChannelFuture channelFuture = bootstrap.bind(PORT).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
WebSocketServerProtocolHandler的handlerAdded
初始化添加的时候,会调用handlerAdded进行处理器的添加,分别添加握手处理器WebSocketServerProtocolHandshakeHandler,UTF8文本帧验证器Utf8FrameValidator,关闭帧处理器WebSocketCloseFrameHandler:
public void handlerAdded(ChannelHandlerContext ctx) {
ChannelPipeline cp = ctx.pipeline();
if (cp.get(WebSocketServerProtocolHandshakeHandler.class) == null) {
// Add the WebSocketHandshakeHandler before this one.在前面添加一个握手处理器
cp.addBefore(ctx.name(), WebSocketServerProtocolHandshakeHandler.class.getName(),
new WebSocketServerProtocolHandshakeHandler(serverConfig));
}
if (serverConfig.decoderConfig().withUTF8Validator() && cp.get(Utf8FrameValidator.class) == null) {
// Add the UFT8 checking before this one.在前面添加帧验证器
cp.addBefore(ctx.name(), Utf8FrameValidator.class.getName(),
new Utf8FrameValidator());
}
}
添加完之后,管道中的处理器:
head ------->HttpServerCodec------>ChunkedWriteHandler----->HttpObjectAggregator---->WebSocketServerProtocolHandshakeHandler------>Utf8FrameValidator----->WebSocketServerProtocolHandler--------------->自定义处理器--------------->tail
WebSocketServerProtocolHandshakeHandler#channelRead
之后就是客户端发来HTTP
请求websocket
握手。HTTP
解码出完整消息后就传递到WebSocketServerProtocolHandshakeHandler
了,我们来看看他做了什么。
- 验证协议
url
。 - 验证
GET
的请求升级。 - 设置握手处理器。
- 移除处理器WebSocketServerProtocolHandshakeHandler
.
- 创建握手
WebSocketServerHandshaker
对象,进行握手。 - 启动一个定义任务进行超时回调。
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { final FullHttpRequest req = (FullHttpRequest) msg; if (!isWebSocketPath(req)) {//不是websocket路径就不管 ctx.fireChannelRead(msg); return; } try { if (!GET.equals(req.method())) {//只有GET支持的升级的 sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN, ctx.alloc().buffer(0))); return; } //创建握手工厂 final WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory( getWebSocketLocation(ctx.pipeline(), req, serverConfig.websocketPath()), serverConfig.subprotocols(), serverConfig.decoderConfig()); final WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(req);//创建一个握手处理器 final ChannelPromise localHandshakePromise = handshakePromise;//握手回调 if (handshaker == null) {//不支持的版本 WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); } else { // Ensure we set the handshaker and replace this handler before we // trigger the actual handshake. Otherwise we may receive websocket bytes in this handler // before we had a chance to replace it. // // See https://github.com/netty/netty/issues/9471. WebSocketServerProtocolHandler.setHandshaker(ctx.channel(), handshaker);//设置处理器 ctx.pipeline().remove(this);//移除当前处理器 final ChannelFuture handshakeFuture = handshaker.handshake(ctx.channel(), req); handshakeFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { if (!future.isSuccess()) {//发送不成功 localHandshakePromise.tryFailure(future.cause()); ctx.fireExceptionCaught(future.cause()); } else {//发送成功 localHandshakePromise.trySuccess(); // Kept for compatibility 保持兼容性 触发事件 ctx.fireUserEventTriggered(//这个HANDSHAKE_COMPLETE是过时的 WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE); ctx.fireUserEventTriggered(//这个是新的 new WebSocketServerProtocolHandler.HandshakeComplete( req.uri(), req.headers(), handshaker.selectedSubprotocol())); } } }); applyHandshakeTimeout(); } } finally { req.release(); } }
isWebSocketPath验证URL
这个主要就是验证URL
是否是WebSocke
的URL
,主要就是判断创建时候传进去的这个"/
hello",默认是比较整个字符串,不是比较开头。
:
private boolean isWebSocketPath(FullHttpRequest req) { String websocketPath = serverConfig.websocketPath(); String uri = req.uri(); boolean checkStartUri = uri.startsWith(websocketPath); boolean checkNextUri = checkNextUri(uri, websocketPath); return serverConfig.checkStartsWith() ? (checkStartUri && checkNextUri) : uri.equals(websocketPath); }
sendHttpResponse发送消息
如果响应的状态码不是200
或者请求不是设置长连接,就关闭通道了。
private static void sendHttpResponse(ChannelHandlerContext ctx, HttpRequest req, HttpResponse res) { ChannelFuture f = ctx.channel().writeAndFlush(res); if (!isKeepAlive(req) || res.status().code() != 200) {//req不支持KeepAlive,或者res状态码不是200就等写完成了关闭通道 f.addListener(ChannelFutureListener.CLOSE); } }
WebSocketServerHandshakerFactory的newHandshaker创建握手对象
根据请求头信息的sec-websocket-version
来决定要哪个版本的握手对象,一般都是13
,如果都不支持就会返回null
。
public WebSocketServerHandshaker newHandshaker(HttpRequest req) { CharSequence version = req.headers().get(HttpHeaderNames.SEC_WEBSOCKET_VERSION);//从请求头获取WEBSOCKET版本,根据不同版本,返回不同握手对象 if (version != null) { if (version.equals(WebSocketVersion.V13.toHttpHeaderValue())) { // Version 13 of the wire protocol - RFC 6455 (version 17 of the draft hybi specification). return new WebSocketServerHandshaker13( webSocketURL, subprotocols, decoderConfig); } else if (version.equals(WebSocketVersion.V08.toHttpHeaderValue())) { // Version 8 of the wire protocol - version 10 of the draft hybi specification. return new WebSocketServerHandshaker08( webSocketURL, subprotocols, decoderConfig); } else if (version.equals(WebSocketVersion.V07.toHttpHeaderValue())) { // Version 8 of the wire protocol - version 07 of the draft hybi specification. return new WebSocketServerHandshaker07( webSocketURL, subprotocols, decoderConfig); } else { return null; } } else { // Assume version 00 where version header was not specified return new WebSocketServerHandshaker00(webSocketURL, subprotocols, decoderConfig); } }
ctx.pipeline().remove(this);
只要握手对象创建好了,就不需要响应HTTP
了,直接就把当前处理器WebSocketServerProtocolHandler
给移除了。
head ------->HttpServerCodec------>ChunkedWriteHandler----->HttpObjectAggregator---->Utf8FrameValidator----->WebSocketServerProtocolHandler--------------->自定义处理器--------------->tail
WebSocketServerHandshaker的handshake
握手对象进行握手,其实就是发送响应数据。先会创建一个FullHttpResponse 响应,然后把跟HTTP相关的聚合,压缩处理器删除,如果有HttpServerCodec,那就在前面添加websocket的编解码器,等发送响应成功了把HttpServerCodec删了。如果是HTTP编解码器,就把解码器先替换成websocket的解码器,等发送响应成功了,再把编码器替换成websocket的编码器。
public final ChannelFuture handshake(Channel channel, FullHttpRequest req, HttpHeaders responseHeaders, final ChannelPromise promise) { if (logger.isDebugEnabled()) { logger.debug("{} WebSocket version {} server handshake", channel, version()); } FullHttpResponse response = newHandshakeResponse(req, responseHeaders);//创建响应 ChannelPipeline p = channel.pipeline(); if (p.get(HttpObjectAggregator.class) != null) { p.remove(HttpObjectAggregator.class);//删除聚合 } if (p.get(HttpContentCompressor.class) != null) {//删除压缩 p.remove(HttpContentCompressor.class); } ChannelHandlerContext ctx = p.context(HttpRequestDecoder.class);//请求解码器 final String encoderName; if (ctx == null) {//不存在 // this means the user use an HttpServerCodec ctx = p.context(HttpServerCodec.class);//HttpServerCodec是否存在 if (ctx == null) {//也不存在,就没办法解码http了,失败了 promise.setFailure( new IllegalStateException("No HttpDecoder and no HttpServerCodec in the pipeline")); return promise; }//在之前添加WebSocket编解码 p.addBefore(ctx.name(), "wsencoder", newWebSocketEncoder()); p.addBefore(ctx.name(), "wsdecoder", newWebsocketDecoder()); encoderName = ctx.name(); } else { p.replace(ctx.name(), "wsdecoder", newWebsocketDecoder());//替换HttpRequestDecoder encoderName = p.context(HttpResponseEncoder.class).name(); p.addBefore(encoderName, "wsencoder", newWebSocketEncoder());//在HttpResponseEncoder之前添加编码器 }//监听发出事件 channel.writeAndFlush(response).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { ChannelPipeline p = future.channel().pipeline(); p.remove(encoderName);//成功了就把http的编码器删除了,HttpServerCodec或者HttpResponseEncoder promise.setSuccess(); } else { promise.setFailure(future.cause()); } } }); return promise; }
发送回调前是这样:
head ------->WebSocketFrameDecoder------>WebSocketFrameEncoder----->HttpResponseDecoder---->Utf8FrameValidator----->WebSocketServerProtocolHandler--------------->自定义处理器--------------->tail
回调后:
head ------->WebSocketFrameDecoder------>WebSocketFrameEncoder---->Utf8FrameValidator----->WebSocketServerProtocolHandler--------------->自定义处理器--------------->tail
applyHandshakeTimeout
发送可能会等好久,所以就给了个超时的定时任务,默认设置是10
秒,超时了就触发超时事件,然后关闭通道,如果发送回调了,就把定时任务取消。
private void applyHandshakeTimeout() { final ChannelPromise localHandshakePromise = handshakePromise; final long handshakeTimeoutMillis = serverConfig.handshakeTimeoutMillis(); if (handshakeTimeoutMillis <= 0 || localHandshakePromise.isDone()) { return;//完成了就不管了 } //起一个定时任务 final Future<?> timeoutFuture = ctx.executor().schedule(new Runnable() { @Override public void run() { if (!localHandshakePromise.isDone() && localHandshakePromise.tryFailure(new WebSocketServerHandshakeException("handshake timed out"))) { ctx.flush()//没完成就刷出去,触发超时事件,然后关闭 .fireUserEventTriggered(ServerHandshakeStateEvent.HANDSHAKE_TIMEOUT) .close(); } } }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS); //如果成功了,就把超时任务取消 // Cancel the handshake timeout when handshake is finished. localHandshakePromise.addListener(new FutureListener<Void>() { @Override public void operationComplete(Future<Void> f) { timeoutFuture.cancel(false); } }); }
至此WebSocketServerProtocolHandshakeHandler
做的事就完成了其实就是握手的时候用HTTP
,然后就转到WebSocket
了,所以为什么会看到会有替换和删除处理器了。
WebSocket编解码器
p.addBefore(ctx.name(), "wsencoder", newWebSocketEncoder());
p.addBefore(ctx.name(), "wsdecoder", newWebsocketDecoder());
父类WebSocketServerHandshaker 只提供了抽象方法,由子类WebSocketServerHandshaker13 实现。因为 handshaker 由上文 WebSocketServerHandshakerFactory#newHandshaker 方法创建,返回结果 WebSocketServerHandshaker13。
WebSocketServerHandshaker13#newWebsocketDecoder
protected WebSocketFrameDecoder newWebsocketDecoder() { return new WebSocket13FrameDecoder(decoderConfig()); }
WebSocket13FrameDecoder只有构造方法,关键方法还是在WebSocket08FrameDecoder里面。
WebSocket08FrameDecoder解码器
属性
//读取状态 enum State { READING_FIRST,//第一次读一个字节 FIN, RSV, OPCODE READING_SECOND,//解析出MASK, PAYLOAD LEN描述 READING_SIZE,//解析具体长度PAYLOAD LEN MASKING_KEY,//解析掩码 PAYLOAD,//解析数据 CORRUPT//帧损坏了 } private static final byte OPCODE_CONT = 0x0;//连续的frame private static final byte OPCODE_TEXT = 0x1;//文本frame private static final byte OPCODE_BINARY = 0x2;//二进制frame private static final byte OPCODE_CLOSE = 0x8;//关闭帧 private static final byte OPCODE_PING = 0x9;//ping private static final byte OPCODE_PONG = 0xA;//pong
decode解码
和http类似,根据状态处理
- READING_FIRST:解析第一个字节,是不是最后一帧,扩展位怎么样,是什么帧类型。
- READING_SECOND:解析第二个字节,是否有掩码,数据长度是多少。
- READING_SIZE:处理长度,如果是0-125,那好办,如果是126,就要读取后面2个字节的数据,如果是127,就要读取后面8个字节的数据。
- MASKING_KEY:如果有掩码就解析出4字节掩码。
- PAYLOAD:解析出最后的数据。
- CORRUPT:帧数据可能损坏了,可能要关闭连接。
READING_FIRST
用了位操作去解析第一个字节,这里的Opcode实际上就是帧类型,比如0
表示持续的帧,1
表示文本帧,2
表示二进制帧等等。
case READING_FIRST: if (!in.isReadable()) { return; } framePayloadLength = 0; // FIN, RSV, OPCODE byte b = in.readByte(); frameFinalFlag = (b & 0x80) != 0;//取出FIN,表示是不是一帧的最后一段 frameRsv = (b & 0x70) >> 4;//取出RSV frameOpcode = b & 0x0F;//取出Opcode if (logger.isTraceEnabled()) { logger.trace("Decoding WebSocket Frame opCode={}", frameOpcode); } state = State.READING_SECOND;
READING_SECOND
然后读取掩码位,读取长度,进行一些合法性的检查,如果违反协议了,就直接发送关闭帧。
case READING_SECOND: if (!in.isReadable()) { return; } // MASK, PAYLOAD LEN 1 b = in.readByte();//再读一个字节 frameMasked = (b & 0x80) != 0;//读取掩码,1表示存在,4字节,0不存在 framePayloadLen1 = b & 0x7F;//获取内容长度 if (frameRsv != 0 && !config.allowExtensions()) {//有扩展标志位,但是不允许扩展 protocolViolation(ctx, in, "RSV != 0 and no extension negotiated, RSV:" + frameRsv); return; } if (!config.allowMaskMismatch() && config.expectMaskedFrames() != frameMasked) {//需要掩码加密,但是发来的没进行掩码加密 protocolViolation(ctx, in, "received a frame that is not masked as expected"); return; } //控制操作,关闭,ping,pong if (frameOpcode > 7) { // control frame (have MSB in opcode set) // control frames MUST NOT be fragmented if (!frameFinalFlag) {//控制帧不用分段了 protocolViolation(ctx, in, "fragmented control frame"); return; } // control frames MUST have payload 125 octets or less if (framePayloadLen1 > 125) {//长度超过125 protocolViolation(ctx, in, "control frame with payload length > 125 octets"); return; } //不为控制帧 // check for reserved control frame opcodes if (!(frameOpcode == OPCODE_CLOSE || frameOpcode == OPCODE_PING || frameOpcode == OPCODE_PONG)) { protocolViolation(ctx, in, "control frame using reserved opcode " + frameOpcode); return; } //关闭帧有内容的话,必须是2个字节的无符号整形表示状态码 // close frame : if there is a body, the first two bytes of the // body MUST be a 2-byte unsigned integer (in network byte // order) representing a getStatus code if (frameOpcode == 8 && framePayloadLen1 == 1) { protocolViolation(ctx, in, "received close control frame with payload len 1"); return; } } else { // data frame 数据帧,不是持续,文本,二进制帧的话也违反协议了 // check for reserved data frame opcodes if (!(frameOpcode == OPCODE_CONT || frameOpcode == OPCODE_TEXT || frameOpcode == OPCODE_BINARY)) { protocolViolation(ctx, in, "data frame using reserved opcode " + frameOpcode); return; } // check opcode vs message fragmentation state 1/2 if (fragmentedFramesCount == 0 && frameOpcode == OPCODE_CONT) {//是持续帧,帧个数为0 protocolViolation(ctx, in, "received continuation data frame outside fragmented message"); return; } //帧的端数不为0,但是不是持续帧,也不是ping // check opcode vs message fragmentation state 2/2 if (fragmentedFramesCount != 0 && frameOpcode != OPCODE_CONT && frameOpcode != OPCODE_PING) { protocolViolation(ctx, in, "received non-continuation data frame while inside fragmented message"); return; } } state = State.READING_SIZE;
protocolViolation违反协议
如果发现有违反协议的,直接把数据丢弃,如果通道没关闭,且设置了违反协议就关闭通道的话就发送关闭帧,抛出异常。
private void protocolViolation(ChannelHandlerContext ctx, ByteBuf in, String reason) { protocolViolation(ctx, in, WebSocketCloseStatus.PROTOCOL_ERROR, reason); } private void protocolViolation(ChannelHandlerContext ctx, ByteBuf in, WebSocketCloseStatus status, String reason) { protocolViolation(ctx, in, new CorruptedWebSocketFrameException(status, reason)); } private void protocolViolation(ChannelHandlerContext ctx, ByteBuf in, CorruptedWebSocketFrameException ex) { state = State.CORRUPT;//帧损坏的状态 int readableBytes = in.readableBytes(); if (readableBytes > 0) { // Fix for memory leak, caused by ByteToMessageDecoder#channelRead: // buffer 'cumulation' is released ONLY when no more readable bytes available. in.skipBytes(readableBytes);//略过,能帮助释放内存 } if (ctx.channel().isActive() && config.closeOnProtocolViolation()) {//帧坏了就关闭通道 Object closeMessage; if (receivedClosingHandshake) { closeMessage = Unpooled.EMPTY_BUFFER;//空帧 } else { WebSocketCloseStatus closeStatus = ex.closeStatus(); String reasonText = ex.getMessage(); if (reasonText == null) { reasonText = closeStatus.reasonText(); } closeMessage = new CloseWebSocketFrame(closeStatus, reasonText);//封装成关闭帧 } ctx.writeAndFlush(closeMessage).addListener(ChannelFutureListener.CLOSE);//发出去,成功后关闭通道 } throw ex;//抛出异常 }
READING_SIZE
处理长度的几种情况。
case READING_SIZE: // Read frame payload length if (framePayloadLen1 == 126) {//如果是126的话,紧跟着后面需要有两个字节的长度 if (in.readableBytes() < 2) { return; } framePayloadLength = in.readUnsignedShort();//读取2次节长度 if (framePayloadLength < 126) {//长度无效 protocolViolation(ctx, in, "invalid data frame length (not using minimal length encoding)"); return; } } else if (framePayloadLen1 == 127) {//如果是127,后面需要8个字节 if (in.readableBytes() < 8) { return; } framePayloadLength = in.readLong(); // TODO: check if it's bigger than 0x7FFFFFFFFFFFFFFF, Maybe // just check if it's negative? if (framePayloadLength < 65536) {//小于等于2字节的 protocolViolation(ctx, in, "invalid data frame length (not using minimal length encoding)"); return; } } else { framePayloadLength = framePayloadLen1;//0-125的情况 } //大于最大长度默认65536 if (framePayloadLength > config.maxFramePayloadLength()) { protocolViolation(ctx, in, WebSocketCloseStatus.MESSAGE_TOO_BIG, "Max frame length of " + config.maxFramePayloadLength() + " has been exceeded."); return; } if (logger.isTraceEnabled()) { logger.trace("Decoding WebSocket Frame length={}", framePayloadLength); } state = State.MASKING_KEY;
MASKING_KEY
解析出掩码,其实这个掩码加密解密只是用了异或^
。
case MASKING_KEY://解析掩码 if (frameMasked) {//有掩码 4字节的 if (in.readableBytes() < 4) { return; } if (maskingKey == null) { maskingKey = new byte[4]; } in.readBytes(maskingKey); } state = State.PAYLOAD;
PAYLOAD
有掩码先解码,然后根据不同的Opcode
类型封装成对应的帧数据。
case PAYLOAD://解析数据 if (in.readableBytes() < framePayloadLength) { return; } ByteBuf payloadBuffer = null; try { payloadBuffer = readBytes(ctx.alloc(), in, toFrameLength(framePayloadLength)); // Now we have all the data, the next checkpoint must be the next // frame state = State.READING_FIRST;//回到初始要解析的状态 // Unmask data if needed if (frameMasked) {//如果有掩码,要解码 unmask(payloadBuffer); } // Processing ping/pong/close frames because they cannot be // fragmented if (frameOpcode == OPCODE_PING) {//如果是ping out.add(new PingWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer)); payloadBuffer = null; return; } if (frameOpcode == OPCODE_PONG) {//如果是pong out.add(new PongWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer)); payloadBuffer = null; return; } if (frameOpcode == OPCODE_CLOSE) {//收到关闭帧,也要回一个关闭帧 receivedClosingHandshake = true; checkCloseFrameBody(ctx, payloadBuffer); out.add(new CloseWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer)); payloadBuffer = null; return; } // Processing for possible fragmented messages for text and binary // frames if (frameFinalFlag) {//是最后一帧 // Final frame of the sequence. Apparently ping frames are // allowed in the middle of a fragmented message if (frameOpcode != OPCODE_PING) {//允许中间发心跳帧,心跳帧不算,不是心跳帧才要清零 fragmentedFramesCount = 0; } } else { // Increment counter fragmentedFramesCount++;//帧个数+1,为持续帧 } // Return the frame if (frameOpcode == OPCODE_TEXT) {//文本类型 out.add(new TextWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer)); payloadBuffer = null; return; } else if (frameOpcode == OPCODE_BINARY) {//二进制 out.add(new BinaryWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer)); payloadBuffer = null; return; } else if (frameOpcode == OPCODE_CONT) {//持续帧 out.add(new ContinuationWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer)); payloadBuffer = null; return; } else { throw new UnsupportedOperationException("Cannot decode web socket frame with opcode: " + frameOpcode); } } finally { if (payloadBuffer != null) {//没有解析出来要释放 payloadBuffer.release(); } }
unmask解码
其实就是取出4
字节掩码,封装成一个整数,然后跟数据进行每次8
位的轮询的异或运算解码。
private void unmask(ByteBuf frame) { int i = frame.readerIndex(); int end = frame.writerIndex(); ByteOrder order = frame.order(); // Remark: & 0xFF is necessary because Java will do signed expansion from // byte to int which we don't want. int intMask = ((maskingKey[0] & 0xFF) << 24) | ((maskingKey[1] & 0xFF) << 16) | ((maskingKey[2] & 0xFF) << 8) | (maskingKey[3] & 0xFF); // If the byte order of our buffers it little endian we have to bring our mask // into the same format, because getInt() and writeInt() will use a reversed byte order if (order == ByteOrder.LITTLE_ENDIAN) { intMask = Integer.reverseBytes(intMask); } for (; i + 3 < end; i += 4) { int unmasked = frame.getInt(i) ^ intMask; frame.setInt(i, unmasked); } for (; i < end; i++) { frame.setByte(i, frame.getByte(i) ^ maskingKey[i % 4]); } }
CORRUPT
一般是有违反协议了,就丢弃了,但是就怕其他问题要读一帧,不然父类处理会出问题。
case CORRUPT://帧坏了 if (in.isReadable()) { // If we don't keep reading Netty will throw an exception saying // we can't return null if no bytes read and state not changed. in.readByte();;//要读一下,否则父类会报错 } return;
问题在于父类ByteToMessageDecoder的callDecode中有:
if (oldInputLength == in.readableBytes()) { throw new DecoderException( StringUtil.simpleClassName(getClass()) + ".decode() did not read anything but decoded a message."); }
Utf8FrameValidator
这个是验证文本帧是否是UTF8编码的。其实他就是检查是否是最后一帧,如果是文本帧的话就检测内容,不是UTF8的就抛异常。如果是持续帧,只有第一帧是文本的才会开始检测,所以后续来的肯定是文本帧,就不用判断是不是文本帧了,只要判断是不是在检测就好了。
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof WebSocketFrame) { WebSocketFrame frame = (WebSocketFrame) msg; try { // Processing for possible fragmented messages for text and binary // frames if (((WebSocketFrame) msg).isFinalFragment()) {//是最后帧 // Final frame of the sequence. Apparently ping frames are // allowed in the middle of a fragmented message if (!(frame instanceof PingWebSocketFrame)) { fragmentedFramesCount = 0; // Check text for UTF8 correctness 监测文本帧 if ((frame instanceof TextWebSocketFrame) || (utf8Validator != null && utf8Validator.isChecking())) { // Check UTF-8 correctness for this payload checkUTF8String(frame.content()); // This does a second check to make sure UTF-8 // correctness for entire text message utf8Validator.finish();//如果不是就报异常 } } } else {//不是最后帧 // Not final frame so we can expect more frames in the // fragmented sequence if (fragmentedFramesCount == 0) {//是第一帧,只检测文本 // First text or binary frame for a fragmented set if (frame instanceof TextWebSocketFrame) { checkUTF8String(frame.content());//检测内容 } } else {//不是第一帧,继续检测,因为前面是文本的,所以持续帧也肯定是 // Subsequent frames - only check if init frame is text if (utf8Validator != null && utf8Validator.isChecking()) { checkUTF8String(frame.content()); } } // Increment counter fragmentedFramesCount++;//帧数累加 } } catch (CorruptedWebSocketFrameException e) { frame.release(); throw e; } } super.channelRead(ctx, msg); }
WebSocketServerProtocolHandler#decode
主要是判断是不是关闭帧,是的话就拿出开始创建的握手对象,然后实现关闭,其实就是发送关闭帧。否则的话就让父类WebSocketProtocolHandler处理。
protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, List<Object> out) throws Exception { if (serverConfig.handleCloseFrames() && frame instanceof CloseWebSocketFrame) {//如果要处理关闭帧 WebSocketServerHandshaker handshaker = getHandshaker(ctx.channel()); if (handshaker != null) { frame.retain(); handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);//握手处理器来处理关闭 } else { ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);//直接处理 } return; } super.decode(ctx, frame, out); }
WebSocketProtocolHandler#decode
如果是心跳ping,pong帧的就响应,然后继续监听读消息,否则就将数据帧加进消息列表中。
protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, List<Object> out) throws Exception { if (frame instanceof PingWebSocketFrame) {//ping帧,写回pong,继续监听读事件,直接返回 frame.content().retain(); ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content())); readIfNeeded(ctx); return; } if (frame instanceof PongWebSocketFrame && dropPongFrames) {//丢弃pong帧 readIfNeeded(ctx); return; } out.add(frame.retain()); }
WebSocket08FrameEncoder解码器
解码器就是编码器反的来,其实就根据情况吧数据封装成协议的格式,比如封装成帧。
protected void encode(ChannelHandlerContext ctx, WebSocketFrame msg, List<Object> out) throws Exception { final ByteBuf data = msg.content(); byte[] mask; byte opcode; if (msg instanceof TextWebSocketFrame) { opcode = OPCODE_TEXT; } else if (msg instanceof PingWebSocketFrame) { opcode = OPCODE_PING; } else if (msg instanceof PongWebSocketFrame) { opcode = OPCODE_PONG; } else if (msg instanceof CloseWebSocketFrame) { opcode = OPCODE_CLOSE; } else if (msg instanceof BinaryWebSocketFrame) { opcode = OPCODE_BINARY; } else if (msg instanceof ContinuationWebSocketFrame) { opcode = OPCODE_CONT; } else { throw new UnsupportedOperationException("Cannot encode frame of type: " + msg.getClass().getName()); } int length = data.readableBytes(); if (logger.isTraceEnabled()) { logger.trace("Encoding WebSocket Frame opCode={} length={}", opcode, length); } //封装第一个字节: int b0 = 0; if (msg.isFinalFragment()) { b0 |= 1 << 7; } b0 |= msg.rsv() % 8 << 4; b0 |= opcode % 128; if (opcode == OPCODE_PING && length > 125) { throw new TooLongFrameException("invalid payload for PING (payload length must be <= 125, was " + length); } //处理长度: boolean release = true; ByteBuf buf = null; try { int maskLength = maskPayload ? 4 : 0; if (length <= 125) {//长度0-125 int size = 2 + maskLength; if (maskPayload || length <= GATHERING_WRITE_THRESHOLD) { size += length; } buf = ctx.alloc().buffer(size);//前面2个字节+掩码长度(4字节)+内容长度 buf.writeByte(b0); byte b = (byte) (maskPayload ? 0x80 | (byte) length : (byte) length); buf.writeByte(b); } else if (length <= 0xFFFF) {//内容2字节长度 int size = 4 + maskLength; if (maskPayload || length <= GATHERING_WRITE_THRESHOLD) { size += length; } buf = ctx.alloc().buffer(size); buf.writeByte(b0); buf.writeByte(maskPayload ? 0xFE : 126); buf.writeByte(length >>> 8 & 0xFF); buf.writeByte(length & 0xFF); } else { //内容8字节长度 int size = 10 + maskLength; if (maskPayload || length <= GATHERING_WRITE_THRESHOLD) { size += length; } buf = ctx.alloc().buffer(size); buf.writeByte(b0); buf.writeByte(maskPayload ? 0xFF : 127); buf.writeLong(length); } //处理掩码,这里默认服务器返回一般不用掩码,而且这里有一种优化,数据不过不太大的话,就合并成一个缓冲区一起发送: // Write payload if (maskPayload) {//掩码编码 int random = (int) (Math.random() * Integer.MAX_VALUE); mask = ByteBuffer.allocate(4).putInt(random).array(); buf.writeBytes(mask); ByteOrder srcOrder = data.order(); ByteOrder dstOrder = buf.order(); int counter = 0; int i = data.readerIndex(); int end = data.writerIndex(); if (srcOrder == dstOrder) { // Use the optimized path only when byte orders match // Remark: & 0xFF is necessary because Java will do signed expansion from // byte to int which we don't want. int intMask = ((mask[0] & 0xFF) << 24) | ((mask[1] & 0xFF) << 16) | ((mask[2] & 0xFF) << 8) | (mask[3] & 0xFF); // If the byte order of our buffers it little endian we have to bring our mask // into the same format, because getInt() and writeInt() will use a reversed byte order if (srcOrder == ByteOrder.LITTLE_ENDIAN) { intMask = Integer.reverseBytes(intMask); } for (; i + 3 < end; i += 4) { int intData = data.getInt(i); buf.writeInt(intData ^ intMask); } } for (; i < end; i++) { byte byteData = data.getByte(i); buf.writeByte(byteData ^ mask[counter++ % 4]); } out.add(buf); } else { if (buf.writableBytes() >= data.readableBytes()) {//可写长度如果大于等于内容大度,就合并成一个就发一次 // merge buffers as this is cheaper then a gathering write if the payload is small enough buf.writeBytes(data); out.add(buf); } else { out.add(buf); out.add(data.retain()); } } release = false; } finally { if (release && buf != null) { buf.release(); } } }