hystrix源码之线程池

HystrixThreadPool

  定义了hystrix线程池接口

  获取ExecutorService对象,即jdk定义的线程池。
public
ExecutorService getExecutor();
  获取rxjava定义的Scheduler对象。
public Scheduler getScheduler();
  获取rxjava定义的Scheduler对象。
public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread);
  记录一次线程池执行
public void markThreadExecution();
  记录一次线程池执行成功
public void markThreadCompletion();
  记录一次线程池执行拒绝
public void markThreadRejection();
  队列是否有空闲
public boolean isQueueSpaceAvailable();

  通过工厂模式创建hystrix线程池,并设置了缓存,每一个threadkey对应一个HystrixThreadPoolDefault线程池。

static class Factory {
        final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();
        static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
            String key = threadPoolKey.name();
            HystrixThreadPool previouslyCached = threadPools.get(key);
            if (previouslyCached != null) {
                return previouslyCached;
            }
            synchronized (HystrixThreadPool.class) {
                if (!threadPools.containsKey(key)) {
                    threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
                }
            }
            return threadPools.get(key);
        }
    }

HystrixThreadPoolDefault

  线程池默认实现。

  通过HystrixThreadPoolMetrics实现metric记录。

        @Override
        public void markThreadExecution() {
            metrics.markThreadExecution();
        }
        @Override
        public void markThreadCompletion() {
            metrics.markThreadCompletion();
        }
        @Override
        public void markThreadRejection() {
            metrics.markThreadRejection();
        }

  内部通过jdk的ThreadPoolExecutor实现线程池功能。

     @Override
        public boolean isQueueSpaceAvailable() {
            if (queueSize <= 0) {
                return true;
            } else {
                return threadPool.getQueue().size() < properties.queueSizeRejectionThreshold().get();
            }
        }
        @Override
        public ThreadPoolExecutor getExecutor() {
            return threadPool;
        }

  通过HystrixContextScheduler实现rxjava定义的Scheduler。

    @Override
        public Scheduler getScheduler() {
            //by default, interrupt underlying threads on timeout
            return getScheduler(new Func0<Boolean>() {
                @Override
                public Boolean call() {
                    return true;
                }
            });
        }
        @Override
        public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
            touchConfig();
            return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
        }

   通过HystrixConcurrencyStrategy创建线程池,通过HystrixThreadPoolMetrics的单例方法获得一个HystrixThreadPoolMetrics。

public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
            this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
            HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
            this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
                    concurrencyStrategy.getThreadPool(threadPoolKey, properties),
                    properties);
            this.threadPool = this.metrics.getThreadPool();
            HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
        }
原文地址:https://www.cnblogs.com/zhangwanhua/p/7874227.html