WebSocket编解码器

WebSocket编解码器

客户端代码

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
<script type="text/javascript">
    var socket;
    if (window.WebSocket) {

        socket = new WebSocket("ws://localhost:7000/hello");

        //收到服务端回送的消息
        socket.onmessage = function (ev) {
            var rt = document.getElementById('responseText');
            rt.value = rt.value + "\n" + ev.data;
        }
        //连接开启
        socket.onopen = function (ev) {
            var rt = document.getElementById('responseText');
            rt.value = '连接开启了....';

        }
        //连接关闭
        socket.onclose = function (ev) {
            var rt = document.getElementById('responseText');
            rt.value = rt.value + "\n" + "连接关闭了.......";

        }
    } else {
        alert("当前浏览器不支持websocket");
    }

    function send(message) {
        if (window.WebSocket) {
            if (socket.readyState == WebSocket.OPEN) {
                socket.send(message);
            } else {
                alert("未连接")
            }
        }

    }
</script>
<form onsubmit="return false">

    <textarea name="message" style="height: 300px; 300px"></textarea>
    <input type="button" value="发送消息" onclick="send(this.form.message.value)"/>
    <textarea id="responseText" style="height: 300px; 300px"></textarea>
    <input type="button" value="清空" onclick="document.getElementById('responseText').value = ''"/>
</form>
</body>
</html>

服务端代码

public class WebSocketServer {
    private final int PORT = 7000;


    public static void main(String[] args) {
        new WebSocketServer().run();
    }

    public void run() {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .handler(new LoggingHandler())//在bossGroup增加一个日志处理器
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel channel) throws Exception {
                            //基于http协议,使用http的编码和解码器
                            channel.pipeline().addLast(new HttpServerCodec());
                            //是以块方式写,添加ChunkedWriteHandler处理器
                            channel.pipeline().addLast(new ChunkedWriteHandler());
                            //http数据在传输过程中是分段,HttpObjectAggregator,就是可以将多个段聚合
                            channel.pipeline().addLast(new HttpObjectAggregator(8192));
                            //websocket数据是以帧(frame)形式传递
                            //浏览器请求时 ws://localhost:7000/hello 表示请求的uri
                            //WebSocketServerProtocolHandler 核心功能将http协议升级为ws协议,保持长连接
                            channel.pipeline().addLast(new WebSocketServerProtocolHandler("/hello"));
                            //处理业务的handler
                            channel.pipeline().addLast(new SimpleChannelInboundHandler<TextWebSocketFrame>() {

                                /**
                                 *
                                 * @param ctx
                                 * @param msg 文本帧
                                 * @throws Exception
                                 */
                                @Override
                                protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
                                    System.out.println("服务器收到的信息:" + msg.text());
                                    ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器当前时间" + LocalDateTime.now() + msg.text()));
                                }

                                //当web客户端连接后,触发该方法
                                public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
                                    //id表示唯一值,LongText是唯一的,ShortText不是唯一的
                                    System.out.println("handlerAdded 被调用" + ctx.channel().id().asLongText());
                                    System.out.println("handlerAdded 被调用" + ctx.channel().id().asShortText());
                                }

                                public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
                                    System.out.println("handlerRemoved 被调用" + ctx.channel().id().asLongText());
                                    System.out.println("handlerRemoved 被调用" + ctx.channel().id().asShortText());
                                }

                                public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                                    System.out.println("异常信息:" + cause.getMessage());
                                    ctx.close();
                                }
                            });
                        }
                    });

            ChannelFuture channelFuture = bootstrap.bind(PORT).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();

        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }


}

WebSocketServerProtocolHandler的handlerAdded

初始化添加的时候,会调用handlerAdded进行处理器的添加,分别添加握手处理器WebSocketServerProtocolHandshakeHandler,UTF8文本帧验证器Utf8FrameValidator,关闭帧处理器WebSocketCloseFrameHandler:

public void handlerAdded(ChannelHandlerContext ctx) {
        ChannelPipeline cp = ctx.pipeline();
        if (cp.get(WebSocketServerProtocolHandshakeHandler.class) == null) {
            // Add the WebSocketHandshakeHandler before this one.在前面添加一个握手处理器
            cp.addBefore(ctx.name(), WebSocketServerProtocolHandshakeHandler.class.getName(),
                    new WebSocketServerProtocolHandshakeHandler(serverConfig));
        }
        if (serverConfig.decoderConfig().withUTF8Validator() && cp.get(Utf8FrameValidator.class) == null) {
            // Add the UFT8 checking before this one.在前面添加帧验证器
            cp.addBefore(ctx.name(), Utf8FrameValidator.class.getName(),
                    new Utf8FrameValidator());
        }
    }

添加完之后,管道中的处理器:

