本人对于netty框架的一些理解,怎么与网站上的websock建立连接

在Netty的里面有一个Boss,他开了一家公司(开启一个服务端口)对外提供业务服务,它手下有一群做事情的workers。Boss一直对外宣传自己公司提供的业务,并且接受(accept)有需要的客户(client),当一位客户找到Boss说需要他公司提供的业务,Boss便会为这位客户安排一个worker,这个worker全程为这位客户服务(read/write)。如果公司业务繁忙,一个worker可能会为多个客户进行服务。这就是Netty里面Boss和worker之间的关系。下面看看Netty是如何让Boss和Worker进行协助的。


private EventLoopGroup boss = new NioEventLoopGroup();
private EventLoopGroup work = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap()
        .group(boss, work)
        .channel(NioServerSocketChannel.class)
        .localAddress(new InetSocketAddress(nettyPort))
        //保持长连接
        .childOption(ChannelOption.SO_KEEPALIVE, true)
        .childHandler(new HeartbeatInitializer());
ChannelFuture future = bootstrap.bind().sync();
if (future.isSuccess()) {
    log.info("启动 Netty 成功");


}

上诉代码初始化了一条netty的服务,那么如何初始话的写在类HeartbeatInitializer里面

public class HeartbeatInitializer extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ch.pipeline()
                //五秒没有收到消息 将IdleStateHandler 添加到 ChannelPipeline 拦截器中
                .addLast(new IdleStateHandler(5, 0, 0))

                // HttpServerCodec:将请求和应答消息解码为HTTP消息
                .addLast("http-codec",new HttpServerCodec())
                // HttpObjectAggregator:将HTTP消息的多个部分合成一条完整的HTTP消息
                .addLast("aggregator",new HttpObjectAggregator(65536))
                // ChunkedWriteHandler:向客户端发送HTML5文件
                .addLast("http-chunked",new ChunkedWriteHandler())

                .addLast(new HeartBeatSimpleHandle());
    }
}

上述文件设置了 拦截器,解码和解码合并,还有响应,最后一个new HeartBeatSimpleHandle()用来处理请求

public class HeartBeatSimpleHandle extends SimpleChannelInboundHandler<Object> {

    private WebSocketServerHandshaker handShaker;

    /**
     * 取消绑定
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {

        NettySocketHolder.remove((NioSocketChannel) ctx.channel());
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        super.userEventTriggered(ctx, evt);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 传统的HTTP接入
        if (msg instanceof FullHttpRequest) {
            FullHttpRequest req = (FullHttpRequest) msg;
            handleHttpRequest(ctx, req);
            //获取url后置参数
            String uri=req.uri();
            QueryStringDecoder queryStringDecoder = new QueryStringDecoder(uri);
            Map<String, List<String>> parameters = queryStringDecoder.parameters();
            Integer userId = Integer.valueOf(parameters.get("userId").get(0));
            // 存储当前登录ctx
            if(NettySocketHolder.get((long)userId) == null){
                NettySocketHolder.put((long)userId, (NioSocketChannel) ctx.channel());
            }
            // WebSocket接入
        } else if (msg instanceof WebSocketFrame) {
            if ("live".equals(ctx.channel().attr(AttributeKey.valueOf("type")).get())) {
                handlerWebSocketFrame(ctx, (WebSocketFrame) msg);
            }
            log.info("收到msg={},id={}", msg);
            //保存客户端与 Channel 之间的关系
        }
    }

    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
        // 如果HTTP解码失败,返回HTTP异常
        if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) {
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
            return;
        }
        //获取url后置参数
        HttpMethod method=req.method();
        String uri=req.uri();
        QueryStringDecoder queryStringDecoder = new QueryStringDecoder(uri);
        Map<String, List<String>> parameters = queryStringDecoder.parameters();
        if(method==HttpMethod.GET&&"/websocket".equals(uri)){
            //...处理
            ctx.channel().attr(AttributeKey.valueOf("type")).set("live");
        }
        // 构造握手响应返回,本机测试
        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
                "ws://"+req.headers().get(HttpHeaderNames.HOST)+uri, null, false);
        handShaker = wsFactory.newHandshaker(req);
        if (handShaker == null) {
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
        } else {
            handShaker.handshake(ctx.channel(), req);
        }
    }

    private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) {
        // 返回应答给客户端
        if (res.status().code() != 200) {
            ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
            res.content().writeBytes(buf);
            buf.release();
        }
        // 如果是非Keep-Alive,关闭连接
        ChannelFuture f = ctx.channel().writeAndFlush(res);
        if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) {
            f.addListener(ChannelFutureListener.CLOSE);
        }
    }

    private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
        // 判断是否关闭链路的指令
        if (frame instanceof CloseWebSocketFrame) {
            handShaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
        }
    }

HeartBeatSimpleHandle 用与处理管道(Channel)的读取工作,将管道储存在NettySocketHolder里面,用的时候取出。WebSocketServerHandshaker用于响应客户端的响应

  var websocket;

    $(function(){
        $.ajax({
            type: "POST",
            url: "/login",
            data: {
                userId: 2,
                account: "135541",
                userName: "123"
            },
            contentType: "application/x-www-form-urlencoded; charset=utf-8",
            dataType: "json",
            success: function (result) {
                websoketCannel(result.data.userId, result.data.account, result.data.userName);
            }
        });
    });

    function websoketCannel(userId, account, userName){

        //如果浏览器支持WebSocket
        if(window.WebSocket){
            websocket = new WebSocket("ws://localhost:1212/websocket?userId="+ userId +"&account="+ account +"&userName="+ userName +"");  //获得WebSocket对象

            //当有消息过来的时候触发
            websocket.onmessage = function(event){
                var data = JSON.parse(event.data);
                $("#message").text(data.msg);
            };

            //连接关闭的时候触发
            websocket.onclose = function(event){
                console.log("断开连接");
            };

            //连接打开的时候触发
            websocket.onopen = function(event){
                console.log("建立连接");
            }
        }else{
            alert("浏览器不支持WebSocket");
        }
    }

    function sendMsg(msg) { //发送消息
        if(window.WebSocket){
            if(websocket.readyState == WebSocket.OPEN) { //如果WebSocket是打开状态
                websocket.send(msg); //send()发送消息
            }
        }else{
            return;
        }
    }

上面代码是客户端如何连netty

原文地址:https://www.cnblogs.com/kangniuniu/p/11107768.html