hystrix源码小贴士之之hystrix-metrics-event-stream

hystrix-metrics-event-stream主要提供了一些servlet,可以让用户通过http请求获取metrics信息。

HystrixSampleSseServlet

  继承了HttpServlet,不断从sampleStream中读取值并返回,直到sampleStream发送complete或出现异常。

 private void handleRequest(HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException {
        ...
                sampleSubscription = sampleStream
                        .observeOn(Schedulers.io())
                        .subscribe(new Subscriber<String>() {
                            @Override
                            public void onCompleted() {
                                logger.error("HystrixSampleSseServlet: ({}) received unexpected OnCompleted from sample stream", getClass().getSimpleName());
                                moreDataWillBeSent.set(false);
                            }
                            @Override
                            public void onError(Throwable e) {
                                moreDataWillBeSent.set(false);
                            }
                            @Override
                            public void onNext(String sampleDataAsString) {
                                if (sampleDataAsString != null) {
                                    try {
                                        writer.print("data: " + sampleDataAsString + "

");
                                        // explicitly check for client disconnect - PrintWriter does not throw exceptions
                                        if (writer.checkError()) {
                                            moreDataWillBeSent.set(false);
                                        }
                                        writer.flush();
                                    } catch (Exception ex) {
                                        moreDataWillBeSent.set(false);
                                    }
                                }
                            }
                        });

                while (moreDataWillBeSent.get() && !isDestroyed) {
                    try {
                        Thread.sleep(pausePollerThreadDelayInMs);
                        //in case stream has not started emitting yet, catch any clients which connect/disconnect before emits start
                        writer.print("ping: 

");
                        // explicitly check for client disconnect - PrintWriter does not throw exceptions
                        if (writer.checkError()) {
                            moreDataWillBeSent.set(false);
                        }
                        writer.flush();
                    } catch (Exception ex) {
                        moreDataWillBeSent.set(false);
                    }
                }
            }
        } finally {
            decrementCurrentConcurrentConnections();
            if (sampleSubscription != null && !sampleSubscription.isUnsubscribed()) {
                sampleSubscription.unsubscribe();
            }
        }
    }

HystrixConfigSseServlet

  继承HystrixSampleSseServlet,读取HystrixConfigurationStream流数据。

public HystrixConfigSseServlet() {
        this(HystrixConfigurationStream.getInstance().observe(), DEFAULT_PAUSE_POLLER_THREAD_DELAY_IN_MS);
    }

    /* package-private */ HystrixConfigSseServlet(Observable<HystrixConfiguration> sampleStream, int pausePollerThreadDelayInMs) {
        super(sampleStream.map(new Func1<HystrixConfiguration, String>() {
            @Override
            public String call(HystrixConfiguration hystrixConfiguration) {
                return SerialHystrixConfiguration.toJsonString(hystrixConfiguration);
            }
        }), pausePollerThreadDelayInMs);
    }

HystrixMetricsStreamServlet

  继承HystrixSampleSseServlet,读取HystrixDashboardStream流数据。

public HystrixMetricsStreamServlet() {
        this(HystrixDashboardStream.getInstance().observe(), DEFAULT_PAUSE_POLLER_THREAD_DELAY_IN_MS);
    }

    /* package-private */ HystrixMetricsStreamServlet(Observable<HystrixDashboardStream.DashboardData> sampleStream, int pausePollerThreadDelayInMs) {
        super(sampleStream.concatMap(new Func1<HystrixDashboardStream.DashboardData, Observable<String>>() {
            @Override
            public Observable<String> call(HystrixDashboardStream.DashboardData dashboardData) {
                return Observable.from(SerialHystrixDashboardData.toMultipleJsonStrings(dashboardData));
            }
        }), pausePollerThreadDelayInMs);
    }

HystrixRequestEventsSseServlet

  继承HystrixSampleSseServlet,读取HystrixRequestEventsStream流数据。

public HystrixRequestEventsSseServlet() {
        this(HystrixRequestEventsStream.getInstance().observe(), DEFAULT_PAUSE_POLLER_THREAD_DELAY_IN_MS);
    }

    /* package-private */ HystrixRequestEventsSseServlet(Observable<HystrixRequestEvents> sampleStream, int pausePollerThreadDelayInMs) {
        super(sampleStream.map(new Func1<HystrixRequestEvents, String>() {
            @Override
            public String call(HystrixRequestEvents requestEvents) {
                return SerialHystrixRequestEvents.toJsonString(requestEvents);
            }
        }), pausePollerThreadDelayInMs);
    }

HystrixUtilizationSseServlet

  继承HystrixSampleSseServlet,读取HystrixUtilizationStream流数据。

public HystrixUtilizationSseServlet() {
        this(HystrixUtilizationStream.getInstance().observe(), DEFAULT_PAUSE_POLLER_THREAD_DELAY_IN_MS);
    }

    /* package-private */ HystrixUtilizationSseServlet(Observable<HystrixUtilization> sampleStream, int pausePollerThreadDelayInMs) {
        super(sampleStream.map(new Func1<HystrixUtilization, String>() {
            @Override
            public String call(HystrixUtilization hystrixUtilization) {
                return SerialHystrixUtilization.toJsonString(hystrixUtilization);
            }
        }), pausePollerThreadDelayInMs);
    }

 使用hystrix-metrics-event-stream

  • 引入hystrix-metrics-event-stream-*.jar包
  • 添加web.xml配置
<servlet>
   <description></description>
   <display-name>HystrixUtilizationSseServlet</display-name>
   <servlet-name>HystrixUtilizationSseServlet</servlet-name>
   <servlet-class>com.netflix.hystrix.contrib.sample.stream.HystrixUtilizationSseServlet</servlet-class>
</servlet>
 <servlet-mapping>
   <servlet-name>HystrixUtilizationSseServlet</servlet-name>
   <url-pattern>/hystrix/utilization.stream</url-pattern>
 </servlet-mapping>
原文地址:https://www.cnblogs.com/zhangwanhua/p/8183095.html