View Javadoc

1   // Copyright (C) 2011 - 2012 Philip Aston
2   // All rights reserved.
3   //
4   // This file is part of The Grinder software distribution. Refer to
5   // the file LICENSE which is part of The Grinder distribution for
6   // licensing details. The Grinder distribution is available on the
7   // Internet at http://grinder.sourceforge.net/
8   //
9   // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
10  // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
11  // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
12  // FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
13  // COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
14  // INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
15  // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
16  // SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
17  // HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
18  // STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
19  // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
20  // OF THE POSSIBILITY OF SUCH DAMAGE.
21  
22  package net.grinder.console.synchronisation;
23  
24  import static net.grinder.testutility.SocketUtilities.findFreePort;
25  import static org.junit.Assert.assertEquals;
26  
27  import java.io.File;
28  import java.util.ArrayList;
29  import java.util.List;
30  import java.util.concurrent.Callable;
31  import java.util.concurrent.ExecutionException;
32  import java.util.concurrent.ExecutorService;
33  import java.util.concurrent.Executors;
34  import java.util.concurrent.Future;
35  import java.util.concurrent.atomic.AtomicInteger;
36  
37  import net.grinder.common.UncheckedInterruptedException;
38  import net.grinder.communication.ClientReceiver;
39  import net.grinder.communication.ClientSender;
40  import net.grinder.communication.ConnectionType;
41  import net.grinder.communication.Connector;
42  import net.grinder.communication.MessageDispatchSender;
43  import net.grinder.communication.MessagePump;
44  import net.grinder.communication.Sender;
45  import net.grinder.console.common.ErrorHandler;
46  import net.grinder.console.common.Resources;
47  import net.grinder.console.communication.ConsoleCommunication;
48  import net.grinder.console.communication.ConsoleCommunicationImplementation;
49  import net.grinder.console.communication.ProcessControl;
50  import net.grinder.console.model.ConsoleProperties;
51  import net.grinder.engine.agent.StubAgentIdentity;
52  import net.grinder.messages.console.AgentAddress;
53  import net.grinder.messages.console.WorkerAddress;
54  import net.grinder.script.Barrier;
55  import net.grinder.synchronisation.BarrierGroups;
56  import net.grinder.synchronisation.BarrierIdentityGenerator;
57  import net.grinder.synchronisation.BarrierImplementation;
58  import net.grinder.synchronisation.ClientBarrierGroups;
59  import net.grinder.synchronisation.messages.BarrierIdentity;
60  import net.grinder.synchronisation.messages.BarrierIdentity.Factory;
61  import net.grinder.util.StandardTimeAuthority;
62  
63  import org.junit.After;
64  import org.junit.Before;
65  import org.junit.Test;
66  import org.mockito.Mock;
67  import org.mockito.MockitoAnnotations;
68  
69  
70  /**
71   * Concurrent tests for distributed barriers.
72   *
73   * @author Philip Aston
74   */
75  public class TestDistributedBarriers {
76  
77    private static final int PROCESSES = 3;
78    private static final int THREADS_PER_PROCESS = 5;
79    private static final int THREADS = PROCESSES * THREADS_PER_PROCESS;
80    private static final int RUNS = 3;
81  
82    @Mock private ErrorHandler m_errorHandler;
83    @Mock private ProcessControl m_processControl;
84    @Mock private Resources m_resources;
85  
86    private ConsoleCommunication m_communication;
87  
88    private final ExecutorService m_exector = Executors.newCachedThreadPool();
89    private int m_port;
90  
91    @Before public void setup() throws Exception {
92      MockitoAnnotations.initMocks(this);
93  
94      final ConsoleProperties properties =
95        new ConsoleProperties(m_resources, new File(""));
96  
97      m_port = findFreePort();
98      properties.setConsolePort(m_port);
99  
100     m_communication =
101       new ConsoleCommunicationImplementation(m_resources,
102                                              properties,
103                                              m_errorHandler,
104                                              new StandardTimeAuthority());
105 
106     new WireDistributedBarriers(m_communication, m_processControl);
107 
108     m_exector.execute(new Runnable() {
109       public void run() {
110         try {
111           while (m_communication.processOneMessage()) { }
112         }
113         catch (UncheckedInterruptedException e) {
114           // Exit.
115         }
116       }
117     });
118   }
119 
120   @After public void shutdown() {
121     m_exector.shutdownNow();
122     m_communication.shutdown();
123   }
124 
125   private final AtomicInteger m_n = new AtomicInteger();
126 
127   private class ClientThread implements Callable<Void> {
128 
129     private final BarrierGroups m_barrierGroups;
130     private final Factory m_identityFactory;
131     private final Barrier m_incrementBarrier;
132     private final Barrier m_assertionBarrier;
133     private final Barrier m_resetBarrier;
134 
135     private ClientThread(BarrierGroups barrierGroups,
136                          Factory identityFactory)
137       throws Exception {
138 
139       m_barrierGroups = barrierGroups;
140       m_identityFactory = identityFactory;
141 
142       m_incrementBarrier =
143         new BarrierImplementation(m_barrierGroups.getGroup("After Increment"),
144                                   m_identityFactory);
145 
146       m_assertionBarrier =
147         new BarrierImplementation(m_barrierGroups.getGroup("After assert"),
148                                   m_identityFactory);
149 
150       m_resetBarrier =
151         new BarrierImplementation(m_barrierGroups.getGroup("After reset"),
152                                   m_identityFactory);
153     }
154 
155     public void run() throws Exception {
156 
157       m_n.incrementAndGet();
158 
159       m_incrementBarrier.await();
160 
161       assertEquals(THREADS, m_n.get());
162 
163       m_assertionBarrier.await();
164 
165       m_n.set(0);
166 
167       m_resetBarrier.await();
168     }
169 
170 
171     public Void call() throws Exception {
172       for (int i = 0; i < RUNS; ++i) {
173         run();
174       }
175 
176       return null;
177     }
178   }
179 
180   private class ClientProcess {
181 
182     private final MessageDispatchSender m_messageDispatcher;
183     private final MessagePump m_messagePump;
184     private final StubAgentIdentity m_agentIdentity;
185     private final BarrierGroups m_barrierGroups;
186     private final BarrierIdentity.Factory m_identityFactory;
187 
188     public ClientProcess(int n) throws Exception {
189 
190       m_agentIdentity = new StubAgentIdentity("agent" + n);
191 
192       final ClientReceiver agentReceiver =
193         ClientReceiver.connect(new Connector("localhost",
194                                              m_port,
195                                              ConnectionType.AGENT),
196                                new AgentAddress(m_agentIdentity));
197 
198       m_messageDispatcher = new MessageDispatchSender();
199 
200       m_messagePump = new MessagePump(agentReceiver, m_messageDispatcher, 1);
201 
202       final WorkerAddress workerAddress =
203         new WorkerAddress(m_agentIdentity.createWorkerIdentity());
204 
205       final Sender workerSender =
206         ClientSender.connect(new Connector("localhost",
207                                            m_port,
208                                            ConnectionType.WORKER),
209                             workerAddress);
210 
211       m_barrierGroups =
212         new ClientBarrierGroups(workerSender, m_messageDispatcher);
213 
214       m_identityFactory =
215         new BarrierIdentityGenerator(workerAddress.getIdentity());
216     }
217 
218     public void start() {
219       m_messagePump.start();
220     }
221 
222     public void stop() {
223       m_messagePump.shutdown();
224     }
225 
226     private ClientThread createWorker() throws Exception {
227 
228       return new ClientThread(m_barrierGroups, m_identityFactory);
229     }
230   }
231 
232   @Test public void testDistributed() throws Throwable {
233 
234     final List<ClientProcess> processes = new ArrayList<ClientProcess>();
235     final List<ClientThread> threads = new ArrayList<ClientThread>();
236 
237     for (int i = 0; i < PROCESSES; ++i) {
238       final ClientProcess agent = new ClientProcess(i);
239       agent.start();
240       processes.add(agent);
241 
242       for (int j = 0; j < THREADS_PER_PROCESS; ++j) {
243         threads.add(agent.createWorker());
244       }
245     }
246 
247     try {
248       for (Future<Void> f : m_exector.invokeAll(threads)) { f.get(); }
249     }
250     catch (ExecutionException e) {
251       throw e.getCause();
252     }
253 
254     for (ClientProcess p : processes) { p.stop(); }
255   }
256 }