ServerBootstrap

ServerBootstrap

ServerBootstrap:启动类

 ServerBootstrap bootstrap = new ServerBootstrap();
             bootstrap.group(bossGroup, workerGroup)
                     .channel(NioServerSocketChannel.class)
                     .childHandler(new ChannelInitializer<SocketChannel>() {
                         @Override
                         protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new ServerHandler());
                         }
                      })
                      .childOption(ChannelOption.SO_KEEPALIVE, true);    

channel(NioServerSocketChannel.class)方法

创建反射通道工厂new ReflectiveChannelFactory,通过反射获得无参构造函数this.constructor,当调用newChannel()的时候可以创建对象。NIO 模型是 Netty 中最成熟且被广泛使用的模型。因此,推荐 Netty 服务端采用 NioServerSocketChannel 作为 Channel 的类型,客户端采用 NioSocketChannel。Netty 提供了多种类型的 Channel 实现类,你可以按需切换,例如 OioServerSocketChannel、EpollServerSocketChannel 等。

1 public B channel(Class<? extends C> channelClass) {
2         return channelFactory(new ReflectiveChannelFactory<C>(
3                 ObjectUtil.checkNotNull(channelClass, "channelClass")
4         ));
5     }

 1 public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
 2 
 3     private final Constructor<? extends T> constructor;
 4 
 5     public ReflectiveChannelFactory(Class<? extends T> clazz) {
 6         ObjectUtil.checkNotNull(clazz, "clazz");
 7         try {
 8             this.constructor = clazz.getConstructor();
 9         } catch (NoSuchMethodException e) {
10             throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
11                     " does not have a public non-arg constructor", e);
12         }
13     }
14 
15     @Override
16     public T newChannel() {
17         try {
18             return constructor.newInstance();
19         } catch (Throwable t) {
20             throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
21         }
22     }
23 
24     @Override
25     public String toString() {
26         return StringUtil.simpleClassName(ReflectiveChannelFactory.class) +
27                 '(' + StringUtil.simpleClassName(constructor.getDeclaringClass()) + ".class)";
28     }
29 }

channelFactory方法

设置通道工厂,返回自身,继续链式调用。

 1 public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {
 2         return channelFactory((ChannelFactory<C>) channelFactory);
 3     }
 4 
 5 
 6 public B channelFactory(ChannelFactory<? extends C> channelFactory) {
 7         ObjectUtil.checkNotNull(channelFactory, "channelFactory");
 8         if (this.channelFactory != null) {
 9             throw new IllegalStateException("channelFactory set already");
10         }
11 
12         this.channelFactory = channelFactory;
13         return self();
14     }

childHandler(ChannelHandler childHandler)

在 Netty 中可以通过 ChannelPipeline 去注册多个 ChannelHandler,每个 ChannelHandler 各司其职,ServerBootstrap 的 childHandler() 方法需要注册一个 ChannelHandler。ChannelInitializer是实现了 ChannelHandler接口的匿名类,通过实例化 ChannelInitializer 作为 ServerBootstrap 的参数。

1 public ServerBootstrap childHandler(ChannelHandler childHandler) {
2         this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");
3         return this;
4     }

 Channel 初始化时都会绑定一个 Pipeline,它主要用于服务编排。Pipeline 管理了多个 ChannelHandler。I/O 事件依次在 ChannelHandler 中传播,ChannelHandler 负责业务逻辑处理。

b.childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) {
        ch.pipeline()
                .addLast("codec", new HttpServerCodec())
                .addLast("compressor", new HttpContentCompressor())
                .addLast("aggregator", new HttpObjectAggregator(65536)) 
                .addLast("handler", new HttpServerHandler());
    }
})

上述 示例中使用链式的方式加载了多个 ChannelHandler,包含HTTP 编解码处理器、HTTPContent 压缩处理器、HTTP 消息聚合处理器、自定义业务逻辑处理器。

b.option(ChannelOption.SO_KEEPALIVE, true);

设置 Channel 参数。ServerBootstrap 设置 Channel 属性有option和childOption两个方法,option 主要负责设置 Boss 线程组,而 childOption 对应的是 Worker 线程组。

经常使用的参数含义,结合业务场景,按需设置。

