hystrix源码之请求合并

请求合并

  使用HystrixObservableCollapser可以将参数不同,但执行过程相同的调用合并执行。当调用observe、toObservable方法时,会向RequestCollapser提交getRequestArgument方法获取的参数。用户需要实现getRequestArgument方法来设定请求参数,RequestCollapser是执行批量的类。

    public Observable<ResponseType> toObservable(Scheduler observeOn) {
        return Observable.defer(new Func0<Observable<ResponseType>>() {
            @Override
            public Observable<ResponseType> call() {
                final boolean isRequestCacheEnabled = getProperties().requestCacheEnabled().get();
                //是否开启缓存,如果开启缓存并且实现了getCacheKey方法,则首先从HystrixRequestCache获取结果数据
                if (isRequestCacheEnabled) {
                    HystrixCachedObservable<ResponseType> fromCache = requestCache.get(getCacheKey());
                    if (fromCache != null) {
                        metrics.markResponseFromCache();
                        return fromCache.toObservable();
                    }
                }
           //获取合并请求对象,提交请求
                RequestCollapser<BatchReturnType, ResponseType, RequestArgumentType> requestCollapser = collapserFactory.getRequestCollapser(collapserInstanceWrapper);
                Observable<ResponseType> response = requestCollapser.submitRequest(getRequestArgument());
                metrics.markRequestBatched();
        // 是否开启缓存,如果开启缓存并且实现了getCacheKey方法,将结果放到HystrixRequestCache
if (isRequestCacheEnabled) { HystrixCachedObservable<ResponseType> toCache = HystrixCachedObservable.from(response); HystrixCachedObservable<ResponseType> fromCache = requestCache.putIfAbsent(getCacheKey(), toCache); if (fromCache == null) { return toCache.toObservable(); } else { return fromCache.toObservable(); } } return response; } }); }

RequestCollapserFactory

  创建RequestCollapser,如果是GLOBAL作用域,会创建一个RequestCollapser,并且使用一个static的ConcurrentHashMap作为一个全局的缓存。如果是REQUEST作用域,会创建一个RequestCollapser,使用HystrixRequestVariableHolder来存储。

复制代码
private static ConcurrentHashMap<String, RequestCollapser<?, ?, ?>> globalScopedCollapsers = new ConcurrentHashMap<String, RequestCollapser<?, ?, ?>>();
private static ConcurrentHashMap<String, HystrixRequestVariableHolder<RequestCollapser<?, ?, ?>>> requestScopedCollapsers = new ConcurrentHashMap<String, HystrixRequestVariableHolder<RequestCollapser<?, ?, ?>>>();

public RequestCollapser<BatchReturnType, ResponseType, RequestArgumentType> getRequestCollapser(HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> commandCollapser) {
        if (Scopes.REQUEST == Scopes.valueOf(getScope().name())) {
            return getCollapserForUserRequest(commandCollapser);
        } else if (Scopes.GLOBAL == Scopes.valueOf(getScope().name())) {
            return getCollapserForGlobalScope(commandCollapser);
        } else {
            logger.warn("Invalid Scope: {}  Defaulting to REQUEST scope.", getScope());
            return getCollapserForUserRequest(commandCollapser);
        }
    }
复制代码

 RequestCollapser

   内部有一个RequestBatch对象,代表当前的一批请求,当定时器到达一定间隔,会执行该批量请求对象中所有的请求,然后新建一个RequestBatch对象。


...
private final AtomicReference<RequestBatch<BatchReturnType, ResponseType, RequestArgumentType>> batch = new AtomicReference<RequestBatch<BatchReturnType, ResponseType, RequestArgumentType>>();
....

private class CollapsedTask implements TimerListener {
        final Callable<Void> callableWithContextOfParent;

        CollapsedTask() {
            // this gets executed from the context of a HystrixCommand parent thread (such as a Tomcat thread)
            // so we create the callable now where we can capture the thread context
            callableWithContextOfParent = new HystrixContextCallable<Void>(concurrencyStrategy, new Callable<Void>() {
                // the wrapCallable call allows a strategy to capture thread-context if desired

                @Override
                public Void call() throws Exception {
                    try {
                        // we fetch current so that when multiple threads race
                        // we can do compareAndSet with the expected/new to ensure only one happens
                        RequestBatch<BatchReturnType, ResponseType, RequestArgumentType> currentBatch = batch.get();
                        // 1) it can be null if it got shutdown
                        // 2) we don't execute this batch if it has no requests and let it wait until next tick to be executed
                        if (currentBatch != null && currentBatch.getSize() > 0) {
                            // do execution within context of wrapped Callable
                            createNewBatchAndExecutePreviousIfNeeded(currentBatch);
                        }
                    } catch (Throwable t) {
                        logger.error("Error occurred trying to execute the batch.", t);
                        t.printStackTrace();
                        // ignore error so we don't kill the Timer mainLoop and prevent further items from being scheduled
                    }
                    return null;
                }

            });
        }

        @Override
        public void tick() {
            try {
                callableWithContextOfParent.call();
            } catch (Exception e) {
                logger.error("Error occurred trying to execute callable inside CollapsedTask from Timer.", e);
                e.printStackTrace();
            }
        }

        @Override
        public int getIntervalTimeInMilliseconds() {
            return properties.timerDelayInMilliseconds().get();
        }

    }

   使用HystrixContextCallable是为了batch执行共享父亲HystrixRequestContext。

@Override
    public K call() throws Exception {
        HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread();
        try {
            // set the state of this thread to that of its parent
            HystrixRequestContext.setContextOnCurrentThread(parentThreadState);
            // execute actual Callable with the state of the parent
            return actual.call();
        } finally {
            // restore this thread back to its original state
            HystrixRequestContext.setContextOnCurrentThread(existingState);
        }
    }

 RequestBatch

  内部有一个ConcurrentMap来存储请求对象。key为:请求参数对象,value为CollapsedRequest对象。

private final ConcurrentMap<RequestArgumentType, CollapsedRequest<ResponseType, RequestArgumentType>> argumentMap =
            new ConcurrentHashMap<RequestArgumentType, CollapsedRequest<ResponseType, RequestArgumentType>>();

   当执行批量请求方法时,遍历存储的CollapsedRequest对象,通过commandCollapser的createObservableCommand方法创建执行批量请求的命令,通过commandCollapser的mapResponseToRequests方法将执行结果和请求进行匹配。

try {
                // shard batches
                Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shards = commandCollapser.shardRequests(argumentMap.values());
                // for each shard execute its requests 
                for (final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> shardRequests : shards) {
                    try {
                        // create a new command to handle this batch of requests
                        Observable<BatchReturnType> o = commandCollapser.createObservableCommand(shardRequests);

                        commandCollapser.mapResponseToRequests(o, shardRequests)
...

  

原文地址:https://www.cnblogs.com/zhangwanhua/p/7851283.html