Feign整合Hystrix

一、引言

  在我们之前feign源码解析一文中,已经提到了hystrix,所以这次的分析也是接着上次feign的源码,继续展开讲解。

二、FeignClientFactoryBean创建JDK动态代理

public <T> T target(FeignClientFactoryBean factory, Feign.Builder feign,
            FeignContext context, Target.HardCodedTarget<T> target) {
        //没有开启hystrix  feign.hystrix.enabled=false
        if (!(feign instanceof feign.hystrix.HystrixFeign.Builder)) {
            return feign.target(target);
        }
        //如果开启了hystrix
        feign.hystrix.HystrixFeign.Builder builder = (feign.hystrix.HystrixFeign.Builder) feign;
        //获取实例名称
        String name = StringUtils.isEmpty(factory.getContextId()) ? factory.getName()
                : factory.getContextId();
        SetterFactory setterFactory = getOptional(name, context, SetterFactory.class);
        if (setterFactory != null) {
            builder.setterFactory(setterFactory);
        }
        //注解中的fallback
        Class<?> fallback = factory.getFallback();
        if (fallback != void.class) {
            return targetWithFallback(name, context, target, builder, fallback);
        }
        //注解中的FallbackFactory
        Class<?> fallbackFactory = factory.getFallbackFactory();
        if (fallbackFactory != void.class) {
            return targetWithFallbackFactory(name, context, target, builder,
                    fallbackFactory);
        }
        //开启了hystrix,但是没有写fallback或fallbackFactory方法
        return feign.target(target);
    }

