Hystrix的原理与使用

转载自:https://segmentfault.com/a/1190000005988895
              http://blog.csdn.net/xiaoyu411502/article/details/50601687

       Netflix的 Hystrix 是一个帮助解决分布式系统交互时超时处理和容错的类库, 它同样拥有保护系统的能力.
       Hystrix的设计原则包括:资源隔离、熔断器、命令模式。

资源隔离

       货船为了进行防止漏水和火灾的扩散,会将货仓分隔为多个,这种资源隔离减少风险的方式被称为:Bulkheads(舱壁隔离模式)。
       Hystrix将同样的模式运用到了服务调用者上,在一个高度服务化的系统中,一个业务逻辑通常会依赖多个服务,比如:商品详情展示服务会依赖商品服务,价格服务,商品评论服务。调用三个依赖服务会共享商品详情服务的线程池。如果其中的商品评论服务不可用,就会出现线程池里所有线程都因等待响应而被阻塞,从而造成服务雪崩。Hystrix通过将每个依赖服务分配独立的线程池进行资源隔离,从而避免服务雪崩。

熔断器模式

       熔断器模式定义了熔断器开关相互转换的逻辑。
       服务的健康状况 = 请求失败数 / 请求总数。熔断器开关由关闭到打开的状态转换是通过当前服务健康状况和设定阈值比较决定的。
       当熔断器开关关闭时,请求被允许通过熔断器。 如果当前健康状况高于设定阈值,开关继续保持关闭。如果当前健康状况低于设定阈值,开关则切换为打开状态。当熔断器开关打开时,请求被禁止通过。当熔断器开关处于打开状态,经过一段时间后,熔断器会自动进入半开状态,这时熔断器只允许一个请求通过。当该请求调用成功时,熔断器恢复到关闭状态。若该请求失败,熔断器继续保持打开状态,接下来的请求被禁止通过。
       熔断器的开关能保证服务调用者在调用异常服务时,快速返回结果,避免大量的同步等待,并且熔断器能在一段时间后继续侦测请求执行结果,提供恢复服务调用的可能。

命令模式

       Hystrix使用命令模式(继承HystrixCommand类)来包裹具体的服务调用逻辑(run方法),并在命令模式中添加了服务调用失败后的降级逻辑(getFallback)。
       同时在Command的构造方法中可以定义当前服务线程池和熔断器的相关参数。因此在使用了Command模式构建了服务对象之后,服务便拥有了熔断器和线程池的功能。


示例
使用命令模式封装依赖逻辑:

  1. public class HelloWorldCommand extends HystrixCommand<String> {
  2. private final String name;
  3.  
  4. public HelloWorldCommand(String name) {
  5. // //最少配置:指定命令组名(CommandGroup)
  6. // super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
  7.  
  8. super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
  9. .andCommandKey(HystrixCommandKey.Factory.asKey("query"))
  10. .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ExampleThreadPool"))
  11. .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(20))//服务线程池数量
  12. .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
  13. .withCircuitBreakerErrorThresholdPercentage(60) //熔断器关闭到打开阈值
  14. .withCircuitBreakerSleepWindowInMilliseconds(3000)//熔断器打开到关闭的时间窗长度
  15. ));
  16. this.name = name;
  17. }
  18.  
  19. @Override
  20. protected String run() {
  21. // 依赖逻辑封装在run()方法中
  22. return "Hello " + name + ", thread: " + Thread.currentThread().getName();
  23. }
  24.  
  25. //调用实例
  26. public static void main(String[] args) throws Exception {
  27. //每个Command对象只能调用一次,不可以重复调用
  28. HelloWorldCommand helloWorldCommand = new HelloWorldCommand("Synchronous-hystrix");
  29.  
  30. //使用execute()同步调用代码,效果等同于:helloWorldCommand.queue().get();
  31. String result = helloWorldCommand.execute();
  32. System.out.println("result = " + result);
  33.  
  34. helloWorldCommand = new HelloWorldCommand("Asynchronous-hystrix");
  35. //异步调用,可自由控制获取结果时机,
  36. Future<String> future = helloWorldCommand.queue();
  37. //get操作不能超过command定义的超时时间,默认:1秒
  38. result = future.get(100, TimeUnit.MILLISECONDS);
  39. System.out.println("result = " + result);
  40. System.out.println("mainThread = " + Thread.currentThread().getName());
  41. }
  42. }

运行结果:

  1. result = Hello Synchronous-hystrix, thread: hystrix-ExampleThreadPool-1
  2. result = Hello Asynchronous-hystrix, thread: hystrix-ExampleThreadPool-2
  3. mainThread = main

       注意上面有几个参数:
       CommandKey,依赖命名,每个CommandKey代表一个依赖抽象,相同的依赖要使用相同的CommandKey名称。依赖隔离的根本就是对相同CommandKey的依赖做隔离。
       CommandGroup,依赖分组,命令分组用于对依赖操作分组,便于统计,汇总等。CommandGroup是每个命令最少配置的必选参数,在不指定ThreadPoolKey的情况下,字面值用于对不同依赖的线程池/信号区分。
         ThreadPoolKey,线程池/信号,当对同一业务依赖做隔离时使用CommandGroup做区分,但是对同一依赖的不同远程调用,例如一个是redis,一个是http,可以使用HystrixThreadPoolKey做隔离区分。即最然在业务上都是相同的组,但是需要在资源上做隔离时,是可以使用HystrixThreadPoolKey区分。

