View Javadoc

1   // Copyright (C) 2004 - 2012 Philip Aston
2   // All rights reserved.
3   //
4   // This file is part of The Grinder software distribution. Refer to
5   // the file LICENSE which is part of The Grinder distribution for
6   // licensing details. The Grinder distribution is available on the
7   // Internet at http://grinder.sourceforge.net/
8   //
9   // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
10  // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
11  // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
12  // FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
13  // COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
14  // INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
15  // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
16  // SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
17  // HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
18  // STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
19  // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
20  // OF THE POSSIBILITY OF SUCH DAMAGE.
21  
22  package net.grinder.engine.agent;
23  
24  import static org.junit.Assert.assertEquals;
25  import static org.junit.Assert.assertFalse;
26  import static org.junit.Assert.assertTrue;
27  import static org.mockito.Matchers.eq;
28  import static org.mockito.Matchers.isA;
29  import static org.mockito.Mockito.doThrow;
30  import static org.mockito.Mockito.mock;
31  import static org.mockito.Mockito.timeout;
32  import static org.mockito.Mockito.verify;
33  import static org.mockito.Mockito.verifyNoMoreInteractions;
34  import static org.mockito.Mockito.when;
35  
36  import java.io.OutputStream;
37  import java.io.PrintWriter;
38  import java.util.ArrayList;
39  import java.util.concurrent.ExecutorService;
40  import java.util.concurrent.RejectedExecutionException;
41  
42  import net.grinder.common.UncheckedInterruptedException;
43  import net.grinder.common.processidentity.WorkerIdentity;
44  import net.grinder.engine.common.EngineException;
45  import net.grinder.testutility.RedirectStandardStreams;
46  import net.grinder.util.thread.Condition;
47  
48  import org.junit.Before;
49  import org.junit.Test;
50  import org.mockito.Mock;
51  import org.mockito.MockitoAnnotations;
52  import org.slf4j.Logger;
53  
54  
55  /**
56   *  Unit tests for <code>WorkerLauncher</code>.
57   *
58   * @author Philip Aston
59   */
60  public class TestWorkerLauncher {
61  
62    private static final String s_testClasspath =
63      System.getProperty("java.class.path");
64  
65    private final MyCondition m_condition = new MyCondition();
66    @Mock private Logger m_logger;
67  
68    @Before public void setUp() {
69      MockitoAnnotations.initMocks(this);
70    }
71  
72    @Test public void testConstructor() throws Exception {
73      final WorkerLauncher workerLauncher1 =
74        new WorkerLauncher(0, null, null, null);
75  
76      assertTrue(workerLauncher1.allFinished());
77      workerLauncher1.shutdown();
78  
79      final WorkerLauncher workerLauncher2 =
80        new WorkerLauncher(10, null, null, null);
81  
82      assertFalse(workerLauncher2.allFinished());
83  
84      workerLauncher2.destroyAllWorkers();
85      workerLauncher2.shutdown();
86    }
87  
88    @Test public void testStartSomeProcesses() throws Exception {
89  
90      final MyWorkerFactory myProcessFactory = new MyWorkerFactory();
91  
92      final WorkerLauncher workerLauncher =
93        new WorkerLauncher(5, myProcessFactory, m_condition, m_logger);
94  
95      m_condition.waitFor(workerLauncher);
96      assertFalse(m_condition.isFinished());
97  
98      assertEquals(0, myProcessFactory.getNumberOfProcesses());
99  
100     workerLauncher.startSomeWorkers(1);
101 
102     assertEquals(1, myProcessFactory.getNumberOfProcesses());
103 
104     assertFalse(workerLauncher.allFinished());
105     assertEquals(System.out, myProcessFactory.getLastOutputStream());
106     assertEquals(System.err, myProcessFactory.getLastErrorStream());
107 
108     assertEquals(1, myProcessFactory.getChildProcesses().size());
109 
110     verify(m_logger).info("worker process-0 started");
111 
112     workerLauncher.startSomeWorkers(10);
113     assertEquals(5, myProcessFactory.getNumberOfProcesses());
114 
115     verify(m_logger).info("worker process-1 started");
116     verify(m_logger).info("worker process-2 started");
117     verify(m_logger).info("worker process-3 started");
118     verify(m_logger).info("worker process-4 started");
119 
120     assertEquals(5, myProcessFactory.getChildProcesses().size());
121 
122     assertFalse(workerLauncher.allFinished());
123 
124     final Worker[] processes =
125       myProcessFactory.getChildProcesses().toArray(new Worker[0]);
126 
127     sendTerminationMessage(processes[0]);
128     sendTerminationMessage(processes[2]);
129 
130     assertFalse(workerLauncher.allFinished());
131     assertFalse(m_condition.isFinished());
132 
133     sendTerminationMessage(processes[1]);
134     sendTerminationMessage(processes[3]);
135     sendTerminationMessage(processes[4]);
136 
137     // Can't be bothered to add another layer of synchronisation, just
138     // spin.
139     while (!m_condition.isFinished()) {
140       Thread.sleep(20);
141     }
142 
143     assertTrue(workerLauncher.allFinished());
144     workerLauncher.shutdown();
145   }
146 
147   private void sendTerminationMessage(Worker process) {
148     final PrintWriter processStdin =
149       new PrintWriter(process.getCommunicationStream());
150 
151     processStdin.print("Foo\n");
152     processStdin.flush();
153   }
154 
155   @Test public void testStartAllProcesses() throws Exception {
156 
157     final MyWorkerFactory myProcessFactory = new MyWorkerFactory();
158 
159     final WorkerLauncher workerLauncher =
160       new WorkerLauncher(9, myProcessFactory, m_condition, m_logger);
161 
162     m_condition.waitFor(workerLauncher);
163     assertFalse(m_condition.isFinished());
164 
165     assertEquals(0, myProcessFactory.getNumberOfProcesses());
166 
167     workerLauncher.startAllWorkers();
168 
169     assertEquals(9, myProcessFactory.getNumberOfProcesses());
170 
171     assertFalse(workerLauncher.allFinished());
172     assertEquals(System.out, myProcessFactory.getLastOutputStream());
173     assertEquals(System.err, myProcessFactory.getLastErrorStream());
174 
175     assertEquals(9, myProcessFactory.getChildProcesses().size());
176 
177     final Worker[] processes =
178       myProcessFactory.getChildProcesses().toArray(new Worker[0]);
179 
180     sendTerminationMessage(processes[0]);
181     sendTerminationMessage(processes[6]);
182     sendTerminationMessage(processes[5]);
183     sendTerminationMessage(processes[2]);
184     sendTerminationMessage(processes[7]);
185 
186     assertFalse(workerLauncher.allFinished());
187     assertFalse(m_condition.isFinished());
188 
189     sendTerminationMessage(processes[8]);
190     sendTerminationMessage(processes[1]);
191     sendTerminationMessage(processes[3]);
192     sendTerminationMessage(processes[4]);
193 
194     // Can't be bothered to add another layer of synchronisation, just
195     // spin.
196     while (!m_condition.isFinished()) {
197       Thread.sleep(20);
198     }
199 
200     assertTrue(workerLauncher.allFinished());
201     workerLauncher.shutdown();
202   }
203 
204   @Test public void testDestroyAllProcesses() throws Exception {
205 
206     final MyWorkerFactory myProcessFactory = new MyWorkerFactory();
207 
208     final WorkerLauncher workerLauncher =
209       new WorkerLauncher(4, myProcessFactory, m_condition, m_logger);
210 
211     m_condition.waitFor(workerLauncher);
212     assertFalse(m_condition.isFinished());
213 
214     assertEquals(0, myProcessFactory.getNumberOfProcesses());
215 
216     final RedirectStandardStreams redirectStreams =
217       new RedirectStandardStreams() {
218         protected void runWithRedirectedStreams() throws Exception {
219           workerLauncher.startAllWorkers();
220         }
221       };
222 
223     redirectStreams.run();
224 
225     assertEquals(4, myProcessFactory.getNumberOfProcesses());
226 
227     assertFalse(workerLauncher.allFinished());
228     assertEquals(4, myProcessFactory.getChildProcesses().size());
229 
230     final Worker[] processes =
231       myProcessFactory.getChildProcesses().toArray(new Worker[0]);
232 
233     sendTerminationMessage(processes[1]);
234     sendTerminationMessage(processes[3]);
235 
236     assertFalse(workerLauncher.allFinished());
237     assertFalse(m_condition.isFinished());
238 
239     workerLauncher.destroyAllWorkers();
240 
241     // Can't be bothered to add another layer of synchronisation, just
242     // spin.
243     while (!m_condition.isFinished()) {
244       Thread.sleep(20);
245     }
246 
247     assertTrue(workerLauncher.allFinished());
248     workerLauncher.shutdown();
249   }
250 
251   @Test public void testInteruptedWaitFor() throws Exception {
252 
253     final WorkerIdentity workerIdentity =
254       new AgentIdentityImplementation("test").createWorkerIdentity();
255 
256     final WorkerFactory workerFactory = mock(WorkerFactory.class);
257     final Worker worker = mock(Worker.class);
258     when(worker.getIdentity()).thenReturn(workerIdentity);
259 
260     when(workerFactory.create(isA(OutputStream.class),
261                               isA(OutputStream.class))).thenReturn(worker);
262 
263     final WorkerLauncher workerLauncher =
264       new WorkerLauncher(1, workerFactory, m_condition, m_logger);
265 
266     doThrow(new UncheckedInterruptedException(new InterruptedException()))
267       .when(worker).waitFor();
268 
269     workerLauncher.startAllWorkers();
270 
271     verify(worker).getIdentity();
272     verify(worker, timeout(1000)).waitFor();
273     verify(worker, timeout(1000)).destroy();
274 
275     verifyNoMoreInteractions(worker);
276   }
277 
278   @Test public void testRejectedExecution() throws Exception {
279 
280     final WorkerIdentity workerIdentity =
281       new AgentIdentityImplementation("test").createWorkerIdentity();
282 
283     final WorkerFactory workerFactory = mock(WorkerFactory.class);
284     final Worker worker = mock(Worker.class);
285     when(worker.getIdentity()).thenReturn(workerIdentity);
286 
287     when(workerFactory.create(isA(OutputStream.class),
288                               isA(OutputStream.class))).thenReturn(worker);
289 
290     final ExecutorService executor = mock(ExecutorService.class);
291 
292     final WorkerLauncher workerLauncher =
293       new WorkerLauncher(executor, 1, workerFactory, m_condition, m_logger);
294 
295     doThrow(new RejectedExecutionException())
296       .when(executor).execute(isA(Runnable.class));
297 
298     final boolean result = workerLauncher.startSomeWorkers(1);
299     assertFalse(result);
300 
301     verify(m_logger).error(eq("Failed to wait for test-0"),
302                            isA(Exception.class));
303   }
304 
305   private static class MyCondition extends Condition {
306     private boolean m_finished;
307 
308     public synchronized void waitFor(final WorkerLauncher workerLauncher) {
309 
310       m_finished = false;
311 
312       new Thread() {
313         public void run() {
314           try {
315             synchronized (MyCondition.this) {
316               while (!workerLauncher.allFinished()) {
317                 MyCondition.this.wait();
318               }
319             }
320 
321             m_finished = true;
322           }
323           catch (InterruptedException e) {
324           }
325         }
326       }.start();
327     }
328 
329     public boolean isFinished() {
330       return m_finished;
331     }
332   }
333 
334   private static class MyWorkerFactory implements WorkerFactory {
335 
336     private int m_numberOfProcesses = 0;
337     private OutputStream m_lastOutputStream;
338     private OutputStream m_lastErrorStream;
339     private ArrayList<Worker> m_childProcesses = new ArrayList<Worker>();
340     private StubAgentIdentity m_agentIdentity =
341       new StubAgentIdentity("process");
342 
343     public Worker create(OutputStream outputStream, OutputStream errorStream)
344       throws EngineException {
345 
346       m_lastOutputStream = outputStream;
347       m_lastErrorStream = errorStream;
348 
349       final CommandLine commandLine =
350         new MyCommandLine("java",
351                           "-classpath",
352                           s_testClasspath,
353                           EchoClass.class.getName());
354 
355       final Worker childProcess =
356         new ProcessWorker(m_agentIdentity.createWorkerIdentity(),
357                           commandLine,
358                           outputStream,
359                           errorStream);
360 
361       ++m_numberOfProcesses;
362       m_childProcesses.add(childProcess);
363 
364       return childProcess;
365     }
366 
367     public int getNumberOfProcesses() {
368       return m_numberOfProcesses;
369     }
370 
371     public OutputStream getLastOutputStream() {
372       return m_lastOutputStream;
373     }
374 
375     public OutputStream getLastErrorStream() {
376       return m_lastErrorStream;
377     }
378 
379     public ArrayList<Worker> getChildProcesses() {
380       return m_childProcesses;
381     }
382   }
383 }