head ------->HttpServerCodec------>ChunkedWriteHandler----->HttpObjectAggregator---->WebSocketServerProtocolHandshakeHandler------>Utf8FrameValidator----->WebSocketServerProtocolHandler--------------->自定义处理器--------------->tail

WebSocketServerProtocolHandshakeHandler#channelRead

之后就是客户端发来HTTP请求websocket握手。HTTP解码出完整消息后就传递到WebSocketServerProtocolHandshakeHandler了,我们来看看他做了什么。

  • 验证协议url
  • 验证GET的请求升级。
  • 设置握手处理器。
  • 移除处理器WebSocketServerProtocolHandshakeHandler.
  • 创建握手WebSocketServerHandshaker 对象,进行握手。
  • 启动一个定义任务进行超时回调。
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
        final FullHttpRequest req = (FullHttpRequest) msg;
        if (!isWebSocketPath(req)) {//不是websocket路径就不管
            ctx.fireChannelRead(msg);
            return;
        }

        try {
            if (!GET.equals(req.method())) {//只有GET支持的升级的
                sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN, ctx.alloc().buffer(0)));
                return;
            }
       //创建握手工厂
            final WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
                    getWebSocketLocation(ctx.pipeline(), req, serverConfig.websocketPath()),
                    serverConfig.subprotocols(), serverConfig.decoderConfig());
            final WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(req);//创建一个握手处理器
            final ChannelPromise localHandshakePromise = handshakePromise;//握手回调
            if (handshaker == null) {//不支持的版本
                WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
            } else {
                // Ensure we set the handshaker and replace this handler before we
                // trigger the actual handshake. Otherwise we may receive websocket bytes in this handler
                // before we had a chance to replace it.
                //
                // See https://github.com/netty/netty/issues/9471.
                WebSocketServerProtocolHandler.setHandshaker(ctx.channel(), handshaker);//设置处理器
                ctx.pipeline().remove(this);//移除当前处理器

                final ChannelFuture handshakeFuture = handshaker.handshake(ctx.channel(), req);
                handshakeFuture.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) {
                        if (!future.isSuccess()) {//发送不成功
                            localHandshakePromise.tryFailure(future.cause());
                            ctx.fireExceptionCaught(future.cause());
                        } else {//发送成功
                            localHandshakePromise.trySuccess();
                            // Kept for compatibility  保持兼容性 触发事件
                            ctx.fireUserEventTriggered(//这个HANDSHAKE_COMPLETE是过时的
                                    WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE);
                            ctx.fireUserEventTriggered(//这个是新的
                                    new WebSocketServerProtocolHandler.HandshakeComplete(
                                            req.uri(), req.headers(), handshaker.selectedSubprotocol()));
                        }
                    }
                });
                applyHandshakeTimeout();
            }
        } finally {
            req.release();
        }
    }

isWebSocketPath验证URL

这个主要就是验证URL是否是WebSockeURL,主要就是判断创建时候传进去的这个"/hello",默认是比较整个字符串,不是比较开头。

private boolean isWebSocketPath(FullHttpRequest req) {
        String websocketPath = serverConfig.websocketPath();
        String uri = req.uri();
        boolean checkStartUri = uri.startsWith(websocketPath);
        boolean checkNextUri = checkNextUri(uri, websocketPath);
        return serverConfig.checkStartsWith() ? (checkStartUri && checkNextUri) : uri.equals(websocketPath);
    }

sendHttpResponse发送消息

如果响应的状态码不是200或者请求不是设置长连接,就关闭通道了。

    private static void sendHttpResponse(ChannelHandlerContext ctx, HttpRequest req, HttpResponse res) {
        ChannelFuture f = ctx.channel().writeAndFlush(res);
        if (!isKeepAlive(req) || res.status().code() != 200) {//req不支持KeepAlive,或者res状态码不是200就等写完成了关闭通道
            f.addListener(ChannelFutureListener.CLOSE);
        }
    }

WebSocketServerHandshakerFactory的newHandshaker创建握手对象

根据请求头信息的sec-websocket-version来决定要哪个版本的握手对象,一般都是13,如果都不支持就会返回null

    public WebSocketServerHandshaker newHandshaker(HttpRequest req) {

        CharSequence version = req.headers().get(HttpHeaderNames.SEC_WEBSOCKET_VERSION);//从请求头获取WEBSOCKET版本,根据不同版本,返回不同握手对象
        if (version != null) {
            if (version.equals(WebSocketVersion.V13.toHttpHeaderValue())) {
                // Version 13 of the wire protocol - RFC 6455 (version 17 of the draft hybi specification).
                return new WebSocketServerHandshaker13(
                        webSocketURL, subprotocols, decoderConfig);
            } else if (version.equals(WebSocketVersion.V08.toHttpHeaderValue())) {
                // Version 8 of the wire protocol - version 10 of the draft hybi specification.
                return new WebSocketServerHandshaker08(
                        webSocketURL, subprotocols, decoderConfig);
            } else if (version.equals(WebSocketVersion.V07.toHttpHeaderValue())) {
                // Version 8 of the wire protocol - version 07 of the draft hybi specification.
                return new WebSocketServerHandshaker07(
                        webSocketURL, subprotocols, decoderConfig);
            } else {
                return null;
            }
        } else {
            // Assume version 00 where version header was not specified
            return new WebSocketServerHandshaker00(webSocketURL, subprotocols, decoderConfig);
        }
    }

