Pigeon源码分析(一) -- 服务发布源码分析

@Configuration
public class WaybillQueryFacadeConfiguration {

    @Autowired
    private WaybillQueryFacade waybillQueryFacade;

    @Bean
    public boolean waybillQueryFacade() throws Exception {
        ServiceFactory.addService("http://service.ymm.com/trade/om/waybillQueryFacade_1.0.0",
                WaybillQueryFacade.class, waybillQueryFacade);
        ServiceFactory.publishService("http://service.ymm.com/trade/om/waybillQueryFacade_1.0.0");
        return true;
    }
}

  上面是发布一个rpc服务端的代码,看得出来发布的入口就是ServiceFactory。ServiceFactory的初始化方式是通过静态代码块完成。

  先看ServiceFactory的静态代码块 ServiceFactory # 

static {
        try {
            ProviderBootStrap.init();
            String appname = ConfigManagerLoader.getConfigManager().getAppName();
            if (StringUtils.isBlank(appname) || "NULL".equalsIgnoreCase(appname)) {
                throw new RuntimeException("appname is not assigned");
            }
        } catch (Throwable t) {
            t.printStackTrace();
            logger.error("error while initializing service factory:", t);
            System.exit(1);
        }
    }

  再看 ProviderBootStrap.init(),该方法也是一个static方法,看来作者还真是喜欢static方法。主要就是干了三件事,初始化各种handler,初始化序列化器,初始化写zk的客户端

