Spring Cloud-hystrix(六)

作用

防止 多个服务相互交互时某个服务运行缓慢导致调用方线程挂起,高并发情况下 导致挂起线太多 引起调用方的服务不可用

能够在服务发生故障或者通过断路器监控向调用方返回一个错误 而不是长时间的等待

Spring Cloud Hystrix 实现了线程隔离 断路器等功能 是基于开源框架Netflix 实现的

Hystrix具备服 务降级、 服务熔断、 线程和信号隔离、 请求缓存、 请求合并以及服务监控等强大功能

简单例子

1.Provier增加一个增对hystirx测试的Contorller

@Controller
public class HistryxTestServiceContorller {

    @Qualifier("eurekaRegistration")
    @Autowired
    private Registration registration; // 服务注册


    @RequestMapping(value = "/histryxTest1")
    @ResponseBody
    public String histryxTest1(){
        List<String> serverInfo = new ArrayList<String>();
        serverInfo.add("ServiceId:"+registration.getServiceId());
        serverInfo.add("ServiceUri:"+registration.getUri());
        serverInfo.add("ServiceHost:"+registration.getHost());
        serverInfo.add("ServiceSchema:"+ registration.getScheme());
        serverInfo.add("ServicePort:"+registration.getPort());
        serverInfo.add("ServiceMetadata:"+ registration.getMetadata());
        return StringUtils.join(serverInfo,",");
    }


    @RequestMapping(value = "/histryxTimeOutTest")
    @ResponseBody
    public String histryxTimeOutTest() throws InterruptedException {
        int sleepIndex=new Random().nextInt(4000);
        Thread.sleep(sleepIndex);
        List<String> serverInfo = new ArrayList<String>();

        serverInfo.add("ServiceId:"+registration.getServiceId());
        serverInfo.add("ServiceUri:"+registration.getUri());
        serverInfo.add("ServiceHost:"+registration.getHost());
        serverInfo.add("ServiceSchema:"+ registration.getScheme());
        serverInfo.add("ServicePort:"+registration.getPort());
        serverInfo.add("ServiceMetadata:"+ registration.getMetadata());
        return StringUtils.join(serverInfo,",")+",sleep:"+sleepIndex+"ms";
    }

}

2.Concumer增加一个测试Hystrix的Contorller

@Controller
public class HystrixTestContorller {
    @Autowired
    RestTemplate restTemplate;

    @RequestMapping(value = "/histryxTest1")
    @ResponseBody
    public String histryxTest1(){
        return restTemplate.getForEntity("http://PROVIDER/histryxTest1",String.class).getBody();

    }

    @RequestMapping(value = "/histryxTimeOutTest")
    @ResponseBody
    public String histryxTimeOutTest() throws InterruptedException {
        return restTemplate.getForEntity("http://PROVIDER/histryxTimeOutTest",String.class).getBody();

    }

    public String failback(){
        return "error";
    }
}

3.Provider增加2个配置文件对应不同端口

application-peer1.yml

spring:
  application:
    name: provider #服务名字
server:
  port: 8081
eureka:
  instance:
    # 10s未收到心跳,剔除instance 要比心跳时间大
    lease-expiration-duration-in-seconds: 30000
    # 心跳时间
    lease-renewal-interval-in-seconds: 5999
    hostname: localhost   #当前实例的主机名字
  client:
    serviceUrl:
      defaultZone: http://peer1:1111/eureka #,http://peer2:1112/eureka #注册中心地址

application-peer2.yml

spring:
  application:
    name: provider #服务名字
server:
  port: 8082
eureka:
  instance:
    # 10s未收到心跳,剔除instance 要比心跳时间大
    lease-expiration-duration-in-seconds: 30000
    # 心跳时间
    lease-renewal-interval-in-seconds: 5999
    hostname: localhost   #当前实例的主机名字
  client:
    serviceUrl:
      defaultZone: http://peer1:1111/eureka #,http://peer2:1112/eureka #注册中心地址
#management:

4.打包provider 启动2个provider指向不同的配置文件

java -jar /Users/liqiang/Desktop/java开发环境/javadom/springcloudhelloword/spring-cloud-stream/target/spring-cloud-stream-0.0.1-SNAPSHOT.jar --spring.profiles.active=peer1
java -jar /Users/liqiang/Desktop/java开发环境/javadom/springcloudhelloword/spring-cloud-stream/target/spring-cloud-stream-0.0.1-SNAPSHOT.jar --spring.profiles.active=peer2

5.查看知否成功注册

6.访问consumer

会发现 会线性轮训8081 和8082 

7.关闭其中一个服务 轮训到这个服务的时候回报

8测试超时熔断

不加熔断的情况下会一致等待执行完毕