ctx.pipeline().remove(this);

只要握手对象创建好了,就不需要响应HTTP了,直接就把当前处理器WebSocketServerProtocolHandler给移除了。

head ------->HttpServerCodec------>ChunkedWriteHandler----->HttpObjectAggregator---->Utf8FrameValidator----->WebSocketServerProtocolHandler--------------->自定义处理器--------------->tail

WebSocketServerHandshaker的handshake

握手对象进行握手,其实就是发送响应数据。先会创建一个FullHttpResponse 响应,然后把跟HTTP相关的聚合,压缩处理器删除,如果有HttpServerCodec,那就在前面添加websocket的编解码器,等发送响应成功了把HttpServerCodec删了。如果是HTTP编解码器,就把解码器先替换成websocket的解码器,等发送响应成功了,再把编码器替换成websocket的编码器。

    public final ChannelFuture handshake(Channel channel, FullHttpRequest req,
                                            HttpHeaders responseHeaders, final ChannelPromise promise) {

        if (logger.isDebugEnabled()) {
            logger.debug("{} WebSocket version {} server handshake", channel, version());
        }
        FullHttpResponse response = newHandshakeResponse(req, responseHeaders);//创建响应
        ChannelPipeline p = channel.pipeline();
        if (p.get(HttpObjectAggregator.class) != null) {
            p.remove(HttpObjectAggregator.class);//删除聚合
        }
        if (p.get(HttpContentCompressor.class) != null) {//删除压缩
            p.remove(HttpContentCompressor.class);
        }
        ChannelHandlerContext ctx = p.context(HttpRequestDecoder.class);//请求解码器
        final String encoderName;
        if (ctx == null) {//不存在
            // this means the user use an HttpServerCodec
            ctx = p.context(HttpServerCodec.class);//HttpServerCodec是否存在
            if (ctx == null) {//也不存在,就没办法解码http了,失败了
                promise.setFailure(
                        new IllegalStateException("No HttpDecoder and no HttpServerCodec in the pipeline"));
                return promise;
            }//在之前添加WebSocket编解码
            p.addBefore(ctx.name(), "wsencoder", newWebSocketEncoder());
            p.addBefore(ctx.name(), "wsdecoder", newWebsocketDecoder());
            encoderName = ctx.name();
        } else {
            p.replace(ctx.name(), "wsdecoder", newWebsocketDecoder());//替换HttpRequestDecoder

            encoderName = p.context(HttpResponseEncoder.class).name();
            p.addBefore(encoderName, "wsencoder", newWebSocketEncoder());//在HttpResponseEncoder之前添加编码器
        }//监听发出事件
        channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    ChannelPipeline p = future.channel().pipeline();
                    p.remove(encoderName);//成功了就把http的编码器删除了,HttpServerCodec或者HttpResponseEncoder
                    promise.setSuccess();
                } else {
                    promise.setFailure(future.cause());
                }
            }
        });
        return promise;
    }

发送回调前是这样:

head ------->WebSocketFrameDecoder------>WebSocketFrameEncoder----->HttpResponseDecoder---->Utf8FrameValidator----->WebSocketServerProtocolHandler--------------->自定义处理器--------------->tail

回调后:

head ------->WebSocketFrameDecoder------>WebSocketFrameEncoder---->Utf8FrameValidator----->WebSocketServerProtocolHandler--------------->自定义处理器--------------->tail

applyHandshakeTimeout

发送可能会等好久,所以就给了个超时的定时任务,默认设置是10秒,超时了就触发超时事件,然后关闭通道,如果发送回调了,就把定时任务取消。

    private void applyHandshakeTimeout() {
        final ChannelPromise localHandshakePromise = handshakePromise;
        final long handshakeTimeoutMillis = serverConfig.handshakeTimeoutMillis();
        if (handshakeTimeoutMillis <= 0 || localHandshakePromise.isDone()) {
            return;//完成了就不管了
        }
     //起一个定时任务
        final Future<?> timeoutFuture = ctx.executor().schedule(new Runnable() {
            @Override
            public void run() {
                if (!localHandshakePromise.isDone() &&
                    localHandshakePromise.tryFailure(new WebSocketServerHandshakeException("handshake timed out"))) {
                    ctx.flush()//没完成就刷出去,触发超时事件,然后关闭
                       .fireUserEventTriggered(ServerHandshakeStateEvent.HANDSHAKE_TIMEOUT)
                       .close();
                }
            }
        }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS);
     //如果成功了,就把超时任务取消
        // Cancel the handshake timeout when handshake is finished.
        localHandshakePromise.addListener(new FutureListener<Void>() {
            @Override
            public void operationComplete(Future<Void> f) {
                timeoutFuture.cancel(false);
            }
        });
    }

