Google-Guava Concurrent包里的Service框架浅析

原文地址  译文地址 译者:何一昕 校对:方腾飞

概述

Guava包里的Service接口用于封装一个服务对象的运行状态、包括start和stop等方法。例如web服务器,RPC服务器、计时器等可以实现这个接口。对此类服务的状态管理并不轻松、需要对服务的开启/关闭进行妥善管理、特别是在多线程环境下尤为复杂。Guava包提供了一些基础类帮助你管理复杂的状态转换逻辑和同步细节。

使用一个服务

一个服务正常生命周期有:

服务一旦被停止就无法再重新启动了。如果服务在starting、running、stopping状态出现问题、会进入Service.State.FAILED.状态。调用 startAsync()方法可以异步开启一个服务,同时返回this对象形成方法调用链。注意:只有在当前服务的状态是NEW时才能调用startAsync()方法,因此最好在应用中有一个统一的地方初始化相关服务。停止一个服务也是类似的、使用异步方法stopAsync() 。但是不像startAsync(),多次调用这个方法是安全的。这是为了方便处理关闭服务时候的锁竞争问题。

Service也提供了一些方法用于等待服务状态转换的完成:

通过 addListener()方法异步添加监听器。此方法允许你添加一个 Service.Listener 、它会在每次服务状态转换的时候被调用。注意:最好在服务启动之前添加Listener(这时的状态是NEW)、否则之前已发生的状态转换事件是无法在新添加的Listener上被重新触发的。

同步使用awaitRunning()。这个方法不能被打断、不强制捕获异常、一旦服务启动就会返回。如果服务没有成功启动,会抛出IllegalStateException异常。同样的, awaitTerminated() 方法会等待服务达到终止状态(TERMINATED 或者 FAILED)。两个方法都有重载方法允许传入超时时间。

Service 接口本身实现起来会比较复杂、且容易碰到一些捉摸不透的问题。因此我们不推荐直接实现这个接口。而是请继承Guava包里已经封装好的基础抽象类。每个基础类支持一种特定的线程模型。

基础实现类

AbstractIdleService

 AbstractIdleService 类简单实现了Service接口、其在running状态时不会执行任何动作–因此在running时也不需要启动线程–但需要处理开启/关闭动作。要实现一个此类的服务,只需继承AbstractIdleService类,然后自己实现startUp() 和shutDown()方法就可以了。

1    protected void startUp() {
2    servlets.add(new GcStatsServlet());
3    }
4    protected void shutDown() {}

如上面的例子、由于任何请求到GcStatsServlet时已经会有现成线程处理了,所以在服务运行时就不需要做什么额外动作了。

AbstractExecutionThreadService

AbstractExecutionThreadService 通过单线程处理启动、运行、和关闭等操作。你必须重载run()方法,同时需要能响应停止服务的请求。具体的实现可以在一个循环内做处理:

1    public void run() {
2      while (isRunning()) {
3        // perform a unit of work
4      }
5    }

另外,你还可以重载triggerShutdown()方法让run()方法结束返回。

重载startUp()和shutDown()方法是可选的,不影响服务本身状态的管理

01    protected void startUp() {
02    dispatcher.listenForConnections(port, queue);
03     }
04     protected void run() {
05       Connection connection;
06       while ((connection = queue.take() != POISON)) {
07         process(connection);
08       }
09     }
10     protected void triggerShutdown() {
11       dispatcher.stopListeningForConnections(queue);
12       queue.put(POISON);
13     }
 

start()内部会调用startUp()方法,创建一个线程、然后在线程内调用run()方法。stop()会调用 triggerShutdown()方法并且等待线程终止。

AbstractScheduledService

AbstractScheduledService类用于在运行时处理一些周期性的任务。子类可以实现 runOneIteration()方法定义一个周期执行的任务,以及相应的startUp()和shutDown()方法。为了能够描述执行周期,你需要实现scheduler()方法。通常情况下,你可以使用AbstractScheduledService.Scheduler类提供的两种调度器:newFixedRateSchedule(initialDelay, delay, TimeUnit)  和newFixedDelaySchedule(initialDelay, delay, TimeUnit),类似于JDK并发包中ScheduledExecutorService类提供的两种调度方式。如要自定义schedules则可以使用 CustomScheduler类来辅助实现;具体用法见javadoc。

AbstractService

如需要自定义的线程管理、可以通过扩展 AbstractService类来实现。一般情况下、使用上面的几个实现类就已经满足需求了,但如果在服务执行过程中有一些特定的线程处理需求、则建议继承AbstractService类。

继承AbstractService方法必须实现两个方法.

  • doStart():  首次调用startAsync()时会同时调用doStart(),doStart()内部需要处理所有的初始化工作、如果启动成功则调用notifyStarted()方法;启动失败则调用notifyFailed()
  • doStop() 首次调用stopAsync()会同时调用doStop(),doStop()要做的事情就是停止服务,如果停止成功则调用 notifyStopped()方法;停止失败则调用 notifyFailed()方法。

doStart和doStop方法的实现需要考虑下性能,尽可能的低延迟。如果初始化的开销较大,如读文件,打开网络连接,或者其他任何可能引起阻塞的操作,建议移到另外一个单独的线程去处理。

使用ServiceManager

除了对Service接口提供基础的实现类,Guava还提供了 ServiceManager类使得涉及到多个Service集合的操作更加容易。通过实例化ServiceManager类来创建一个Service集合,你可以通过以下方法来管理它们:

  • startAsync()  : 将启动所有被管理的服务。如果当前服务的状态都是NEW的话、那么你只能调用该方法一次、这跟 Service#startAsync()是一样的。
  • stopAsync() 将停止所有被管理的服务。
  • addListener 会添加一个ServiceManager.Listener,在服务状态转换中会调用该Listener
  • awaitHealthy() 会等待所有的服务达到Running状态
  • awaitStopped()会等待所有服务达到终止状态

检测类的方法有:

  • isHealthy()  如果所有的服务处于Running状态、会返回True
  • servicesByState()以状态为索引返回当前所有服务的快照
  • startupTimes() :返回一个Map对象,记录被管理的服务启动的耗时、以毫秒为单位,同时Map默认按启动时间排序。

我们建议整个服务的生命周期都能通过ServiceManager来管理,不过即使状态转换是通过其他机制触发的、也不影响ServiceManager方法的正确执行。例如:当一个服务不是通过startAsync()、而是其他机制启动时,listeners 仍然可以被正常调用、awaitHealthy()也能够正常工作。ServiceManager 唯一强制的要求是当其被创建时所有的服务必须处于New状态。

附:TestCase、也可以作为练习Demo

ServiceTest

01    </pre>
02    /*
03     * Copyright (C) 2013 The Guava Authors
04     *
05     * Licensed under the Apache License, Version 2.0 (the "License");
06     * you may not use this file except in compliance with the License.
07     * You may obtain a copy of the License at
08     *
09     * http://www.apache.org/licenses/LICENSE-2.0
10     *
11     * Unless required by applicable law or agreed to in writing, software
12     * distributed under the License is distributed on an "AS IS" BASIS,
13     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14     * See the License for the specific language governing permissions and
15     * limitations under the License.
16     */
17     
18    package com.google.common.util.concurrent;
19     
20    import static com.google.common.util.concurrent.Service.State.FAILED;
21    import static com.google.common.util.concurrent.Service.State.NEW;
22    import static com.google.common.util.concurrent.Service.State.RUNNING;
23    import static com.google.common.util.concurrent.Service.State.STARTING;
24    import static com.google.common.util.concurrent.Service.State.STOPPING;
25    import static com.google.common.util.concurrent.Service.State.TERMINATED;
26     
27    import junit.framework.TestCase;
28     
29    /**
30     * Unit tests for {@link Service}
31     */
32    public class ServiceTest extends TestCase {
33     
34    /** Assert on the comparison ordering of the State enum since we guarantee it. */
35     public void testStateOrdering() {
36     // List every valid (direct) state transition.
37     assertLessThan(NEW, STARTING);
38     assertLessThan(NEW, TERMINATED);
39     
40     assertLessThan(STARTING, RUNNING);
41     assertLessThan(STARTING, STOPPING);
42     assertLessThan(STARTING, FAILED);
43     
44     assertLessThan(RUNNING, STOPPING);
45     assertLessThan(RUNNING, FAILED);
46     
47     assertLessThan(STOPPING, FAILED);
48     assertLessThan(STOPPING, TERMINATED);
49     }
50     
51     private static <T extends Comparable<? super T>> void assertLessThan(T a, T b) {
52     if (a.compareTo(b) >= 0) {
53     fail(String.format("Expected %s to be less than %s", a, b));
54     }
55     }
56    }
57    <pre>
AbstractIdleServiceTest

