1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package net.grinder.engine.agent;
23
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertFalse;
26 import static org.junit.Assert.assertTrue;
27 import static org.mockito.Matchers.eq;
28 import static org.mockito.Matchers.isA;
29 import static org.mockito.Mockito.doThrow;
30 import static org.mockito.Mockito.mock;
31 import static org.mockito.Mockito.timeout;
32 import static org.mockito.Mockito.verify;
33 import static org.mockito.Mockito.verifyNoMoreInteractions;
34 import static org.mockito.Mockito.when;
35
36 import java.io.OutputStream;
37 import java.io.PrintWriter;
38 import java.util.ArrayList;
39 import java.util.concurrent.ExecutorService;
40 import java.util.concurrent.RejectedExecutionException;
41
42 import net.grinder.common.UncheckedInterruptedException;
43 import net.grinder.common.processidentity.WorkerIdentity;
44 import net.grinder.engine.common.EngineException;
45 import net.grinder.testutility.RedirectStandardStreams;
46 import net.grinder.util.thread.Condition;
47
48 import org.junit.Before;
49 import org.junit.Test;
50 import org.mockito.Mock;
51 import org.mockito.MockitoAnnotations;
52 import org.slf4j.Logger;
53
54
55
56
57
58
59
60 public class TestWorkerLauncher {
61
62 private static final String s_testClasspath =
63 System.getProperty("java.class.path");
64
65 private final MyCondition m_condition = new MyCondition();
66 @Mock private Logger m_logger;
67
68 @Before public void setUp() {
69 MockitoAnnotations.initMocks(this);
70 }
71
72 @Test public void testConstructor() throws Exception {
73 final WorkerLauncher workerLauncher1 =
74 new WorkerLauncher(0, null, null, null);
75
76 assertTrue(workerLauncher1.allFinished());
77 workerLauncher1.shutdown();
78
79 final WorkerLauncher workerLauncher2 =
80 new WorkerLauncher(10, null, null, null);
81
82 assertFalse(workerLauncher2.allFinished());
83
84 workerLauncher2.destroyAllWorkers();
85 workerLauncher2.shutdown();
86 }
87
88 @Test public void testStartSomeProcesses() throws Exception {
89
90 final MyWorkerFactory myProcessFactory = new MyWorkerFactory();
91
92 final WorkerLauncher workerLauncher =
93 new WorkerLauncher(5, myProcessFactory, m_condition, m_logger);
94
95 m_condition.waitFor(workerLauncher);
96 assertFalse(m_condition.isFinished());
97
98 assertEquals(0, myProcessFactory.getNumberOfProcesses());
99
100 workerLauncher.startSomeWorkers(1);
101
102 assertEquals(1, myProcessFactory.getNumberOfProcesses());
103
104 assertFalse(workerLauncher.allFinished());
105 assertEquals(System.out, myProcessFactory.getLastOutputStream());
106 assertEquals(System.err, myProcessFactory.getLastErrorStream());
107
108 assertEquals(1, myProcessFactory.getChildProcesses().size());
109
110 verify(m_logger).info("worker process-0 started");
111
112 workerLauncher.startSomeWorkers(10);
113 assertEquals(5, myProcessFactory.getNumberOfProcesses());
114
115 verify(m_logger).info("worker process-1 started");
116 verify(m_logger).info("worker process-2 started");
117 verify(m_logger).info("worker process-3 started");
118 verify(m_logger).info("worker process-4 started");
119
120 assertEquals(5, myProcessFactory.getChildProcesses().size());
121
122 assertFalse(workerLauncher.allFinished());
123
124 final Worker[] processes =
125 myProcessFactory.getChildProcesses().toArray(new Worker[0]);
126
127 sendTerminationMessage(processes[0]);
128 sendTerminationMessage(processes[2]);
129
130 assertFalse(workerLauncher.allFinished());
131 assertFalse(m_condition.isFinished());
132
133 sendTerminationMessage(processes[1]);
134 sendTerminationMessage(processes[3]);
135 sendTerminationMessage(processes[4]);
136
137
138
139 while (!m_condition.isFinished()) {
140 Thread.sleep(20);
141 }
142
143 assertTrue(workerLauncher.allFinished());
144 workerLauncher.shutdown();
145 }
146
147 private void sendTerminationMessage(Worker process) {
148 final PrintWriter processStdin =
149 new PrintWriter(process.getCommunicationStream());
150
151 processStdin.print("Foo\n");
152 processStdin.flush();
153 }
154
155 @Test public void testStartAllProcesses() throws Exception {
156
157 final MyWorkerFactory myProcessFactory = new MyWorkerFactory();
158
159 final WorkerLauncher workerLauncher =
160 new WorkerLauncher(9, myProcessFactory, m_condition, m_logger);
161
162 m_condition.waitFor(workerLauncher);
163 assertFalse(m_condition.isFinished());
164
165 assertEquals(0, myProcessFactory.getNumberOfProcesses());
166
167 workerLauncher.startAllWorkers();
168
169 assertEquals(9, myProcessFactory.getNumberOfProcesses());
170
171 assertFalse(workerLauncher.allFinished());
172 assertEquals(System.out, myProcessFactory.getLastOutputStream());
173 assertEquals(System.err, myProcessFactory.getLastErrorStream());
174
175 assertEquals(9, myProcessFactory.getChildProcesses().size());
176
177 final Worker[] processes =
178 myProcessFactory.getChildProcesses().toArray(new Worker[0]);
179
180 sendTerminationMessage(processes[0]);
181 sendTerminationMessage(processes[6]);
182 sendTerminationMessage(processes[5]);
183 sendTerminationMessage(processes[2]);
184 sendTerminationMessage(processes[7]);
185
186 assertFalse(workerLauncher.allFinished());
187 assertFalse(m_condition.isFinished());
188
189 sendTerminationMessage(processes[8]);
190 sendTerminationMessage(processes[1]);
191 sendTerminationMessage(processes[3]);
192 sendTerminationMessage(processes[4]);
193
194
195
196 while (!m_condition.isFinished()) {
197 Thread.sleep(20);
198 }
199
200 assertTrue(workerLauncher.allFinished());
201 workerLauncher.shutdown();
202 }
203
204 @Test public void testDestroyAllProcesses() throws Exception {
205
206 final MyWorkerFactory myProcessFactory = new MyWorkerFactory();
207
208 final WorkerLauncher workerLauncher =
209 new WorkerLauncher(4, myProcessFactory, m_condition, m_logger);
210
211 m_condition.waitFor(workerLauncher);
212 assertFalse(m_condition.isFinished());
213
214 assertEquals(0, myProcessFactory.getNumberOfProcesses());
215
216 final RedirectStandardStreams redirectStreams =
217 new RedirectStandardStreams() {
218 protected void runWithRedirectedStreams() throws Exception {
219 workerLauncher.startAllWorkers();
220 }
221 };
222
223 redirectStreams.run();
224
225 assertEquals(4, myProcessFactory.getNumberOfProcesses());
226
227 assertFalse(workerLauncher.allFinished());
228 assertEquals(4, myProcessFactory.getChildProcesses().size());
229
230 final Worker[] processes =
231 myProcessFactory.getChildProcesses().toArray(new Worker[0]);
232
233 sendTerminationMessage(processes[1]);
234 sendTerminationMessage(processes[3]);
235
236 assertFalse(workerLauncher.allFinished());
237 assertFalse(m_condition.isFinished());
238
239 workerLauncher.destroyAllWorkers();
240
241
242
243 while (!m_condition.isFinished()) {
244 Thread.sleep(20);
245 }
246
247 assertTrue(workerLauncher.allFinished());
248 workerLauncher.shutdown();
249 }
250
251 @Test public void testInteruptedWaitFor() throws Exception {
252
253 final WorkerIdentity workerIdentity =
254 new AgentIdentityImplementation("test").createWorkerIdentity();
255
256 final WorkerFactory workerFactory = mock(WorkerFactory.class);
257 final Worker worker = mock(Worker.class);
258 when(worker.getIdentity()).thenReturn(workerIdentity);
259
260 when(workerFactory.create(isA(OutputStream.class),
261 isA(OutputStream.class))).thenReturn(worker);
262
263 final WorkerLauncher workerLauncher =
264 new WorkerLauncher(1, workerFactory, m_condition, m_logger);
265
266 doThrow(new UncheckedInterruptedException(new InterruptedException()))
267 .when(worker).waitFor();
268
269 workerLauncher.startAllWorkers();
270
271 verify(worker).getIdentity();
272 verify(worker, timeout(1000)).waitFor();
273 verify(worker, timeout(1000)).destroy();
274
275 verifyNoMoreInteractions(worker);
276 }
277
278 @Test public void testRejectedExecution() throws Exception {
279
280 final WorkerIdentity workerIdentity =
281 new AgentIdentityImplementation("test").createWorkerIdentity();
282
283 final WorkerFactory workerFactory = mock(WorkerFactory.class);
284 final Worker worker = mock(Worker.class);
285 when(worker.getIdentity()).thenReturn(workerIdentity);
286
287 when(workerFactory.create(isA(OutputStream.class),
288 isA(OutputStream.class))).thenReturn(worker);
289
290 final ExecutorService executor = mock(ExecutorService.class);
291
292 final WorkerLauncher workerLauncher =
293 new WorkerLauncher(executor, 1, workerFactory, m_condition, m_logger);
294
295 doThrow(new RejectedExecutionException())
296 .when(executor).execute(isA(Runnable.class));
297
298 final boolean result = workerLauncher.startSomeWorkers(1);
299 assertFalse(result);
300
301 verify(m_logger).error(eq("Failed to wait for test-0"),
302 isA(Exception.class));
303 }
304
305 private static class MyCondition extends Condition {
306 private boolean m_finished;
307
308 public synchronized void waitFor(final WorkerLauncher workerLauncher) {
309
310 m_finished = false;
311
312 new Thread() {
313 public void run() {
314 try {
315 synchronized (MyCondition.this) {
316 while (!workerLauncher.allFinished()) {
317 MyCondition.this.wait();
318 }
319 }
320
321 m_finished = true;
322 }
323 catch (InterruptedException e) {
324 }
325 }
326 }.start();
327 }
328
329 public boolean isFinished() {
330 return m_finished;
331 }
332 }
333
334 private static class MyWorkerFactory implements WorkerFactory {
335
336 private int m_numberOfProcesses = 0;
337 private OutputStream m_lastOutputStream;
338 private OutputStream m_lastErrorStream;
339 private ArrayList<Worker> m_childProcesses = new ArrayList<Worker>();
340 private StubAgentIdentity m_agentIdentity =
341 new StubAgentIdentity("process");
342
343 public Worker create(OutputStream outputStream, OutputStream errorStream)
344 throws EngineException {
345
346 m_lastOutputStream = outputStream;
347 m_lastErrorStream = errorStream;
348
349 final CommandLine commandLine =
350 new MyCommandLine("java",
351 "-classpath",
352 s_testClasspath,
353 EchoClass.class.getName());
354
355 final Worker childProcess =
356 new ProcessWorker(m_agentIdentity.createWorkerIdentity(),
357 commandLine,
358 outputStream,
359 errorStream);
360
361 ++m_numberOfProcesses;
362 m_childProcesses.add(childProcess);
363
364 return childProcess;
365 }
366
367 public int getNumberOfProcesses() {
368 return m_numberOfProcesses;
369 }
370
371 public OutputStream getLastOutputStream() {
372 return m_lastOutputStream;
373 }
374
375 public OutputStream getLastErrorStream() {
376 return m_lastErrorStream;
377 }
378
379 public ArrayList<Worker> getChildProcesses() {
380 return m_childProcesses;
381 }
382 }
383 }