1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package net.grinder.console.synchronisation;
23
24 import static org.junit.Assert.assertSame;
25 import static org.junit.Assert.fail;
26 import static org.mockito.Matchers.eq;
27 import static org.mockito.Mockito.doThrow;
28 import static org.mockito.Mockito.mock;
29 import static org.mockito.Mockito.times;
30 import static org.mockito.Mockito.verify;
31 import static org.mockito.Mockito.when;
32 import net.grinder.common.processidentity.ProcessReport;
33 import net.grinder.common.processidentity.WorkerIdentity;
34 import net.grinder.common.processidentity.WorkerProcessReport;
35 import net.grinder.communication.CommunicationException;
36 import net.grinder.communication.Message;
37 import net.grinder.communication.MessageDispatchRegistry;
38 import net.grinder.communication.MessageDispatchRegistry.Handler;
39 import net.grinder.console.common.processidentity.StubWorkerProcessReport;
40 import net.grinder.console.communication.ConsoleCommunication;
41 import net.grinder.console.communication.ProcessControl;
42 import net.grinder.console.communication.StubProcessReports;
43 import net.grinder.console.communication.ProcessControl.Listener;
44 import net.grinder.console.communication.ProcessControl.ProcessReports;
45 import net.grinder.engine.agent.StubAgentIdentity;
46 import net.grinder.messages.console.WorkerAddress;
47 import net.grinder.synchronisation.BarrierGroup;
48 import net.grinder.synchronisation.BarrierGroups;
49 import net.grinder.synchronisation.messages.AddBarrierMessage;
50 import net.grinder.synchronisation.messages.AddWaiterMessage;
51 import net.grinder.synchronisation.messages.BarrierIdentity;
52 import net.grinder.synchronisation.messages.CancelWaiterMessage;
53 import net.grinder.synchronisation.messages.RemoveBarriersMessage;
54
55 import org.junit.Before;
56 import org.junit.Test;
57 import org.mockito.ArgumentCaptor;
58 import org.mockito.Captor;
59 import org.mockito.Mock;
60 import org.mockito.MockitoAnnotations;
61
62
63
64
65
66
67
68 public class TestWireDistributedBarriers {
69
70 @Mock private MessageDispatchRegistry m_messageDispatchRegistry;
71 @Mock private ConsoleCommunication m_consoleCommunication;
72 @Mock private ProcessControl m_processControl;
73 @Mock private BarrierGroups m_barrierGroups;
74 @Mock private BarrierIdentity m_barrierIdentity;
75
76 @Captor private ArgumentCaptor<Handler<Message>> m_handlerCaptor;
77 @Captor private ArgumentCaptor<Listener> m_processStatusListenerCaptor;
78
79 @Before public void setUp() {
80 MockitoAnnotations.initMocks(this);
81
82 when(m_consoleCommunication.getMessageDispatchRegistry())
83 .thenReturn(m_messageDispatchRegistry);
84 }
85
86 @Test public void testBarrierMessageHandlers() throws Exception {
87
88 final BarrierGroup barrierGroup = mock(BarrierGroup.class);
89
90 when(m_barrierGroups.getGroup("hello")).thenReturn(barrierGroup);
91
92 new WireDistributedBarriers(m_consoleCommunication,
93 m_processControl,
94 m_barrierGroups);
95
96
97 verify(m_messageDispatchRegistry).set(eq(AddBarrierMessage.class),
98 m_handlerCaptor.capture());
99
100 m_handlerCaptor.getValue().handle(new AddBarrierMessage("hello"));
101
102 verify(barrierGroup).addBarrier();
103
104
105 verify(m_messageDispatchRegistry).set(eq(AddWaiterMessage.class),
106 m_handlerCaptor.capture());
107
108 m_handlerCaptor.getValue().handle(new AddWaiterMessage("hello",
109 m_barrierIdentity));
110 verify(barrierGroup).addWaiter(m_barrierIdentity);
111
112
113 verify(m_messageDispatchRegistry).set(eq(CancelWaiterMessage.class),
114 m_handlerCaptor.capture());
115
116 m_handlerCaptor.getValue().handle(
117 new CancelWaiterMessage("hello", m_barrierIdentity));
118
119 verify(barrierGroup).cancelWaiter(m_barrierIdentity);
120
121
122 verify(m_messageDispatchRegistry).set(eq(RemoveBarriersMessage.class),
123 m_handlerCaptor.capture());
124
125 m_handlerCaptor.getValue().handle(new RemoveBarriersMessage("hello", 1));
126
127 verify(barrierGroup).removeBarriers(1);
128 }
129
130 @Test public void testBarriersCleanUp() throws Exception {
131 final BarrierGroup group1 = mock(BarrierGroup.class);
132 final BarrierGroup group2 = mock(BarrierGroup.class);
133
134 when(m_barrierGroups.getGroup("g1")).thenReturn(group1);
135 when(m_barrierGroups.getGroup("g2")).thenReturn(group2);
136
137 new WireDistributedBarriers(m_consoleCommunication,
138 m_processControl,
139 m_barrierGroups);
140
141 verify(m_messageDispatchRegistry).set(eq(AddBarrierMessage.class),
142 m_handlerCaptor.capture());
143
144 final Handler<Message> addBarrierHandler = m_handlerCaptor.getValue();
145
146 verify(m_messageDispatchRegistry).set(eq(AddWaiterMessage.class),
147 m_handlerCaptor.capture());
148
149 final Handler<Message> addWaiterHandler = m_handlerCaptor.getValue();
150
151
152
153 final StubAgentIdentity agent = new StubAgentIdentity("agent");
154 final WorkerIdentity worker1 = agent.createWorkerIdentity();
155 final WorkerIdentity worker2 = agent.createWorkerIdentity();
156
157 final AddBarrierMessage message1 = new AddBarrierMessage("g1");
158 message1.setAddress(new WorkerAddress(worker1));
159 addBarrierHandler.handle(message1);
160 addBarrierHandler.handle(message1);
161
162 final AddBarrierMessage message2 = new AddBarrierMessage("g2");
163 message2.setAddress(new WorkerAddress(worker2));
164 addBarrierHandler.handle(message2);
165
166 final AddBarrierMessage message3 = new AddBarrierMessage("g1");
167 message3.setAddress(new WorkerAddress(worker2));
168 addBarrierHandler.handle(message3);
169
170 final AddWaiterMessage message4 = new AddWaiterMessage("g1",
171 m_barrierIdentity);
172 message4.setAddress(new WorkerAddress(worker2));
173 addWaiterHandler.handle(message4);
174
175 verify(group1, times(3)).addBarrier();
176 verify(group1).addWaiter(m_barrierIdentity);
177
178 verify(group2).addBarrier();
179
180 verify(m_processControl).addProcessStatusListener(
181 m_processStatusListenerCaptor.capture());
182
183 final Listener listener = m_processStatusListenerCaptor.getValue();
184
185
186 listener.update(new ProcessReports[] {
187 new StubProcessReports(null,
188 new WorkerProcessReport[] {
189 new StubWorkerProcessReport(worker2,
190 ProcessReport.State.RUNNING, 1, 1)
191 }),
192 });
193
194 verify(group1).removeBarriers(2);
195
196
197 listener.update(new ProcessReports[0]);
198
199 verify(group1).cancelWaiter(m_barrierIdentity);
200 verify(group1).removeBarriers(1);
201
202 verify(group2).removeBarriers(1);
203 }
204
205 @Test public void testBarriersCleanUpAssertion() throws Exception {
206
207 final CommunicationException exception = new CommunicationException("");
208
209 final BarrierGroup group1 = mock(BarrierGroup.class);
210 doThrow(exception).when(group1).removeBarriers(1);
211
212 when(m_barrierGroups.getGroup("g1")).thenReturn(group1);
213
214 new WireDistributedBarriers(m_consoleCommunication,
215 m_processControl,
216 m_barrierGroups);
217
218 verify(m_messageDispatchRegistry).set(eq(AddBarrierMessage.class),
219 m_handlerCaptor.capture());
220
221 final Handler<Message> addBarrierHandler = m_handlerCaptor.getValue();
222
223 verify(m_messageDispatchRegistry).set(eq(AddWaiterMessage.class),
224 m_handlerCaptor.capture());
225
226
227 final AddBarrierMessage message1 = new AddBarrierMessage("g1");
228 addBarrierHandler.handle(message1);
229
230 verify(m_processControl)
231 .addProcessStatusListener(m_processStatusListenerCaptor.capture());
232
233 final Listener listener = m_processStatusListenerCaptor.getValue();
234
235 try {
236 listener.update(new ProcessReports[0]);
237 fail("Expected AssertionError");
238 }
239 catch (AssertionError e) {
240 assertSame(exception, e.getCause());
241 }
242 }
243 }