netty学习

在项目中使用到netty框架,因此,想对其进行一个梳理。
最好还是结合具体的代码来展开思考和阐述吧。

从 关于HTTP(S)说起

// Configure SSL.
final SslContext sslCtx;
if (Config.SSL) {
    SelfSignedCertificate ssc = new SelfSignedCertificate();
    sslCtx = SslContext.newServerContext(SslProvider.JDK, ssc.certificate(), ssc.privateKey());
} else {
    sslCtx = null;
}

public static SslContext newServerContext(
        SslProvider provider, File certChainFile, File keyFile) throws SSLException {
    return newServerContext(provider, certChainFile, keyFile, null, null, null, 0, 0);
}

以上只是给出了代码中的概要,通过查看详细的代码之后,可以发现,这个功能是为了HTTPS加密传输做准备工作,比如生成公钥和私钥,然后存储到指定目录之后初始化环境配置。

ServerBootstrap b = new ServerBootstrap();
b.option(ChannelOption.SO_BACKLOG, 1024);
b.option(ChannelOption.SO_REUSEADDR, true);
b.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class)
 .handler(new LoggingHandler(LogLevel.INFO))
 .childHandler(new RiskServerInitializer(sslCtx));

上面的代码段中生成了秘钥并进行了相关的环境配置,最终产生了一个SslContext 实例对象。这个对象很快就被使用到了,b.childHandler(new RiskServerInitializer(sslCtx));这里就使用到了。


public class RiskServerInitializer extends ChannelInitializer<SocketChannel> {
    
    private final SslContext sslCtx;

    public RiskServerInitializer(SslContext sslCtx) {
        this.sslCtx = sslCtx;
    }

    @Override
    public void initChannel(SocketChannel ch) throws Exception {
			// X-Requested-With在文件上传时会用到
		String[] headers = {"Origin", "X-Requested-With", "Content-Type", "Accept", "x-user-token", "X-User-Mobile"};
	    CorsConfig corsConfig = CorsConfig.withAnyOrigin().allowedRequestHeaders(headers).build();    	
        ChannelPipeline pipeline = ch.pipeline();
        if (sslCtx != null) {
            pipeline.addLast(sslCtx.newHandler(ch.alloc()));
        }
        pipeline.addLast("decoder", new HttpRequestDecoder());
        pipeline.addLast("encoder", new HttpResponseEncoder());
        // inserted HttpObjectAggregator into the pipeline, we get FullHttpRequest object with all the contnet of the payload
        // 这个handler是了文件上传用的.
        pipeline.addLast("aggregator", new HttpObjectAggregator(8388608)); // 8MB
        pipeline.addLast("deflater", new SmartHttpContentCompressor());
        
        pipeline.addLast("cors", new CorsHandler(corsConfig));
        pipeline.addLast("uploads", new RiskUploadFileServerHandler());        
        pipeline.addLast("handler", new RiskServerHandler());
    }
}

从上面的代码中可以看到,sslCtx对象被传给了RiskServerInitializer 类之后作为了一个成员变量来存储。并在其initChannel()方法中被使用到了。这个方法体内的代码,仍然是针对HTTP/S请求展开的操作,比如服务端对客户端的HTTP请求进行解码,发送返回时进行编码,还需要设置内容的聚合与分段压缩,跨域资源请求(CORS)的配置,文件上传功能等。此外,还核心的业务功能RiskServerHandler进行了加载和初始化。
这里的sslCtx.newHandler(ch.alloc()),ch.alloc()的方法原型为ByteBufAllocator alloc();,返回一个ByteBufAllocator 对象,以便分配ByteBuf容器来存储数据。外层的sslCtx.newHandler的方法原型是:

public final SslHandler newHandler(ByteBufAllocator alloc) {
    return newHandler(newEngine(alloc));
}

显然,这样做的目的是将SslHandler添加到ChannelPipeline中。
但是,initChannel()方法又是如何被调用的呢?我们从它的父类ChannelInitializer中可以看到,在channelRegistered方法中对其进行了调用。


@Sharable
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelInitializer.class);

    /**
     * This method will be called once the {@link Channel} was registered. After the method returns this instance
     * will be removed from the {@link ChannelPipeline} of the {@link Channel}.
     *
     * @param ch            the {@link Channel} which was registered.
     * @throws Exception    is thrown if an error occurs. In that case the {@link Channel} will be closed.
     */
    protected abstract void initChannel(C ch) throws Exception;

    @Override
    @SuppressWarnings("unchecked")
    public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        ChannelPipeline pipeline = ctx.pipeline();
        boolean success = false;
        try {
            initChannel((C) ctx.channel());
            pipeline.remove(this);
            ctx.fireChannelRegistered();
            success = true;
        } catch (Throwable t) {
            logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);
        } finally {
            if (pipeline.context(this) != null) {
                pipeline.remove(this);
            }
            if (!success) {
                ctx.close();
            }
        }
    }
}

我没打算使用所谓的“各个击破”战略,也没打算使用“自定向下”的策略,但是我想适当的去看待一个问题,或者说从某个角度去看一个问题。然后,见好就收,回到总体上来,然后继续从另一个角度去看,去思考这个新的事物。
前面是从HTTP(S)的角度去看,下面回到总体来看一下代码。

整体梳理

// Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(Config.threadNum);
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.option(ChannelOption.SO_BACKLOG, 1024);
            b.option(ChannelOption.SO_REUSEADDR, true);
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new RiskServerInitializer(sslCtx));

            Channel ch = b.bind(Config.PORT).sync().channel();

            ch.closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

首先是ServerBootstrap 。Bootstrap 的含义是引导,而ServerBootstrap 的含义是服务端引导,既然有服务端引导,就会有客户端引导,而在netty中,客户端引导就是Bootstrap。

原文地址:https://www.cnblogs.com/ioveNature/p/7940562.html