dubbo源码阅读-远程暴露(七)之Transport

接口定义

@SPI("netty") //缺省值netty
public interface Transporter {

    /**
     * Bind a server.
     *
     * @param url erver url
     * @param handler
     * @return server
     * @throws RemotingException
     * @see com.alibaba.dubbo.remoting.Transporters#bind(URL, ChannelHandler...)
     *url参数 含有server 或者transporter
     */
    @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
    Server bind(URL url, ChannelHandler handler) throws RemotingException;

    /**
     * Connect to a server.
     *
     * @param url     server url
     * @param handler
     * @return client
     * @throws RemotingException
     * @see com.alibaba.dubbo.remoting.Transporters#connect(URL, ChannelHandler...)
     * url参数含有server 或者transporter
     */
    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    Client connect(URL url, ChannelHandler handler) throws RemotingException;

}

类图

说明

接上一篇《dubbo源码阅读-远程暴露(七)之Exchangers》

@Override
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        //1. handler 会再次经过2层包装,增加功能
        ChannelHandler h  = new DecodeHandler(new HeaderExchangeHandler(handler));
        //<1>2. Transports 操作会启动netty 监听端口,配置序列化实现,
        Server s = Transporters.bind(url,h);
        /**
         * <6>HeaderExchangeServer: 会对nettyServer 进行包装, 主要增加2个功能:
         *
         *     a. 对channel进行 空闲时间检测,超过则关闭连接,节省资源。
         *
         *     b. 如果server关闭,则发送消息给client端,不再发送请求到该server。
         */
        return new HeaderExchangeServer( s);
    }

Transporters

<1>bind

com.alibaba.dubbo.remoting.Transporters#bind(com.alibaba.dubbo.common.URL, com.alibaba.dubbo.remoting.ChannelHandler...)

    /**
     * @param url
     * @param handlers 经过DecodeHandler->HeaderExchangeHandler->com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol#requestHandler逐级装饰
     * @return
     * @throws RemotingException
     */
    public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handlers == null || handlers.length == 0) {
            throw new IllegalArgumentException("handlers == null");
        }
        ChannelHandler handler;
        if (handlers.length == 1) {
            //没做特殊处理传入的是一个
            handler = handlers[0];
        } else {
            //如果传入多个handles 通过ChannelHandlerDispatcher包装 内部通过包装使每个Handle都能监听各个事件
            handler = new ChannelHandlerDispatcher(handlers);
        }
        /**
         * <2>getTransporter
         * <3>bind
         */
        return getTransporter().bind(url, handler);
    }

<2>getTransporter

com.alibaba.dubbo.remoting.Transporters#getTransporter

 public static Transporter getTransporter() {
        //SPI扩展点 缺省值是 @SPI("netty")
        return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
    }

NettyTransporter

  /**
     * 经过DecodeHandler->HeaderExchangeHandler->com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol#requestHandler逐级装饰
     * @param url erver url
     * @param listener
     * @return
     * @throws RemotingException
     */
    @Override
    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        //<3>创建一个nettyServer 此处handle是
        return new NettyServer(url, listener);
    }

<3>NettyServer

com.alibaba.dubbo.remoting.transport.netty4.NettyServer#NettyServer

 /**
     * <4>ChannelHandlers.wrap
     *  <6>super
     * @param url
     * @param handler
     * @throws RemotingException
     */
    public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }

<4>ChannelHandlers.wrap

  public static ChannelHandler wrap(ChannelHandler handler, URL url) {
        //<5>打包
        return ChannelHandlers.getInstance().wrapInternal(handler, url);
    }

<5>wrapInternal

 protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        //将handler经过MultiMessageHandler->HeartbeatHandler->SPI获取Dispatcher默认是ALL 进行装饰
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler, url)));
    }

<7>doOpen

com.alibaba.dubbo.remoting.transport.netty4.NettyServer#doOpen

    @Override
    protected void doOpen() throws Throwable {
        /**
         *  boss线程,主要监听端口和获取worker线程及分配socketChannel给worker
         * NamedThreadFactory自定义线程工厂 并设置线程为守护线程 主线程关闭 子线程自动关闭
         */
        bootstrap = new ServerBootstrap();
        // worker线程负责数据读写
        bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
        //创建niosocket工厂
        workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                new DefaultThreadFactory("NettyServerWorker", true));
/**
 * NettyServerHandler->NettyServer->...-ProtocolRequestHandle
 * f
 * 
 */
        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
        channels = nettyServerHandler.getChannels();
        //参数讲解:https://blog.csdn.net/zhongzunfa/article/details/94590670
        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)//tcp协议 禁用Nagle 算法
                .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)//tcp协议 允许通一个端口可以绑定到多个套接字
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)//tcp协议
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                                .addLast("decoder", adapter.getDecoder())//解码器
                                .addLast("encoder", adapter.getEncoder())//编码器
                                .addLast("handler", nettyServerHandler);//处理器
                    }
                });
        // 启动服务
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();

    }

<6>AbstractServer

    public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);
        /**
         * 从url获取绑定ip和端口如:dubbo://127.0.0.1:23888....
         * localAddress例子:/192.168.2.1:23888
         * 成员变量
         */
        localAddress = getUrl().toInetSocketAddress();
        //获取绑定id优先从url参数bind.ip获取 获取不到取url里面封装的
        String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
        //获取绑定端口 优先从url bind.port 获取获取不到取url封装的
        int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
        //anyhost是否有取到绑定ip 参见:https://www.cnblogs.com/LQBlog/p/12469007.html#autoid-6-12-0
        if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
            bindIp = NetUtils.ANYHOST;
        }
        //初始化绑定address 成员变量
        bindAddress = new InetSocketAddress(bindIp, bindPort);
        /**
         *  获取url accepts 控制客户端连接服务端的连接数控制 默认0
         *  文档地址:http://dubbo.apache.org/zh-cn/docs/user/references/xml/dubbo-protocol.html
         *  成员变量
         */
        this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
        /**
         * 获取url参数 idle.timeout 空闲超时时间,单位:毫秒
         * 成员变量
         */
        this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
        try {
            //<7>开启服务器 模板方法模式 由子类实现
            doOpen();
            if (logger.isInfoEnabled()) {
                logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
            }
        } catch (Throwable t) {
            throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                    + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
        }
        //获得线程池
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
    }
原文地址:https://www.cnblogs.com/LQBlog/p/12517551.html