1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package net.grinder.console.synchronisation;
23
24 import static net.grinder.testutility.SocketUtilities.findFreePort;
25 import static org.junit.Assert.assertEquals;
26
27 import java.io.File;
28 import java.util.ArrayList;
29 import java.util.List;
30 import java.util.concurrent.Callable;
31 import java.util.concurrent.ExecutionException;
32 import java.util.concurrent.ExecutorService;
33 import java.util.concurrent.Executors;
34 import java.util.concurrent.Future;
35 import java.util.concurrent.atomic.AtomicInteger;
36
37 import net.grinder.common.UncheckedInterruptedException;
38 import net.grinder.communication.ClientReceiver;
39 import net.grinder.communication.ClientSender;
40 import net.grinder.communication.ConnectionType;
41 import net.grinder.communication.Connector;
42 import net.grinder.communication.MessageDispatchSender;
43 import net.grinder.communication.MessagePump;
44 import net.grinder.communication.Sender;
45 import net.grinder.console.common.ErrorHandler;
46 import net.grinder.console.common.Resources;
47 import net.grinder.console.communication.ConsoleCommunication;
48 import net.grinder.console.communication.ConsoleCommunicationImplementation;
49 import net.grinder.console.communication.ProcessControl;
50 import net.grinder.console.model.ConsoleProperties;
51 import net.grinder.engine.agent.StubAgentIdentity;
52 import net.grinder.messages.console.AgentAddress;
53 import net.grinder.messages.console.WorkerAddress;
54 import net.grinder.script.Barrier;
55 import net.grinder.synchronisation.BarrierGroups;
56 import net.grinder.synchronisation.BarrierIdentityGenerator;
57 import net.grinder.synchronisation.BarrierImplementation;
58 import net.grinder.synchronisation.ClientBarrierGroups;
59 import net.grinder.synchronisation.messages.BarrierIdentity;
60 import net.grinder.synchronisation.messages.BarrierIdentity.Factory;
61 import net.grinder.util.StandardTimeAuthority;
62
63 import org.junit.After;
64 import org.junit.Before;
65 import org.junit.Test;
66 import org.mockito.Mock;
67 import org.mockito.MockitoAnnotations;
68
69
70
71
72
73
74
75 public class TestDistributedBarriers {
76
77 private static final int PROCESSES = 3;
78 private static final int THREADS_PER_PROCESS = 5;
79 private static final int THREADS = PROCESSES * THREADS_PER_PROCESS;
80 private static final int RUNS = 3;
81
82 @Mock private ErrorHandler m_errorHandler;
83 @Mock private ProcessControl m_processControl;
84 @Mock private Resources m_resources;
85
86 private ConsoleCommunication m_communication;
87
88 private final ExecutorService m_exector = Executors.newCachedThreadPool();
89 private int m_port;
90
91 @Before public void setup() throws Exception {
92 MockitoAnnotations.initMocks(this);
93
94 final ConsoleProperties properties =
95 new ConsoleProperties(m_resources, new File(""));
96
97 m_port = findFreePort();
98 properties.setConsolePort(m_port);
99
100 m_communication =
101 new ConsoleCommunicationImplementation(m_resources,
102 properties,
103 m_errorHandler,
104 new StandardTimeAuthority());
105
106 new WireDistributedBarriers(m_communication, m_processControl);
107
108 m_exector.execute(new Runnable() {
109 public void run() {
110 try {
111 while (m_communication.processOneMessage()) { }
112 }
113 catch (UncheckedInterruptedException e) {
114
115 }
116 }
117 });
118 }
119
120 @After public void shutdown() {
121 m_exector.shutdownNow();
122 m_communication.shutdown();
123 }
124
125 private final AtomicInteger m_n = new AtomicInteger();
126
127 private class ClientThread implements Callable<Void> {
128
129 private final BarrierGroups m_barrierGroups;
130 private final Factory m_identityFactory;
131 private final Barrier m_incrementBarrier;
132 private final Barrier m_assertionBarrier;
133 private final Barrier m_resetBarrier;
134
135 private ClientThread(BarrierGroups barrierGroups,
136 Factory identityFactory)
137 throws Exception {
138
139 m_barrierGroups = barrierGroups;
140 m_identityFactory = identityFactory;
141
142 m_incrementBarrier =
143 new BarrierImplementation(m_barrierGroups.getGroup("After Increment"),
144 m_identityFactory);
145
146 m_assertionBarrier =
147 new BarrierImplementation(m_barrierGroups.getGroup("After assert"),
148 m_identityFactory);
149
150 m_resetBarrier =
151 new BarrierImplementation(m_barrierGroups.getGroup("After reset"),
152 m_identityFactory);
153 }
154
155 public void run() throws Exception {
156
157 m_n.incrementAndGet();
158
159 m_incrementBarrier.await();
160
161 assertEquals(THREADS, m_n.get());
162
163 m_assertionBarrier.await();
164
165 m_n.set(0);
166
167 m_resetBarrier.await();
168 }
169
170
171 public Void call() throws Exception {
172 for (int i = 0; i < RUNS; ++i) {
173 run();
174 }
175
176 return null;
177 }
178 }
179
180 private class ClientProcess {
181
182 private final MessageDispatchSender m_messageDispatcher;
183 private final MessagePump m_messagePump;
184 private final StubAgentIdentity m_agentIdentity;
185 private final BarrierGroups m_barrierGroups;
186 private final BarrierIdentity.Factory m_identityFactory;
187
188 public ClientProcess(int n) throws Exception {
189
190 m_agentIdentity = new StubAgentIdentity("agent" + n);
191
192 final ClientReceiver agentReceiver =
193 ClientReceiver.connect(new Connector("localhost",
194 m_port,
195 ConnectionType.AGENT),
196 new AgentAddress(m_agentIdentity));
197
198 m_messageDispatcher = new MessageDispatchSender();
199
200 m_messagePump = new MessagePump(agentReceiver, m_messageDispatcher, 1);
201
202 final WorkerAddress workerAddress =
203 new WorkerAddress(m_agentIdentity.createWorkerIdentity());
204
205 final Sender workerSender =
206 ClientSender.connect(new Connector("localhost",
207 m_port,
208 ConnectionType.WORKER),
209 workerAddress);
210
211 m_barrierGroups =
212 new ClientBarrierGroups(workerSender, m_messageDispatcher);
213
214 m_identityFactory =
215 new BarrierIdentityGenerator(workerAddress.getIdentity());
216 }
217
218 public void start() {
219 m_messagePump.start();
220 }
221
222 public void stop() {
223 m_messagePump.shutdown();
224 }
225
226 private ClientThread createWorker() throws Exception {
227
228 return new ClientThread(m_barrierGroups, m_identityFactory);
229 }
230 }
231
232 @Test public void testDistributed() throws Throwable {
233
234 final List<ClientProcess> processes = new ArrayList<ClientProcess>();
235 final List<ClientThread> threads = new ArrayList<ClientThread>();
236
237 for (int i = 0; i < PROCESSES; ++i) {
238 final ClientProcess agent = new ClientProcess(i);
239 agent.start();
240 processes.add(agent);
241
242 for (int j = 0; j < THREADS_PER_PROCESS; ++j) {
243 threads.add(agent.createWorker());
244 }
245 }
246
247 try {
248 for (Future<Void> f : m_exector.invokeAll(threads)) { f.get(); }
249 }
250 catch (ExecutionException e) {
251 throw e.getCause();
252 }
253
254 for (ClientProcess p : processes) { p.stop(); }
255 }
256 }