ServerBootstrap
ServerBootstrap:启动类
ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ServerHandler()); } }) .childOption(ChannelOption.SO_KEEPALIVE, true);
channel(NioServerSocketChannel.class)方法
创建反射通道工厂new ReflectiveChannelFactory,通过反射获得无参构造函数this.constructor,当调用newChannel()的时候可以创建对象。NIO 模型是 Netty 中最成熟且被广泛使用的模型。因此,推荐 Netty 服务端采用 NioServerSocketChannel 作为 Channel 的类型,客户端采用 NioSocketChannel。Netty 提供了多种类型的 Channel 实现类,你可以按需切换,例如 OioServerSocketChannel、EpollServerSocketChannel 等。
1 public B channel(Class<? extends C> channelClass) { 2 return channelFactory(new ReflectiveChannelFactory<C>( 3 ObjectUtil.checkNotNull(channelClass, "channelClass") 4 )); 5 }
1 public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> { 2 3 private final Constructor<? extends T> constructor; 4 5 public ReflectiveChannelFactory(Class<? extends T> clazz) { 6 ObjectUtil.checkNotNull(clazz, "clazz"); 7 try { 8 this.constructor = clazz.getConstructor(); 9 } catch (NoSuchMethodException e) { 10 throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) + 11 " does not have a public non-arg constructor", e); 12 } 13 } 14 15 @Override 16 public T newChannel() { 17 try { 18 return constructor.newInstance(); 19 } catch (Throwable t) { 20 throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t); 21 } 22 } 23 24 @Override 25 public String toString() { 26 return StringUtil.simpleClassName(ReflectiveChannelFactory.class) + 27 '(' + StringUtil.simpleClassName(constructor.getDeclaringClass()) + ".class)"; 28 } 29 }
channelFactory方法
设置通道工厂,返回自身,继续链式调用。
1 public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) { 2 return channelFactory((ChannelFactory<C>) channelFactory); 3 } 4 5 6 public B channelFactory(ChannelFactory<? extends C> channelFactory) { 7 ObjectUtil.checkNotNull(channelFactory, "channelFactory"); 8 if (this.channelFactory != null) { 9 throw new IllegalStateException("channelFactory set already"); 10 } 11 12 this.channelFactory = channelFactory; 13 return self(); 14 }
childHandler(ChannelHandler childHandler)
在 Netty 中可以通过 ChannelPipeline 去注册多个 ChannelHandler,每个 ChannelHandler 各司其职,ServerBootstrap 的 childHandler() 方法需要注册一个 ChannelHandler。ChannelInitializer是实现了 ChannelHandler接口的匿名类,通过实例化 ChannelInitializer 作为 ServerBootstrap 的参数。
1 public ServerBootstrap childHandler(ChannelHandler childHandler) { 2 this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler"); 3 return this; 4 }
Channel 初始化时都会绑定一个 Pipeline,它主要用于服务编排。Pipeline 管理了多个 ChannelHandler。I/O 事件依次在 ChannelHandler 中传播,ChannelHandler 负责业务逻辑处理。
b.childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ch.pipeline() .addLast("codec", new HttpServerCodec()) .addLast("compressor", new HttpContentCompressor()) .addLast("aggregator", new HttpObjectAggregator(65536)) .addLast("handler", new HttpServerHandler()); } })
上述 示例中使用链式的方式加载了多个 ChannelHandler,包含HTTP 编解码处理器、HTTPContent 压缩处理器、HTTP 消息聚合处理器、自定义业务逻辑处理器。
b.option(ChannelOption.SO_KEEPALIVE, true);
设置 Channel 参数。ServerBootstrap 设置 Channel 属性有option和childOption两个方法,option 主要负责设置 Boss 线程组,而 childOption 对应的是 Worker 线程组。
经常使用的参数含义,结合业务场景,按需设置。
参数 | 含义 |
---|---|
SO_KEEPALIVE | 设置为 true 代表启用了 TCP SO_KEEPALIVE 属性,TCP 会主动探测连接状态,即连接保活 |
SO_BACKLOG | 已完成三次握手的请求队列最大长度,同一时刻服务端可能会处理多个连接,在高并发海量连接的场景下,该参数应适当调大 |
TCP_NODELAY | Netty 默认是 true,表示立即发送数据。如果设置为 false 表示启用 Nagle 算法,该算法会将 TCP 网络数据包累积到一定量才会发送,虽然可以减少报文发送的数量,但是会造成一定的数据延迟。Netty 为了最小化数据传输的延迟,默认禁用了 Nagle 算法 |
SO_SNDBUF | TCP 数据发送缓冲区大小 |
SO_RCVBUF | TCP数据接收缓冲区大小,TCP数据接收缓冲区大小 |
SO_LINGER | 设置延迟关闭的时间,等待缓冲区中的数据发送完成 |
CONNECT_TIMEOUT_MILLIS | 建立连接的超时时间 |
ServerBootstrap.bind(hostname, port)
流程图
调用的是AbstractBootstrap类的doBind(final SocketAddress localAddress)方法.
1 private ChannelFuture doBind(final SocketAddress localAddress) { 2 final ChannelFuture regFuture = initAndRegister(); 3 final Channel channel = regFuture.channel(); 4 if (regFuture.cause() != null) { 5 return regFuture; 6 } 7 8 if (regFuture.isDone()) { 9 // At this point we know that the registration was complete and successful. 10 ChannelPromise promise = channel.newPromise(); 11 doBind0(regFuture, channel, localAddress, promise); 12 return promise; 13 } else { 14 // Registration future is almost always fulfilled already, but just in case it's not. 15 final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); 16 regFuture.addListener(new ChannelFutureListener() { 17 @Override 18 public void operationComplete(ChannelFuture future) throws Exception { 19 Throwable cause = future.cause(); 20 if (cause != null) { 21 // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an 22 // IllegalStateException once we try to access the EventLoop of the Channel. 23 promise.setFailure(cause); 24 } else { 25 // Registration was successful, so set the correct executor to use. 26 // See https://github.com/netty/netty/issues/2586 27 promise.registered(); 28 29 doBind0(regFuture, channel, localAddress, promise); 30 } 31 } 32 }); 33 return promise; 34 } 35 }
initAndRegister()方法
通道的创建和初始化,这里的channelFactory.newChannel()就是创建上面讲的设置channel(NioServerSocketChannel.class)的NioServerSocketChannel实例。
1 final ChannelFuture initAndRegister() { 2 Channel channel = null; 3 try { 4 channel = channelFactory.newChannel(); 5 init(channel); 6 } catch (Throwable t) { 7 if (channel != null) { 8 // channel can be null if newChannel crashed (eg SocketException("too many open files")) 9 channel.unsafe().closeForcibly(); 10 // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor 11 return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); 12 } 13 // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor 14 return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); 15 } 16 17 ChannelFuture regFuture = config().group().register(channel); 18 if (regFuture.cause() != null) { 19 if (channel.isRegistered()) { 20 channel.close(); 21 } else { 22 channel.unsafe().closeForcibly(); 23 } 24 } 25 26 // If we are here and the promise is not failed, it's one of the following cases: 27 // 1) If we attempted registration from the event loop, the registration has been completed at this point. 28 // i.e. It's safe to attempt bind() or connect() now because the channel has been registered. 29 // 2) If we attempted registration from the other thread, the registration request has been successfully 30 // added to the event loop's task queue for later execution. 31 // i.e. It's safe to attempt bind() or connect() now: 32 // because bind() or connect() will be executed *after* the scheduled registration task is executed 33 // because register(), bind(), and connect() are all bound to the same thread. 34 35 return regFuture; 36 }
实例化NioServerSocketChannel
1 public NioServerSocketChannel() { 2 this(newSocket(DEFAULT_SELECTOR_PROVIDER)); 3 } 4 5 private static ServerSocketChannel newSocket(SelectorProvider provider) { 6 try { 7 8 return provider.openServerSocketChannel(); 9 } catch (IOException e) { 10 throw new ChannelException( 11 "Failed to open a server socket.", e); 12 } 13 } 14 15 public NioServerSocketChannel(ServerSocketChannel channel) { 16 super(null, channel, SelectionKey.OP_ACCEPT); 17 config = new NioServerSocketChannelConfig(this, javaChannel().socket()); 18 }
newSocket返回的是默认的ServerSocketChannelImpl,它是NIO里的。SelectionKey.OP_ACCEPT即是NIO的请求连接。
super(null, channel, SelectionKey.OP_ACCEPT)
设置通道NioServerSocketChannel和感兴趣的事件readInterestOp
1 protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) { 2 super(parent, ch, readInterestOp); 3 } 4 5 protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { 6 super(parent); 7 this.ch = ch;//设置ServerSocketChannel实例ServerSocketChannelImpl 8 this.readInterestOp = readInterestOp;//设置事件OP_ACCEPT 9 try { 10 ch.configureBlocking(false);//设置非阻塞 11 } catch (IOException e) { 12 try { 13 ch.close(); 14 } catch (IOException e2) { 15 logger.warn( 16 "Failed to close a partially initialized socket.", e2); 17 } 18 19 throw new ChannelException("Failed to enter non-blocking mode.", e); 20 } 21 }
super(parent);
1 protected AbstractChannel(Channel parent) { 2 this.parent = parent; 3 id = newId();//id 4 unsafe = newUnsafe();//创建NioMessageUnsafe,一些底层的操作都是这个来完成的 5 pipeline = newChannelPipeline();//管道 6 }
创建ChannelPipeline类型的实例,实际类型是DefaultChannelPipeline,还创建了id类型是DefaultChannelId实现了ChannelId,也就是唯一标识符asLongText()的实例。
init方法
初始化通道
@Override void init(Channel channel) { setChannelOptions(channel, newOptionsArray(), logger); setAttributes(channel, newAttributesArray()); ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions); final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs); p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
addLast(new ChannelInitializer<Channel>() {}方法
在管道的倒数第二个位置上添加ChannelInitializer
1 public final ChannelPipeline addLast(ChannelHandler... handlers) { 2 return addLast(null, handlers); 3 } 4 5 public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) { 6 ObjectUtil.checkNotNull(handlers, "handlers"); 7 8 for (ChannelHandler h: handlers) { 9 if (h == null) { 10 break; 11 } 12 addLast(executor, null, h); 13 } 14 15 return this; 16 }
1 @Override 2 public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { 3 final AbstractChannelHandlerContext newCtx; 4 synchronized (this) { 5 checkMultiplicity(handler); 6 7 newCtx = newContext(group, filterName(name, handler), handler); 8 9 addLast0(newCtx);//将所有的ChannelHandlerContext以队列的形式串联起来 10 11 // If the registered is false it means that the channel was not registered on an eventLoop yet. 12 // In this case we add the context to the pipeline and add a task that will call 13 // ChannelHandler.handlerAdded(...) once the channel is registered. 14 if (!registered) { //registered初始化为false,只有执行了callHandlerAddedForAllHandlers才为true,只会执行一次 15 newCtx.setAddPending(); //修改handler状态,使用了AtomicIntegerFieldUpdater 16 callHandlerCallbackLater(newCtx, true);//暂时放入一个单链表中后面执行 17 return this; 18 } 19 20 EventExecutor executor = newCtx.executor(); 21 if (!executor.inEventLoop()) { 22 callHandlerAddedInEventLoop(newCtx, executor); 23 return this; 24 } 25 } 26 callHandlerAdded0(newCtx); 27 return this; 28 }
newContext(group, filterName(name, handler), handler)
创建ChannelHandlerContext,默认DefaultChannelHandlerContext实例,是ChannelHandlerContext的子类,而且会把管道和handler穿进去。
1 private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) { 2 return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler); 3 }
不是增加到尾部的addLast0
吧前面创建的ChannelHandlerContext加到里面,处理器被通道处理器上下文包裹了,添加的过程为把新的结点插入到倒数第二个位置。
1 private void addLast0(AbstractChannelHandlerContext newCtx) { 2 AbstractChannelHandlerContext prev = tail.prev; 3 newCtx.prev = prev; 4 newCtx.next = tail; 5 prev.next = newCtx; 6 tail.prev = newCtx; 7 }
callHandlerCallbackLater(newCtx, true)方法: 构造pipeline任务链
1 private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) { 2 assert !registered; 3 4 PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx); 5 PendingHandlerCallback pending = pendingHandlerCallbackHead; //默认为null 6 if (pending == null) { 7 pendingHandlerCallbackHead = task; 8 } else { 9 // Find the tail of the linked-list. 10 while (pending.next != null) { 11 pending = pending.next; 12 } 13 pending.next = task; 14 } 15 }
initChannel(final Channel ch)方法
1 public void initChannel(final Channel ch) { 2 final ChannelPipeline pipeline = ch.pipeline(); 3 ChannelHandler handler = config.handler(); 4 if (handler != null) { 5 pipeline.addLast(handler); 6 } 7 8 ch.eventLoop().execute(new Runnable() { 9 @Override 10 public void run() { 11 pipeline.addLast(new ServerBootstrapAcceptor( 12 ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); 13 } 14 }); 15 }
开启一个循环吧ServerBootstrapAcceptor加入通道,这个就是我们说的连接接收器Acceptor。