注册异步事件回调执行:

  1. //注册观察者事件拦截
  2. Observable<String> fs = new HelloWorldCommand("World").observe();
  3. //注册结果回调事件
  4. fs.subscribe(new Action1<String>() {
  5. @Override
  6. public void call(String result) {
  7. //执行结果处理,result 为HelloWorldCommand返回的结果
  8. //用户对结果做二次处理.
  9. System.out.println("call : " + result);
  10. }
  11. });
  12.  
  13. //注册完整执行生命周期事件
  14. fs.subscribe(new Observer<String>() {
  15. @Override
  16. public void onCompleted() {
  17. // onNext/onError完成之后最后回调
  18. System.out.println("execute onCompleted");
  19. }
  20.  
  21. @Override
  22. public void onError(Throwable e) {
  23. // 当产生异常时回调
  24. System.out.println("onError " + e.getMessage());
  25. e.printStackTrace();
  26. }
  27.  
  28. @Override
  29. public void onNext(String v) {
  30. // 获取结果后回调
  31. System.out.println("onNext: " + v);
  32. }
  33. });

运行结果:

  1. call : Hello World, thread: hystrix-ExampleThreadPool-3
  2. onNext: Hello World, thread: hystrix-ExampleThreadPool-3
  3. execute onCompleted


使用Fallback() 提供降级策略:

  1. //重载HystrixCommand 的getFallback方法实现逻辑
  2. public class HelloWorldCommand extends HystrixCommand<String> {
  3. private final String name;
  4. public HelloWorldCommand(String name) {
  5. super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("HelloWorldGroup"))
  6. /*依赖超时时间,500毫秒*/
  7. .andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(500)));
  8. this.name = name;
  9. }
  10. @Override
  11. protected String getFallback() {
  12. return "exeucute Falled";
  13. }
  14. @Override
  15. protected String run() throws Exception {
  16. //sleep 1 秒,调用会超时
  17. TimeUnit.MILLISECONDS.sleep(1000);
  18. return "Hello " + name +" thread:" + Thread.currentThread().getName();
  19. }
  20. public static void main(String[] args) throws Exception{
  21. HelloWorldCommand command = new HelloWorldCommand("test-Fallback");
  22. String result = command.execute();
  23. }
  24. }

运行结果:

getFallback executed

除了HystrixBadRequestException异常之外,所有从run()方法抛出的异常都算作失败,并触发降级getFallback()和断路器逻辑。HystrixBadRequestException用在非法参数或非系统故障异常等不应触发回退逻辑的场景。


Hystrix的内部处理逻辑
下图为Hystrix服务调用的内部逻辑:


       

Hystrix Metrics的实现
       Hystrix的Metrics中保存了当前服务的健康状况,包括服务调用总次数和服务调用失败次数等。根据Metrics的计数,熔断器从而能计算出当前服务的调用失败率,用来和设定的阈值比较从而决定熔断器的状态切换逻辑。因此Metrics的实现非常重要。
       Hystrix使用RxJava的Observable.window()实现滑动窗口。RxJava的window使用后台线程创建新桶,避免了并发创建桶的问题。同时RxJava的单线程无锁特性也保证了计数变更时的线程安全。从而使代码更加简洁。以下为使用RxJava的window方法实现的一个简易滑动窗口Metrics,短短几行代码便能完成统计功能,足以证明RxJava的强大:

  1. @Test
  2. public void timeWindowTest() throws Exception{
  3. Observable<Integer> source = Observable.interval(50, TimeUnit.MILLISECONDS).map(i -> RandomUtils.nextInt(2));
  4. source.window(1, TimeUnit.SECONDS).subscribe(window -> {
  5. int[] metrics = new int[2];
  6. window.subscribe(i -> metrics[i]++,
  7. InternalObservableUtils.ERROR_NOT_IMPLEMENTED,
  8. () -> System.out.println("窗口Metrics:" + JSON.toJSONString(metrics)));
  9. });
  10. TimeUnit.SECONDS.sleep(3);
  11. }


补充:

  1. //构造setter
  2. HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey(group);
  3. HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey(group);
  4. HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey(service);
  5. HytrixBaseCommand.Setter setter = HytrixBaseCommand.Setter.withGroupKey(groupKey) //分组名
  6. .andCommandKey(commandKey) //依赖名
  7. .andThreadPoolKey(threadPoolKey); //执行线程池名
  8. //构造command
  9. HystrixCommand<String> command = new HystrixCommand<String>(setter) {
  10. protected String run() throws Exception {
  11. if(time>0)//模拟长时间操作
  12. Thread.sleep(time);
  13. if(isException) //模拟异常情况
  14. throw new RuntimeException("exception in run");
  15. return service+ ":return";
  16. }
  17. @Override
  18. protected String getFallback() {
  19. return service+":fallback";
  20. }
  21. };
  22. //阻塞执行,获取结果
  23. String rel=command.execute();
  24. return rel;


使用hystrix不仅仅有HystrixCommand,还有HystrixObservableCommand,两者的区别请参考:http://www.jianshu.com/p/b9af028efebb

原文地址:https://www.cnblogs.com/tiancai/p/9506100.html