参数含义
SO_KEEPALIVE 设置为 true 代表启用了 TCP SO_KEEPALIVE 属性,TCP 会主动探测连接状态,即连接保活
SO_BACKLOG 已完成三次握手的请求队列最大长度,同一时刻服务端可能会处理多个连接,在高并发海量连接的场景下,该参数应适当调大
TCP_NODELAY Netty 默认是 true,表示立即发送数据。如果设置为 false 表示启用 Nagle 算法,该算法会将 TCP 网络数据包累积到一定量才会发送,虽然可以减少报文发送的数量,但是会造成一定的数据延迟。Netty 为了最小化数据传输的延迟,默认禁用了 Nagle 算法
SO_SNDBUF TCP 数据发送缓冲区大小
SO_RCVBUF TCP数据接收缓冲区大小,TCP数据接收缓冲区大小
SO_LINGER 设置延迟关闭的时间,等待缓冲区中的数据发送完成
CONNECT_TIMEOUT_MILLIS 建立连接的超时时间

 

ServerBootstrap.bind(hostname, port)

流程图

 

调用的是AbstractBootstrap类的doBind(final SocketAddress localAddress)方法.

 1 private ChannelFuture doBind(final SocketAddress localAddress) {
 2     final ChannelFuture regFuture = initAndRegister();
 3     final Channel channel = regFuture.channel();
 4     if (regFuture.cause() != null) {
 5         return regFuture;
 6     }
 7 
 8     if (regFuture.isDone()) {
 9         // At this point we know that the registration was complete and successful.
10         ChannelPromise promise = channel.newPromise();
11         doBind0(regFuture, channel, localAddress, promise);
12         return promise;
13     } else {
14         // Registration future is almost always fulfilled already, but just in case it's not.
15         final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
16         regFuture.addListener(new ChannelFutureListener() {
17             @Override
18             public void operationComplete(ChannelFuture future) throws Exception {
19                 Throwable cause = future.cause();
20                 if (cause != null) {
21                     // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
22                     // IllegalStateException once we try to access the EventLoop of the Channel.
23                     promise.setFailure(cause);
24                 } else {
25                     // Registration was successful, so set the correct executor to use.
26                     // See https://github.com/netty/netty/issues/2586
27                     promise.registered();
28 
29                     doBind0(regFuture, channel, localAddress, promise);
30                 }
31             }
32         });
33         return promise;
34     }
35 }

initAndRegister()方法

通道的创建和初始化,这里的channelFactory.newChannel()就是创建上面讲的设置channel(NioServerSocketChannel.class)的NioServerSocketChannel实例。

 1 final ChannelFuture initAndRegister() {
 2     Channel channel = null;
 3     try {
 4         channel = channelFactory.newChannel();
 5         init(channel);
 6     } catch (Throwable t) {
 7         if (channel != null) {
 8             // channel can be null if newChannel crashed (eg SocketException("too many open files"))
 9             channel.unsafe().closeForcibly();
10             // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
11             return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
12         }
13         // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
14         return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
15     }
16 
17     ChannelFuture regFuture = config().group().register(channel);
18     if (regFuture.cause() != null) {
19         if (channel.isRegistered()) {
20             channel.close();
21         } else {
22             channel.unsafe().closeForcibly();
23         }
24     }
25 
26     // If we are here and the promise is not failed, it's one of the following cases:
27     // 1) If we attempted registration from the event loop, the registration has been completed at this point.
28     //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
29     // 2) If we attempted registration from the other thread, the registration request has been successfully
30     //    added to the event loop's task queue for later execution.
31     //    i.e. It's safe to attempt bind() or connect() now:
32     //         because bind() or connect() will be executed *after* the scheduled registration task is executed
33     //         because register(), bind(), and connect() are all bound to the same thread.
34 
35     return regFuture;
36 }

实例化NioServerSocketChannel

 1 public NioServerSocketChannel() {
 2         this(newSocket(DEFAULT_SELECTOR_PROVIDER));
 3     }
 4 
 5 private static ServerSocketChannel newSocket(SelectorProvider provider) {
 6         try {
 7            
 8             return provider.openServerSocketChannel();
 9         } catch (IOException e) {
10             throw new ChannelException(
11                     "Failed to open a server socket.", e);
12         }
13     }
14 
15 public NioServerSocketChannel(ServerSocketChannel channel) {
16         super(null, channel, SelectionKey.OP_ACCEPT);
17         config = new NioServerSocketChannelConfig(this, javaChannel().socket());
18     }

newSocket返回的是默认的ServerSocketChannelImpl,它是NIO里的。SelectionKey.OP_ACCEPT即是NIO的请求连接。

super(null, channel, SelectionKey.OP_ACCEPT)

设置通道NioServerSocketChannel和感兴趣的事件readInterestOp

 1 protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
 2         super(parent, ch, readInterestOp);
 3     }
 4 
 5 protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
 6         super(parent);
 7         this.ch = ch;//设置ServerSocketChannel实例ServerSocketChannelImpl
 8         this.readInterestOp = readInterestOp;//设置事件OP_ACCEPT
 9         try {
10             ch.configureBlocking(false);//设置非阻塞
11         } catch (IOException e) {
12             try {
13                 ch.close();
14             } catch (IOException e2) {
15                 logger.warn(
16                             "Failed to close a partially initialized socket.", e2);
17             }
18 
19             throw new ChannelException("Failed to enter non-blocking mode.", e);
20         }
21     }