001    /*
002     * Copyright (C) 2009 The Guava Authors
003     *
004     * Licensed under the Apache License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     * http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016     
017    package com.google.common.util.concurrent;
018     
019    import static org.truth0.Truth.ASSERT;
020     
021    import com.google.common.collect.Lists;
022     
023    import junit.framework.TestCase;
024     
025    import java.util.List;
026    import java.util.concurrent.Executor;
027    import java.util.concurrent.TimeUnit;
028    import java.util.concurrent.TimeoutException;
029     
030    /**
031     * Tests for {@link AbstractIdleService}.
032     *
033     * @author Chris Nokleberg
034     * @author Ben Yu
035     */
036    public class AbstractIdleServiceTest extends TestCase {
037     
038    // Functional tests using real thread. We only verify publicly visible state.
039     // Interaction assertions are done by the single-threaded unit tests.
040     
041    public static class FunctionalTest extends TestCase {
042     
043    private static class DefaultService extends AbstractIdleService {
044     @Override protected void startUp() throws Exception {}
045     @Override protected void shutDown() throws Exception {}
046     }
047     
048    public void testServiceStartStop() throws Exception {
049     AbstractIdleService service = new DefaultService();
050     service.startAsync().awaitRunning();
051     assertEquals(Service.State.RUNNING, service.state());
052     service.stopAsync().awaitTerminated();
053     assertEquals(Service.State.TERMINATED, service.state());
054     }
055     
056    public void testStart_failed() throws Exception {
057     final Exception exception = new Exception("deliberate");
058     AbstractIdleService service = new DefaultService() {
059     @Override protected void startUp() throws Exception {
060     throw exception;
061     }
062     };
063     try {
064     service.startAsync().awaitRunning();
065     fail();
066     } catch (RuntimeException e) {
067     assertSame(exception, e.getCause());
068     }
069     assertEquals(Service.State.FAILED, service.state());
070     }
071     
072    public void testStop_failed() throws Exception {
073     final Exception exception = new Exception("deliberate");
074     AbstractIdleService service = new DefaultService() {
075     @Override protected void shutDown() throws Exception {
076     throw exception;
077     }
078     };
079     service.startAsync().awaitRunning();
080     try {
081     service.stopAsync().awaitTerminated();
082     fail();
083     } catch (RuntimeException e) {
084     assertSame(exception, e.getCause());
085     }
086     assertEquals(Service.State.FAILED, service.state());
087     }
088     }
089     
090    public void testStart() {
091     TestService service = new TestService();
092     assertEquals(0, service.startUpCalled);
093     service.startAsync().awaitRunning();
094     assertEquals(1, service.startUpCalled);
095     assertEquals(Service.State.RUNNING, service.state());
096     ASSERT.that(service.transitionStates).has().exactly(Service.State.STARTING).inOrder();
097     }
098     
099    public void testStart_failed() {
100     final Exception exception = new Exception("deliberate");
101     TestService service = new TestService() {
102     @Override protected void startUp() throws Exception {
103     super.startUp();
104     throw exception;
105     }
106     };
107     assertEquals(0, service.startUpCalled);
108     try {
109     service.startAsync().awaitRunning();
110     fail();
111     } catch (RuntimeException e) {
112     assertSame(exception, e.getCause());
113     }
114     assertEquals(1, service.startUpCalled);
115     assertEquals(Service.State.FAILED, service.state());
116     ASSERT.that(service.transitionStates).has().exactly(Service.State.STARTING).inOrder();
117     }
118     
119    public void testStop_withoutStart() {
120     TestService service = new TestService();
121     service.stopAsync().awaitTerminated();
122     assertEquals(0, service.startUpCalled);
123     assertEquals(0, service.shutDownCalled);
124     assertEquals(Service.State.TERMINATED, service.state());
125     ASSERT.that(service.transitionStates).isEmpty();
126     }
127     
128    public void testStop_afterStart() {
129     TestService service = new TestService();
130     service.startAsync().awaitRunning();
131     assertEquals(1, service.startUpCalled);
132     assertEquals(0, service.shutDownCalled);
133     service.stopAsync().awaitTerminated();
134     assertEquals(1, service.startUpCalled);
135     assertEquals(1, service.shutDownCalled);
136     assertEquals(Service.State.TERMINATED, service.state());
137     ASSERT.that(service.transitionStates)
138     .has().exactly(Service.State.STARTING, Service.State.STOPPING).inOrder();
139     }
140     
141    public void testStop_failed() {
142     final Exception exception = new Exception("deliberate");
143     TestService service = new TestService() {
144     @Override protected void shutDown() throws Exception {
145     super.shutDown();
146     throw exception;
147     }
148     };
149     service.startAsync().awaitRunning();
150     assertEquals(1, service.startUpCalled);
151     assertEquals(0, service.shutDownCalled);
152     try {
153     service.stopAsync().awaitTerminated();
154     fail();
155     } catch (RuntimeException e) {
156     assertSame(exception, e.getCause());
157     }
158     assertEquals(1, service.startUpCalled);
159     assertEquals(1, service.shutDownCalled);
160     assertEquals(Service.State.FAILED, service.state());
161     ASSERT.that(service.transitionStates)
162     .has().exactly(Service.State.STARTING, Service.State.STOPPING).inOrder();
163     }
164     
165    public void testServiceToString() {
166     AbstractIdleService service = new TestService();
167     assertEquals("TestService [NEW]", service.toString());
168     service.startAsync().awaitRunning();
169     assertEquals("TestService [RUNNING]", service.toString());
170     service.stopAsync().awaitTerminated();
171     assertEquals("TestService [TERMINATED]", service.toString());
172     }
173     
174    public void testTimeout() throws Exception {
175     // Create a service whose executor will never run its commands
176     Service service = new TestService() {
177     @Override protected Executor executor() {
178     return new Executor() {
179     @Override public void execute(Runnable command) {}
180     };
181     }
182     };
183     try {
184     service.startAsync().awaitRunning(1, TimeUnit.MILLISECONDS);
185     fail("Expected timeout");
186     } catch (TimeoutException e) {
187     ASSERT.that(e.getMessage()).contains(Service.State.STARTING.toString());
188     }
189     }
190     
191    private static class TestService extends AbstractIdleService {
192     int startUpCalled = 0;
193     int shutDownCalled = 0;
194     final List<State> transitionStates = Lists.newArrayList();
195     
196    @Override protected void startUp() throws Exception {
197     assertEquals(0, startUpCalled);
198     assertEquals(0, shutDownCalled);
199     startUpCalled++;
200     assertEquals(State.STARTING, state());
201     }
202     
203    @Override protected void shutDown() throws Exception {
204     assertEquals(1, startUpCalled);
205     assertEquals(0, shutDownCalled);
206     shutDownCalled++;
207     assertEquals(State.STOPPING, state());
208     }
209     
210    @Override protected Executor executor() {
211     transitionStates.add(state());
212     return MoreExecutors.sameThreadExecutor();
213     }
214     }
215    }
216     
217    <pre>
AbstractScheduledServiceTest

