hystrix源码之插件

HystrixPlugins

  获取并发相关类(HystrixConcurrencyStrategy)、事件通知类(HystrixEventNotifier)、度量信息类(HystrixMetricsPublisher)、Properties配置类(HystrixPropertiesStrategy)、HystrixCommand回调函数类(HystrixCommandExecutionHook)、HystrixDynamicProperties,6类插件。

插件获取:

  HystrixPlugins 首先会去properties(HystrixDynamicProperties)配置下需找相应类型的实现类。

private static <T> T getPluginImplementationViaProperties(Class<T> pluginClass, HystrixDynamicProperties dynamicProperties) {
        String classSimpleName = pluginClass.getSimpleName();
        // Check Archaius for plugin class.
        String propertyName = "hystrix.plugin." + classSimpleName + ".implementation";
        String implementingClass = dynamicProperties.getString(propertyName, null).get();
....

  如果返回null,则通过ServiceLoader获取相应类型的实现类。

private <T> T getPluginImplementation(Class<T> pluginClass) {
        T p = getPluginImplementationViaProperties(pluginClass, dynamicProperties);
        if (p != null) return p;        
        return findService(pluginClass, classLoader);
    }
    private static <T> T findService(
            Class<T> spi, 
            ClassLoader classLoader) throws ServiceConfigurationError {
        
        ServiceLoader<T> sl = ServiceLoader.load(spi,
                classLoader);
        for (T s : sl) {
            if (s != null)
                return s;
        }
        return null;
    }

  如果获取成功,存储变量中。

final AtomicReference<HystrixEventNotifier> notifier = new AtomicReference<HystrixEventNotifier>();
public
HystrixEventNotifier getEventNotifier() { if (notifier.get() == null) { // check for an implementation from Archaius first Object impl = getPluginImplementation(HystrixEventNotifier.class); if (impl == null) { // nothing set via Archaius so initialize with default notifier.compareAndSet(null, HystrixEventNotifierDefault.getInstance()); // we don't return from here but call get() again in case of thread-race so the winner will always get returned } else { // we received an implementation from Archaius so use it notifier.compareAndSet(null, (HystrixEventNotifier) impl); } } return notifier.get(); }

  获取HystrixDynamicProperties对象,有点不同,首先使用HystrixDynamicPropertiesSystemProperties配置来需找相应类型的实现类。第二如果查询不同会默认使用Archaius,如果依然没有Archaius支持,会使用HystrixDynamicPropertiesSystemProperties。

private static HystrixDynamicProperties resolveDynamicProperties(ClassLoader classLoader, LoggerSupplier logSupplier) {
        HystrixDynamicProperties hp = getPluginImplementationViaProperties(HystrixDynamicProperties.class, 
                HystrixDynamicPropertiesSystemProperties.getInstance());
        if (hp != null) {
            logSupplier.getLogger().debug(
                    "Created HystrixDynamicProperties instance from System property named "
                    + ""hystrix.plugin.HystrixDynamicProperties.implementation". Using class: {}", 
                    hp.getClass().getCanonicalName());
            return hp;
        }
        hp = findService(HystrixDynamicProperties.class, classLoader);
        if (hp != null) {
            logSupplier.getLogger()
                    .debug("Created HystrixDynamicProperties instance by loading from ServiceLoader. Using class: {}", 
                            hp.getClass().getCanonicalName());
            return hp;
        }
        hp = HystrixArchaiusHelper.createArchaiusDynamicProperties();
        if (hp != null) {
            logSupplier.getLogger().debug("Created HystrixDynamicProperties. Using class : {}", 
                    hp.getClass().getCanonicalName());
            return hp;
        }
        hp = HystrixDynamicPropertiesSystemProperties.getInstance();
        logSupplier.getLogger().info("Using System Properties for HystrixDynamicProperties! Using class: {}", 
                hp.getClass().getCanonicalName());
        return hp;
    }

  HystrixConcurrencyStrategy

  并发相关的策略类。

  获取线程池,实际根据配置创建ThreadPoolExecutor。

/**
     获取线程池*/
    public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize, HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);
        final int dynamicCoreSize = corePoolSize.get();
        final int dynamicMaximumSize = maximumPoolSize.get();
        if (dynamicCoreSize > dynamicMaximumSize) {
            logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
                    dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ".  Maximum size will be set to " +
                    dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
            return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime.get(), unit, workQueue, threadFactory);
        } else {
            return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime.get(), unit, workQueue, threadFactory);
        }
    }

    public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
        final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);
        final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();
        final int dynamicCoreSize = threadPoolProperties.coreSize().get();
        final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();
        final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
        final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);
        if (allowMaximumSizeToDivergeFromCoreSize) {
            final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
            if (dynamicCoreSize > dynamicMaximumSize) {
                logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
                        dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ".  Maximum size will be set to " +
                        dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
                return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
            } else {
                return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
            }
        } else {
            return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
        }
    }
    private static ThreadFactory getThreadFactory(final HystrixThreadPoolKey threadPoolKey) {
        if (!PlatformSpecific.isAppEngineStandardEnvironment()) {
            return new ThreadFactory() {
                private final AtomicInteger threadNumber = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r, "hystrix-" + threadPoolKey.name() + "-" + threadNumber.incrementAndGet());
                    thread.setDaemon(true);
                    return thread;
                }

            };
        } else {
            return PlatformSpecific.getAppEngineThreadFactory();
        }
    }
    public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
        if (maxQueueSize <= 0) {
            return new SynchronousQueue<Runnable>();
        } else {
            return new LinkedBlockingQueue<Runnable>(maxQueueSize);
        }
    }

   在执行callable前提供用户对callable进行分装

