Netty 实现 websocket

现在网上网站为了实现推送基本都采用轮询的方式,比较新的轮询技术是comet,采用ajax,但是还是得发送请求,为了解决html效率低下的问题,html5定义了websocket协议。

服务端代码:

import java.util.concurrent.TimeUnit;

import org.apache.activemq.util.TimeUtils;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;

public class WebSocketServer {

    public void bind(int port) throws Exception {
        EventLoopGroup parentGroup = new NioEventLoopGroup();
        EventLoopGroup childGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(parentGroup, childGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch)
                                throws Exception {
                            ch.pipeline().addLast("http-codec",
                                    new HttpServerCodec());

                            ch.pipeline().addLast("aggregator",
                                    new HttpObjectAggregator(65536));

                            ch.pipeline().addLast("http-chunked",
                                    new ChunkedWriteHandler());

                            ch.pipeline().addLast("handler",
                                    new WebSocketServerHandler());
                        }
                        
                    });
            Channel f = b.bind(port).sync().channel();
            f.closeFuture().sync();
        } finally{
            parentGroup.shutdownGracefully();
            childGroup.shutdownGracefully();
        }
    }

    /**
     * @param args
     */
    public static void main(String[] args) throws Exception{
        // TODO Auto-generated method stub
        new WebSocketServer().bind(8080);
    }

}

handler

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderUtil;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.util.CharsetUtil;

import java.util.Date;

public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {

    private WebSocketServerHandshaker handshaker;

    @Override
    protected void messageReceived(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        // TODO Auto-generated method stub
        if(msg instanceof FullHttpRequest){
            handleHttpRequest(ctx, (FullHttpRequest)msg);
        }else if(msg instanceof WebSocketFrame){
            handleWebSocketFrame(ctx, (WebSocketFrame) msg);
        }
    }
    
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception{
        System.out.println("handleHttpRequest");
        if(!req.decoderResult().isSuccess() || !"websocket".equals(req.headers().get("Upgrade"))){
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
            return;
        }
        WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory("ws://localhost:8080/websocket", null, false);
        handshaker = factory.newHandshaker(req);
        
        if(handshaker == null){
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
        }else{
            handshaker.handshake(ctx.channel(), req);
        }
    }
    
    private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame){
        System.out.println("handleWebSocketFrame");
        if(frame instanceof CloseWebSocketFrame){
            handshaker.close(ctx.channel(), (CloseWebSocketFrame)frame.retain());
            return;
        }
        if(frame instanceof PingWebSocketFrame){
            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        if(!(frame instanceof TextWebSocketFrame)){
            throw new UnsupportedOperationException(String.format("%s frame types not support", frame.getClass().getName()));
        }
        String req = ((TextWebSocketFrame) frame).text();
        
        System.out.println(String.format("%s received %s", ctx.channel(), req));
        
        for(int i = 0; i < 10; i++){
            ctx.channel().writeAndFlush(new TextWebSocketFrame(req+",欢迎使用Netty Websocket服务,现在时刻:" + new Date().toString()));
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        
    }
    
    private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) throws Exception{
        if(res.status().code() != 200){
            ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(),CharsetUtil.UTF_8);
            res.content().writeBytes(buf);
            buf.release();
            HttpHeaderUtil.setContentLength(res, res.content().readableBytes());
        }
    
        ChannelFuture f = ctx.channel().writeAndFlush(res);
        if(!HttpHeaderUtil.isKeepAlive(req) || res.status().code() != 200){
            f.addListener(ChannelFutureListener.CLOSE);
        }
    }

}

html调用:

<html>
    <head>
        <meta charset="utf-8">
        Netty Socket时间服务器
    </head>
    <body>
        <script type="text/javascript">
            var socket;
            if(!window.WebSocket){
                window.WebSocket = window.MozWebSocket;
            }
        
            if(window.WebSocket){
                socket = new WebSocket("ws://localhost:8080/websocket");
                socket.onmessage = function(event){
                    var ta = document.getElementById("responseText");
                    ta.value = "";
                    ta.value = event.data;
                };
                socket.onopen = function(event){
                    var ta = document.getElementById("responseText");
                    ta.value ="打开websocket服务正常,浏览器支持websocket";                
                };
                socket.onclose = function(event){
                    var ta = document.getElementById("responseText");
                    ta.value = "";
                    ta.value = "websocket关闭";
                };
            }else{
                alert("抱歉,浏览器不支持websocket");
            }
            
            function send(msg){
                if(socket.readyState = WebSocket.OPEN){
                    socket.send(msg);
                }else{
                    alert("没有建立连接");
                }
            }
        </script>
        
        <form>
            <input type="text" name="message" value="Netty最佳"/>
            <input type="button" value="发送websocket请求信息" onclick="send(this.form.message.value)"/>
            <hr color="blue">
            <textarea id="responseText" style="500px;height:300px"></textarea>
            
        </form>
    </body>
</html>
原文地址:https://www.cnblogs.com/momofeng/p/5482827.html