targetWithFallback降级逻辑由@FeignClient注解的fallback指定的类提供。

    private <T> T targetWithFallback(String feignClientName, FeignContext context,
            Target.HardCodedTarget<T> target, HystrixFeign.Builder builder,
            Class<?> fallback) {
        // 通过feignClientName和fallback 从子容器获取对应的Bean
        T fallbackInstance = getFromContext("fallback", feignClientName, context,
                fallback, target.type());
        // 调用HystrixFeign.Builder的target方法
        return builder.target(target, fallbackInstance);
    }
    public <T> T target(Target<T> target, T fallback) {
    //通过fallback构建了默认的FallbackFactory 然后把fallback实例赋值给了内部属性constant
return build(fallback != null ? new FallbackFactory.Default<T>(fallback) : null)
      //调用ReflectiveFeign创建 .newInstance(target); }

targetWithFallbackFactory降级逻辑由@FeignClient注解的fallbackFactory指定的类提供。

private <T> T targetWithFallbackFactory(String feignClientName, FeignContext context,
            Target.HardCodedTarget<T> target, HystrixFeign.Builder builder,
            Class<?> fallbackFactoryClass) {
    // 通过feignClientName和fallbackFactoryClass 获取子容器中的FallbackFactory实现类
    FallbackFactory<? extends T> fallbackFactory = (FallbackFactory<? extends T>) getFromContext(
            "fallbackFactory", feignClientName, context, fallbackFactoryClass,
            FallbackFactory.class);
    // 调用HystrixFeign.Builder的target方法
    return builder.target(target, fallbackFactory);
}
    public <T> T target(Target<T> target, FallbackFactory<? extends T> fallbackFactory) {
      //直接通过取到的fallbackFactory实例构建
      return build(fallbackFactory).newInstance(target);
    }

两种fallback最终都走到build(fallbackFactory).newInstance(target)方法

    Feign build(final FallbackFactory<?> nullableFallbackFactory) {
        // 设置创建JDKInvocationHandler的工厂
      super.invocationHandlerFactory(new InvocationHandlerFactory() {
        @Override
        public InvocationHandler create(Target target,
                                        Map<Method, MethodHandler> dispatch) {
            // 创建HystrixInvocationHandler
          return new HystrixInvocationHandler(target, dispatch, setterFactory,
              nullableFallbackFactory);
        }
      });
      // 代理SpringMvcContract,对方法元数据解析做一些Hystrix相关的定制改造
      super.contract(new HystrixDelegatingContract(contract));
      // 构造ReflectiveFeign
      return super.build();
    }
    
@Override
public <T> T newInstance(Target<T> target) {
  // 获取方法名对应的处理类的映射关系
  Map<String, MethodHandler> nameToHandler = targetToHandlersByName.apply(target);
  
  ```
  
  // 创建HystrixInvocationHandler
  InvocationHandler handler = factory.create(target, methodToHandler);
  // 创建JDK动态代理
  T proxy = (T) Proxy.newProxyInstance(target.type().getClassLoader(),
      new Class<?>[] {target.type()}, handler);
  return proxy;
}

这段创建逻辑在之前的feign源码篇已经分析过了,但是没有分析控制器HystrixInvocationHandler,现在分析一下

final class HystrixInvocationHandler implements InvocationHandler {

  // HardCodedTarget实例
  // 封装原始FeignClient接口Class、服务名(trade-service)、url(http://trade-service
  private final Target<?> target;
  // 目标方法 - 实际处理的Handler
  private final Map<Method, MethodHandler> dispatch;
  // 降级工厂
  private final FallbackFactory<?> fallbackFactory;
  // 目标方法 - 目标方法 setAccessible(true)
  private final Map<Method, Method> fallbackMethodMap;
  // 目标方法 - com.netflix.hystrix.HystrixCommand.Setter
  private final Map<Method, Setter> setterMethodMap;

  // 构造方法
  HystrixInvocationHandler(Target<?> target, Map<Method, MethodHandler> dispatch,
      SetterFactory setterFactory, FallbackFactory<?> fallbackFactory) {
    this.target = checkNotNull(target, "target");
    this.dispatch = checkNotNull(dispatch, "dispatch");
    this.fallbackFactory = fallbackFactory;
    // 构造目标方法 - 目标方法映射,主要是为了防止invoke方法多次调用setAccessible(true)
    this.fallbackMethodMap = toFallbackMethod(dispatch);
    // 构造目标方法 - com.netflix.hystrix.HystrixCommand.Setter
    this.setterMethodMap = toSetters(setterFactory, target, dispatch.keySet());
  }

  // 
  static Map<Method, Method> toFallbackMethod(Map<Method, MethodHandler> dispatch) {
    Map<Method, Method> result = new LinkedHashMap<Method, Method>();
    for (Method method : dispatch.keySet()) {
      method.setAccessible(true);
      result.put(method, method);
    }
    return result;
  }

  //
  static Map<Method, Setter> toSetters(SetterFactory setterFactory,
                                       Target<?> target,
                                       Set<Method> methods) {
    Map<Method, Setter> result = new LinkedHashMap<Method, Setter>();
    for (Method method : methods) {
      method.setAccessible(true);
      // 调用SetterFactory的create方法创建Hystrix的Setter
      result.put(method, setterFactory.create(target, method));
    }
    return result;
  }

    ```
}

SetterFactory的默认实现feign.hystrix.SetterFactory.Default。HystrixCommandGroupKey取HardCodedTarget实例的name属性,也就是服务名。HystrixCommandKey取的是StockClient.getStock(Long)类名和方法签名的拼接。所以默认FeignHystrix资源隔离的线程池维度是HystrixCommandGroupKey,一个服务名一个线程池。

final class Default implements SetterFactory {
  @Override
  public HystrixCommand.Setter create(Target<?> target, Method method) {
    String groupKey = target.name();
    String commandKey = Feign.configKey(target.type(), method);
    return HystrixCommand.Setter
        .withGroupKey(HystrixCommandGroupKey.Factory.asKey(groupKey))
        .andCommandKey(HystrixCommandKey.Factory.asKey(commandKey));
  }
}

HystrixInvocationHandler#invoke方法

final class HystrixInvocationHandler implements InvocationHandler {

  @Override
  public Object invoke(final Object proxy, final Method method, final Object[] args)
      throws Throwable {

    // 如果是Object提供的方法,和ReflectiveFeign.FeignInvocationHandler的处理方式一致
    if ("equals".equals(method.getName())) {
      try {
        Object otherHandler =
            args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null;
        return equals(otherHandler);
      } catch (IllegalArgumentException e) {
        return false;
      }
    } else if ("hashCode".equals(method.getName())) {
      return hashCode();
    } else if ("toString".equals(method.getName())) {
      return toString();
    }

    // 创建HystrixCommand
    HystrixCommand<Object> hystrixCommand =
        new HystrixCommand<Object>(setterMethodMap.get(method)) {
          @Override
          protected Object run() throws Exception {
            try {
            // 获取MethodHandler执行方法 和ReflectiveFeign.FeignInvocationHandler的处理方式一致
              return HystrixInvocationHandler.this.dispatch.get(method).invoke(args);
            } catch (Exception e) {
              throw e;
            } catch (Throwable t) {
              throw (Error) t;
            }
          }

          @Override
          protected Object getFallback() {
            // 如果fallbackFactory为空
            // 抛出throw new UnsupportedOperationException("No fallback available.")
            if (fallbackFactory == null) {
              return super.getFallback();
            }
            try {
              // getExecutionException获取HystrixCommand执行过程中的异常
              // 调用自己定义的fallbackFactory的create方法创建fallback
              Object fallback = fallbackFactory.create(getExecutionException());
              // 反射调用fallback的对应方法
              Object result = fallbackMethodMap.get(method).invoke(fallback, args);
              // 解析方法的返回类型
              if (isReturnsHystrixCommand(method)) {
                // 如果是HystrixCommand,那么直接执行execute方法同步返回
                return ((HystrixCommand) result).execute();
              } else if (isReturnsObservable(method)) {
                // Create a cold Observable
                return ((Observable) result).toBlocking().first();
              } else if (isReturnsSingle(method)) {
                // Create a cold Observable as a Single
                return ((Single) result).toObservable().toBlocking().first();
              } else if (isReturnsCompletable(method)) {
                ((Completable) result).await();
                return null;
              } else if (isReturnsCompletableFuture(method)) {
                return ((Future) result).get();
              } else {
                return result;
              }
            } catch (IllegalAccessException e) {
              // shouldn't happen as method is public due to being an interface
              throw new AssertionError(e);
            } catch (InvocationTargetException | ExecutionException e) {
              // Exceptions on fallback are tossed by Hystrix
              throw new AssertionError(e.getCause());
            } catch (InterruptedException e) {
              // Exceptions on fallback are tossed by Hystrix
              Thread.currentThread().interrupt();
              throw new AssertionError(e.getCause());
            }
          }
        };
    // 解析方法返回类型,确定HystrixCommand如何返回
    if (Util.isDefault(method)) {
     // 如果是默认方法直接同步执行
      return hystrixCommand.execute();
    } else if (isReturnsHystrixCommand(method)) {
    // 如果是HystrixCommand,直接返回HystrixCommand实例,不执行
      return hystrixCommand;
    } else if (isReturnsObservable(method)) {
      // Create a cold Observable
      return hystrixCommand.toObservable();
    } else if (isReturnsSingle(method)) {
      // Create a cold Observable as a Single
      return hystrixCommand.toObservable().toSingle();
    } else if (isReturnsCompletable(method)) {
      return hystrixCommand.toObservable().toCompletable();
    } else if (isReturnsCompletableFuture(method)) {
      return new ObservableCompletableFuture<>(hystrixCommand);
    }
     // 正常返回类型,同步执行HystrixCommand的execute方法
    return hystrixCommand.execute();
  }

  private boolean isReturnsCompletable(Method method) {
    return Completable.class.isAssignableFrom(method.getReturnType());
  }

  private boolean isReturnsHystrixCommand(Method method) {
    return HystrixCommand.class.isAssignableFrom(method.getReturnType());
  }

  private boolean isReturnsObservable(Method method) {
    return Observable.class.isAssignableFrom(method.getReturnType());
  }

  private boolean isReturnsCompletableFuture(Method method) {
    return CompletableFuture.class.isAssignableFrom(method.getReturnType());
  }

  private boolean isReturnsSingle(Method method) {
    return Single.class.isAssignableFrom(method.getReturnType());
  }

    ```
}

这里为什么要对不同fallback返回值类型做不同的处理呢?

因为fallback尽量避免阻塞操作,比如网络通信。如果因为业务需要,非得在fallback里做远程调用。那么可以返回HystrixCommand,这样fallback就可以换一个HystrixCommandGroupKey在另外的线程池,并且具有隔离的资源和独立的熔断指标统计。

举个栗子

@Service
public class OuterApiFallback implements OuterApi {

    public Response<String> getDefaultFallBack() {
        return Response.fail("out服务熔断异常");
    }

    @Override
    public Response<String> testConcurrent() {
        return getDefaultFallBack();
    }
    
    @Override
    public HystrixCommand<Response<String>> testNextConcurrent() {
        HystrixCommand.Setter setter = HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("fallback"));
        return new HystrixCommand<Response<String>>(setter) {
                    @Override
                    protected Response<String> run() throws Exception {
                        // 可能是又一次网络通信,比如请求某个降级服务
                        ···
                        //然后返回
                        return Response.success("");
                    }
                };
    }
    
}

spring-cloud-openfeign-core与Hystrix集成是在InvocationHandler里创建了HystrixCommand,支持降级,但是不支持缓存(创建的HystrixCommand没有重写getCacheKey方法)。

参考链接:https://juejin.cn/post/6881546816999915534

原文地址:https://www.cnblogs.com/sglx/p/15784600.html