public <T> Callable<T> wrapCallable(Callable<T> callable) {
        return callable;
    }

  获取HystrixRequestVariable

public <T> HystrixRequestVariable<T> getRequestVariable(final HystrixRequestVariableLifecycle<T> rv) {
        return new HystrixLifecycleForwardingRequestVariable<T>(rv);
    }

HystrixCommandExecutionHook

   hystrix命令执行过程中的回调接口,提供了一下回调接口。

    /**
    在命令执行前被调用*/
    public <T> void onStart(HystrixInvokable<T> commandInstance) {
        //do nothing by default
    }

    /**
     当接收到数据时被调用*/
    public <T> T onEmit(HystrixInvokable<T> commandInstance, T value) {
        return value; //by default, just pass through
    }

    /**
     *当命令异常时被调用
     * @since 1.2
     */
    public <T> Exception onError(HystrixInvokable<T> commandInstance, FailureType failureType, Exception e) {
        return e; //by default, just pass through
    }

    /**
     *当执行成功后被调用
     * @since 1.4
     */
    public <T> void onSuccess(HystrixInvokable<T> commandInstance) {
        //do nothing by default
    }

    /**
     在线程池执行命令前执行*/
    public <T> void onThreadStart(HystrixInvokable<T> commandInstance) {
        //do nothing by default
    }

    /**
    在线程池执行完成后调用*/
    public <T> void onThreadComplete(HystrixInvokable<T> commandInstance) {
        // do nothing by default
    }

    /**
    当开始执行命令时调用*/
    public <T> void onExecutionStart(HystrixInvokable<T> commandInstance) {
        //do nothing by default
    }

    /**
     当执行命令emits a value时调用
     *
     * @param commandInstance The executing HystrixInvokable instance.
     * @param value value emitted
     *
     * @since 1.4
     */
    public <T> T onExecutionEmit(HystrixInvokable<T> commandInstance, T value) {
        return value; //by default, just pass through
    }

    /**
    执行抛出异常时调用*/
    public <T> Exception onExecutionError(HystrixInvokable<T> commandInstance, Exception e) {
        return e; //by default, just pass through
    }

    /**
    当调用命令成功执行完调用
     * @since 1.4
     */
    public <T> void onExecutionSuccess(HystrixInvokable<T> commandInstance) {
        //do nothing by default
    }

    /**
     * Invoked when the fallback method in {@link HystrixInvokable} starts.
     *
     * @param commandInstance The executing HystrixInvokable instance.
     *
     * @since 1.2
     */
    public <T> void onFallbackStart(HystrixInvokable<T> commandInstance) {
        //do nothing by default
    }

    /**
     * Invoked when the fallback method in {@link HystrixInvokable} emits a value.
     *
     * @param commandInstance The executing HystrixInvokable instance.
     * @param value value emitted
     *
     * @since 1.4
     */
    public <T> T onFallbackEmit(HystrixInvokable<T> commandInstance, T value) {
        return value; //by default, just pass through
    }

    /**
     * Invoked when the fallback method in {@link HystrixInvokable} fails with an Exception.
     *
     * @param commandInstance The executing HystrixInvokable instance.
     * @param e exception object
     *
     * @since 1.2
     */
    public <T> Exception onFallbackError(HystrixInvokable<T> commandInstance, Exception e) {
        //by default, just pass through
        return e;
    }

    /**
     * Invoked when the user-defined execution method in {@link HystrixInvokable} completes successfully.
     *
     * @param commandInstance The executing HystrixInvokable instance.
     *
     * @since 1.4
     */
    public <T> void onFallbackSuccess(HystrixInvokable<T> commandInstance) {
        //do nothing by default
    }

    /**
    如果从缓存中获取数据时调用*/
    public <T> void onCacheHit(HystrixInvokable<T> commandInstance) {
        //do nothing by default
    }

    /**
    当取消挂载时被调用
     * @since 1.5.9
     */
    public <T> void onUnsubscribe(HystrixInvokable<T> commandInstance) {
        //do nothing by default
    }

 HystrixEventNotifier

  hystrix命令执行过程中,接收相应的事件通知。

