1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package net.grinder.console.communication;
23
24 import static net.grinder.testutility.FileUtilities.createRandomFile;
25 import static net.grinder.testutility.SocketUtilities.findFreePort;
26 import static org.junit.Assert.assertEquals;
27 import static org.junit.Assert.assertTrue;
28 import static org.mockito.Matchers.isA;
29 import static org.mockito.Mockito.doAnswer;
30 import static org.mockito.Mockito.mock;
31 import static org.mockito.Mockito.timeout;
32 import static org.mockito.Mockito.times;
33 import static org.mockito.Mockito.verify;
34 import static org.mockito.Mockito.verifyNoMoreInteractions;
35
36 import java.io.File;
37 import java.io.ObjectInputStream;
38 import java.io.ObjectOutputStream;
39 import java.net.InetAddress;
40 import java.net.ServerSocket;
41 import java.net.Socket;
42 import java.util.TimerTask;
43 import java.util.concurrent.CountDownLatch;
44 import java.util.concurrent.TimeUnit;
45
46 import net.grinder.common.GrinderProperties;
47 import net.grinder.common.UncheckedInterruptedException;
48 import net.grinder.common.processidentity.AgentIdentity;
49 import net.grinder.common.processidentity.ProcessReport;
50 import net.grinder.common.processidentity.ProcessReport.State;
51 import net.grinder.communication.Address;
52 import net.grinder.communication.CommunicationException;
53 import net.grinder.communication.ConnectionType;
54 import net.grinder.communication.Message;
55 import net.grinder.communication.MessageDispatchRegistry.Handler;
56 import net.grinder.communication.SendToEveryoneAddress;
57 import net.grinder.communication.StreamSender;
58 import net.grinder.communication.StubConnector;
59 import net.grinder.console.common.DisplayMessageConsoleException;
60 import net.grinder.console.common.ErrorHandler;
61 import net.grinder.console.common.Resources;
62 import net.grinder.console.common.ResourcesImplementation;
63 import net.grinder.console.communication.ProcessControl.ProcessReports;
64 import net.grinder.console.model.ConsoleProperties;
65 import net.grinder.engine.agent.StubAgentIdentity;
66 import net.grinder.messages.agent.CacheHighWaterMark;
67 import net.grinder.messages.agent.ClearCacheMessage;
68 import net.grinder.messages.agent.DistributeFileMessage;
69 import net.grinder.messages.agent.DistributionCacheCheckpointMessage;
70 import net.grinder.messages.agent.ResetGrinderMessage;
71 import net.grinder.messages.agent.StartGrinderMessage;
72 import net.grinder.messages.agent.StopGrinderMessage;
73 import net.grinder.messages.agent.StubCacheHighWaterMark;
74 import net.grinder.messages.console.AgentAddress;
75 import net.grinder.messages.console.AgentProcessReportMessage;
76 import net.grinder.messages.console.WorkerAddress;
77 import net.grinder.messages.console.WorkerProcessReportMessage;
78 import net.grinder.testutility.AbstractJUnit4FileTestCase;
79 import net.grinder.testutility.StubTimer;
80 import net.grinder.util.FileContents;
81 import net.grinder.util.StandardTimeAuthority;
82 import net.grinder.util.TimeAuthority;
83
84 import org.junit.Before;
85 import org.junit.Test;
86 import org.mockito.Mock;
87 import org.mockito.MockitoAnnotations;
88 import org.mockito.invocation.InvocationOnMock;
89 import org.mockito.stubbing.Answer;
90
91
92
93
94
95
96
97
98
99 public class TestConsoleCommunicationImplementation
100 extends AbstractJUnit4FileTestCase {
101
102 private static final Resources s_resources =
103 new ResourcesImplementation(
104 "net.grinder.console.common.resources.Console");
105
106 private @Mock ErrorHandler m_errorHandler;
107 private @Mock Handler<Message> m_messageHandler;
108 private final TimeAuthority m_timeAuthority = new StandardTimeAuthority();
109
110 private ConsoleCommunicationImplementation m_consoleCommunication;
111 private ConsoleProperties m_properties;
112 private ServerSocket m_usedServerSocket;
113 private final ProcessMessagesThread m_processMessagesThread =
114 new ProcessMessagesThread();
115 private StubTimer m_timer;
116
117 @Before public void setUp() throws Exception {
118 MockitoAnnotations.initMocks(this);
119
120 m_timer = new StubTimer();
121
122 m_usedServerSocket = new ServerSocket(0, 50, InetAddress.getByName(null));
123
124 final File file = new File(getDirectory(), "properties");
125 m_properties = new ConsoleProperties(s_resources, file);
126
127 m_properties.setConsolePort(findFreePort());
128
129 m_consoleCommunication =
130 new ConsoleCommunicationImplementation(s_resources,
131 m_properties,
132 m_errorHandler,
133 m_timeAuthority,
134 10,
135 10000);
136 }
137
138 @Override
139 public void tearDown() throws Exception {
140 super.tearDown();
141
142 m_consoleCommunication.shutdown();
143
144 m_processMessagesThread.interrupt();
145 m_processMessagesThread.join();
146
147 m_timer.cancel();
148
149 m_usedServerSocket.close();
150
151 waitForNumberOfConnections(0);
152 }
153
154 @Test public void testConstruction() throws Exception {
155
156
157 m_processMessagesThread.start();
158
159
160 m_properties.setConsolePort(m_usedServerSocket.getLocalPort());
161
162 final ConsoleCommunicationImplementation consoleCommunication =
163 new ConsoleCommunicationImplementation(s_resources,
164 m_properties,
165 m_errorHandler,
166 m_timeAuthority,
167 500,
168 10000);
169
170 assertEquals(0, consoleCommunication.getNumberOfConnections());
171
172 final ConsoleCommunicationImplementation consoleCommunication2 =
173 new ConsoleCommunicationImplementation(s_resources,
174 m_properties,
175 m_errorHandler,
176 m_timeAuthority);
177
178 assertEquals(0, consoleCommunication2.getNumberOfConnections());
179 }
180
181 @Test public void testShutdown() throws Exception {
182
183 m_processMessagesThread.start();
184
185 new StubConnector(InetAddress.getByName(null).getHostName(),
186 m_properties.getConsolePort(),
187 ConnectionType.AGENT)
188 .connect();
189
190 waitForNumberOfConnections(1);
191
192 m_consoleCommunication.shutdown();
193
194 waitForNumberOfConnections(0);
195 }
196
197 private Message readMessage(Socket socket) throws Exception {
198 final ObjectInputStream objectStream =
199 new ObjectInputStream(socket.getInputStream());
200
201 return (Message)objectStream.readObject();
202 }
203
204 private void sendMessage(Socket socket, Message message) throws Exception {
205 final ObjectOutputStream objectStream =
206 new ObjectOutputStream(socket.getOutputStream());
207
208 objectStream.writeObject(message);
209 objectStream.flush();
210 }
211
212 @Test public void testWithProcessControl() throws Exception {
213
214
215 final AgentIdentity agentIdentity = new StubAgentIdentity("foo");
216
217 final Socket socket =
218 new StubConnector(InetAddress.getByName(null).getHostName(),
219 m_properties.getConsolePort(),
220 ConnectionType.AGENT)
221 .connect(new AgentAddress(agentIdentity));
222
223 waitForNumberOfConnections(1);
224
225 final ProcessControl processControl =
226 new ProcessControlImplementation(m_timer,
227 m_consoleCommunication,
228 s_resources);
229
230 final CacheHighWaterMark cacheHighWaterMark =
231 new StubCacheHighWaterMark("cache", 100);
232
233 final ProcessControl.Listener listener =
234 mock(ProcessControl.Listener.class);
235
236 processControl.addProcessStatusListener(listener);
237 processControl.resetWorkerProcesses();
238 processControl.stopAgentAndWorkerProcesses();
239
240 assertTrue(readMessage(socket) instanceof ResetGrinderMessage);
241 assertTrue(readMessage(socket) instanceof StopGrinderMessage);
242
243 final GrinderProperties properties = new GrinderProperties();
244 properties.setProperty("foo", "bah");
245
246
247 m_processMessagesThread.start();
248
249 final AgentProcessReportMessage message =
250 new AgentProcessReportMessage(ProcessReport.State.RUNNING,
251 cacheHighWaterMark);
252
253 message.setAddress(new AgentAddress(agentIdentity));
254
255 new StreamSender(socket.getOutputStream()).send(message);
256
257 final CountDownLatch listenerCalledLatch = new CountDownLatch(1);
258
259 doAnswer(new Answer<Void>() {
260 @Override
261 public Void answer(InvocationOnMock invocation) {
262 listenerCalledLatch.countDown();
263 return null;
264 }
265 }).when(listener).update(isA(ProcessReports[].class));
266
267
268
269 final TimerTask processStatusUpdateTask = m_timer.getTaskByPeriod(500L);
270
271 do {
272 processStatusUpdateTask.run();
273 }
274 while (!listenerCalledLatch.await(10, TimeUnit.MILLISECONDS));
275
276 verify(listener).update(isA(ProcessReports[].class));
277 verifyNoMoreInteractions(listener);
278
279 processControl.startWorkerProcesses(properties);
280 final StartGrinderMessage startGrinderMessage =
281 (StartGrinderMessage)readMessage(socket);
282
283 assertEquals(properties, startGrinderMessage.getProperties());
284 assertEquals(0, startGrinderMessage.getAgentNumber());
285
286 processControl.startWorkerProcesses(new GrinderProperties());
287 final StartGrinderMessage startGrinderMessage2 =
288 (StartGrinderMessage)readMessage(socket);
289 assertEquals(0, startGrinderMessage2.getProperties().size());
290
291
292
293 m_properties.setIgnoreSampleCount(99);
294
295
296 m_properties.setConsolePort(findFreePort());
297
298
299 waitForNumberOfConnections(0);
300
301 final Socket socket2 =
302 new StubConnector(InetAddress.getByName(null).getHostName(),
303 m_properties.getConsolePort(),
304 ConnectionType.AGENT)
305 .connect();
306
307
308 waitForNumberOfConnections(1);
309
310 processControl.resetWorkerProcesses();
311
312 assertTrue(readMessage(socket2) instanceof ResetGrinderMessage);
313 }
314
315 @Test public void testDistributionControl() throws Exception {
316 final Socket socket =
317 new StubConnector(InetAddress.getByName(null).getHostName(),
318 m_properties.getConsolePort(),
319 ConnectionType.AGENT)
320 .connect();
321
322 final DistributionControl distributionControl =
323 new DistributionControlImplementation(m_consoleCommunication);
324
325 final Socket socket2 =
326 new StubConnector(InetAddress.getByName(null).getHostName(),
327 m_properties.getConsolePort(),
328 ConnectionType.AGENT)
329 .connect();
330
331 waitForNumberOfConnections(2);
332
333 socket2.close();
334
335 final Address address = new SendToEveryoneAddress();
336
337
338
339
340
341
342 int n = 0;
343
344 while (m_consoleCommunication.getNumberOfConnections() != 1) {
345 distributionControl.clearFileCaches(address);
346 ++n;
347 assertTrue(n < 10);
348 }
349
350 for (int i = 0; i < n; ++i) {
351 assertTrue(readMessage(socket) instanceof ClearCacheMessage);
352 }
353
354 final File relativePath = new File("foo");
355 final File fullPath = new File(getDirectory(), relativePath.getPath());
356 createRandomFile(fullPath);
357
358 final FileContents fileContents = new FileContents(getDirectory(),
359 relativePath);
360
361 distributionControl.sendFile(address, fileContents);
362
363 assertTrue(readMessage(socket) instanceof DistributeFileMessage);
364 socket.close();
365
366
367
368 m_processMessagesThread.start();
369
370
371 m_properties.setConsoleHost("localhost");
372
373
374 waitForNumberOfConnections(0);
375
376 final Socket socket3 =
377 new StubConnector(InetAddress.getByName(null).getHostName(),
378 m_properties.getConsolePort(),
379 ConnectionType.AGENT)
380 .connect();
381
382 waitForNumberOfConnections(1);
383
384 distributionControl.clearFileCaches(address);
385 assertTrue(readMessage(socket3) instanceof ClearCacheMessage);
386
387 distributionControl.setHighWaterMark(address,
388 new StubCacheHighWaterMark("", 100));
389 assertTrue(
390 readMessage(socket3) instanceof DistributionCacheCheckpointMessage);
391 }
392
393
394
395
396
397
398 private void waitForNumberOfConnections(int n) throws InterruptedException {
399 for (int retry = 0;
400 m_consoleCommunication.getNumberOfConnections() != n && retry < 200;
401 ++retry) {
402 Thread.sleep(10);
403 }
404
405 assertEquals(n, m_consoleCommunication.getNumberOfConnections());
406 }
407
408 @Test public void testProcessOneMessage() throws Exception {
409 m_consoleCommunication.getMessageDispatchRegistry()
410 .addFallback(m_messageHandler);
411
412 m_processMessagesThread.start();
413
414 final ProcessControl processControl =
415 new ProcessControlImplementation(m_timer,
416 m_consoleCommunication,
417 s_resources);
418
419 assertEquals(0, processControl.getNumberOfLiveAgents());
420
421 final StubAgentIdentity agentIdentity = new StubAgentIdentity("agent");
422
423 final Socket agentSocket =
424 new StubConnector(InetAddress.getByName(null).getHostName(),
425 m_properties.getConsolePort(),
426 ConnectionType.AGENT)
427 .connect(new AgentAddress(agentIdentity));
428
429 final AgentProcessReportMessage agentMessage =
430 new AgentProcessReportMessage(State.STARTED, null);
431
432 sendMessage(agentSocket, agentMessage);
433
434 final WorkerProcessReportMessage message =
435 new WorkerProcessReportMessage(State.STARTED,
436 (short)0,
437 (short)0);
438
439 final Socket workerSocket =
440 new StubConnector(InetAddress.getByName(null).getHostName(),
441 m_properties.getConsolePort(),
442 ConnectionType.WORKER)
443 .connect(new WorkerAddress(agentIdentity.createWorkerIdentity()));
444
445
446 sendMessage(workerSocket, message);
447
448 sendMessage(workerSocket, new MyMessage());
449
450
451 verify(m_messageHandler, timeout(10000)).handle(isA(MyMessage.class));
452
453 assertEquals(1, processControl.getNumberOfLiveAgents());
454
455
456
457
458
459 sendMessage(workerSocket, new StopGrinderMessage());
460
461 verify(m_messageHandler, timeout(10000))
462 .handle(isA(StopGrinderMessage.class));
463
464 verifyNoMoreInteractions(m_messageHandler);
465 }
466
467 @Test public void testSendExceptions() throws Exception {
468
469
470 m_processMessagesThread.start();
471
472
473 m_properties.setConsolePort(m_usedServerSocket.getLocalPort());
474
475 verify(m_errorHandler)
476 .handleException(isA(DisplayMessageConsoleException.class));
477
478 m_consoleCommunication.sendToAddressedAgents(
479 new AgentAddress(new StubAgentIdentity("agent")), new MyMessage());
480
481 verify(m_errorHandler, times(2))
482 .handleException(isA(DisplayMessageConsoleException.class));
483
484 m_consoleCommunication.sendToAgents(new MyMessage());
485 verify(m_errorHandler, times(3))
486 .handleException(isA(DisplayMessageConsoleException.class));
487
488
489 m_properties.setConsolePort(m_usedServerSocket.getLocalPort());
490 final ConsoleCommunication brokenConsoleCommunication =
491 new ConsoleCommunicationImplementation(s_resources,
492 m_properties,
493 m_errorHandler,
494 m_timeAuthority,
495 100,
496 10000);
497
498 verify(m_errorHandler, times(4))
499 .handleException(isA(DisplayMessageConsoleException.class));
500
501
502 brokenConsoleCommunication.sendToAddressedAgents(
503 new AgentAddress(new StubAgentIdentity("agent")), new MyMessage());
504 verify(m_errorHandler).handleErrorMessage(isA(String.class));
505
506 brokenConsoleCommunication.sendToAgents(new MyMessage());
507 verify(m_errorHandler, times(2)).handleErrorMessage(isA(String.class));
508
509 verifyNoMoreInteractions(m_errorHandler);
510 }
511
512 @Test public void testErrorHandling() throws Exception {
513
514
515 m_processMessagesThread.start();
516
517 m_properties.setConsolePort(m_usedServerSocket.getLocalPort());
518
519 verify(m_errorHandler)
520 .handleException(isA(DisplayMessageConsoleException.class));
521
522 final Address address = new SendToEveryoneAddress();
523
524 new DistributionControlImplementation(m_consoleCommunication)
525 .clearFileCaches(address);
526
527 verify(m_errorHandler, times(2))
528 .handleException(isA(DisplayMessageConsoleException.class));
529 verifyNoMoreInteractions(m_errorHandler);
530
531 final ErrorHandler errorHandler2 = mock(ErrorHandler.class);
532
533
534 m_properties.setConsolePort(m_usedServerSocket.getLocalPort());
535 final ConsoleCommunication brokenConsoleCommunication =
536 new ConsoleCommunicationImplementation(s_resources,
537 m_properties,
538 errorHandler2,
539 m_timeAuthority,
540 100,
541 10000);
542
543 verify(errorHandler2)
544 .handleException(isA(DisplayMessageConsoleException.class));
545
546 new DistributionControlImplementation(brokenConsoleCommunication)
547 .clearFileCaches(address);
548
549 verify(errorHandler2).handleErrorMessage(isA(String.class));
550 verifyNoMoreInteractions(errorHandler2);
551 }
552
553 @Test public void testErrorHandlingWithFurtherCommunicationProblems()
554 throws Exception {
555
556 final ServerSocket freeServerSocket = new ServerSocket(0);
557 freeServerSocket.close();
558
559
560
561 m_processMessagesThread.start();
562
563 m_properties.setConsolePort(freeServerSocket.getLocalPort());
564
565 final Socket socket = new Socket(freeServerSocket.getInetAddress(),
566 freeServerSocket.getLocalPort());
567
568 socket.getOutputStream().close();
569
570
571 verify(m_errorHandler, timeout(1000))
572 .handleException(isA(CommunicationException.class));
573
574 final Socket socket2 =
575 new StubConnector(InetAddress.getByName(null).getHostName(),
576 m_properties.getConsolePort(),
577 ConnectionType.AGENT)
578 .connect();
579
580 socket2.getOutputStream().write(new byte[100]);
581
582 verify(m_errorHandler, timeout(1000).times(2))
583 .handleException(isA(CommunicationException.class));
584
585 socket.close();
586 socket2.close();
587
588 verifyNoMoreInteractions(m_errorHandler);
589 }
590
591 private static final class MyMessage implements Message {
592 private static final long serialVersionUID = 1L;
593 }
594
595 private final class ProcessMessagesThread extends Thread {
596 public ProcessMessagesThread() {
597 super("Process messages");
598 }
599
600 @Override
601 public void run() {
602 try {
603 while (m_consoleCommunication.processOneMessage()) { }
604 }
605 catch (final UncheckedInterruptedException e) {
606
607 }
608 }
609 }
610 }