至此WebSocketServerProtocolHandshakeHandler做的事就完成了其实就是握手的时候用HTTP,然后就转到WebSocket了,所以为什么会看到会有替换和删除处理器了。

WebSocket编解码器

p.addBefore(ctx.name(), "wsencoder", newWebSocketEncoder());
p.addBefore(ctx.name(), "wsdecoder", newWebsocketDecoder());

父类WebSocketServerHandshaker 只提供了抽象方法,由子类WebSocketServerHandshaker13 实现。因为 handshaker 由上文 WebSocketServerHandshakerFactory#newHandshaker 方法创建,返回结果 WebSocketServerHandshaker13。

WebSocketServerHandshaker13#newWebsocketDecoder

    protected WebSocketFrameDecoder newWebsocketDecoder() {
        return new WebSocket13FrameDecoder(decoderConfig());
    }

WebSocket13FrameDecoder只有构造方法,关键方法还是在WebSocket08FrameDecoder里面。

WebSocket08FrameDecoder解码器

属性

    //读取状态
    enum State {
        READING_FIRST,//第一次读一个字节 FIN, RSV, OPCODE
        READING_SECOND,//解析出MASK, PAYLOAD LEN描述
        READING_SIZE,//解析具体长度PAYLOAD LEN
        MASKING_KEY,//解析掩码
        PAYLOAD,//解析数据
        CORRUPT//帧损坏了
    }


    private static final byte OPCODE_CONT = 0x0;//连续的frame
    private static final byte OPCODE_TEXT = 0x1;//文本frame
    private static final byte OPCODE_BINARY = 0x2;//二进制frame
    private static final byte OPCODE_CLOSE = 0x8;//关闭帧
    private static final byte OPCODE_PING = 0x9;//ping
    private static final byte OPCODE_PONG = 0xA;//pong

decode解码

和http类似,根据状态处理

  1. READING_FIRST:解析第一个字节,是不是最后一帧,扩展位怎么样,是什么帧类型。
  2. READING_SECOND:解析第二个字节,是否有掩码,数据长度是多少。
  3. READING_SIZE:处理长度,如果是0-125,那好办,如果是126,就要读取后面2个字节的数据,如果是127,就要读取后面8个字节的数据。
  4. MASKING_KEY:如果有掩码就解析出4字节掩码。
  5. PAYLOAD:解析出最后的数据。
  6. CORRUPT:帧数据可能损坏了,可能要关闭连接。

READING_FIRST

用了位操作去解析第一个字节,这里的Opcode实际上就是帧类型,比如0表示持续的帧,1表示文本帧,2表示二进制帧等等。

case READING_FIRST:
            if (!in.isReadable()) {
                return;
            }

            framePayloadLength = 0;

            // FIN, RSV, OPCODE
            byte b = in.readByte();
            frameFinalFlag = (b & 0x80) != 0;//取出FIN,表示是不是一帧的最后一段
            frameRsv = (b & 0x70) >> 4;//取出RSV
            frameOpcode = b & 0x0F;//取出Opcode

            if (logger.isTraceEnabled()) {
                logger.trace("Decoding WebSocket Frame opCode={}", frameOpcode);
            }

            state = State.READING_SECOND;

READING_SECOND

