View Javadoc

1   // Copyright (C) 2004 - 2012 Philip Aston
2   // All rights reserved.
3   //
4   // This file is part of The Grinder software distribution. Refer to
5   // the file LICENSE which is part of The Grinder distribution for
6   // licensing details. The Grinder distribution is available on the
7   // Internet at http://grinder.sourceforge.net/
8   //
9   // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
10  // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
11  // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
12  // FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
13  // COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
14  // INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
15  // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
16  // SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
17  // HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
18  // STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
19  // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
20  // OF THE POSSIBILITY OF SUCH DAMAGE.
21  
22  package net.grinder.console.communication;
23  
24  import static net.grinder.testutility.FileUtilities.createRandomFile;
25  import static net.grinder.testutility.SocketUtilities.findFreePort;
26  import static org.junit.Assert.assertEquals;
27  import static org.junit.Assert.assertTrue;
28  import static org.mockito.Matchers.isA;
29  import static org.mockito.Mockito.doAnswer;
30  import static org.mockito.Mockito.mock;
31  import static org.mockito.Mockito.timeout;
32  import static org.mockito.Mockito.times;
33  import static org.mockito.Mockito.verify;
34  import static org.mockito.Mockito.verifyNoMoreInteractions;
35  
36  import java.io.File;
37  import java.io.ObjectInputStream;
38  import java.io.ObjectOutputStream;
39  import java.net.InetAddress;
40  import java.net.ServerSocket;
41  import java.net.Socket;
42  import java.util.TimerTask;
43  import java.util.concurrent.CountDownLatch;
44  import java.util.concurrent.TimeUnit;
45  
46  import net.grinder.common.GrinderProperties;
47  import net.grinder.common.UncheckedInterruptedException;
48  import net.grinder.common.processidentity.AgentIdentity;
49  import net.grinder.common.processidentity.ProcessReport;
50  import net.grinder.common.processidentity.ProcessReport.State;
51  import net.grinder.communication.Address;
52  import net.grinder.communication.CommunicationException;
53  import net.grinder.communication.ConnectionType;
54  import net.grinder.communication.Message;
55  import net.grinder.communication.MessageDispatchRegistry.Handler;
56  import net.grinder.communication.SendToEveryoneAddress;
57  import net.grinder.communication.StreamSender;
58  import net.grinder.communication.StubConnector;
59  import net.grinder.console.common.DisplayMessageConsoleException;
60  import net.grinder.console.common.ErrorHandler;
61  import net.grinder.console.common.Resources;
62  import net.grinder.console.common.ResourcesImplementation;
63  import net.grinder.console.communication.ProcessControl.ProcessReports;
64  import net.grinder.console.model.ConsoleProperties;
65  import net.grinder.engine.agent.StubAgentIdentity;
66  import net.grinder.messages.agent.CacheHighWaterMark;
67  import net.grinder.messages.agent.ClearCacheMessage;
68  import net.grinder.messages.agent.DistributeFileMessage;
69  import net.grinder.messages.agent.DistributionCacheCheckpointMessage;
70  import net.grinder.messages.agent.ResetGrinderMessage;
71  import net.grinder.messages.agent.StartGrinderMessage;
72  import net.grinder.messages.agent.StopGrinderMessage;
73  import net.grinder.messages.agent.StubCacheHighWaterMark;
74  import net.grinder.messages.console.AgentAddress;
75  import net.grinder.messages.console.AgentProcessReportMessage;
76  import net.grinder.messages.console.WorkerAddress;
77  import net.grinder.messages.console.WorkerProcessReportMessage;
78  import net.grinder.testutility.AbstractJUnit4FileTestCase;
79  import net.grinder.testutility.StubTimer;
80  import net.grinder.util.FileContents;
81  import net.grinder.util.StandardTimeAuthority;
82  import net.grinder.util.TimeAuthority;
83  
84  import org.junit.Before;
85  import org.junit.Test;
86  import org.mockito.Mock;
87  import org.mockito.MockitoAnnotations;
88  import org.mockito.invocation.InvocationOnMock;
89  import org.mockito.stubbing.Answer;
90  
91  
92  /**
93   * Unit test case for {@link ConsoleCommunicationImplementation}. Also tests
94   * {@link ProcessControlImplementation} and
95   * {@link DistributionControlImplementation}.
96   *
97   * @author Philip Aston
98   */
99  public class TestConsoleCommunicationImplementation
100   extends AbstractJUnit4FileTestCase {
101 
102   private static final Resources s_resources =
103       new ResourcesImplementation(
104         "net.grinder.console.common.resources.Console");
105 
106   private @Mock ErrorHandler m_errorHandler;
107   private @Mock Handler<Message> m_messageHandler;
108   private final TimeAuthority m_timeAuthority = new StandardTimeAuthority();
109 
110   private ConsoleCommunicationImplementation m_consoleCommunication;
111   private ConsoleProperties m_properties;
112   private ServerSocket m_usedServerSocket;
113   private final ProcessMessagesThread m_processMessagesThread =
114     new ProcessMessagesThread();
115   private StubTimer m_timer;
116 
117   @Before public void setUp() throws Exception {
118     MockitoAnnotations.initMocks(this);
119 
120     m_timer = new StubTimer();
121 
122     m_usedServerSocket = new ServerSocket(0, 50, InetAddress.getByName(null));
123 
124     final File file = new File(getDirectory(), "properties");
125     m_properties = new ConsoleProperties(s_resources, file);
126 
127     m_properties.setConsolePort(findFreePort());
128 
129     m_consoleCommunication =
130       new ConsoleCommunicationImplementation(s_resources,
131                                              m_properties,
132                                              m_errorHandler,
133                                              m_timeAuthority,
134                                              10,
135                                              10000);
136   }
137 
138   @Override
139   public void tearDown() throws Exception {
140     super.tearDown();
141 
142     m_consoleCommunication.shutdown();
143 
144     m_processMessagesThread.interrupt();
145     m_processMessagesThread.join();
146 
147     m_timer.cancel();
148 
149     m_usedServerSocket.close();
150 
151     waitForNumberOfConnections(0);
152   }
153 
154   @Test public void testConstruction() throws Exception {
155     // Need a thread to be attempting to process messages or
156     // ConsoleCommunicationImplementation.reset() will not complete.
157     m_processMessagesThread.start();
158 
159     // Cause the sender to be invalid.
160     m_properties.setConsolePort(m_usedServerSocket.getLocalPort());
161 
162     final ConsoleCommunicationImplementation consoleCommunication =
163       new ConsoleCommunicationImplementation(s_resources,
164                                              m_properties,
165                                              m_errorHandler,
166                                              m_timeAuthority,
167                                              500,
168                                              10000);
169 
170     assertEquals(0, consoleCommunication.getNumberOfConnections());
171 
172     final ConsoleCommunicationImplementation consoleCommunication2 =
173       new ConsoleCommunicationImplementation(s_resources,
174                                              m_properties,
175                                              m_errorHandler,
176                                              m_timeAuthority);
177 
178     assertEquals(0, consoleCommunication2.getNumberOfConnections());
179   }
180 
181   @Test public void testShutdown() throws Exception {
182 
183     m_processMessagesThread.start();
184 
185     new StubConnector(InetAddress.getByName(null).getHostName(),
186                       m_properties.getConsolePort(),
187                       ConnectionType.AGENT)
188       .connect();
189 
190     waitForNumberOfConnections(1);
191 
192     m_consoleCommunication.shutdown();
193 
194     waitForNumberOfConnections(0);
195   }
196 
197   private Message readMessage(Socket socket) throws Exception {
198     final ObjectInputStream objectStream =
199       new ObjectInputStream(socket.getInputStream());
200 
201     return (Message)objectStream.readObject();
202   }
203 
204   private void sendMessage(Socket socket, Message message) throws Exception {
205     final ObjectOutputStream objectStream =
206       new ObjectOutputStream(socket.getOutputStream());
207 
208     objectStream.writeObject(message);
209     objectStream.flush();
210   }
211 
212   @Test public void testWithProcessControl() throws Exception {
213     // We need to associate the agent id with the connection or we'll never
214     // get a start message.
215     final AgentIdentity agentIdentity = new StubAgentIdentity("foo");
216 
217     final Socket socket =
218       new StubConnector(InetAddress.getByName(null).getHostName(),
219                         m_properties.getConsolePort(),
220                         ConnectionType.AGENT)
221       .connect(new AgentAddress(agentIdentity));
222 
223     waitForNumberOfConnections(1);
224 
225     final ProcessControl processControl =
226       new ProcessControlImplementation(m_timer,
227                                        m_consoleCommunication,
228                                        s_resources);
229 
230     final CacheHighWaterMark cacheHighWaterMark =
231       new StubCacheHighWaterMark("cache", 100);
232 
233     final ProcessControl.Listener listener =
234       mock(ProcessControl.Listener.class);
235 
236     processControl.addProcessStatusListener(listener);
237     processControl.resetWorkerProcesses();
238     processControl.stopAgentAndWorkerProcesses();
239 
240     assertTrue(readMessage(socket) instanceof ResetGrinderMessage);
241     assertTrue(readMessage(socket) instanceof StopGrinderMessage);
242 
243     final GrinderProperties properties = new GrinderProperties();
244     properties.setProperty("foo", "bah");
245 
246     // Need a thread to be attempting to process messages.
247     m_processMessagesThread.start();
248 
249     final AgentProcessReportMessage message =
250       new AgentProcessReportMessage(ProcessReport.State.RUNNING,
251                                     cacheHighWaterMark);
252 
253     message.setAddress(new AgentAddress(agentIdentity));
254 
255     new StreamSender(socket.getOutputStream()).send(message);
256 
257     final CountDownLatch listenerCalledLatch = new CountDownLatch(1);
258 
259     doAnswer(new Answer<Void>() {
260         @Override
261         public Void answer(InvocationOnMock invocation) {
262           listenerCalledLatch.countDown();
263           return null;
264         }
265       }).when(listener).update(isA(ProcessReports[].class));
266 
267     // Repeatedly run update task until it notifies listener that the message
268     // has been received.
269     final TimerTask processStatusUpdateTask = m_timer.getTaskByPeriod(500L);
270 
271     do {
272       processStatusUpdateTask.run();
273     }
274     while (!listenerCalledLatch.await(10, TimeUnit.MILLISECONDS));
275 
276     verify(listener).update(isA(ProcessReports[].class));
277     verifyNoMoreInteractions(listener);
278 
279     processControl.startWorkerProcesses(properties);
280     final StartGrinderMessage startGrinderMessage =
281       (StartGrinderMessage)readMessage(socket);
282 
283     assertEquals(properties, startGrinderMessage.getProperties());
284     assertEquals(0, startGrinderMessage.getAgentNumber());
285 
286     processControl.startWorkerProcesses(new GrinderProperties());
287     final StartGrinderMessage startGrinderMessage2 =
288       (StartGrinderMessage)readMessage(socket);
289     assertEquals(0, startGrinderMessage2.getProperties().size());
290 
291     // This shouldn't call reset. If it does, we'll block because
292     // nothing's processing the messages.
293     m_properties.setIgnoreSampleCount(99);
294 
295     // Reset by changing properties and do another test.
296     m_properties.setConsolePort(findFreePort());
297 
298     // Changing the port drops the existing connections.
299     waitForNumberOfConnections(0);
300 
301     final Socket socket2 =
302       new StubConnector(InetAddress.getByName(null).getHostName(),
303                         m_properties.getConsolePort(),
304                         ConnectionType.AGENT)
305       .connect();
306 
307     // Make sure something is listening to our new connection.
308     waitForNumberOfConnections(1);
309 
310     processControl.resetWorkerProcesses();
311 
312     assertTrue(readMessage(socket2) instanceof ResetGrinderMessage);
313   }
314 
315   @Test public void testDistributionControl() throws Exception {
316     final Socket socket =
317       new StubConnector(InetAddress.getByName(null).getHostName(),
318                         m_properties.getConsolePort(),
319                         ConnectionType.AGENT)
320       .connect();
321 
322     final DistributionControl distributionControl =
323       new DistributionControlImplementation(m_consoleCommunication);
324 
325     final Socket socket2 =
326       new StubConnector(InetAddress.getByName(null).getHostName(),
327                         m_properties.getConsolePort(),
328                         ConnectionType.AGENT)
329       .connect();
330 
331     waitForNumberOfConnections(2);
332 
333     socket2.close();
334 
335     final Address address = new SendToEveryoneAddress();
336 
337     // Closing the socket isn't enough for the ConsoleCommunication's Sender to
338     // know we've gone (and so close its end of the connection); we need to send
339     // something too.
340     // Sadly it appears we sometimes need to chuck more than one message the
341     // socket before it figures out the other end is stuffed.
342     int n = 0;
343 
344     while (m_consoleCommunication.getNumberOfConnections() != 1) {
345       distributionControl.clearFileCaches(address);
346       ++n;
347       assertTrue(n < 10);
348     }
349 
350     for (int i = 0; i < n; ++i) {
351       assertTrue(readMessage(socket) instanceof ClearCacheMessage);
352     }
353 
354     final File relativePath = new File("foo");
355     final File fullPath = new File(getDirectory(), relativePath.getPath());
356     createRandomFile(fullPath);
357 
358     final FileContents fileContents = new FileContents(getDirectory(),
359       relativePath);
360 
361     distributionControl.sendFile(address, fileContents);
362 
363     assertTrue(readMessage(socket) instanceof DistributeFileMessage);
364     socket.close();
365 
366     // Need a thread to be attempting to process messages or
367     // ConsoleCommunicationImplementation.reset() will not complete.
368     m_processMessagesThread.start();
369 
370     // Reset by changing properties and do another test.
371     m_properties.setConsoleHost("localhost");
372 
373     // Reseting the properties should ditch the existing connections.
374     waitForNumberOfConnections(0);
375 
376     final Socket socket3 =
377       new StubConnector(InetAddress.getByName(null).getHostName(),
378                         m_properties.getConsolePort(),
379                         ConnectionType.AGENT)
380       .connect();
381 
382     waitForNumberOfConnections(1);
383 
384     distributionControl.clearFileCaches(address);
385     assertTrue(readMessage(socket3) instanceof ClearCacheMessage);
386 
387     distributionControl.setHighWaterMark(address,
388                                          new StubCacheHighWaterMark("", 100));
389     assertTrue(
390       readMessage(socket3) instanceof DistributionCacheCheckpointMessage);
391   }
392 
393   /**
394    * Connections are accepted by separate threads so we need to spin a while.
395    * @param n - Wait until there are this number of accepted connections.
396    * @throws InterruptedException
397    */
398   private void waitForNumberOfConnections(int n) throws InterruptedException {
399     for (int retry = 0;
400          m_consoleCommunication.getNumberOfConnections() != n && retry < 200;
401          ++retry) {
402       Thread.sleep(10);
403     }
404 
405     assertEquals(n, m_consoleCommunication.getNumberOfConnections());
406   }
407 
408   @Test public void testProcessOneMessage() throws Exception {
409     m_consoleCommunication.getMessageDispatchRegistry()
410       .addFallback(m_messageHandler);
411 
412     m_processMessagesThread.start();
413 
414     final ProcessControl processControl =
415       new ProcessControlImplementation(m_timer,
416                                        m_consoleCommunication,
417                                        s_resources);
418 
419     assertEquals(0, processControl.getNumberOfLiveAgents());
420 
421     final StubAgentIdentity agentIdentity = new StubAgentIdentity("agent");
422 
423     final Socket agentSocket =
424       new StubConnector(InetAddress.getByName(null).getHostName(),
425                         m_properties.getConsolePort(),
426                         ConnectionType.AGENT)
427       .connect(new AgentAddress(agentIdentity));
428 
429     final AgentProcessReportMessage agentMessage =
430       new AgentProcessReportMessage(State.STARTED, null);
431 
432     sendMessage(agentSocket, agentMessage);
433 
434     final WorkerProcessReportMessage message =
435       new WorkerProcessReportMessage(State.STARTED,
436                                      (short)0,
437                                      (short)0);
438 
439     final Socket workerSocket =
440       new StubConnector(InetAddress.getByName(null).getHostName(),
441                         m_properties.getConsolePort(),
442                         ConnectionType.WORKER)
443       .connect(new WorkerAddress(agentIdentity.createWorkerIdentity()));
444 
445 
446     sendMessage(workerSocket, message);
447 
448     sendMessage(workerSocket, new MyMessage());
449 
450     // Message instance different due to serialisation.
451     verify(m_messageHandler, timeout(10000)).handle(isA(MyMessage.class));
452 
453     assertEquals(1, processControl.getNumberOfLiveAgents());
454 
455     // ConsoleCommunication should have handled the original
456     // AgentProcessReportMessage and WorkerProcessReportMessage. We check here
457     // so we're sure the've been processed.
458 
459     sendMessage(workerSocket, new StopGrinderMessage());
460 
461     verify(m_messageHandler, timeout(10000))
462       .handle(isA(StopGrinderMessage.class));
463 
464     verifyNoMoreInteractions(m_messageHandler);
465   }
466 
467   @Test public void testSendExceptions() throws Exception {
468     // Need a thread to be attempting to process messages or
469     // ConsoleCommunicationImplementation.reset() will not complete.
470     m_processMessagesThread.start();
471 
472     // Cause the sender to be invalid.
473     m_properties.setConsolePort(m_usedServerSocket.getLocalPort());
474 
475     verify(m_errorHandler)
476       .handleException(isA(DisplayMessageConsoleException.class));
477 
478     m_consoleCommunication.sendToAddressedAgents(
479       new AgentAddress(new StubAgentIdentity("agent")), new MyMessage());
480 
481     verify(m_errorHandler, times(2))
482       .handleException(isA(DisplayMessageConsoleException.class));
483 
484     m_consoleCommunication.sendToAgents(new MyMessage());
485     verify(m_errorHandler, times(3))
486       .handleException(isA(DisplayMessageConsoleException.class));
487 
488 
489     m_properties.setConsolePort(m_usedServerSocket.getLocalPort());
490     final ConsoleCommunication brokenConsoleCommunication =
491       new ConsoleCommunicationImplementation(s_resources,
492                                              m_properties,
493                                              m_errorHandler,
494                                              m_timeAuthority,
495                                              100,
496                                              10000);
497 
498     verify(m_errorHandler, times(4))
499       .handleException(isA(DisplayMessageConsoleException.class));
500 
501 
502     brokenConsoleCommunication.sendToAddressedAgents(
503       new AgentAddress(new StubAgentIdentity("agent")), new MyMessage());
504     verify(m_errorHandler).handleErrorMessage(isA(String.class));
505 
506     brokenConsoleCommunication.sendToAgents(new MyMessage());
507     verify(m_errorHandler, times(2)).handleErrorMessage(isA(String.class));
508 
509     verifyNoMoreInteractions(m_errorHandler);
510   }
511 
512   @Test public void testErrorHandling() throws Exception {
513     // Need a thread to be attempting to process messages or the
514     // receiver will never be shutdown correctly.
515     m_processMessagesThread.start();
516 
517     m_properties.setConsolePort(m_usedServerSocket.getLocalPort());
518 
519     verify(m_errorHandler)
520       .handleException(isA(DisplayMessageConsoleException.class));
521 
522     final Address address = new SendToEveryoneAddress();
523 
524     new DistributionControlImplementation(m_consoleCommunication)
525     .clearFileCaches(address);
526 
527     verify(m_errorHandler, times(2))
528       .handleException(isA(DisplayMessageConsoleException.class));
529     verifyNoMoreInteractions(m_errorHandler);
530 
531     final ErrorHandler errorHandler2 = mock(ErrorHandler.class);
532 
533     // Test a ConsoleCommunication with an invalid Sender.
534     m_properties.setConsolePort(m_usedServerSocket.getLocalPort());
535     final ConsoleCommunication brokenConsoleCommunication =
536       new ConsoleCommunicationImplementation(s_resources,
537                                              m_properties,
538                                              errorHandler2,
539                                              m_timeAuthority,
540                                              100,
541                                              10000);
542 
543     verify(errorHandler2)
544       .handleException(isA(DisplayMessageConsoleException.class));
545 
546     new DistributionControlImplementation(brokenConsoleCommunication)
547     .clearFileCaches(address);
548 
549     verify(errorHandler2).handleErrorMessage(isA(String.class));
550     verifyNoMoreInteractions(errorHandler2);
551   }
552 
553   @Test public void testErrorHandlingWithFurtherCommunicationProblems()
554     throws Exception {
555 
556     final ServerSocket freeServerSocket = new ServerSocket(0);
557     freeServerSocket.close();
558 
559     // Need a thread to be attempting to process messages or
560     // ConsoleCommunicationImplementation.reset() will not complete.
561     m_processMessagesThread.start();
562 
563     m_properties.setConsolePort(freeServerSocket.getLocalPort());
564 
565     final Socket socket = new Socket(freeServerSocket.getInetAddress(),
566                                      freeServerSocket.getLocalPort());
567 
568     socket.getOutputStream().close();
569 
570     // Will be called via the Acceptor problem listener.
571     verify(m_errorHandler, timeout(1000))
572       .handleException(isA(CommunicationException.class));
573 
574     final Socket socket2 =
575       new StubConnector(InetAddress.getByName(null).getHostName(),
576                         m_properties.getConsolePort(),
577                         ConnectionType.AGENT)
578       .connect();
579 
580     socket2.getOutputStream().write(new byte[100]);
581 
582     verify(m_errorHandler, timeout(1000).times(2))
583       .handleException(isA(CommunicationException.class));
584 
585     socket.close();
586     socket2.close();
587 
588     verifyNoMoreInteractions(m_errorHandler);
589   }
590 
591   private static final class MyMessage implements Message {
592     private static final long serialVersionUID = 1L;
593   }
594 
595   private final class ProcessMessagesThread extends Thread {
596     public ProcessMessagesThread() {
597       super("Process messages");
598     }
599 
600     @Override
601     public void run() {
602       try {
603         while (m_consoleCommunication.processOneMessage()) { }
604       }
605       catch (final UncheckedInterruptedException e) {
606         // Time to go.
607       }
608     }
609   }
610 }