001    </pre>
002    /*
003     * Copyright (C) 2011 The Guava Authors
004     *
005     * Licensed under the Apache License, Version 2.0 (the "License");
006     * you may not use this file except in compliance with the License.
007     * You may obtain a copy of the License at
008     *
009     * http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017     
018    package com.google.common.util.concurrent;
019     
020    import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
021    import com.google.common.util.concurrent.Service.State;
022     
023    import junit.framework.TestCase;
024     
025    import java.util.concurrent.CountDownLatch;
026    import java.util.concurrent.CyclicBarrier;
027    import java.util.concurrent.ExecutionException;
028    import java.util.concurrent.Executors;
029    import java.util.concurrent.Future;
030    import java.util.concurrent.ScheduledExecutorService;
031    import java.util.concurrent.ScheduledFuture;
032    import java.util.concurrent.ScheduledThreadPoolExecutor;
033    import java.util.concurrent.TimeUnit;
034    import java.util.concurrent.atomic.AtomicBoolean;
035    import java.util.concurrent.atomic.AtomicInteger;
036     
037    /**
038     * Unit test for {@link AbstractScheduledService}.
039     *
040     * @author Luke Sandberg
041     */
042     
043    public class AbstractScheduledServiceTest extends TestCase {
044     
045    volatile Scheduler configuration = Scheduler.newFixedDelaySchedule(0, 10, TimeUnit.MILLISECONDS);
046     volatile ScheduledFuture<?> future = null;
047     
048    volatile boolean atFixedRateCalled = false;
049     volatile boolean withFixedDelayCalled = false;
050     volatile boolean scheduleCalled = false;
051     
052    final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(10) {
053     @Override
054     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
055     long delay, TimeUnit unit) {
056     return future = super.scheduleWithFixedDelay(command, initialDelay, delay, unit);
057     }
058     };
059     
060    public void testServiceStartStop() throws Exception {
061     NullService service = new NullService();
062     service.startAsync().awaitRunning();
063     assertFalse(future.isDone());
064     service.stopAsync().awaitTerminated();
065     assertTrue(future.isCancelled());
066     }
067     
068    private class NullService extends AbstractScheduledService {
069     @Override protected void runOneIteration() throws Exception {}
070     @Override protected Scheduler scheduler() { return configuration; }
071     @Override protected ScheduledExecutorService executor() { return executor; }
072     }
073     
074    public void testFailOnExceptionFromRun() throws Exception {
075     TestService service = new TestService();
076     service.runException = new Exception();
077     service.startAsync().awaitRunning();
078     service.runFirstBarrier.await();
079     service.runSecondBarrier.await();
080     try {
081     future.get();
082     fail();
083     } catch (ExecutionException e) {
084     // An execution exception holds a runtime exception (from throwables.propogate) that holds our
085     // original exception.
086     assertEquals(service.runException, e.getCause().getCause());
087     }
088     assertEquals(service.state(), Service.State.FAILED);
089     }
090     
091    public void testFailOnExceptionFromStartUp() {
092     TestService service = new TestService();
093     service.startUpException = new Exception();
094     try {
095     service.startAsync().awaitRunning();
096     fail();
097     } catch (IllegalStateException e) {
098     assertEquals(service.startUpException, e.getCause());
099     }
100     assertEquals(0, service.numberOfTimesRunCalled.get());
101     assertEquals(Service.State.FAILED, service.state());
102     }
103     
104    public void testFailOnExceptionFromShutDown() throws Exception {
105     TestService service = new TestService();
106     service.shutDownException = new Exception();
107     service.startAsync().awaitRunning();
108     service.runFirstBarrier.await();
109     service.stopAsync();
110     service.runSecondBarrier.await();
111     try {
112     service.awaitTerminated();
113     fail();
114     } catch (IllegalStateException e) {
115     assertEquals(service.shutDownException, e.getCause());
116     }
117     assertEquals(Service.State.FAILED, service.state());
118     }
119     
120    public void testRunOneIterationCalledMultipleTimes() throws Exception {
121     TestService service = new TestService();
122     service.startAsync().awaitRunning();
123     for (int i = 1; i < 10; i++) {
124     service.runFirstBarrier.await();
125     assertEquals(i, service.numberOfTimesRunCalled.get());
126     service.runSecondBarrier.await();
127     }
128     service.runFirstBarrier.await();
129     service.stopAsync();
130     service.runSecondBarrier.await();
131     service.stopAsync().awaitTerminated();
132     }
133     
134    public void testExecutorOnlyCalledOnce() throws Exception {
135     TestService service = new TestService();
136     service.startAsync().awaitRunning();
137     // It should be called once during startup.
138     assertEquals(1, service.numberOfTimesExecutorCalled.get());
139     for (int i = 1; i < 10; i++) {
140     service.runFirstBarrier.await();
141     assertEquals(i, service.numberOfTimesRunCalled.get());
142     service.runSecondBarrier.await();
143     }
144     service.runFirstBarrier.await();
145     service.stopAsync();
146     service.runSecondBarrier.await();
147     service.stopAsync().awaitTerminated();
148     // Only called once overall.
149     assertEquals(1, service.numberOfTimesExecutorCalled.get());
150     }
151     
152    public void testDefaultExecutorIsShutdownWhenServiceIsStopped() throws Exception {
153     final CountDownLatch terminationLatch = new CountDownLatch(1);
154     AbstractScheduledService service = new AbstractScheduledService() {
155     volatile ScheduledExecutorService executorService;
156     @Override protected void runOneIteration() throws Exception {}
157     
158    @Override protected ScheduledExecutorService executor() {
159     if (executorService == null) {
160     executorService = super.executor();
161     // Add a listener that will be executed after the listener that shuts down the executor.
162     addListener(new Listener() {
163     @Override public void terminated(State from) {
164     terminationLatch.countDown();
165     }
166     }, MoreExecutors.sameThreadExecutor());
167     }
168     return executorService;
169     }
170     
171    @Override protected Scheduler scheduler() {
172     return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MILLISECONDS);
173     }};
174     
175    service.startAsync();
176     assertFalse(service.executor().isShutdown());
177     service.awaitRunning();
178     service.stopAsync();
179     terminationLatch.await();
180     assertTrue(service.executor().isShutdown());
181     assertTrue(service.executor().awaitTermination(100, TimeUnit.MILLISECONDS));
182     }
183     
184    public void testDefaultExecutorIsShutdownWhenServiceFails() throws Exception {
185     final CountDownLatch failureLatch = new CountDownLatch(1);
186     AbstractScheduledService service = new AbstractScheduledService() {
187     volatile ScheduledExecutorService executorService;
188     @Override protected void runOneIteration() throws Exception {}
189     
190    @Override protected void startUp() throws Exception {
191     throw new Exception("Failed");
192     }
193     
194    @Override protected ScheduledExecutorService executor() {
195     if (executorService == null) {
196     executorService = super.executor();
197     // Add a listener that will be executed after the listener that shuts down the executor.
198     addListener(new Listener() {
199     @Override public void failed(State from, Throwable failure) {
200     failureLatch.countDown();
201     }
202     }, MoreExecutors.sameThreadExecutor());
203     }
204     return executorService;
205     }
206     
207    @Override protected Scheduler scheduler() {
208     return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MILLISECONDS);
209     }};
210     
211    try {
212     service.startAsync().awaitRunning();
213     fail("Expected service to fail during startup");
214     } catch (IllegalStateException expected) {}
215     failureLatch.await();
216     assertTrue(service.executor().isShutdown());
217     assertTrue(service.executor().awaitTermination(100, TimeUnit.MILLISECONDS));
218     }
219     
220    public void testSchedulerOnlyCalledOnce() throws Exception {
221     TestService service = new TestService();
222     service.startAsync().awaitRunning();
223     // It should be called once during startup.
224     assertEquals(1, service.numberOfTimesSchedulerCalled.get());
225     for (int i = 1; i < 10; i++) {
226     service.runFirstBarrier.await();
227     assertEquals(i, service.numberOfTimesRunCalled.get());
228     service.runSecondBarrier.await();
229     }
230     service.runFirstBarrier.await();
231     service.stopAsync();
232     service.runSecondBarrier.await();
233     service.awaitTerminated();
234     // Only called once overall.
235     assertEquals(1, service.numberOfTimesSchedulerCalled.get());
236     }
237     
238    private class TestService extends AbstractScheduledService {
239     CyclicBarrier runFirstBarrier = new CyclicBarrier(2);
240     CyclicBarrier runSecondBarrier = new CyclicBarrier(2);
241     
242    volatile boolean startUpCalled = false;
243     volatile boolean shutDownCalled = false;
244     AtomicInteger numberOfTimesRunCalled = new AtomicInteger(0);
245     AtomicInteger numberOfTimesExecutorCalled = new AtomicInteger(0);
246     AtomicInteger numberOfTimesSchedulerCalled = new AtomicInteger(0);
247     volatile Exception runException = null;
248     volatile Exception startUpException = null;
249     volatile Exception shutDownException = null;
250     
251    @Override
252     protected void runOneIteration() throws Exception {
253     assertTrue(startUpCalled);
254     assertFalse(shutDownCalled);
255     numberOfTimesRunCalled.incrementAndGet();
256     assertEquals(State.RUNNING, state());
257     runFirstBarrier.await();
258     runSecondBarrier.await();
259     if (runException != null) {
260     throw runException;
261     }
262     }
263     
264    @Override
265     protected void startUp() throws Exception {
266     assertFalse(startUpCalled);
267     assertFalse(shutDownCalled);
268     startUpCalled = true;
269     assertEquals(State.STARTING, state());
270     if (startUpException != null) {
271     throw startUpException;
272     }
273     }
274     
275    @Override
276     protected void shutDown() throws Exception {
277     assertTrue(startUpCalled);
278     assertFalse(shutDownCalled);
279     shutDownCalled = true;
280     if (shutDownException != null) {
281     throw shutDownException;
282     }
283     }
284     
285    @Override
286     protected ScheduledExecutorService executor() {
287     numberOfTimesExecutorCalled.incrementAndGet();
288     return executor;
289     }
290     
291    @Override
292     protected Scheduler scheduler() {
293     numberOfTimesSchedulerCalled.incrementAndGet();
294     return configuration;
295     }
296     }
297     
298    public static class SchedulerTest extends TestCase {
299     // These constants are arbitrary and just used to make sure that the correct method is called
300     // with the correct parameters.
301     private static final int initialDelay = 10;
302     private static final int delay = 20;
303     private static final TimeUnit unit = TimeUnit.MILLISECONDS;
304     
305    // Unique runnable object used for comparison.
306     final Runnable testRunnable = new Runnable() {@Override public void run() {}};
307     boolean called = false;
308     
309    private void assertSingleCallWithCorrectParameters(Runnable command, long initialDelay,
310     long delay, TimeUnit unit) {
311     assertFalse(called); // only called once.
312     called = true;
313     assertEquals(SchedulerTest.initialDelay, initialDelay);
314     assertEquals(SchedulerTest.delay, delay);
315     assertEquals(SchedulerTest.unit, unit);
316     assertEquals(testRunnable, command);
317     }
318     
319    public void testFixedRateSchedule() {
320     Scheduler schedule = Scheduler.newFixedRateSchedule(initialDelay, delay, unit);
321     schedule.schedule(null, new ScheduledThreadPoolExecutor(1) {
322     @Override
323     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,
324     long period, TimeUnit unit) {
325     assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit);
326     return null;
327     }
328     }, testRunnable);
329     assertTrue(called);
330     }
331     
332    public void testFixedDelaySchedule() {
333     Scheduler schedule = Scheduler.newFixedDelaySchedule(initialDelay, delay, unit);
334     schedule.schedule(null, new ScheduledThreadPoolExecutor(10) {
335     @Override
336     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
337     long delay, TimeUnit unit) {
338     assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit);
339     return null;
340     }
341     }, testRunnable);
342     assertTrue(called);
343     }
344     
345    private class TestCustomScheduler extends AbstractScheduledService.CustomScheduler {
346     public AtomicInteger scheduleCounter = new AtomicInteger(0);
347     @Override
348     protected Schedule getNextSchedule() throws Exception {
349     scheduleCounter.incrementAndGet();
350     return new Schedule(0, TimeUnit.SECONDS);
351     }
352     }
353     
354    public void testCustomSchedule_startStop() throws Exception {
355     final CyclicBarrier firstBarrier = new CyclicBarrier(2);
356     final CyclicBarrier secondBarrier = new CyclicBarrier(2);
357     final AtomicBoolean shouldWait = new AtomicBoolean(true);
358     Runnable task = new Runnable() {
359     @Override public void run() {
360     try {
361     if (shouldWait.get()) {
362     firstBarrier.await();
363     secondBarrier.await();
364     }
365     } catch (Exception e) {
366     throw new RuntimeException(e);
367     }
368     }
369     };
370     TestCustomScheduler scheduler = new TestCustomScheduler();
371     Future<?> future = scheduler.schedule(null, Executors.newScheduledThreadPool(10), task);
372     firstBarrier.await();
373     assertEquals(1, scheduler.scheduleCounter.get());
374     secondBarrier.await();
375     firstBarrier.await();
376     assertEquals(2, scheduler.scheduleCounter.get());
377     shouldWait.set(false);
378     secondBarrier.await();
379     future.cancel(false);
380     }
381     
382    public void testCustomSchedulerServiceStop() throws Exception {
383     TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService();
384     service.startAsync().awaitRunning();
385     service.firstBarrier.await();
386     assertEquals(1, service.numIterations.get());
387     service.stopAsync();
388     service.secondBarrier.await();
389     service.awaitTerminated();
390     // Sleep for a while just to ensure that our task wasn't called again.
391     Thread.sleep(unit.toMillis(3 * delay));
392     assertEquals(1, service.numIterations.get());
393     }
394     
395    public void testBig() throws Exception {
396     TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService() {
397     @Override protected Scheduler scheduler() {
398     return new AbstractScheduledService.CustomScheduler() {
399     @Override
400     protected Schedule getNextSchedule() throws Exception {
401     // Explicitly yield to increase the probability of a pathological scheduling.
402     Thread.yield();
403     return new Schedule(0, TimeUnit.SECONDS);
404     }
405     };
406     }
407     };
408     service.useBarriers = false;
409     service.startAsync().awaitRunning();
410     Thread.sleep(50);
411     service.useBarriers = true;
412     service.firstBarrier.await();
413     int numIterations = service.numIterations.get();
414     service.stopAsync();
415     service.secondBarrier.await();
416     service.awaitTerminated();
417     assertEquals(numIterations, service.numIterations.get());
418     }
419     
420    private static class TestAbstractScheduledCustomService extends AbstractScheduledService {
421     final AtomicInteger numIterations = new AtomicInteger(0);
422     volatile boolean useBarriers = true;
423     final CyclicBarrier firstBarrier = new CyclicBarrier(2);
424     final CyclicBarrier secondBarrier = new CyclicBarrier(2);
425     
426    @Override protected void runOneIteration() throws Exception {
427     numIterations.incrementAndGet();
428     if (useBarriers) {
429     firstBarrier.await();
430     secondBarrier.await();
431     }
432     }
433     
434    @Override protected ScheduledExecutorService executor() {
435     // use a bunch of threads so that weird overlapping schedules are more likely to happen.
436     return Executors.newScheduledThreadPool(10);
437     }
438     
439    @Override protected void startUp() throws Exception {}
440     
441    @Override protected void shutDown() throws Exception {}
442     
443    @Override protected Scheduler scheduler() {
444     return new CustomScheduler() {
445     @Override
446     protected Schedule getNextSchedule() throws Exception {
447     return new Schedule(delay, unit);
448     }};
449     }
450     }
451     
452    public void testCustomSchedulerFailure() throws Exception {
453     TestFailingCustomScheduledService service = new TestFailingCustomScheduledService();
454     service.startAsync().awaitRunning();
455     for (int i = 1; i < 4; i++) {
456     service.firstBarrier.await();
457     assertEquals(i, service.numIterations.get());
458     service.secondBarrier.await();
459     }
460     Thread.sleep(1000);
461     try {
462     service.stopAsync().awaitTerminated(100, TimeUnit.SECONDS);
463     fail();
464     } catch (IllegalStateException e) {
465     assertEquals(State.FAILED, service.state());
466     }
467     }
468     
469    private static class TestFailingCustomScheduledService extends AbstractScheduledService {
470     final AtomicInteger numIterations = new AtomicInteger(0);
471     final CyclicBarrier firstBarrier = new CyclicBarrier(2);
472     final CyclicBarrier secondBarrier = new CyclicBarrier(2);
473     
474    @Override protected void runOneIteration() throws Exception {
475     numIterations.incrementAndGet();
476     firstBarrier.await();
477     secondBarrier.await();
478     }
479     
480    @Override protected ScheduledExecutorService executor() {
481     // use a bunch of threads so that weird overlapping schedules are more likely to happen.
482     return Executors.newScheduledThreadPool(10);
483     }
484     
485    @Override protected Scheduler scheduler() {
486     return new CustomScheduler() {
487     @Override
488     protected Schedule getNextSchedule() throws Exception {
489     if (numIterations.get() > 2) {
490     throw new IllegalStateException("Failed");
491     }
492     return new Schedule(delay, unit);
493     }};
494     }
495     }
496     }
497    }
498    <pre>
AbstractServiceTest