然后读取掩码位,读取长度,进行一些合法性的检查,如果违反协议了,就直接发送关闭帧。

        case READING_SECOND:
            if (!in.isReadable()) {
                return;
            }
            // MASK, PAYLOAD LEN 1
            b = in.readByte();//再读一个字节
            frameMasked = (b & 0x80) != 0;//读取掩码,1表示存在,4字节,0不存在
            framePayloadLen1 = b & 0x7F;//获取内容长度

            if (frameRsv != 0 && !config.allowExtensions()) {//有扩展标志位,但是不允许扩展
                protocolViolation(ctx, in, "RSV != 0 and no extension negotiated, RSV:" + frameRsv);
                return;
            }

            if (!config.allowMaskMismatch() && config.expectMaskedFrames() != frameMasked) {//需要掩码加密,但是发来的没进行掩码加密
                protocolViolation(ctx, in, "received a frame that is not masked as expected");
                return;
            }
        //控制操作,关闭,ping,pong
            if (frameOpcode > 7) { // control frame (have MSB in opcode set)

                // control frames MUST NOT be fragmented
                if (!frameFinalFlag) {//控制帧不用分段了
                    protocolViolation(ctx, in, "fragmented control frame");
                    return;
                }

                // control frames MUST have payload 125 octets or less
                if (framePayloadLen1 > 125) {//长度超过125
                    protocolViolation(ctx, in, "control frame with payload length > 125 octets");
                    return;
                }  
          //不为控制帧
                // check for reserved control frame opcodes
                if (!(frameOpcode == OPCODE_CLOSE || frameOpcode == OPCODE_PING
                      || frameOpcode == OPCODE_PONG)) {
                    protocolViolation(ctx, in, "control frame using reserved opcode " + frameOpcode);
                    return;
                }
          //关闭帧有内容的话,必须是2个字节的无符号整形表示状态码
                // close frame : if there is a body, the first two bytes of the
                // body MUST be a 2-byte unsigned integer (in network byte
                // order) representing a getStatus code
                if (frameOpcode == 8 && framePayloadLen1 == 1) {
                    protocolViolation(ctx, in, "received close control frame with payload len 1");
                    return;
                }
            } else { // data frame  数据帧,不是持续,文本,二进制帧的话也违反协议了
                // check for reserved data frame opcodes
                if (!(frameOpcode == OPCODE_CONT || frameOpcode == OPCODE_TEXT
                      || frameOpcode == OPCODE_BINARY)) {
                    protocolViolation(ctx, in, "data frame using reserved opcode " + frameOpcode);
                    return;
                }

                // check opcode vs message fragmentation state 1/2
                if (fragmentedFramesCount == 0 && frameOpcode == OPCODE_CONT) {//是持续帧,帧个数为0
                    protocolViolation(ctx, in, "received continuation data frame outside fragmented message");
                    return;
                }
          //帧的端数不为0,但是不是持续帧,也不是ping
                // check opcode vs message fragmentation state 2/2
                if (fragmentedFramesCount != 0 && frameOpcode != OPCODE_CONT && frameOpcode != OPCODE_PING) {
                    protocolViolation(ctx, in,
                                      "received non-continuation data frame while inside fragmented message");
                    return;
                }
            }

            state = State.READING_SIZE;

protocolViolation违反协议

如果发现有违反协议的,直接把数据丢弃,如果通道没关闭,且设置了违反协议就关闭通道的话就发送关闭帧,抛出异常。

    private void protocolViolation(ChannelHandlerContext ctx, ByteBuf in, String reason) {
        protocolViolation(ctx, in, WebSocketCloseStatus.PROTOCOL_ERROR, reason);
    }

    private void protocolViolation(ChannelHandlerContext ctx, ByteBuf in, WebSocketCloseStatus status, String reason) {
        protocolViolation(ctx, in, new CorruptedWebSocketFrameException(status, reason));
    }

    private void protocolViolation(ChannelHandlerContext ctx, ByteBuf in, CorruptedWebSocketFrameException ex) {
        state = State.CORRUPT;//帧损坏的状态
        int readableBytes = in.readableBytes();
        if (readableBytes > 0) {
            // Fix for memory leak, caused by ByteToMessageDecoder#channelRead:
            // buffer 'cumulation' is released ONLY when no more readable bytes available.
            in.skipBytes(readableBytes);//略过,能帮助释放内存
        }
        if (ctx.channel().isActive() && config.closeOnProtocolViolation()) {//帧坏了就关闭通道
            Object closeMessage;
            if (receivedClosingHandshake) {
                closeMessage = Unpooled.EMPTY_BUFFER;//空帧
            } else {
                WebSocketCloseStatus closeStatus = ex.closeStatus();
                String reasonText = ex.getMessage();
                if (reasonText == null) {
                    reasonText = closeStatus.reasonText();
                }
                closeMessage = new CloseWebSocketFrame(closeStatus, reasonText);//封装成关闭帧
            }
            ctx.writeAndFlush(closeMessage).addListener(ChannelFutureListener.CLOSE);//发出去,成功后关闭通道
        }
        throw ex;//抛出异常
    }

READING_SIZE

