netty-bind

ServerBootstrap在bind时,主要做了3个操作:init、register、bind

init

 1 void init(Channel channel) throws Exception {
 2   final Map<ChannelOption<?>, Object> options = options0();
 3   synchronized (options) {
 4     setChannelOptions(channel, options, logger);
 5   }
 6 
 7   final Map<AttributeKey<?>, Object> attrs = attrs0();
 8   synchronized (attrs) {
 9     for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
10       @SuppressWarnings("unchecked")
11       AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
12       channel.attr(key).set(e.getValue());
13     }
14   }
15 
16   ChannelPipeline p = channel.pipeline();
17 
18   final EventLoopGroup currentChildGroup = childGroup;
19   final ChannelHandler currentChildHandler = childHandler;
20   final Entry<ChannelOption<?>, Object>[] currentChildOptions;
21   final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
22   synchronized (childOptions) {
23     currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
24   }
25   synchronized (childAttrs) {
26     currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
27   }
28     //register后,ChannelInitializer的initChannel就会通过ChannelPipeline的fireChannelRegistered被调用,
29     //initChannel方法返回后,ChannelInitializer会被从ChannelPipeline中删除。
30     //pipeline.addLast(handler)会将我们设置的handler添加在pipeline中,如果我们添加的是一个ChannelInitializer,
31     //执行完这个initChannel后,ChannelPipeline中就会有ChannelInitializer、ServerBootstrapAcceptor两个Handler,
32     //registered事件继续在ChannelPipeline中传播,传至新添加的ChannelInitializer时,又会执行initChannel逻辑,
33     //我们通常会在initChannel方法中进行添加handler操作,假设添加了h1、h2,
34     //如果不通过execute执行添加ServerBootstrapAcceptor的操作(execute会将任务入队),
35     //最后ChannelPipeline中就会有ServerBootstrapAcceptor、h1、h2。
36   p.addLast(new ChannelInitializer<Channel>() {
37     @Override
38     public void initChannel(final Channel ch) throws Exception {
39       final ChannelPipeline pipeline = ch.pipeline();
40       ChannelHandler handler = config.handler();
41       if (handler != null) {
42         pipeline.addLast(handler);
43       }
44 
45       ch.eventLoop().execute(new Runnable() {
46         @Override
47         public void run() {
48             //ServerBootstrapAcceptor
49           pipeline.addLast(new ServerBootstrapAcceptor(
50                   ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
51         }
52       });
53     }
54   });
55 }

ServerBootstrapAcceptor主要就是将channel注册到selector上

 1 private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
 2     //......
 3     
 4     public void channelRead(ChannelHandlerContext ctx, Object msg) {
 5         //io.netty.channel.Channel,该channel是对jdk的channel的包装,
 6         //在serverSocket accept到socket后,socket就会被包装为io.netty.channel.Channel,
 7         //然后通过pipeline.fireChannelRead传到这里,目前这一系列操作都是在boss线程中进行的。
 8       final Channel child = (Channel) msg;
 9         //将childHandler添加到accept的channel上
10       child.pipeline().addLast(childHandler);
11 
12       setChannelOptions(child, childOptions, logger);
13 
14       for (Entry<AttributeKey<?>, Object> e: childAttrs) {
15         child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
16       }
17 
18       try {
19           //注册Channel,注册后channel就绑定在了特定的EventLoop上,
20           //并且注册在与EventLoop对应的selector上,该channel之后的IO操作都是在EventLoop上进行的。
21         childGroup.register(child).addListener(new ChannelFutureListener() {
22           @Override
23           public void operationComplete(ChannelFuture future) throws Exception {
24             if (!future.isSuccess()) {
25               forceClose(child, future.cause());
26             }
27           }
28         });
29       } catch (Throwable t) {
30         forceClose(child, t);
31       }
32     }
33     
34     //......
35 }

register

register和bind都涉及到一个接口io.netty.channel.Channel.Unsafe

 1 //Unsafe提供了提供了与真正的IO操作,比如从socket读数据、写数据,将channel注册到selector等。
 2 interface Unsafe {
 3     //......
 4     
 5     void bind(SocketAddress localAddress, ChannelPromise promise);
 6     
 7     void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
 8 
 9     void write(Object msg, ChannelPromise promise);
10 
11     void flush();
12     
13     /**
14      * Schedules a read operation that fills the inbound buffer of the first {@link ChannelInboundHandler} in the
15      * {@link ChannelPipeline}.  If there's already a pending read operation, this method does nothing.
16      */
17     void beginRead();
18     
19     /**
20      * Register the {@link Channel} of the {@link ChannelPromise} and notify
21      * the {@link ChannelFuture} once the registration was complete.
22      */
23     void register(EventLoop eventLoop, ChannelPromise promise);
24      
25     //......
26 }
 1 //Unsafe实现的register方法,有删减
 2 public final void register(EventLoop eventLoop, final ChannelPromise promise) {
 3     //如果在worker线程中,则直接注册(即将channel注册在selector上)
 4     if (eventLoop.inEventLoop()) {
 5         register0(promise);
 6     } else {
 7         //如果不在worker线程中,则异步注册,如果worker线程未开启,execute会先开启线程再入队。
 8         eventLoop.execute(new Runnable() {
 9             @Override
10             public void run() {
11                 register0(promise);
12             }
13         });
14     }
15 }

bind

注册之后,就可以bind了,bind仍然会在eventLoop中执行,此后serverSocket就开始监听client请求了。

原文地址:https://www.cnblogs.com/holoyong/p/7436078.html