9 consumer增加熔断

pom文件增加hystrix依赖

 <!--histrix-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
        </dependency>

10.@EnableCircuitBreake开启熔断自动化配置

@SpringBootApplication
@EnableDiscoveryClient //开启服务发现
@EnableCircuitBreaker//开启断路器功能或者整体使用@SpringCloudApplication注解替代。详情点进去看
public class ConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
    //LoadBalanced 通过代理RestTemplate 实现客户端负载均衡的能力
    @LoadBalanced
    @Bean
    RestTemplate restTemplate(){
        return new RestTemplate();
    }
}

11.在需要在启用熔断的类增加注解

@Controller
public class HystrixTestContorller {
    @Autowired
    RestTemplate restTemplate;

    //指定熔断调用的方法
    @HystrixCommand(fallbackMethod = "failback")
    @RequestMapping(value = "/histryxTest1")
    @ResponseBody
    public String histryxTest1(){
        return restTemplate.getForEntity("http://PROVIDER/histryxTest1",String.class).getBody();

    }
    //指定熔断调用的方法
    @HystrixCommand(fallbackMethod = "failback")
    @RequestMapping(value = "/histryxTimeOutTest")
    @ResponseBody
    public String histryxTimeOutTest() throws InterruptedException {
        return restTemplate.getForEntity("http://PROVIDER/histryxTimeOutTest",String.class).getBody();

    }
    public String failback(){
        return "error";
    }
}

根据上面的测试关闭服务后会直接返回erro  或者hystirx默认1秒超时  上面超时也会直接返回error (这样就解决了服务依赖某个服务响应慢导致服务挂起 引起雪崩效应)

hystrix超时是默认开启的 可以通过hystrix.command.default.execution.timeout.enabled=true #是否启用超时

HystrixCommand&HystrixObservableCommand原理

1.创建HystrixCommand或HystrixObservableCommand对象(命令模式  分为命令执行者Receiver 抽象命令Command 调用者 IInvoker实现 命令执行者和调用者的解耦)

HystrixCommand 用于依赖服务返回单个结果

HystrixObservableCommand 用于依赖服务返回多个服务

个人理解命令模式

/**
 * 抽象命令
 */
public interface ICommand {
    public String execute();
}
/**
 * 命令具体实现
 */
public class Command implements  ICommand {
    public Receiver receiver;
    public Command(Receiver receiver){
        this.receiver=receiver;
    }

    @Override
    public String execute() {
        /**
         * 命令和调用者的解耦
         * 这里就可以判断是否熔断 是否超时 是否错误 调用 receiver.failback();
         * 或者一些数据指标的统计
         *
         */
        return receiver.action();
    }
}
/**
 * 命令执行者
 */
public class Receiver {
    public RestTemplate restTemplate;
    public Receiver(RestTemplate restTemplate){
        this.restTemplate=restTemplate;
    }
    //执行命令
    public  String action(){
        return restTemplate.getForEntity("http://PROVIDER/hello",String.class).getBody();
    }
    //命令失败后执行操作(撤回逻辑等)
    public String failback(){
        return  "error";
    }
}
/**
 * 调用者
 */
public class Invoker {
    ICommand command;
    public Invoker(ICommand iCommand){
        this.command=iCommand;
    }
    public String action(){
        return  this.command.execute();
    }

}

    Receiver receiver=new Receiver(restTemplate);
    ICommand command=new Command(receiver);
    Invoker invoker=new Invoker(command);
    invoker.action();

2、执行命令

HystrixComrnand

execute()  同步执行  发生错误直接跑出异常

queue() 返回 Future 异步执行

通过HystrixCommand.getFallback()来 实现服务降级逻辑。

HystrixObservableCommand

observe()  返回Observable对象,它代表了操作的多个结果,它是 一个Hot Observable。

toObservable()  同样会返回Observable对象, 也代表了操作的多个结果, 但它返回的是 一个Cold Observable。

通过Command.resumeW江hFallback()

3.判断结果是否被缓存如果缓存则直接返回缓存

4.如果缓存没命中判断缓存是否打开如果打开则直接执行failback 否则直接跳到第五步

5.判断 请求队列/信号量/线程池是否占满(线程池塘是判断 依赖服务特有的线程池塘)

6HystrixObservableCommand.construct()或HystrixCommand.run() 取决于是用HystrixCommand还是HystrixObservableCommand

7.Hystrix会将“成功”、 “失败”、 “拒绝”、 “ 超时” 等信息报告给断路器,而断路器会维 护 一 组计数器来统计这些数据。 断路器会根据这些统计信息来判断是否熔断/断路

8.failback处理 

   第4步 第5步   第6步发生异常时 HystrixComrnand通过HystrixCommand.getFallback()来 实现服务降级逻辑。 通过Command.resumeWithFallback()

