Netty 服务端创建过程

1.首先通过构造函数创建ServerBootstrap 实例,ServerBootStrap是Netty的启动辅助类。用于设置服务端启动相关的参数

ServerBootstrap bootstrap = new ServerBootstrap();

2.设置并绑定Reactor线程池,也就是创建EventLoopGroup对象,管理相关业务。

 EventLoopGroup bossGroup = new NioEventLoopGroup();
 EventLoopGroup wordGroup = new NioEventLoopGroup();
NioEventLoopGroup实际上就是Reactor线程池,负责调度和执行客户端的接入、网络读写事件的处理、用户自定义任务和定时任务的执行。

NettyServerBootStrap.Group()源码:
  public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
        super.group(parentGroup);
        if (childGroup == null) {
            throw new NullPointerException("childGroup");
        } else if (this.childGroup != null) {
            throw new IllegalStateException("childGroup set already");
        } else {
            this.childGroup = childGroup;
            return this;
        }
    }

3.设置并且绑定服务端通道(Channel),Netty中也就是NioServerSocketChannel。

4.在链路创建的时候初始化ChannelPipeline。ChannelPipeline主要负责管理执行相关的网络事件(根据ChannelHandler的执行策略调度ChannelHandler执行),它本质上是一个负责处理网络事件的职责链(用参数配置),负责管理和执行ChannelHandler。

第3步和第4步通过链式:

serverBootstrap.group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChildChannelInitializer());

调度的ChannelPipeline:

class ChildChannelInitializer extends ChannelInitializer<SocketChannel>{

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        pipeline.addLast("http-codec", new HttpServerCodec());
        pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
        socketChannel.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
        pipeline.addLast("handler", new WebSocketServerHandler());
    }
}

5.ChannelPipeline调度ChannelHandler。ChannelHandler(下面的WebSocketServerHandler)是处理业务的关键接口,是Netty提供给用户定制和扩展需要实现的接口。比如下面这里就实现了处理网络请求、发送响应信息等。ChannelHandler提供了很多定制化的处理类供使用:

系统编解码框架

通用基于长度的半包解码器

码流日志打印

SSL安全认证

链路空闲监测

流量整形

Base64编解码等

class WebSocketServerHandler extends SimpleChannelInboundHandler<Object>{

    private WebSocketServerHandshaker handshaker;



    @Override
    protected void messageReceived(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
        System.out.println("解析请求");
        // 传统http接入
        if (o instanceof FullHttpRequest){
            handleHttpRequest(channelHandlerContext, (FullHttpRequest) o);
        } else if (o instanceof WebSocketFrame){
            handleWebSocketFrame(channelHandlerContext, (WebSocketFrame) o);
        }

    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    // 解码http请求
    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req){

        // 如果http解码失败 返回http异常
        if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))){
            sentHttpResponse(ctx,req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST));
            System.out.println("解析异常");
            return;
        }
        String url = "ws://localhost:"+ SocketEnum.PORT.getNum()+"/websocket";
        System.out.println("url:"+url);
        // 握手工厂 构造握手响应返回 本地测试
        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(url, null, false);
        handshaker = wsFactory.newHandshaker(req);
        if (null == handshaker){
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
        }else {
            handshaker.handshake(ctx.channel(), req);
        }

    }

    // 解码websocket请求
    private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame){
        // 判断是否是关闭链路的指令
        if (frame instanceof CloseWebSocketFrame){
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
            return;
        }
        // 判断是否是ping消息
        if (frame instanceof PingWebSocketFrame){
            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        //判断如果不是文本消息则抛出异常
        if (!(frame instanceof TextWebSocketFrame)){
            throw new UnsupportedOperationException(String.format("%s frame types noe supported", frame.getClass().getName()));
        }

        String  request = ((TextWebSocketFrame)frame).text();
        ctx.channel().write(
                new TextWebSocketFrame(request
                + ", 欢迎使用Netty WebSocket 服务,现在时刻:"
                + new Date().toString()));
    }


    //发送响应
    private static void sentHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res){
        // 返回响应给客户端
        if (res.status().code() != 200){
            ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
            res.content().writeBytes(buf);
            buf.release();
            setContentLength(res, res.content().readableBytes());
        }

        // 如果是非Keep-Aliv,关闭连接
        ChannelFuture f = ctx.channel().writeAndFlush(res);
        if (!isKeepAlive(req) || res.status().code()!= 200){
            f.addListener(ChannelFutureListener.CLOSE);
        }
    }

6.在这些初始化和处理方式都搭建完毕以后即可启动并且监听端口。

  

Channel channel = serverBootstrap.bind(port).sync().channel();

内部最后执行:

protected void doBind(SocketAddress localAddress) throws Exception{
  javaChannel().socket.bind(localAdress, config.getBacklog());  
}

然后Selector会轮询,遇到准备就绪的Channel之后就由Reactor线程NioLoop执行ChannelPipeline的对应方法调度Handler去实现具体业务。  

原文地址:https://www.cnblogs.com/meijsuger/p/11222931.html