处理长度的几种情况。

        case READING_SIZE:

            // Read frame payload length
            if (framePayloadLen1 == 126) {//如果是126的话,紧跟着后面需要有两个字节的长度
                if (in.readableBytes() < 2) {
                    return;
                }
                framePayloadLength = in.readUnsignedShort();//读取2次节长度
                if (framePayloadLength < 126) {//长度无效
                    protocolViolation(ctx, in, "invalid data frame length (not using minimal length encoding)");
                    return;
                }
            } else if (framePayloadLen1 == 127) {//如果是127,后面需要8个字节
                if (in.readableBytes() < 8) {
                    return;
                }
                framePayloadLength = in.readLong();
                // TODO: check if it's bigger than 0x7FFFFFFFFFFFFFFF, Maybe
                // just check if it's negative?

                if (framePayloadLength < 65536) {//小于等于2字节的
                    protocolViolation(ctx, in, "invalid data frame length (not using minimal length encoding)");
                    return;
                }
            } else {
                framePayloadLength = framePayloadLen1;//0-125的情况
            }
        //大于最大长度默认65536
            if (framePayloadLength > config.maxFramePayloadLength()) {
                protocolViolation(ctx, in, WebSocketCloseStatus.MESSAGE_TOO_BIG,
                    "Max frame length of " + config.maxFramePayloadLength() + " has been exceeded.");
                return;
            }

            if (logger.isTraceEnabled()) {
                logger.trace("Decoding WebSocket Frame length={}", framePayloadLength);
            }

            state = State.MASKING_KEY;

MASKING_KEY

解析出掩码,其实这个掩码加密解密只是用了异或^

        case MASKING_KEY://解析掩码
            if (frameMasked) {//有掩码 4字节的
                if (in.readableBytes() < 4) {
                    return;
                }
                if (maskingKey == null) {
                    maskingKey = new byte[4];
                }
                in.readBytes(maskingKey);
            }
            state = State.PAYLOAD;

PAYLOAD

有掩码先解码,然后根据不同的Opcode类型封装成对应的帧数据。

        case PAYLOAD://解析数据
            if (in.readableBytes() < framePayloadLength) {
                return;
            }

            ByteBuf payloadBuffer = null;
            try {
                payloadBuffer = readBytes(ctx.alloc(), in, toFrameLength(framePayloadLength));

                // Now we have all the data, the next checkpoint must be the next
                // frame
                state = State.READING_FIRST;//回到初始要解析的状态

                // Unmask data if needed
                if (frameMasked) {//如果有掩码,要解码
                    unmask(payloadBuffer);
                }

                // Processing ping/pong/close frames because they cannot be
                // fragmented
                if (frameOpcode == OPCODE_PING) {//如果是ping
                    out.add(new PingWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer));
                    payloadBuffer = null;
                    return;
                }
                if (frameOpcode == OPCODE_PONG) {//如果是pong
                    out.add(new PongWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer));
                    payloadBuffer = null;
                    return;
                }
                if (frameOpcode == OPCODE_CLOSE) {//收到关闭帧,也要回一个关闭帧
                    receivedClosingHandshake = true;
                    checkCloseFrameBody(ctx, payloadBuffer);
                    out.add(new CloseWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer));
                    payloadBuffer = null;
                    return;
                }

                // Processing for possible fragmented messages for text and binary
                // frames
                if (frameFinalFlag) {//是最后一帧
                    // Final frame of the sequence. Apparently ping frames are
                    // allowed in the middle of a fragmented message
                    if (frameOpcode != OPCODE_PING) {//允许中间发心跳帧,心跳帧不算,不是心跳帧才要清零
                        fragmentedFramesCount = 0;
                    }
                } else {
                    // Increment counter
                    fragmentedFramesCount++;//帧个数+1,为持续帧
                }

                // Return the frame
                if (frameOpcode == OPCODE_TEXT) {//文本类型
                    out.add(new TextWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer));
                    payloadBuffer = null;
                    return;
                } else if (frameOpcode == OPCODE_BINARY) {//二进制
                    out.add(new BinaryWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer));
                    payloadBuffer = null;
                    return;
                } else if (frameOpcode == OPCODE_CONT) {//持续帧
                    out.add(new ContinuationWebSocketFrame(frameFinalFlag, frameRsv,
                                                           payloadBuffer));
                    payloadBuffer = null;
                    return;
                } else {
                    throw new UnsupportedOperationException("Cannot decode web socket frame with opcode: "
                                                            + frameOpcode);
                }
            } finally {
                if (payloadBuffer != null) {//没有解析出来要释放
                    payloadBuffer.release();
                }
            }

unmask解码

其实就是取出4字节掩码,封装成一个整数,然后跟数据进行每次8位的轮询的异或运算解码。

    private void unmask(ByteBuf frame) {
        int i = frame.readerIndex();
        int end = frame.writerIndex();

        ByteOrder order = frame.order();

        // Remark: & 0xFF is necessary because Java will do signed expansion from
        // byte to int which we don't want.
        int intMask = ((maskingKey[0] & 0xFF) << 24)
                    | ((maskingKey[1] & 0xFF) << 16)
                    | ((maskingKey[2] & 0xFF) << 8)
                    | (maskingKey[3] & 0xFF);

        // If the byte order of our buffers it little endian we have to bring our mask
        // into the same format, because getInt() and writeInt() will use a reversed byte order
        if (order == ByteOrder.LITTLE_ENDIAN) {
            intMask = Integer.reverseBytes(intMask);
        }

        for (; i + 3 < end; i += 4) {
            int unmasked = frame.getInt(i) ^ intMask;
            frame.setInt(i, unmasked);
        }
        for (; i < end; i++) {
            frame.setByte(i, frame.getByte(i) ^ maskingKey[i % 4]);
        }
    }