/**
    接收到消息后执行
  接受一下消息:
RESPONSE_FROM_CACHE如果该command从缓存中获取数据
CANCELLED 如果command在完成前unsubscrible
EXCEPTION_THROWN 如果command出现异常
EMIT 如果command发送数据
THREAD_POOL_REJECTED 如果线程池抛出reject异常
FAILURE 执行失败异常
  FALLBACK_EMIT执行fallback返回数据
  FALLBACK_SUCCESS执行fallback成功
  FALLBACK_FAILURE执行fallback发生异常
  FALLBACK_REJECTION超过信号量,fallback未被执行。
  FALLBACK_MISSING
  SEMAPHORE_REJECTED信号量模型执行被拒绝。
  SHORT_CIRCUITED熔断
*/ public void markEvent(HystrixEventType eventType, HystrixCommandKey key) { // do nothing } /** * Called after a command is executed using thread isolation. * Will not get called if a command is rejected, short-circuited etc.*/ public void markCommandExecution(HystrixCommandKey key, ExecutionIsolationStrategy isolationStrategy, int duration, List<HystrixEventType> eventsDuringExecution) { // do nothing }

 HystrixMetricsPublisher

  HystrixMetricsPublisherFactory使用该插件来创建HystrixMetricsPublisherCommand、HystrixMetricsPublisherThreadPool、HystrixMetricsPublisherCollapser,并调用相应的initialize方法,这些类用来向第三方publish hystrix的metrics信息。

private final ConcurrentHashMap<String, HystrixMetricsPublisherThreadPool> threadPoolPublishers = new ConcurrentHashMap<String, HystrixMetricsPublisherThreadPool>();

    /* package */ HystrixMetricsPublisherThreadPool getPublisherForThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolMetrics metrics, HystrixThreadPoolProperties properties) {
        // attempt to retrieve from cache first
        HystrixMetricsPublisherThreadPool publisher = threadPoolPublishers.get(threadPoolKey.name());
        if (publisher != null) {
            return publisher;
        }
        // it doesn't exist so we need to create it
        publisher = HystrixPlugins.getInstance().getMetricsPublisher().getMetricsPublisherForThreadPool(threadPoolKey, metrics, properties);
        // attempt to store it (race other threads)
        HystrixMetricsPublisherThreadPool existing = threadPoolPublishers.putIfAbsent(threadPoolKey.name(), publisher);
        if (existing == null) {
            // we won the thread-race to store the instance we created so initialize it
            publisher.initialize();
            // done registering, return instance that got cached
            return publisher;
        } else {
            // we lost so return 'existing' and let the one we created be garbage collected
            // without calling initialize() on it
            return existing;
        }
    }

  

 HystrixPropertiesStrategy

  创建HystrixCommandProperties、HystrixThreadPoolProperties、HystrixCollapserProperties、HystrixTimerThreadPoolProperties

/**
     创建HystrixCommandProperties*/
    public HystrixCommandProperties getCommandProperties(HystrixCommandKey commandKey, HystrixCommandProperties.Setter builder) {
        return new HystrixPropertiesCommandDefault(commandKey, builder);
    }/**
     创建HystrixThreadPoolProperties*/
    public HystrixThreadPoolProperties getThreadPoolProperties(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter builder) {
        return new HystrixPropertiesThreadPoolDefault(threadPoolKey, builder);
    }/**
     创建HystrixCollapserProperties*/
    public HystrixCollapserProperties getCollapserProperties(HystrixCollapserKey collapserKey, HystrixCollapserProperties.Setter builder) {
        return new HystrixPropertiesCollapserDefault(collapserKey, builder);
    }
/**
     创建HystrixTimerThreadPoolProperties*/
    public HystrixTimerThreadPoolProperties getTimerThreadPoolProperties() {
        return new HystrixPropertiesTimerThreadPoolDefault();
    }
原文地址:https://www.cnblogs.com/zhangwanhua/p/7878515.html