View Javadoc

1   // Copyright (C) 2008 - 2012 Philip Aston
2   // All rights reserved.
3   //
4   // This file is part of The Grinder software distribution. Refer to
5   // the file LICENSE which is part of The Grinder distribution for
6   // licensing details. The Grinder distribution is available on the
7   // Internet at http://grinder.sourceforge.net/
8   //
9   // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
10  // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
11  // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
12  // FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
13  // COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
14  // INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
15  // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
16  // SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
17  // HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
18  // STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
19  // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
20  // OF THE POSSIBILITY OF SUCH DAMAGE.
21  
22  package net.grinder.console.synchronisation;
23  
24  import static org.junit.Assert.assertSame;
25  import static org.junit.Assert.fail;
26  import static org.mockito.Matchers.eq;
27  import static org.mockito.Mockito.doThrow;
28  import static org.mockito.Mockito.mock;
29  import static org.mockito.Mockito.times;
30  import static org.mockito.Mockito.verify;
31  import static org.mockito.Mockito.when;
32  import net.grinder.common.processidentity.ProcessReport;
33  import net.grinder.common.processidentity.WorkerIdentity;
34  import net.grinder.common.processidentity.WorkerProcessReport;
35  import net.grinder.communication.CommunicationException;
36  import net.grinder.communication.Message;
37  import net.grinder.communication.MessageDispatchRegistry;
38  import net.grinder.communication.MessageDispatchRegistry.Handler;
39  import net.grinder.console.common.processidentity.StubWorkerProcessReport;
40  import net.grinder.console.communication.ConsoleCommunication;
41  import net.grinder.console.communication.ProcessControl;
42  import net.grinder.console.communication.StubProcessReports;
43  import net.grinder.console.communication.ProcessControl.Listener;
44  import net.grinder.console.communication.ProcessControl.ProcessReports;
45  import net.grinder.engine.agent.StubAgentIdentity;
46  import net.grinder.messages.console.WorkerAddress;
47  import net.grinder.synchronisation.BarrierGroup;
48  import net.grinder.synchronisation.BarrierGroups;
49  import net.grinder.synchronisation.messages.AddBarrierMessage;
50  import net.grinder.synchronisation.messages.AddWaiterMessage;
51  import net.grinder.synchronisation.messages.BarrierIdentity;
52  import net.grinder.synchronisation.messages.CancelWaiterMessage;
53  import net.grinder.synchronisation.messages.RemoveBarriersMessage;
54  
55  import org.junit.Before;
56  import org.junit.Test;
57  import org.mockito.ArgumentCaptor;
58  import org.mockito.Captor;
59  import org.mockito.Mock;
60  import org.mockito.MockitoAnnotations;
61  
62  
63  /**
64   * Unit tests for {@link WireDistributedBarriers}.
65   *
66   * @author Philip Aston
67   */
68  public class TestWireDistributedBarriers {
69  
70    @Mock private MessageDispatchRegistry m_messageDispatchRegistry;
71    @Mock private ConsoleCommunication m_consoleCommunication;
72    @Mock private ProcessControl m_processControl;
73    @Mock private BarrierGroups m_barrierGroups;
74    @Mock private BarrierIdentity m_barrierIdentity;
75  
76    @Captor private ArgumentCaptor<Handler<Message>> m_handlerCaptor;
77    @Captor private ArgumentCaptor<Listener> m_processStatusListenerCaptor;
78  
79    @Before public void setUp() {
80      MockitoAnnotations.initMocks(this);
81  
82      when(m_consoleCommunication.getMessageDispatchRegistry())
83        .thenReturn(m_messageDispatchRegistry);
84    }
85  
86    @Test public void testBarrierMessageHandlers() throws Exception {
87  
88      final BarrierGroup barrierGroup = mock(BarrierGroup.class);
89  
90      when(m_barrierGroups.getGroup("hello")).thenReturn(barrierGroup);
91  
92      new WireDistributedBarriers(m_consoleCommunication,
93                                  m_processControl,
94                                  m_barrierGroups);
95  
96      // Add barrier.
97      verify(m_messageDispatchRegistry).set(eq(AddBarrierMessage.class),
98                                            m_handlerCaptor.capture());
99  
100     m_handlerCaptor.getValue().handle(new AddBarrierMessage("hello"));
101 
102     verify(barrierGroup).addBarrier();
103 
104     // Add waiter.
105     verify(m_messageDispatchRegistry).set(eq(AddWaiterMessage.class),
106                                           m_handlerCaptor.capture());
107 
108     m_handlerCaptor.getValue().handle(new AddWaiterMessage("hello",
109                                                            m_barrierIdentity));
110     verify(barrierGroup).addWaiter(m_barrierIdentity);
111 
112     // Cancel waiter.
113     verify(m_messageDispatchRegistry).set(eq(CancelWaiterMessage.class),
114                                           m_handlerCaptor.capture());
115 
116     m_handlerCaptor.getValue().handle(
117       new CancelWaiterMessage("hello", m_barrierIdentity));
118 
119     verify(barrierGroup).cancelWaiter(m_barrierIdentity);
120 
121     // Remove barriers.
122     verify(m_messageDispatchRegistry).set(eq(RemoveBarriersMessage.class),
123                                           m_handlerCaptor.capture());
124 
125     m_handlerCaptor.getValue().handle(new RemoveBarriersMessage("hello", 1));
126 
127     verify(barrierGroup).removeBarriers(1);
128   }
129 
130   @Test public void testBarriersCleanUp() throws Exception {
131     final BarrierGroup group1 = mock(BarrierGroup.class);
132     final BarrierGroup group2 = mock(BarrierGroup.class);
133 
134     when(m_barrierGroups.getGroup("g1")).thenReturn(group1);
135     when(m_barrierGroups.getGroup("g2")).thenReturn(group2);
136 
137     new WireDistributedBarriers(m_consoleCommunication,
138                                 m_processControl,
139                                 m_barrierGroups);
140 
141     verify(m_messageDispatchRegistry).set(eq(AddBarrierMessage.class),
142                                           m_handlerCaptor.capture());
143 
144     final Handler<Message> addBarrierHandler = m_handlerCaptor.getValue();
145 
146     verify(m_messageDispatchRegistry).set(eq(AddWaiterMessage.class),
147                                           m_handlerCaptor.capture());
148 
149     final Handler<Message> addWaiterHandler = m_handlerCaptor.getValue();
150 
151 
152     // Create a couple of barrier groups.
153     final StubAgentIdentity agent = new StubAgentIdentity("agent");
154     final WorkerIdentity worker1 = agent.createWorkerIdentity();
155     final WorkerIdentity worker2 = agent.createWorkerIdentity();
156 
157     final AddBarrierMessage message1 = new AddBarrierMessage("g1");
158     message1.setAddress(new WorkerAddress(worker1));
159     addBarrierHandler.handle(message1);
160     addBarrierHandler.handle(message1);
161 
162     final AddBarrierMessage message2 = new AddBarrierMessage("g2");
163     message2.setAddress(new WorkerAddress(worker2));
164     addBarrierHandler.handle(message2);
165 
166     final AddBarrierMessage message3 = new AddBarrierMessage("g1");
167     message3.setAddress(new WorkerAddress(worker2));
168     addBarrierHandler.handle(message3);
169 
170     final AddWaiterMessage message4 = new AddWaiterMessage("g1",
171                                                            m_barrierIdentity);
172     message4.setAddress(new WorkerAddress(worker2));
173     addWaiterHandler.handle(message4);
174 
175     verify(group1, times(3)).addBarrier();
176     verify(group1).addWaiter(m_barrierIdentity);
177 
178     verify(group2).addBarrier();
179 
180     verify(m_processControl).addProcessStatusListener(
181       m_processStatusListenerCaptor.capture());
182 
183     final Listener listener = m_processStatusListenerCaptor.getValue();
184 
185     // Worker 1 has gone away.
186     listener.update(new ProcessReports[] {
187       new StubProcessReports(null,
188         new WorkerProcessReport[] {
189           new StubWorkerProcessReport(worker2,
190                                       ProcessReport.State.RUNNING, 1, 1)
191         }),
192     });
193 
194     verify(group1).removeBarriers(2);
195 
196     // All workers have gone away.
197     listener.update(new ProcessReports[0]);
198 
199     verify(group1).cancelWaiter(m_barrierIdentity);
200     verify(group1).removeBarriers(1);
201 
202     verify(group2).removeBarriers(1);
203   }
204 
205   @Test public void testBarriersCleanUpAssertion() throws Exception {
206 
207     final CommunicationException exception = new CommunicationException("");
208 
209     final BarrierGroup group1 = mock(BarrierGroup.class);
210     doThrow(exception).when(group1).removeBarriers(1);
211 
212     when(m_barrierGroups.getGroup("g1")).thenReturn(group1);
213 
214     new WireDistributedBarriers(m_consoleCommunication,
215                                 m_processControl,
216                                 m_barrierGroups);
217 
218     verify(m_messageDispatchRegistry).set(eq(AddBarrierMessage.class),
219                                           m_handlerCaptor.capture());
220 
221     final Handler<Message> addBarrierHandler = m_handlerCaptor.getValue();
222 
223     verify(m_messageDispatchRegistry).set(eq(AddWaiterMessage.class),
224                                           m_handlerCaptor.capture());
225 
226 
227     final AddBarrierMessage message1 = new AddBarrierMessage("g1");
228     addBarrierHandler.handle(message1);
229 
230     verify(m_processControl)
231       .addProcessStatusListener(m_processStatusListenerCaptor.capture());
232 
233     final Listener listener = m_processStatusListenerCaptor.getValue();
234 
235     try {
236       listener.update(new ProcessReports[0]);
237       fail("Expected AssertionError");
238     }
239     catch (AssertionError e) {
240       assertSame(exception, e.getCause());
241     }
242   }
243 }