public static void init() {
        if (!isInitialized) {
            LoggerLoader.init();
            ConfigManager configManager = ConfigManagerLoader.getConfigManager();
            RegistryConfigLoader.init();
            ProviderProcessHandlerFactory.init();//各种和业务相关的handler
            SerializerFactory.init();//初始化各种序列化器
            ClassUtils.loadClasses("com.dianping.pigeon");
            Monitor monitor = MonitorLoader.getMonitor();
            if (monitor != null) {
                monitor.init();
            }
            Thread shutdownHook = new Thread(new ShutdownHookListener());
            shutdownHook.setDaemon(true);
            shutdownHook.setPriority(Thread.MAX_PRIORITY);
            Runtime.getRuntime().addShutdownHook(shutdownHook);
            ServerConfig config = new ServerConfig();
            config.setProtocol(Constants.PROTOCOL_HTTP);
            String poolStrategy = ConfigManagerLoader.getConfigManager().getStringValue(
                    "pigeon.provider.pool.strategy", "shared");
            if ("server".equals(poolStrategy)) {
                int corePoolSize = configManager.getIntValue("pigeon.provider.http.corePoolSize", 5);
                int maxPoolSize = configManager.getIntValue("pigeon.provider.http.maxPoolSize", 300);
                int workQueueSize = configManager.getIntValue("pigeon.provider.http.workQueueSize", 300);
                config.setCorePoolSize(corePoolSize);
                config.setMaxPoolSize(maxPoolSize);
                config.setWorkQueueSize(workQueueSize);
            }
            RegistryManager.getInstance();//写zk的工具类
            List<Server> servers = ExtensionLoader.getExtensionList(Server.class);
            for (Server server : servers) {
                if (!server.isStarted()) {
                    if (server.support(config)) {
                        server.start(config);
                        httpServer = server;
                        serversMap.put(server.getProtocol() + server.getPort(), server);
                        logger.warn("pigeon " + server + "[version:" + VersionUtils.VERSION
                                + "] has been started");
                    }
                }
            }
            //spring 正常启动后回调信息
            SpringEventBinder.regOnSpringLoaded(new Runnable() {
                @Override
                public void run() {
                    try {
                        ServiceStatusChangeTask.start("publish");
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
            });
            isInitialized = true;
        }
    }

注意下 ,所以在公司里windows电脑,在工程代码的同磁盘下这个路径  /data/webapps/appenv 其中 appenv就是文件名 没有后缀

public class RegistryConfigLoader {

    private static final Logger logger = LoggerLoader.getLogger(RegistryConfigLoader.class);

    private static final String ENV_FILE = "/data/webapps/appenv";

    static volatile boolean isInitialized = false;

上述代码首先分析了 ServiceFactory的静态代码块,初始化工作主要包括三个

1 业务handlers

2 序列化器

3 注册客户端,比如curator

继续分析 ServiceFactory.addService

public static <T> void addService(String url, Class<T> serviceInterface, T service, int port) throws RpcException {
        ProviderConfig<T> providerConfig = new ProviderConfig<T>(serviceInterface, service);
        providerConfig.setUrl(url);
        providerConfig.getServerConfig().setPort(port);
        addService(providerConfig);
    }

一个rpc服务端最重要的三个参数 1 接口 2 实现类对象 3 url

这三个参数会构造成ProviderConfig

public static <T> void addService(ProviderConfig<T> providerConfig) throws RpcException {
        if (StringUtils.isBlank(providerConfig.getUrl())) {
            providerConfig.setUrl(getServiceUrl(providerConfig));
        }
        try {
            ServicePublisher.addService(providerConfig);
            ServerConfig serverConfig = ProviderBootStrap.startup(providerConfig);
            providerConfig.setServerConfig(serverConfig);
            ServicePublisher.publishService(providerConfig, false);
        } catch (RegistryException t) {
            throw new RpcException("error while adding service:" + providerConfig, t);
        } catch (Throwable t) {
            throw new RpcException("error while adding service:" + providerConfig, t);
        }
    }

ServicePublisher#addService

public static <T> void addService(ProviderConfig<T> providerConfig) throws Exception {
        if (logger.isInfoEnabled()) {
            logger.info("add service:" + providerConfig);
        }
        String version = providerConfig.getVersion();
        String url = providerConfig.getUrl();
        if (StringUtils.isBlank(version)) {// default version 我们一般都不会加version的
            serviceCache.put(url, providerConfig);
        } else {
            String urlWithVersion = getServiceUrlWithVersion(url, version);
            if (serviceCache.containsKey(url)) {
                serviceCache.put(urlWithVersion, providerConfig);
                ProviderConfig<?> providerConfigDefault = serviceCache.get(url);
                String defaultVersion = providerConfigDefault.getVersion();
                if (!StringUtils.isBlank(defaultVersion)) {
                    if (VersionUtils.compareVersion(defaultVersion, providerConfig.getVersion()) < 0) {
                        // replace existing service with this newer service as
                        // the default provider
                        serviceCache.put(url, providerConfig);
                    }
                }
            } else {
                serviceCache.put(urlWithVersion, providerConfig);
                // use this service as the default provider
                serviceCache.put(url, providerConfig);
            }
        }
        T service = providerConfig.getService();
        if (service instanceof InitializingService) {
            ((InitializingService) service).initialize();
        }
        ServiceMethodFactory.init(url);//如果该url是第一次注册,会建立url和method的本地缓存
    }

其实就是建立各种缓存

ProviderBootStrap# startup(providerConfig);

public static ServerConfig startup(ProviderConfig<?> providerConfig) {
        ServerConfig serverConfig = providerConfig.getServerConfig();
        if (serverConfig == null) {
            throw new IllegalArgumentException("server config is required");
        }
        Server server = serversMap.get(serverConfig.getProtocol() + serverConfig.getPort());//找tcp服务器
        if (server != null) {
            server.addService(providerConfig);//继续调用server的addService
            return server.getServerConfig();
        } else {
            synchronized (ProviderBootStrap.class) {
                List<Server> servers = ExtensionLoader.newExtensionList(Server.class);//通过classloader的方式实例化
                for (Server s : servers) {
                    if (!s.isStarted()) {
                        if (s.support(serverConfig)) {
                            s.start(serverConfig);//启动netty
                            s.addService(providerConfig);
                            serversMap.put(s.getProtocol() + serverConfig.getPort(), s);
                            logger.warn("pigeon " + s + "[version:" + VersionUtils.VERSION
                                    + "] has been started");
                            break;
                        }
                    }
                }
                server = serversMap.get(serverConfig.getProtocol() + serverConfig.getPort());
                if (server != null) {
                    return server.getServerConfig();
                }
                return null;
            }
        }
    }
AbstractServer#addService
public <T> void addService(ProviderConfig<T> providerConfig) {
        requestProcessor.addService(providerConfig);//下面的代码不重要,重点就是这个
        doAddService(providerConfig);
        List<ServiceChangeListener> listeners = ServiceChangeListenerContainer.getListeners();
        for (ServiceChangeListener listener : listeners) {
            listener.notifyServiceAdded(providerConfig);
        }
    }

RequestThreadPoolProcessor # addService

 看代码其实就是根据配置,给方法配置执行的线程池

@Override
    public synchronized <T> void addService(ProviderConfig<T> providerConfig) {
        String url = providerConfig.getUrl();
        Map<String, ProviderMethodConfig> methodConfigs = providerConfig.getMethods();
        ServiceMethodCache methodCache = ServiceMethodFactory.getServiceMethodCache(url);
        Set<String> methodNames = methodCache.getMethodMap().keySet();
        if (needStandalonePool(providerConfig)) {
            if (methodThreadPools == null) {
                methodThreadPools = new ConcurrentHashMap<String, DynamicThreadPool>();
            }
            if (serviceThreadPools == null) {
                serviceThreadPools = new ConcurrentHashMap<String, DynamicThreadPool>();
            }
            if (providerConfig.getActives() > 0 && CollectionUtils.isEmpty(methodConfigs)) {
                String key = url;
                DynamicThreadPool pool = serviceThreadPools.get(key);
                if (pool == null) {
                    int actives = providerConfig.getActives();
                    int coreSize = (int) (actives / DEFAULT_POOL_RATIO_CORE) > 0 ? (int) (actives / DEFAULT_POOL_RATIO_CORE)
                            : actives;
                    int maxSize = actives;
                    int queueSize = actives;
                    pool = new DynamicThreadPool("Pigeon-Server-Request-Processor-service", coreSize, maxSize,queueSize);
                    serviceThreadPools.putIfAbsent(key, pool);
                }
            }
            if (!CollectionUtils.isEmpty(methodConfigs)) {
                for (String name : methodNames) {
                    if (!methodConfigs.containsKey(name)) {
                        continue;
                    }
                    String key = url + "#" + name;
                    DynamicThreadPool pool = methodThreadPools.get(key);
                    if (pool == null) {
                        int actives = DEFAULT_POOL_ACTIVES;
                        ProviderMethodConfig methodConfig = methodConfigs.get(name);
                        if (methodConfig != null && methodConfig.getActives() > 0) {
                            actives = methodConfig.getActives();
                        }
                        int coreSize = (int) (actives / DEFAULT_POOL_RATIO_CORE) > 0 ? (int) (actives / DEFAULT_POOL_RATIO_CORE)
                                : actives;
                        int maxSize = actives;
                        int queueSize = actives;
                        pool = new DynamicThreadPool("Pigeon-Server-Request-Processor-method", coreSize, maxSize, queueSize);
                        methodThreadPools.putIfAbsent(key, pool);
                    }
                }
            }
        }
    }

  最后到了 ServicePublisher.publishService(providerConfig, false);

  就是这个方法进行写zk

  核心代码在这里

void registerPersistentNode(String serviceName, String group, String serviceAddress, int weight)
            throws RegistryException {
        String weightPath = Utils.getWeightPath(serviceAddress);/DP/WEIGHT/ip:port
        String servicePath = Utils.getServicePath(serviceName, group); 
        try {
            if (weight > 0) {
                client.set(weightPath, "" + weight);
            }
            if (client.exists(servicePath, false)) {
                Stat stat = new Stat();
                String addressValue = client.get(servicePath, stat);
                String[] addressArray = addressValue.split(",");
                List<String> addressList = new ArrayList<String>();
                for (String addr : addressArray) {
                    addr = addr.trim();
                    if (addr.length() > 0 && !addressList.contains(addr)) {
                        addressList.add(addr.trim());
                    }
                }
                if (!addressList.contains(serviceAddress)) {
                    addressList.add(serviceAddress);
                    Collections.sort(addressList);
                    client.set(servicePath, StringUtils.join(addressList.iterator(), ","), stat.getVersion());
                }
            } else {
                client.create(servicePath, serviceAddress);
            }
            if (logger.isInfoEnabled()) {
                logger.info("registered service to persistent node: " + servicePath);
            }
        } catch (Throwable e) {
            if(e instanceof BadVersionException || e instanceof NodeExistsException) {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException ie) {
                    //ignore
                }
                registerPersistentNode(serviceName, group, serviceAddress, weight);
            } else {
                logger.error("failed to register service to " + servicePath, e);
                throw new RegistryException(e);
            }

        }
    }

[zk: localhost:2181(CONNECTED) 1] ls /DP/SERVER
[@HTTP@http:^^service.dianping.com^rpcserver^commonService_1.0.0, http:^^service.dianping.com^rpcserver^commonService_1.0.0]

dp/server/中实际的数据格式如下

[zk: localhost:2181(CONNECTED) 2] get /DP/SERVER/http:^^service.dianping.com^rpcserver^commonService_1.0.0
10.190.38.63:6088 

/DP/WEIGHT/ 中实际的值如下

[zk: localhost:2181(CONNECTED) 5] ls /DP/WEIGHT
[10.190.38.63:4080, 10.190.38.63:4081, 10.190.38.63:6088]
[zk: localhost:2181(CONNECTED) 6] get /DP/WEIGHT/10.190.38.63:6088
10

原文地址:https://www.cnblogs.com/juniorMa/p/14832970.html