super(parent);

1 protected AbstractChannel(Channel parent) {
2         this.parent = parent;
3         id = newId();//id
4         unsafe = newUnsafe();//创建NioMessageUnsafe,一些底层的操作都是这个来完成的
5         pipeline = newChannelPipeline();//管道
6     }

创建ChannelPipeline类型的实例,实际类型是DefaultChannelPipeline,还创建了id类型是DefaultChannelId实现了ChannelId,也就是唯一标识符asLongText()的实例。

init方法

初始化通道

@Override
void init(Channel channel) {
    setChannelOptions(channel, newOptionsArray(), logger);
    setAttributes(channel, newAttributesArray());

    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);

    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

addLast(new ChannelInitializer<Channel>() {}方法

在管道的倒数第二个位置上添加ChannelInitializer

 1 public final ChannelPipeline addLast(ChannelHandler... handlers) {
 2         return addLast(null, handlers);
 3     }
 4 
 5 public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
 6         ObjectUtil.checkNotNull(handlers, "handlers");
 7 
 8         for (ChannelHandler h: handlers) {
 9             if (h == null) {
10                 break;
11             }
12             addLast(executor, null, h);
13         }
14 
15         return this;
16     }
 1 @Override
 2 public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
 3     final AbstractChannelHandlerContext newCtx;
 4     synchronized (this) {
 5         checkMultiplicity(handler);
 6 
 7         newCtx = newContext(group, filterName(name, handler), handler);
 8 
 9         addLast0(newCtx);//将所有的ChannelHandlerContext以队列的形式串联起来
10 
11         // If the registered is false it means that the channel was not registered on an eventLoop yet.
12         // In this case we add the context to the pipeline and add a task that will call
13         // ChannelHandler.handlerAdded(...) once the channel is registered.
14         if (!registered) { //registered初始化为false,只有执行了callHandlerAddedForAllHandlers才为true,只会执行一次
15             newCtx.setAddPending(); //修改handler状态,使用了AtomicIntegerFieldUpdater
16             callHandlerCallbackLater(newCtx, true);//暂时放入一个单链表中后面执行
17             return this;
18         }
19 
20         EventExecutor executor = newCtx.executor();
21         if (!executor.inEventLoop()) {
22             callHandlerAddedInEventLoop(newCtx, executor);
23             return this;
24         }
25     }
26     callHandlerAdded0(newCtx);
27     return this;
28 }

newContext(group, filterName(name, handler), handler)

创建ChannelHandlerContext,默认DefaultChannelHandlerContext实例,是ChannelHandlerContext的子类,而且会把管道和handler穿进去。

1 private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
2         return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
3     }

不是增加到尾部的addLast0

吧前面创建的ChannelHandlerContext加到里面,处理器被通道处理器上下文包裹了,添加的过程为把新的结点插入到倒数第二个位置。

1 private void addLast0(AbstractChannelHandlerContext newCtx) {
2         AbstractChannelHandlerContext prev = tail.prev;
3         newCtx.prev = prev;
4         newCtx.next = tail;
5         prev.next = newCtx;
6         tail.prev = newCtx;
7     }

callHandlerCallbackLater(newCtx, true)方法: 构造pipeline任务链

 1 private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
 2     assert !registered;
 3 
 4     PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
 5     PendingHandlerCallback pending = pendingHandlerCallbackHead; //默认为null
 6     if (pending == null) {
 7         pendingHandlerCallbackHead = task;
 8     } else {
 9         // Find the tail of the linked-list.
10         while (pending.next != null) {
11             pending = pending.next;
12         }
13         pending.next = task;
14     }
15 }

initChannel(final Channel ch)方法

 1 public void initChannel(final Channel ch) {
 2                 final ChannelPipeline pipeline = ch.pipeline();
 3                 ChannelHandler handler = config.handler();
 4                 if (handler != null) {
 5                     pipeline.addLast(handler);
 6                 }
 7 
 8                 ch.eventLoop().execute(new Runnable() {
 9                     @Override
10                     public void run() {
11                         pipeline.addLast(new ServerBootstrapAcceptor(
12                                 ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
13                     }
14                 });
15             }

开启一个循环吧ServerBootstrapAcceptor加入通道,这个就是我们说的连接接收器Acceptor。

 

原文地址:https://www.cnblogs.com/xiaojiesir/p/15309836.html