001    </pre>
002    /*
003     * Copyright (C) 2009 The Guava Authors
004     *
005     * Licensed under the Apache License, Version 2.0 (the "License");
006     * you may not use this file except in compliance with the License.
007     * You may obtain a copy of the License at
008     *
009     * http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017     
018    package com.google.common.util.concurrent;
019     
020    import static java.lang.Thread.currentThread;
021    import static java.util.concurrent.TimeUnit.SECONDS;
022     
023    import com.google.common.collect.ImmutableList;
024    import com.google.common.collect.Iterables;
025    import com.google.common.collect.Lists;
026    import com.google.common.util.concurrent.Service.Listener;
027    import com.google.common.util.concurrent.Service.State;
028     
029    import junit.framework.TestCase;
030     
031    import java.lang.Thread.UncaughtExceptionHandler;
032    import java.util.List;
033    import java.util.concurrent.CountDownLatch;
034    import java.util.concurrent.TimeUnit;
035    import java.util.concurrent.atomic.AtomicInteger;
036    import java.util.concurrent.atomic.AtomicReference;
037     
038    import javax.annotation.concurrent.GuardedBy;
039     
040    /**
041     * Unit test for {@link AbstractService}.
042     *
043     * @author Jesse Wilson
044     */
045    public class AbstractServiceTest extends TestCase {
046     
047    private Thread executionThread;
048     private Throwable thrownByExecutionThread;
049     
050    public void testNoOpServiceStartStop() throws Exception {
051     NoOpService service = new NoOpService();
052     RecordingListener listener = RecordingListener.record(service);
053     
054    assertEquals(State.NEW, service.state());
055     assertFalse(service.isRunning());
056     assertFalse(service.running);
057     
058    service.startAsync();
059     assertEquals(State.RUNNING, service.state());
060     assertTrue(service.isRunning());
061     assertTrue(service.running);
062     
063    service.stopAsync();
064     assertEquals(State.TERMINATED, service.state());
065     assertFalse(service.isRunning());
066     assertFalse(service.running);
067     assertEquals(
068     ImmutableList.of(
069     State.STARTING,
070     State.RUNNING,
071     State.STOPPING,
072     State.TERMINATED),
073     listener.getStateHistory());
074     }
075     
076    public void testNoOpServiceStartAndWaitStopAndWait() throws Exception {
077     NoOpService service = new NoOpService();
078     
079    service.startAsync().awaitRunning();
080     assertEquals(State.RUNNING, service.state());
081     
082    service.stopAsync().awaitTerminated();
083     assertEquals(State.TERMINATED, service.state());
084     }
085     
086    public void testNoOpServiceStartAsyncAndAwaitStopAsyncAndAwait() throws Exception {
087     NoOpService service = new NoOpService();
088     
089    service.startAsync().awaitRunning();
090     assertEquals(State.RUNNING, service.state());
091     
092    service.stopAsync().awaitTerminated();
093     assertEquals(State.TERMINATED, service.state());
094     }
095     
096    public void testNoOpServiceStopIdempotence() throws Exception {
097     NoOpService service = new NoOpService();
098     RecordingListener listener = RecordingListener.record(service);
099     service.startAsync().awaitRunning();
100     assertEquals(State.RUNNING, service.state());
101     
102    service.stopAsync();
103     service.stopAsync();
104     assertEquals(State.TERMINATED, service.state());
105     assertEquals(
106     ImmutableList.of(
107     State.STARTING,
108     State.RUNNING,
109     State.STOPPING,
110     State.TERMINATED),
111     listener.getStateHistory());
112     }
113     
114    public void testNoOpServiceStopIdempotenceAfterWait() throws Exception {
115     NoOpService service = new NoOpService();
116     
117    service.startAsync().awaitRunning();
118     
119    service.stopAsync().awaitTerminated();
120     service.stopAsync();
121     assertEquals(State.TERMINATED, service.state());
122     }
123     
124    public void testNoOpServiceStopIdempotenceDoubleWait() throws Exception {
125     NoOpService service = new NoOpService();
126     
127    service.startAsync().awaitRunning();
128     assertEquals(State.RUNNING, service.state());
129     
130    service.stopAsync().awaitTerminated();
131     service.stopAsync().awaitTerminated();
132     assertEquals(State.TERMINATED, service.state());
133     }
134     
135    public void testNoOpServiceStartStopAndWaitUninterruptible()
136     throws Exception {
137     NoOpService service = new NoOpService();
138     
139    currentThread().interrupt();
140     try {
141     service.startAsync().awaitRunning();
142     assertEquals(State.RUNNING, service.state());
143     
144    service.stopAsync().awaitTerminated();
145     assertEquals(State.TERMINATED, service.state());
146     
147    assertTrue(currentThread().isInterrupted());
148     } finally {
149     Thread.interrupted(); // clear interrupt for future tests
150     }
151     }
152     
153    private static class NoOpService extends AbstractService {
154     boolean running = false;
155     
156    @Override protected void doStart() {
157     assertFalse(running);
158     running = true;
159     notifyStarted();
160     }
161     
162    @Override protected void doStop() {
163     assertTrue(running);
164     running = false;
165     notifyStopped();
166     }
167     }
168     
169    public void testManualServiceStartStop() throws Exception {
170     ManualSwitchedService service = new ManualSwitchedService();
171     RecordingListener listener = RecordingListener.record(service);
172     
173    service.startAsync();
174     assertEquals(State.STARTING, service.state());
175     assertFalse(service.isRunning());
176     assertTrue(service.doStartCalled);
177     
178    service.notifyStarted(); // usually this would be invoked by another thread
179     assertEquals(State.RUNNING, service.state());
180     assertTrue(service.isRunning());
181     
182    service.stopAsync();
183     assertEquals(State.STOPPING, service.state());
184     assertFalse(service.isRunning());
185     assertTrue(service.doStopCalled);
186     
187    service.notifyStopped(); // usually this would be invoked by another thread
188     assertEquals(State.TERMINATED, service.state());
189     assertFalse(service.isRunning());
190     assertEquals(
191     ImmutableList.of(
192     State.STARTING,
193     State.RUNNING,
194     State.STOPPING,
195     State.TERMINATED),
196     listener.getStateHistory());
197     
198    }
199     
200    public void testManualServiceNotifyStoppedWhileRunning() throws Exception {
201     ManualSwitchedService service = new ManualSwitchedService();
202     RecordingListener listener = RecordingListener.record(service);
203     
204    service.startAsync();
205     service.notifyStarted();
206     service.notifyStopped();
207     assertEquals(State.TERMINATED, service.state());
208     assertFalse(service.isRunning());
209     assertFalse(service.doStopCalled);
210     
211    assertEquals(
212     ImmutableList.of(
213     State.STARTING,
214     State.RUNNING,
215     State.TERMINATED),
216     listener.getStateHistory());
217     }
218     
219    public void testManualServiceStopWhileStarting() throws Exception {
220     ManualSwitchedService service = new ManualSwitchedService();
221     RecordingListener listener = RecordingListener.record(service);
222     
223    service.startAsync();
224     assertEquals(State.STARTING, service.state());
225     assertFalse(service.isRunning());
226     assertTrue(service.doStartCalled);
227     
228    service.stopAsync();
229     assertEquals(State.STOPPING, service.state());
230     assertFalse(service.isRunning());
231     assertFalse(service.doStopCalled);
232     
233    service.notifyStarted();
234     assertEquals(State.STOPPING, service.state());
235     assertFalse(service.isRunning());
236     assertTrue(service.doStopCalled);
237     
238    service.notifyStopped();
239     assertEquals(State.TERMINATED, service.state());
240     assertFalse(service.isRunning());
241     assertEquals(
242     ImmutableList.of(
243     State.STARTING,
244     State.STOPPING,
245     State.TERMINATED),
246     listener.getStateHistory());
247     }
248     
249    /**
250     * This tests for a bug where if {@link Service#stopAsync()} was called while the service was
251     * {@link State#STARTING} more than once, the {@link Listener#stopping(State)} callback would get
252     * called multiple times.
253     */
254     public void testManualServiceStopMultipleTimesWhileStarting() throws Exception {
255     ManualSwitchedService service = new ManualSwitchedService();
256     final AtomicInteger stopppingCount = new AtomicInteger();
257     service.addListener(new Listener() {
258     @Override public void stopping(State from) {
259     stopppingCount.incrementAndGet();
260     }
261     }, MoreExecutors.sameThreadExecutor());
262     
263    service.startAsync();
264     service.stopAsync();
265     assertEquals(1, stopppingCount.get());
266     service.stopAsync();
267     assertEquals(1, stopppingCount.get());
268     }
269     
270    public void testManualServiceStopWhileNew() throws Exception {
271     ManualSwitchedService service = new ManualSwitchedService();
272     RecordingListener listener = RecordingListener.record(service);
273     
274    service.stopAsync();
275     assertEquals(State.TERMINATED, service.state());
276     assertFalse(service.isRunning());
277     assertFalse(service.doStartCalled);
278     assertFalse(service.doStopCalled);
279     assertEquals(ImmutableList.of(State.TERMINATED), listener.getStateHistory());
280     }
281     
282    public void testManualServiceFailWhileStarting() throws Exception {
283     ManualSwitchedService service = new ManualSwitchedService();
284     RecordingListener listener = RecordingListener.record(service);
285     service.startAsync();
286     service.notifyFailed(EXCEPTION);
287     assertEquals(ImmutableList.of(State.STARTING, State.FAILED), listener.getStateHistory());
288     }
289     
290    public void testManualServiceFailWhileRunning() throws Exception {
291     ManualSwitchedService service = new ManualSwitchedService();
292     RecordingListener listener = RecordingListener.record(service);
293     service.startAsync();
294     service.notifyStarted();
295     service.notifyFailed(EXCEPTION);
296     assertEquals(ImmutableList.of(State.STARTING, State.RUNNING, State.FAILED),
297     listener.getStateHistory());
298     }
299     
300    public void testManualServiceFailWhileStopping() throws Exception {
301     ManualSwitchedService service = new ManualSwitchedService();
302     RecordingListener listener = RecordingListener.record(service);
303     service.startAsync();
304     service.notifyStarted();
305     service.stopAsync();
306     service.notifyFailed(EXCEPTION);
307     assertEquals(ImmutableList.of(State.STARTING, State.RUNNING, State.STOPPING, State.FAILED),
308     listener.getStateHistory());
309     }
310     
311    public void testManualServiceUnrequestedStop() {
312     ManualSwitchedService service = new ManualSwitchedService();
313     
314    service.startAsync();
315     
316    service.notifyStarted();
317     assertEquals(State.RUNNING, service.state());
318     assertTrue(service.isRunning());
319     assertFalse(service.doStopCalled);
320     
321    service.notifyStopped();
322     assertEquals(State.TERMINATED, service.state());
323     assertFalse(service.isRunning());
324     assertFalse(service.doStopCalled);
325     }
326     
327    /**
328     * The user of this service should call {@link #notifyStarted} and {@link
329     * #notifyStopped} after calling {@link #startAsync} and {@link #stopAsync}.
330     */
331     private static class ManualSwitchedService extends AbstractService {
332     boolean doStartCalled = false;
333     boolean doStopCalled = false;
334     
335    @Override protected void doStart() {
336     assertFalse(doStartCalled);
337     doStartCalled = true;
338     }
339     
340    @Override protected void doStop() {
341     assertFalse(doStopCalled);
342     doStopCalled = true;
343     }
344     }
345     
346    public void testAwaitTerminated() throws Exception {
347     final NoOpService service = new NoOpService();
348     Thread waiter = new Thread() {
349     @Override public void run() {
350     service.awaitTerminated();
351     }
352     };
353     waiter.start();
354     service.startAsync().awaitRunning();
355     assertEquals(State.RUNNING, service.state());
356     service.stopAsync();
357     waiter.join(100); // ensure that the await in the other thread is triggered
358     assertFalse(waiter.isAlive());
359     }
360     
361    public void testAwaitTerminated_FailedService() throws Exception {
362     final ManualSwitchedService service = new ManualSwitchedService();
363     final AtomicReference<Throwable> exception = Atomics.newReference();
364     Thread waiter = new Thread() {
365     @Override public void run() {
366     try {
367     service.awaitTerminated();
368     fail("Expected an IllegalStateException");
369     } catch (Throwable t) {
370     exception.set(t);
371     }
372     }
373     };
374     waiter.start();
375     service.startAsync();
376     service.notifyStarted();
377     assertEquals(State.RUNNING, service.state());
378     service.notifyFailed(EXCEPTION);
379     assertEquals(State.FAILED, service.state());
380     waiter.join(100);
381     assertFalse(waiter.isAlive());
382     assertTrue(exception.get() instanceof IllegalStateException);
383     assertEquals(EXCEPTION, exception.get().getCause());
384     }
385     
386    public void testThreadedServiceStartAndWaitStopAndWait() throws Throwable {
387     ThreadedService service = new ThreadedService();
388     RecordingListener listener = RecordingListener.record(service);
389     service.startAsync().awaitRunning();
390     assertEquals(State.RUNNING, service.state());
391     
392    service.awaitRunChecks();
393     
394    service.stopAsync().awaitTerminated();
395     assertEquals(State.TERMINATED, service.state());
396     
397    throwIfSet(thrownByExecutionThread);
398     assertEquals(
399     ImmutableList.of(
400     State.STARTING,
401     State.RUNNING,
402     State.STOPPING,
403     State.TERMINATED),
404     listener.getStateHistory());
405     }
406     
407    public void testThreadedServiceStopIdempotence() throws Throwable {
408     ThreadedService service = new ThreadedService();
409     
410    service.startAsync().awaitRunning();
411     assertEquals(State.RUNNING, service.state());
412     
413    service.awaitRunChecks();
414     
415    service.stopAsync();
416     service.stopAsync().awaitTerminated();
417     assertEquals(State.TERMINATED, service.state());
418     
419    throwIfSet(thrownByExecutionThread);
420     }
421     
422    public void testThreadedServiceStopIdempotenceAfterWait()
423     throws Throwable {
424     ThreadedService service = new ThreadedService();
425     
426    service.startAsync().awaitRunning();
427     assertEquals(State.RUNNING, service.state());
428     
429    service.awaitRunChecks();
430     
431    service.stopAsync().awaitTerminated();
432     service.stopAsync();
433     assertEquals(State.TERMINATED, service.state());
434     
435    executionThread.join();
436     
437    throwIfSet(thrownByExecutionThread);
438     }
439     
440    public void testThreadedServiceStopIdempotenceDoubleWait()
441     throws Throwable {
442     ThreadedService service = new ThreadedService();
443     
444    service.startAsync().awaitRunning();
445     assertEquals(State.RUNNING, service.state());
446     
447    service.awaitRunChecks();
448     
449    service.stopAsync().awaitTerminated();
450     service.stopAsync().awaitTerminated();
451     assertEquals(State.TERMINATED, service.state());
452     
453    throwIfSet(thrownByExecutionThread);
454     }
455     
456    public void testManualServiceFailureIdempotence() {
457     ManualSwitchedService service = new ManualSwitchedService();
458     RecordingListener.record(service);
459     service.startAsync();
460     service.notifyFailed(new Exception("1"));
461     service.notifyFailed(new Exception("2"));
462     assertEquals("1", service.failureCause().getMessage());
463     try {
464     service.awaitRunning();
465     fail();
466     } catch (IllegalStateException e) {
467     assertEquals("1", e.getCause().getMessage());
468     }
469     }
470     
471    private class ThreadedService extends AbstractService {
472     final CountDownLatch hasConfirmedIsRunning = new CountDownLatch(1);
473     
474    /*
475     * The main test thread tries to stop() the service shortly after
476     * confirming that it is running. Meanwhile, the service itself is trying
477     * to confirm that it is running. If the main thread's stop() call happens
478     * before it has the chance, the test will fail. To avoid this, the main
479     * thread calls this method, which waits until the service has performed
480     * its own "running" check.
481     */
482     void awaitRunChecks() throws InterruptedException {
483     assertTrue("Service thread hasn't finished its checks. "
484     + "Exception status (possibly stale): " + thrownByExecutionThread,
485     hasConfirmedIsRunning.await(10, SECONDS));
486     }
487     
488    @Override protected void doStart() {
489     assertEquals(State.STARTING, state());
490     invokeOnExecutionThreadForTest(new Runnable() {
491     @Override public void run() {
492     assertEquals(State.STARTING, state());
493     notifyStarted();
494     assertEquals(State.RUNNING, state());
495     hasConfirmedIsRunning.countDown();
496     }
497     });
498     }
499     
500    @Override protected void doStop() {
501     assertEquals(State.STOPPING, state());
502     invokeOnExecutionThreadForTest(new Runnable() {
503     @Override public void run() {
504     assertEquals(State.STOPPING, state());
505     notifyStopped();
506     assertEquals(State.TERMINATED, state());
507     }
508     });
509     }
510     }
511     
512    private void invokeOnExecutionThreadForTest(Runnable runnable) {
513     executionThread = new Thread(runnable);
514     executionThread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
515     @Override
516     public void uncaughtException(Thread thread, Throwable e) {
517     thrownByExecutionThread = e;
518     }
519     });
520     executionThread.start();
521     }
522     
523    private static void throwIfSet(Throwable t) throws Throwable {
524     if (t != null) {
525     throw t;
526     }
527     }
528     
529    public void testStopUnstartedService() throws Exception {
530     NoOpService service = new NoOpService();
531     RecordingListener listener = RecordingListener.record(service);
532     
533    service.stopAsync();
534     assertEquals(State.TERMINATED, service.state());
535     
536    try {
537     service.startAsync();
538     fail();
539     } catch (IllegalStateException expected) {}
540     assertEquals(State.TERMINATED, Iterables.getOnlyElement(listener.getStateHistory()));
541     }
542     
543    public void testFailingServiceStartAndWait() throws Exception {
544     StartFailingService service = new StartFailingService();
545     RecordingListener listener = RecordingListener.record(service);
546     
547    try {
548     service.startAsync().awaitRunning();
549     fail();
550     } catch (IllegalStateException e) {
551     assertEquals(EXCEPTION, service.failureCause());
552     assertEquals(EXCEPTION, e.getCause());
553     }
554     assertEquals(
555     ImmutableList.of(
556     State.STARTING,
557     State.FAILED),
558     listener.getStateHistory());
559     }
560     
561    public void testFailingServiceStopAndWait_stopFailing() throws Exception {
562     StopFailingService service = new StopFailingService();
563     RecordingListener listener = RecordingListener.record(service);
564     
565    service.startAsync().awaitRunning();
566     try {
567     service.stopAsync().awaitTerminated();
568     fail();
569     } catch (IllegalStateException e) {
570     assertEquals(EXCEPTION, service.failureCause());
571     assertEquals(EXCEPTION, e.getCause());
572     }
573     assertEquals(
574     ImmutableList.of(
575     State.STARTING,
576     State.RUNNING,
577     State.STOPPING,
578     State.FAILED),
579     listener.getStateHistory());
580     }
581     
582    public void testFailingServiceStopAndWait_runFailing() throws Exception {
583     RunFailingService service = new RunFailingService();
584     RecordingListener listener = RecordingListener.record(service);
585     
586    service.startAsync();
587     try {
588     service.awaitRunning();
589     fail();
590     } catch (IllegalStateException e) {
591     assertEquals(EXCEPTION, service.failureCause());
592     assertEquals(EXCEPTION, e.getCause());
593     }
594     assertEquals(
595     ImmutableList.of(
596     State.STARTING,
597     State.RUNNING,
598     State.FAILED),
599     listener.getStateHistory());
600     }
601     
602    public void testThrowingServiceStartAndWait() throws Exception {
603     StartThrowingService service = new StartThrowingService();
604     RecordingListener listener = RecordingListener.record(service);
605     
606    try {
607     service.startAsync().awaitRunning();
608     fail();
609     } catch (IllegalStateException e) {
610     assertEquals(service.exception, service.failureCause());
611     assertEquals(service.exception, e.getCause());
612     }
613     assertEquals(
614     ImmutableList.of(
615     State.STARTING,
616     State.FAILED),
617     listener.getStateHistory());
618     }
619     
620    public void testThrowingServiceStopAndWait_stopThrowing() throws Exception {
621     StopThrowingService service = new StopThrowingService();
622     RecordingListener listener = RecordingListener.record(service);
623     
624    service.startAsync().awaitRunning();
625     try {
626     service.stopAsync().awaitTerminated();
627     fail();
628     } catch (IllegalStateException e) {
629     assertEquals(service.exception, service.failureCause());
630     assertEquals(service.exception, e.getCause());
631     }
632     assertEquals(
633     ImmutableList.of(
634     State.STARTING,
635     State.RUNNING,
636     State.STOPPING,
637     State.FAILED),
638     listener.getStateHistory());
639     }
640     
641    public void testThrowingServiceStopAndWait_runThrowing() throws Exception {
642     RunThrowingService service = new RunThrowingService();
643     RecordingListener listener = RecordingListener.record(service);
644     
645    service.startAsync();
646     try {
647     service.awaitTerminated();
648     fail();
649     } catch (IllegalStateException e) {
650     assertEquals(service.exception, service.failureCause());
651     assertEquals(service.exception, e.getCause());
652     }
653     assertEquals(
654     ImmutableList.of(
655     State.STARTING,
656     State.RUNNING,
657     State.FAILED),
658     listener.getStateHistory());
659     }
660     
661    public void testFailureCause_throwsIfNotFailed() {
662     StopFailingService service = new StopFailingService();
663     try {
664     service.failureCause();
665     fail();
666     } catch (IllegalStateException e) {
667     // expected
668     }
669     service.startAsync().awaitRunning();
670     try {
671     service.failureCause();
672     fail();
673     } catch (IllegalStateException e) {
674     // expected
675     }
676     try {
677     service.stopAsync().awaitTerminated();
678     fail();
679     } catch (IllegalStateException e) {
680     assertEquals(EXCEPTION, service.failureCause());
681     assertEquals(EXCEPTION, e.getCause());
682     }
683     }
684     
685    public void testAddListenerAfterFailureDoesntCauseDeadlock() throws InterruptedException {
686     final StartFailingService service = new StartFailingService();
687     service.startAsync();
688     assertEquals(State.FAILED, service.state());
689     service.addListener(new RecordingListener(service), MoreExecutors.sameThreadExecutor());
690     Thread thread = new Thread() {
691     @Override public void run() {
692     // Internally stopAsync() grabs a lock, this could be any such method on AbstractService.
693     service.stopAsync();
694     }
695     };
696     thread.start();
697     thread.join(100);
698     assertFalse(thread + " is deadlocked", thread.isAlive());
699     }
700     
701    public void testListenerDoesntDeadlockOnStartAndWaitFromRunning() throws Exception {
702     final NoOpThreadedService service = new NoOpThreadedService();
703     service.addListener(new Listener() {
704     @Override public void running() {
705     service.awaitRunning();
706     }
707     }, MoreExecutors.sameThreadExecutor());
708     service.startAsync().awaitRunning(10, TimeUnit.MILLISECONDS);
709     service.stopAsync();
710     }
711     
712    public void testListenerDoesntDeadlockOnStopAndWaitFromTerminated() throws Exception {
713     final NoOpThreadedService service = new NoOpThreadedService();
714     service.addListener(new Listener() {
715     @Override public void terminated(State from) {
716     service.stopAsync().awaitTerminated();
717     }
718     }, MoreExecutors.sameThreadExecutor());
719     service.startAsync().awaitRunning();
720     
721    Thread thread = new Thread() {
722     @Override public void run() {
723     service.stopAsync().awaitTerminated();
724     }
725     };
726     thread.start();
727     thread.join(100);
728     assertFalse(thread + " is deadlocked", thread.isAlive());
729     }
730     
731    private static class NoOpThreadedService extends AbstractExecutionThreadService {
732     final CountDownLatch latch = new CountDownLatch(1);
733     @Override protected void run() throws Exception {
734     latch.await();
735     }
736     @Override protected void triggerShutdown() {
737     latch.countDown();
738     }
739     }
740     
741    private static class StartFailingService extends AbstractService {
742     @Override protected void doStart() {
743     notifyFailed(EXCEPTION);
744     }
745     
746    @Override protected void doStop() {
747     fail();
748     }
749     }
750     
751    private static class RunFailingService extends AbstractService {
752     @Override protected void doStart() {
753     notifyStarted();
754     notifyFailed(EXCEPTION);
755     }
756     
757    @Override protected void doStop() {
758     fail();
759     }
760     }
761     
762    private static class StopFailingService extends AbstractService {
763     @Override protected void doStart() {
764     notifyStarted();
765     }
766     
767    @Override protected void doStop() {
768     notifyFailed(EXCEPTION);
769     }
770     }
771     
772    private static class StartThrowingService extends AbstractService {
773     
774    final RuntimeException exception = new RuntimeException("deliberate");
775     
776    @Override protected void doStart() {
777     throw exception;
778     }
779     
780    @Override protected void doStop() {
781     fail();
782     }
783     }
784     
785    private static class RunThrowingService extends AbstractService {
786     
787    final RuntimeException exception = new RuntimeException("deliberate");
788     
789    @Override protected void doStart() {
790     notifyStarted();
791     throw exception;
792     }
793     
794    @Override protected void doStop() {
795     fail();
796     }
797     }
798     
799    private static class StopThrowingService extends AbstractService {
800     
801    final RuntimeException exception = new RuntimeException("deliberate");
802     
803    @Override protected void doStart() {
804     notifyStarted();
805     }
806     
807    @Override protected void doStop() {
808     throw exception;
809     }
810     }
811     
812    private static class RecordingListener extends Listener {
813     static RecordingListener record(Service service) {
814     RecordingListener listener = new RecordingListener(service);
815     service.addListener(listener, MoreExecutors.sameThreadExecutor());
816     return listener;
817     }
818     
819    final Service service;
820     
821    RecordingListener(Service service) {
822     this.service = service;
823     }
824     
825    @GuardedBy("this")
826     final List<State> stateHistory = Lists.newArrayList();
827     final CountDownLatch completionLatch = new CountDownLatch(1);
828     
829    ImmutableList<State> getStateHistory() throws Exception {
830     completionLatch.await();
831     synchronized (this) {
832     return ImmutableList.copyOf(stateHistory);
833     }
834     }
835     
836    @Override public synchronized void starting() {
837     assertTrue(stateHistory.isEmpty());
838     assertNotSame(State.NEW, service.state());
839     stateHistory.add(State.STARTING);
840     }
841     
842    @Override public synchronized void running() {
843     assertEquals(State.STARTING, Iterables.getOnlyElement(stateHistory));
844     stateHistory.add(State.RUNNING);
845     service.awaitRunning();
846     assertNotSame(State.STARTING, service.state());
847     }
848     
849    @Override public synchronized void stopping(State from) {
850     assertEquals(from, Iterables.getLast(stateHistory));
851     stateHistory.add(State.STOPPING);
852     if (from == State.STARTING) {
853     try {
854     service.awaitRunning();
855     fail();
856     } catch (IllegalStateException expected) {
857     assertNull(expected.getCause());
858     assertTrue(expected.getMessage().equals(
859     "Expected the service to be RUNNING, but was STOPPING"));
860     }
861     }
862     assertNotSame(from, service.state());
863     }
864     
865    @Override public synchronized void terminated(State from) {
866     assertEquals(from, Iterables.getLast(stateHistory, State.NEW));
867     stateHistory.add(State.TERMINATED);
868     assertEquals(State.TERMINATED, service.state());
869     if (from == State.NEW) {
870     try {
871     service.awaitRunning();
872     fail();
873     } catch (IllegalStateException expected) {
874     assertNull(expected.getCause());
875     assertTrue(expected.getMessage().equals(
876     "Expected the service to be RUNNING, but was TERMINATED"));
877     }
878     }
879     completionLatch.countDown();
880     }
881     
882    @Override public synchronized void failed(State from, Throwable failure) {
883     assertEquals(from, Iterables.getLast(stateHistory));
884     stateHistory.add(State.FAILED);
885     assertEquals(State.FAILED, service.state());
886     assertEquals(failure, service.failureCause());
887     if (from == State.STARTING) {
888     try {
889     service.awaitRunning();
890     fail();
891     } catch (IllegalStateException e) {
892     assertEquals(failure, e.getCause());
893     }
894     }
895     try {
896     service.awaitTerminated();
897     fail();
898     } catch (IllegalStateException e) {
899     assertEquals(failure, e.getCause());
900     }
901     completionLatch.countDown();
902     }
903     }
904     
905    public void testNotifyStartedWhenNotStarting() {
906     AbstractService service = new DefaultService();
907     try {
908     service.notifyStarted();
909     fail();
910     } catch (IllegalStateException expected) {}
911     }
912     
913    public void testNotifyStoppedWhenNotRunning() {
914     AbstractService service = new DefaultService();
915     try {
916     service.notifyStopped();
917     fail();
918     } catch (IllegalStateException expected) {}
919     }
920     
921    public void testNotifyFailedWhenNotStarted() {
922     AbstractService service = new DefaultService();
923     try {
924     service.notifyFailed(new Exception());
925     fail();
926     } catch (IllegalStateException expected) {}
927     }
928     
929    public void testNotifyFailedWhenTerminated() {
930     NoOpService service = new NoOpService();
931     service.startAsync().awaitRunning();
932     service.stopAsync().awaitTerminated();
933     try {
934     service.notifyFailed(new Exception());
935     fail();
936     } catch (IllegalStateException expected) {}
937     }
938     
939    private static class DefaultService extends AbstractService {
940     @Override protected void doStart() {}
941     @Override protected void doStop() {}
942     }
943     
944    private static final Exception EXCEPTION = new Exception();
945    }
946    <pre>
ServiceManagerTest