CORRUPT

一般是有违反协议了,就丢弃了,但是就怕其他问题要读一帧,不然父类处理会出问题。

        case CORRUPT://帧坏了
            if (in.isReadable()) {
                // If we don't keep reading Netty will throw an exception saying
                // we can't return null if no bytes read and state not changed.
                in.readByte();;//要读一下,否则父类会报错
            }
            return;

问题在于父类ByteToMessageDecoder的callDecode中有:

                if (oldInputLength == in.readableBytes()) {
                    throw new DecoderException(
                            StringUtil.simpleClassName(getClass()) +
                                    ".decode() did not read anything but decoded a message.");
                }

Utf8FrameValidator

这个是验证文本帧是否是UTF8编码的。其实他就是检查是否是最后一帧,如果是文本帧的话就检测内容,不是UTF8的就抛异常。如果是持续帧,只有第一帧是文本的才会开始检测,所以后续来的肯定是文本帧,就不用判断是不是文本帧了,只要判断是不是在检测就好了。

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof WebSocketFrame) {
            WebSocketFrame frame = (WebSocketFrame) msg;

            try {
                // Processing for possible fragmented messages for text and binary
                // frames
                if (((WebSocketFrame) msg).isFinalFragment()) {//是最后帧
                    // Final frame of the sequence. Apparently ping frames are
                    // allowed in the middle of a fragmented message
                    if (!(frame instanceof PingWebSocketFrame)) {
                        fragmentedFramesCount = 0;

                        // Check text for UTF8 correctness 监测文本帧
                        if ((frame instanceof TextWebSocketFrame) ||
                                (utf8Validator != null && utf8Validator.isChecking())) {
                            // Check UTF-8 correctness for this payload
                            checkUTF8String(frame.content());

                            // This does a second check to make sure UTF-8
                            // correctness for entire text message
                            utf8Validator.finish();//如果不是就报异常
                        }
                    }
                } else {//不是最后帧
                    // Not final frame so we can expect more frames in the
                    // fragmented sequence
                    if (fragmentedFramesCount == 0) {//是第一帧,只检测文本
                        // First text or binary frame for a fragmented set
                        if (frame instanceof TextWebSocketFrame) {
                            checkUTF8String(frame.content());//检测内容
                        }
                    } else {//不是第一帧,继续检测,因为前面是文本的,所以持续帧也肯定是
                        // Subsequent frames - only check if init frame is text
                        if (utf8Validator != null && utf8Validator.isChecking()) {
                            checkUTF8String(frame.content());
                        }
                    }

                    // Increment counter
                    fragmentedFramesCount++;//帧数累加
                }
            } catch (CorruptedWebSocketFrameException e) {
                frame.release();
                throw e;
            }
        }

        super.channelRead(ctx, msg);
    }

WebSocketServerProtocolHandler#decode

主要是判断是不是关闭帧,是的话就拿出开始创建的握手对象,然后实现关闭,其实就是发送关闭帧。否则的话就让父类WebSocketProtocolHandler处理。

    protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, List<Object> out) throws Exception {
        if (serverConfig.handleCloseFrames() && frame instanceof CloseWebSocketFrame) {//如果要处理关闭帧
            WebSocketServerHandshaker handshaker = getHandshaker(ctx.channel());
            if (handshaker != null) {
                frame.retain();
                handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);//握手处理器来处理关闭
            } else {
                ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);//直接处理
            }
            return;
        }
        super.decode(ctx, frame, out);
    }

WebSocketProtocolHandler#decode

如果是心跳ping,pong帧的就响应,然后继续监听读消息,否则就将数据帧加进消息列表中。

    protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, List<Object> out) throws Exception {
        if (frame instanceof PingWebSocketFrame) {//ping帧,写回pong,继续监听读事件,直接返回
            frame.content().retain();
            ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content()));
            readIfNeeded(ctx);
            return;
        }
        if (frame instanceof PongWebSocketFrame && dropPongFrames) {//丢弃pong帧
            readIfNeeded(ctx);
            return;
        }

        out.add(frame.retain());
    }

WebSocket08FrameEncoder解码器