• execute(): 抛出异常。

• queue(): 正常返回Future对象,但是当 调用get()来获取结果的时候会抛出异 常。

• observe(): 正常返回Observable 对象, 当订阅它的时候, 将立即通过调用订 阅者的onError方法来通知中止请求。

• toObservable(): 正常返回Observable对象, 当订阅它的时候, 将通过调用 订阅者的onError方法来通知中止请求。

9.响应结果

• toObservable(): 返回最原始的 Observable, 必须通过订阅它才会真正触发

命令的执行流程。

• observe(): 在toObservable()产生原始Observable 之后立即 订阅它, 让 命令能够马上开始异步执行 , 并返回一 个Observable 对象, 当调用它的 subscribe 时, 将重新产生结果和通知给订阅者。

• queue(): 将 toObservable()产生的原始Observable通过toBlocking() 方法转换成BlockingObservable对象, 并调用它的toFuture()方法 返回异

步的Future对象。

• execute():在queue()产生异步结果Future对象之后,通过调用get()方法 阻塞并等待结果的返回。

hystirx都是以Observable返回结果 只是通过Observable可以转换成多种结果 值  future

断路器原理

HystrixCircuitBreaker

public interface HystrixCircuitBreaker {
    //判断是否被执行
    boolean allowRequest();
    //断路器是否打开
    boolean isOpen();
    //闭合断路器 成功时调用
    void markSuccess();
    //将熔断器状态重新置为开启状态,并把circuitOpened设置为当前的时间戳。 error调用
    void markNonSuccess();
    boolean attemptExecution();
    /**
     * 定义了一个什么都不做的断路器实现,它允许所有 请求, 并且断路器状态始终闭合。
     */
    public static class NoOpCircuitBreaker implements HystrixCircuitBreaker {
    }

    /**
     * HystrixCircuitBreaker 实现类
     * 成员变量:HystrixCommandProperties 定义HystrixCommand的配置信息
     * 成员变量 HystrixCommandMetrics 定义hystrixCommand的度量指标
     * 成员变量 AtomicBoolean 定义断路器是否打开
     * 成员变量AtomicLong c江cuitOpenedOrLastTestedTime 定义上一次打开断路器的时间戳
     */
    public static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {......}
    /**
     * 维护了一个Hystrix命令与HystrixCircuitBreaker的关系
     * 集合: ConcurrentHashMap<String, HystrixCircui七Breaker> circuit一 BreakersByCommand,
     * 其中 String 类型的 key 通过 HystrixCommandKey 定义,
     * 每一个 Hystrix 命令需要有一个 key 来标识, 同时一个 Hystrix
     * 命令也会在该集合中 找到它对应的断路器 HystrixCircuitBreaker 实例。
     */
    public static class Factory {
        private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap();

        public Factory() {
        }

        public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
            HystrixCircuitBreaker previouslyCached = (HystrixCircuitBreaker)circuitBreakersByCommand.get(key.name());
            if (previouslyCached != null) {
                return previouslyCached;
            } else {
                HystrixCircuitBreaker cbForCommand = (HystrixCircuitBreaker)circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreaker.HystrixCircuitBreakerImpl(key, group, properties, metrics));
                return cbForCommand == null ? (HystrixCircuitBreaker)circuitBreakersByCommand.get(key.name()) : cbForCommand;
            }
        }

        public static HystrixCircuitBreaker getInstance(HystrixCommandKey key) {
            return (HystrixCircuitBreaker)circuitBreakersByCommand.get(key.name());
        }

        static void reset() {
            circuitBreakersByCommand.clear();
        }
    }
}