view source
 
print?
001    </pre>
002    /*
003     * Copyright (C) 2012 The Guava Authors
004     *
005     * Licensed under the Apache License, Version 2.0 (the "License");
006     * you may not use this file except in compliance with the License.
007     * You may obtain a copy of the License at
008     *
009     * http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package com.google.common.util.concurrent;
018     
019    import static java.util.Arrays.asList;
020     
021    import com.google.common.collect.ImmutableMap;
022    import com.google.common.collect.ImmutableSet;
023    import com.google.common.collect.Lists;
024    import com.google.common.collect.Sets;
025    import com.google.common.testing.NullPointerTester;
026    import com.google.common.testing.TestLogHandler;
027    import com.google.common.util.concurrent.ServiceManager.Listener;
028     
029    import junit.framework.TestCase;
030     
031    import java.util.Arrays;
032    import java.util.Collection;
033    import java.util.List;
034    import java.util.Set;
035    import java.util.concurrent.CountDownLatch;
036    import java.util.concurrent.Executor;
037    import java.util.concurrent.TimeUnit;
038    import java.util.concurrent.TimeoutException;
039    import java.util.logging.Formatter;
040    import java.util.logging.Level;
041    import java.util.logging.LogRecord;
042    import java.util.logging.Logger;
043     
044    /**
045     * Tests for {@link ServiceManager}.
046     *
047     * @author Luke Sandberg
048     * @author Chris Nokleberg
049     */
050    public class ServiceManagerTest extends TestCase {
051     
052    private static class NoOpService extends AbstractService {
053     @Override protected void doStart() {
054     notifyStarted();
055     }
056     
057    @Override protected void doStop() {
058     notifyStopped();
059     }
060     }
061     
062    /*
063     * A NoOp service that will delay the startup and shutdown notification for a configurable amount
064     * of time.
065     */
066     private static class NoOpDelayedSerivce extends NoOpService {
067     private long delay;
068     
069    public NoOpDelayedSerivce(long delay) {
070     this.delay = delay;
071     }
072     
073    @Override protected void doStart() {
074     new Thread() {
075     @Override public void run() {
076     Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS);
077     notifyStarted();
078     }
079     }.start();
080     }
081     
082    @Override protected void doStop() {
083     new Thread() {
084     @Override public void run() {
085     Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS);
086     notifyStopped();
087     }
088     }.start();
089     }
090     }
091     
092    private static class FailStartService extends NoOpService {
093     @Override protected void doStart() {
094     notifyFailed(new IllegalStateException("failed"));
095     }
096     }
097     
098    private static class FailRunService extends NoOpService {
099     @Override protected void doStart() {
100     super.doStart();
101     notifyFailed(new IllegalStateException("failed"));
102     }
103     }
104     
105    private static class FailStopService extends NoOpService {
106     @Override protected void doStop() {
107     notifyFailed(new IllegalStateException("failed"));
108     }
109     }
110     
111    public void testServiceStartupTimes() {
112     Service a = new NoOpDelayedSerivce(150);
113     Service b = new NoOpDelayedSerivce(353);
114     ServiceManager serviceManager = new ServiceManager(asList(a, b));
115     serviceManager.startAsync().awaitHealthy();
116     ImmutableMap<Service, Long> startupTimes = serviceManager.startupTimes();
117     assertEquals(2, startupTimes.size());
118     assertTrue(startupTimes.get(a) >= 150);
119     assertTrue(startupTimes.get(b) >= 353);
120     }
121     
122    public void testServiceStartStop() {
123     Service a = new NoOpService();
124     Service b = new NoOpService();
125     ServiceManager manager = new ServiceManager(asList(a, b));
126     RecordingListener listener = new RecordingListener();
127     manager.addListener(listener);
128     assertState(manager, Service.State.NEW, a, b);
129     assertFalse(manager.isHealthy());
130     manager.startAsync().awaitHealthy();
131     assertState(manager, Service.State.RUNNING, a, b);
132     assertTrue(manager.isHealthy());
133     assertTrue(listener.healthyCalled);
134     assertFalse(listener.stoppedCalled);
135     assertTrue(listener.failedServices.isEmpty());
136     manager.stopAsync().awaitStopped();
137     assertState(manager, Service.State.TERMINATED, a, b);
138     assertFalse(manager.isHealthy());
139     assertTrue(listener.stoppedCalled);
140     assertTrue(listener.failedServices.isEmpty());
141     }
142     
143    public void testFailStart() throws Exception {
144     Service a = new NoOpService();
145     Service b = new FailStartService();
146     Service c = new NoOpService();
147     Service d = new FailStartService();
148     Service e = new NoOpService();
149     ServiceManager manager = new ServiceManager(asList(a, b, c, d, e));
150     RecordingListener listener = new RecordingListener();
151     manager.addListener(listener);
152     assertState(manager, Service.State.NEW, a, b, c, d, e);
153     try {
154     manager.startAsync().awaitHealthy();
155     fail();
156     } catch (IllegalStateException expected) {
157     }
158     assertFalse(listener.healthyCalled);
159     assertState(manager, Service.State.RUNNING, a, c, e);
160     assertEquals(ImmutableSet.of(b, d), listener.failedServices);
161     assertState(manager, Service.State.FAILED, b, d);
162     assertFalse(manager.isHealthy());
163     
164    manager.stopAsync().awaitStopped();
165     assertFalse(manager.isHealthy());
166     assertFalse(listener.healthyCalled);
167     assertTrue(listener.stoppedCalled);
168     }
169     
170    public void testFailRun() throws Exception {
171     Service a = new NoOpService();
172     Service b = new FailRunService();
173     ServiceManager manager = new ServiceManager(asList(a, b));
174     RecordingListener listener = new RecordingListener();
175     manager.addListener(listener);
176     assertState(manager, Service.State.NEW, a, b);
177     try {
178     manager.startAsync().awaitHealthy();
179     fail();
180     } catch (IllegalStateException expected) {
181     }
182     assertTrue(listener.healthyCalled);
183     assertEquals(ImmutableSet.of(b), listener.failedServices);
184     
185    manager.stopAsync().awaitStopped();
186     assertState(manager, Service.State.FAILED, b);
187     assertState(manager, Service.State.TERMINATED, a);
188     
189    assertTrue(listener.stoppedCalled);
190     }
191     
192    public void testFailStop() throws Exception {
193     Service a = new NoOpService();
194     Service b = new FailStopService();
195     Service c = new NoOpService();
196     ServiceManager manager = new ServiceManager(asList(a, b, c));
197     RecordingListener listener = new RecordingListener();
198     manager.addListener(listener);
199     
200    manager.startAsync().awaitHealthy();
201     assertTrue(listener.healthyCalled);
202     assertFalse(listener.stoppedCalled);
203     manager.stopAsync().awaitStopped();
204     
205    assertTrue(listener.stoppedCalled);
206     assertEquals(ImmutableSet.of(b), listener.failedServices);
207     assertState(manager, Service.State.FAILED, b);
208     assertState(manager, Service.State.TERMINATED, a, c);
209     }
210     
211    public void testToString() throws Exception {
212     Service a = new NoOpService();
213     Service b = new FailStartService();
214     ServiceManager manager = new ServiceManager(asList(a, b));
215     String toString = manager.toString();
216     assertTrue(toString.contains("NoOpService"));
217     assertTrue(toString.contains("FailStartService"));
218     }
219     
220    public void testTimeouts() throws Exception {
221     Service a = new NoOpDelayedSerivce(50);
222     ServiceManager manager = new ServiceManager(asList(a));
223     manager.startAsync();
224     try {
225     manager.awaitHealthy(1, TimeUnit.MILLISECONDS);
226     fail();
227     } catch (TimeoutException expected) {
228     }
229     manager.awaitHealthy(100, TimeUnit.MILLISECONDS); // no exception thrown
230     
231    manager.stopAsync();
232     try {
233     manager.awaitStopped(1, TimeUnit.MILLISECONDS);
234     fail();
235     } catch (TimeoutException expected) {
236     }
237     manager.awaitStopped(100, TimeUnit.MILLISECONDS); // no exception thrown
238     }
239     
240    /**
241     * This covers a case where if the last service to stop failed then the stopped callback would
242     * never be called.
243     */
244     public void testSingleFailedServiceCallsStopped() {
245     Service a = new FailStartService();
246     ServiceManager manager = new ServiceManager(asList(a));
247     RecordingListener listener = new RecordingListener();
248     manager.addListener(listener);
249     try {
250     manager.startAsync().awaitHealthy();
251     fail();
252     } catch (IllegalStateException expected) {
253     }
254     assertTrue(listener.stoppedCalled);
255     }
256     
257    /**
258     * This covers a bug where listener.healthy would get called when a single service failed during
259     * startup (it occurred in more complicated cases also).
260     */
261     public void testFailStart_singleServiceCallsHealthy() {
262     Service a = new FailStartService();
263     ServiceManager manager = new ServiceManager(asList(a));
264     RecordingListener listener = new RecordingListener();
265     manager.addListener(listener);
266     try {
267     manager.startAsync().awaitHealthy();
268     fail();
269     } catch (IllegalStateException expected) {
270     }
271     assertFalse(listener.healthyCalled);
272     }
273     
274    /**
275     * This covers a bug where if a listener was installed that would stop the manager if any service
276     * fails and something failed during startup before service.start was called on all the services,
277     * then awaitStopped would deadlock due to an IllegalStateException that was thrown when trying to
278     * stop the timer(!).
279     */
280     public void testFailStart_stopOthers() throws TimeoutException {
281     Service a = new FailStartService();
282     Service b = new NoOpService();
283     final ServiceManager manager = new ServiceManager(asList(a, b));
284     manager.addListener(new Listener() {
285     @Override public void failure(Service service) {
286     manager.stopAsync();
287     }});
288     manager.startAsync();
289     manager.awaitStopped(10, TimeUnit.MILLISECONDS);
290     }
291     
292    private static void assertState(
293     ServiceManager manager, Service.State state, Service... services) {
294     Collection<Service> managerServices = manager.servicesByState().get(state);
295     for (Service service : services) {
296     assertEquals(service.toString(), state, service.state());
297     assertEquals(service.toString(), service.isRunning(), state == Service.State.RUNNING);
298     assertTrue(managerServices + " should contain " + service, managerServices.contains(service));
299     }
300     }
301     
302    /**
303     * This is for covering a case where the ServiceManager would behave strangely if constructed
304     * with no service under management. Listeners would never fire because the ServiceManager was
305     * healthy and stopped at the same time. This test ensures that listeners fire and isHealthy
306     * makes sense.
307     */
308     public void testEmptyServiceManager() {
309     Logger logger = Logger.getLogger(ServiceManager.class.getName());
310     logger.setLevel(Level.FINEST);
311     TestLogHandler logHandler = new TestLogHandler();
312     logger.addHandler(logHandler);
313     ServiceManager manager = new ServiceManager(Arrays.<Service>asList());
314     RecordingListener listener = new RecordingListener();
315     manager.addListener(listener, MoreExecutors.sameThreadExecutor());
316     manager.startAsync().awaitHealthy();
317     assertTrue(manager.isHealthy());
318     assertTrue(listener.healthyCalled);
319     assertFalse(listener.stoppedCalled);
320     assertTrue(listener.failedServices.isEmpty());
321     manager.stopAsync().awaitStopped();
322     assertFalse(manager.isHealthy());
323     assertTrue(listener.stoppedCalled);
324     assertTrue(listener.failedServices.isEmpty());
325     // check that our NoOpService is not directly observable via any of the inspection methods or
326     // via logging.
327     assertEquals("ServiceManager{services=[]}", manager.toString());
328     assertTrue(manager.servicesByState().isEmpty());
329     assertTrue(manager.startupTimes().isEmpty());
330     Formatter logFormatter = new Formatter() {
331     @Override public String format(LogRecord record) {
332     return formatMessage(record);
333     }
334     };
335     for (LogRecord record : logHandler.getStoredLogRecords()) {
336     assertFalse(logFormatter.format(record).contains("NoOpService"));
337     }
338     }
339     
340    /**
341     * This is for a case where a long running Listener using the sameThreadListener could deadlock
342     * another thread calling stopAsync().
343     */
344     
345    public void testListenerDeadlock() throws InterruptedException {
346     final CountDownLatch failEnter = new CountDownLatch(1);
347     Service failRunService = new AbstractService() {
348     @Override protected void doStart() {
349     new Thread() {
350     @Override public void run() {
351     notifyStarted();
352     notifyFailed(new Exception("boom"));
353     }
354     }.start();
355     }
356     @Override protected void doStop() {
357     notifyStopped();
358     }
359     };
360     final ServiceManager manager = new ServiceManager(
361     Arrays.asList(failRunService, new NoOpService()));
362     manager.addListener(new ServiceManager.Listener() {
363     @Override public void failure(Service service) {
364     failEnter.countDown();
365     // block forever!
366     Uninterruptibles.awaitUninterruptibly(new CountDownLatch(1));
367     }
368     }, MoreExecutors.sameThreadExecutor());
369     // We do not call awaitHealthy because, due to races, that method may throw an exception. But
370     // we really just want to wait for the thread to be in the failure callback so we wait for that
371     // explicitly instead.
372     manager.startAsync();
373     failEnter.await();
374     assertFalse("State should be updated before calling listeners", manager.isHealthy());
375     // now we want to stop the services.
376     Thread stoppingThread = new Thread() {
377     @Override public void run() {
378     manager.stopAsync().awaitStopped();
379     }
380     };
381     stoppingThread.start();
382     // this should be super fast since the only non stopped service is a NoOpService
383     stoppingThread.join(1000);
384     assertFalse("stopAsync has deadlocked!.", stoppingThread.isAlive());
385     }
386     
387    /**
388     * Catches a bug where when constructing a service manager failed, later interactions with the
389     * service could cause IllegalStateExceptions inside the partially constructed ServiceManager.
390     * This ISE wouldn't actually bubble up but would get logged by ExecutionQueue. This obfuscated
391     * the original error (which was not constructing ServiceManager correctly).
392     */
393     public void testPartiallyConstructedManager() {
394     Logger logger = Logger.getLogger("global");
395     logger.setLevel(Level.FINEST);
396     TestLogHandler logHandler = new TestLogHandler();
397     logger.addHandler(logHandler);
398     NoOpService service = new NoOpService();
399     service.startAsync();
400     try {
401     new ServiceManager(Arrays.asList(service));
402     fail();
403     } catch (IllegalArgumentException expected) {}
404     service.stopAsync();
405     // Nothing was logged!
406     assertEquals(0, logHandler.getStoredLogRecords().size());
407     }
408     
409    public void testPartiallyConstructedManager_transitionAfterAddListenerBeforeStateIsReady() {
410     // The implementation of this test is pretty sensitive to the implementation :( but we want to
411     // ensure that if weird things happen during construction then we get exceptions.
412     final NoOpService service1 = new NoOpService();
413     // This service will start service1 when addListener is called. This simulates service1 being
414     // started asynchronously.
415     Service service2 = new Service() {
416     final NoOpService delegate = new NoOpService();
417     @Override public final void addListener(Listener listener, Executor executor) {
418     service1.startAsync();
419     delegate.addListener(listener, executor);
420     }
421     // Delegates from here on down
422     @Override public final Service startAsync() {
423     return delegate.startAsync();
424     }
425     
426    @Override public final Service stopAsync() {
427     return delegate.stopAsync();
428     }
429     
430    @Override public final ListenableFuture<State> start() {
431     return delegate.start();
432     }
433     
434    @Override public final ListenableFuture<State> stop() {
435     return delegate.stop();
436     }
437     
438    @Override public State startAndWait() {
439     return delegate.startAndWait();
440     }
441     
442    @Override public State stopAndWait() {
443     return delegate.stopAndWait();
444     }
445     
446    @Override public final void awaitRunning() {
447     delegate.awaitRunning();
448     }
449     
450    @Override public final void awaitRunning(long timeout, TimeUnit unit)
451     throws TimeoutException {
452     delegate.awaitRunning(timeout, unit);
453     }
454     
455    @Override public final void awaitTerminated() {
456     delegate.awaitTerminated();
457     }
458     
459    @Override public final void awaitTerminated(long timeout, TimeUnit unit)
460     throws TimeoutException {
461     delegate.awaitTerminated(timeout, unit);
462     }
463     
464    @Override public final boolean isRunning() {
465     return delegate.isRunning();
466     }
467     
468    @Override public final State state() {
469     return delegate.state();
470     }
471     
472    @Override public final Throwable failureCause() {
473     return delegate.failureCause();
474     }
475     };
476     try {
477     new ServiceManager(Arrays.asList(service1, service2));
478     fail();
479     } catch (IllegalArgumentException expected) {
480     assertTrue(expected.getMessage().contains("started transitioning asynchronously"));
481     }
482     }
483     
484    /**
485     * This test is for a case where two Service.Listener callbacks for the same service would call
486     * transitionService in the wrong order due to a race. Due to the fact that it is a race this
487     * test isn't guaranteed to expose the issue, but it is at least likely to become flaky if the
488     * race sneaks back in, and in this case flaky means something is definitely wrong.
489     *
490     * <p>Before the bug was fixed this test would fail at least 30% of the time.
491     */
492     
493    public void testTransitionRace() throws TimeoutException {
494     for (int k = 0; k < 1000; k++) {
495     List<Service> services = Lists.newArrayList();
496     for (int i = 0; i < 5; i++) {
497     services.add(new SnappyShutdownService(i));
498     }
499     ServiceManager manager = new ServiceManager(services);
500     manager.startAsync().awaitHealthy();
501     manager.stopAsync().awaitStopped(1, TimeUnit.SECONDS);
502     }
503     }
504     
505    /**
506     * This service will shutdown very quickly after stopAsync is called and uses a background thread
507     * so that we know that the stopping() listeners will execute on a different thread than the
508     * terminated() listeners.
509     */
510     private static class SnappyShutdownService extends AbstractExecutionThreadService {
511     final int index;
512     final CountDownLatch latch = new CountDownLatch(1);
513     
514    SnappyShutdownService(int index) {
515     this.index = index;
516     }
517     
518    @Override protected void run() throws Exception {
519     latch.await();
520     }
521     
522    @Override protected void triggerShutdown() {
523     latch.countDown();
524     }
525     
526    @Override protected String serviceName() {
527     return this.getClass().getSimpleName() + "[" + index + "]";
528     }
529     }
530     
531    public void testNulls() {
532     ServiceManager manager = new ServiceManager(Arrays.<Service>asList());
533     new NullPointerTester()
534     .setDefault(ServiceManager.Listener.class, new RecordingListener())
535     .testAllPublicInstanceMethods(manager);
536     }
537     
538    private static final class RecordingListener extends ServiceManager.Listener {
539     volatile boolean healthyCalled;
540     volatile boolean stoppedCalled;
541     final Set<Service> failedServices = Sets.newConcurrentHashSet();
542     
543    @Override public void healthy() {
544     healthyCalled = true;
545     }
546     
547    @Override public void stopped() {
548     stoppedCalled = true;
549     }
550     
551    @Override public void failure(Service service) {
552     failedServices.add(service);
553     }
554     }
555    }
556    <pre>
 
原文地址:https://www.cnblogs.com/muxi0407/p/12022079.html