解码器就是编码器反的来,其实就根据情况吧数据封装成协议的格式,比如封装成帧。

    protected void encode(ChannelHandlerContext ctx, WebSocketFrame msg, List<Object> out) throws Exception {
        final ByteBuf data = msg.content();
        byte[] mask;

        byte opcode;
        if (msg instanceof TextWebSocketFrame) {
            opcode = OPCODE_TEXT;
        } else if (msg instanceof PingWebSocketFrame) {
            opcode = OPCODE_PING;
        } else if (msg instanceof PongWebSocketFrame) {
            opcode = OPCODE_PONG;
        } else if (msg instanceof CloseWebSocketFrame) {
            opcode = OPCODE_CLOSE;
        } else if (msg instanceof BinaryWebSocketFrame) {
            opcode = OPCODE_BINARY;
        } else if (msg instanceof ContinuationWebSocketFrame) {
            opcode = OPCODE_CONT;
        } else {
            throw new UnsupportedOperationException("Cannot encode frame of type: " + msg.getClass().getName());
        }

        int length = data.readableBytes();

        if (logger.isTraceEnabled()) {
            logger.trace("Encoding WebSocket Frame opCode={} length={}", opcode, length);
        }
     //封装第一个字节:
        int b0 = 0;
        if (msg.isFinalFragment()) {
            b0 |= 1 << 7;
        }
        b0 |= msg.rsv() % 8 << 4;
        b0 |= opcode % 128;

        if (opcode == OPCODE_PING && length > 125) {
            throw new TooLongFrameException("invalid payload for PING (payload length must be <= 125, was "
                    + length);
        }
     //处理长度:
        boolean release = true;
        ByteBuf buf = null;
        try {
            int maskLength = maskPayload ? 4 : 0;
            if (length <= 125) {//长度0-125
                int size = 2 + maskLength;
                if (maskPayload || length <= GATHERING_WRITE_THRESHOLD) {
                    size += length;
                }
                buf = ctx.alloc().buffer(size);//前面2个字节+掩码长度(4字节)+内容长度
                buf.writeByte(b0);
                byte b = (byte) (maskPayload ? 0x80 | (byte) length : (byte) length);
                buf.writeByte(b);
            } else if (length <= 0xFFFF) {//内容2字节长度
                int size = 4 + maskLength;
                if (maskPayload || length <= GATHERING_WRITE_THRESHOLD) {
                    size += length;
                }
                buf = ctx.alloc().buffer(size);
                buf.writeByte(b0);
                buf.writeByte(maskPayload ? 0xFE : 126);
                buf.writeByte(length >>> 8 & 0xFF);
                buf.writeByte(length & 0xFF);
            } else {  //内容8字节长度
                int size = 10 + maskLength;
                if (maskPayload || length <= GATHERING_WRITE_THRESHOLD) {
                    size += length;
                }
                buf = ctx.alloc().buffer(size);
                buf.writeByte(b0);
                buf.writeByte(maskPayload ? 0xFF : 127);
                buf.writeLong(length);
            }
       //处理掩码,这里默认服务器返回一般不用掩码,而且这里有一种优化,数据不过不太大的话,就合并成一个缓冲区一起发送:
            // Write payload
            if (maskPayload) {//掩码编码
                int random = (int) (Math.random() * Integer.MAX_VALUE);
                mask = ByteBuffer.allocate(4).putInt(random).array();
                buf.writeBytes(mask);

                ByteOrder srcOrder = data.order();
                ByteOrder dstOrder = buf.order();

                int counter = 0;
                int i = data.readerIndex();
                int end = data.writerIndex();

                if (srcOrder == dstOrder) {
                    // Use the optimized path only when byte orders match
                    // Remark: & 0xFF is necessary because Java will do signed expansion from
                    // byte to int which we don't want.
                    int intMask = ((mask[0] & 0xFF) << 24)
                                | ((mask[1] & 0xFF) << 16)
                                | ((mask[2] & 0xFF) << 8)
                                | (mask[3] & 0xFF);

                    // If the byte order of our buffers it little endian we have to bring our mask
                    // into the same format, because getInt() and writeInt() will use a reversed byte order
                    if (srcOrder == ByteOrder.LITTLE_ENDIAN) {
                        intMask = Integer.reverseBytes(intMask);
                    }

                    for (; i + 3 < end; i += 4) {
                        int intData = data.getInt(i);
                        buf.writeInt(intData ^ intMask);
                    }
                }
                for (; i < end; i++) {
                    byte byteData = data.getByte(i);
                    buf.writeByte(byteData ^ mask[counter++ % 4]);
                }
                out.add(buf);
            } else {
                if (buf.writableBytes() >= data.readableBytes()) {//可写长度如果大于等于内容大度,就合并成一个就发一次
                    // merge buffers as this is cheaper then a gathering write if the payload is small enough
                    buf.writeBytes(data);
                    out.add(buf);
                } else {
                    out.add(buf);
                    out.add(data.retain());
                }
            }
            release = false;
        } finally {
            if (release && buf != null) {
                buf.release();
            }
        }
    }
原文地址:https://www.cnblogs.com/xiaojiesir/p/15522341.html