HystrixCircuitBreakerImpl

 public static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
        //断路器对应实例的属性集合对象
        private final HystrixCommandProperties properties;
        //用来让HystrixCommand记录各类度量指标的对象
        private final HystrixCommandMetrics metrics;
        //用来记录断路器的状态,默认是关闭状态
        private final AtomicReference<HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status> status;
        //断路器打开的时间戳,默认-1,表示断路器未打开
        private final AtomicLong circuitOpened;
       // 这个是通过Rxjava实现的对HystrixCommandMetrics结果的观察者对象,当HystrixCommandMetrics值发生变化时会通知观察者。
        private final AtomicReference<Subscription> activeSubscription;

        protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
            this.status = new AtomicReference(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.CLOSED);
            this.circuitOpened = new AtomicLong(-1L);
            this.activeSubscription = new AtomicReference((Object)null);
            this.properties = properties;
            this.metrics = metrics;
            Subscription s = this.subscribeToStream();
            this.activeSubscription.set(s);
        }

        private Subscription subscribeToStream() {
            //观察者 当HystrixCommandMetrics的度量指标发生变化时,观察者实现的业务逻辑
            return this.metrics.getHealthCountsStream().observe().subscribe(new Subscriber<HealthCounts>() {
                public void onCompleted() {
                }

                public void onError(Throwable e) {
                }

                public void onNext(HealthCounts hc) {
                    if (hc.getTotalRequests() >= (long)(Integer)HystrixCircuitBreakerImpl.this.properties.circuitBreakerRequestVolumeThreshold().get() && hc.getErrorPercentage() >= (Integer)HystrixCircuitBreakerImpl.this.properties.circuitBreakerErrorThresholdPercentage().get() && HystrixCircuitBreakerImpl.this.status.compareAndSet(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.CLOSED, HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.OPEN)) {
                        HystrixCircuitBreakerImpl.this.circuitOpened.set(System.currentTimeMillis());
                    }

                }
            });
        }

        public void markSuccess() {
            if (this.status.compareAndSet(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.HALF_OPEN, HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.CLOSED)) {
                this.metrics.resetStream();
                Subscription previousSubscription = (Subscription)this.activeSubscription.get();
                if (previousSubscription != null) {
                    previousSubscription.unsubscribe();
                }

                Subscription newSubscription = this.subscribeToStream();
                this.activeSubscription.set(newSubscription);
                this.circuitOpened.set(-1L);
            }

        }

        public void markNonSuccess() {
            if (this.status.compareAndSet(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.HALF_OPEN, HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.OPEN)) {
                this.circuitOpened.set(System.currentTimeMillis());
            }

        }

        public boolean isOpen() {
            //强制开启断路器 配置
            if ((Boolean)this.properties.circuitBreakerForceOpen().get()) {
                return true;
            } else if ((Boolean)this.properties.circuitBreakerForceClosed().get()) {//强制关闭断路器 配置
                return false;
            } else {
                return this.circuitOpened.get() >= 0L;//根据断路器开启时间判断断路器的开启状态
            }
        }
        //判断是否允许请求接口(每次请求接口都会判断)
        public boolean allowRequest() {
            if ((Boolean)this.properties.circuitBreakerForceOpen().get()) {
                return false;
            } else if ((Boolean)this.properties.circuitBreakerForceClosed().get()) {
                return true;
            } else if (this.circuitOpened.get() == -1L) {
                return true;
            } else {
                return ((HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status)this.status.get()).equals(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.HALF_OPEN) ? false : this.isAfterSleepWindow();
            }
        }
        //判断时间有没有过休眠期
        private boolean isAfterSleepWindow() {
            long circuitOpenTime = this.circuitOpened.get();
            long currentTime = System.currentTimeMillis();
            long sleepWindowTime = (long)(Integer)this.properties.circuitBreakerSleepWindowInMilliseconds().get();
            return currentTime > circuitOpenTime + sleepWindowTime;
        }
          //尝试执行接口请求
        public boolean attemptExecution() {
            if ((Boolean)this.properties.circuitBreakerForceOpen().get()) {
                return false;
            } else if ((Boolean)this.properties.circuitBreakerForceClosed().get()) {
                return true;
            } else if (this.circuitOpened.get() == -1L) {
                return true;
            } else if (this.isAfterSleepWindow()) {
                return this.status.compareAndSet(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.OPEN, HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.HALF_OPEN);
            } else {
                return false;
            }
        }

        /**
         *CLOSED 关闭
         *OPEN  打开
         *HALF_OPEN :
         * 当断路器打开后,对应接口的请求会有段休眠期,这个休眠期内接口请求不会被正真的执行,但是如果休眠期时间过了
         * 这个时候断路器允许一次真实的接口请求,如果这次请求失败,则断路器打开(OPEN),循环上面的动作,如果请求成功则断路器关闭(CLOSED)。
         */
        static enum Status {
            CLOSED,
            OPEN,
            HALF_OPEN;
            private Status() {
            }
        }
    }

线程隔离

hystrix采用每个依赖服务都拥有各自的线程池

好处:

     1.某个依赖服务自身出线问题不会影响其他依赖服务

     2.不受其他不稳定因素依赖服务的影响

     3.某个依赖服务关闭  快速释放

面临的问题:

     各个依赖服务独立开启线程池塘的额外开销 不过在hystix的统计信息中 微乎其微

hystrix除了可以通过线程池控制并发数还可以对单个依赖服务通过信号量控制  信号量控制比线程池开销更小

只需要将隔离策略参数execution.isolation.strategy设置为SEMAPHORE' Hystrix 会使用信号量替代线程池来控制依赖服务的并发。(默认10)

原文地址:https://www.cnblogs.com/LQBlog/p/10087765.html