1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package net.grinder.communication;
23
24
25 import static net.grinder.testutility.SocketUtilities.findFreePort;
26 import static org.junit.Assert.assertEquals;
27 import static org.junit.Assert.assertNotNull;
28 import static org.junit.Assert.assertNull;
29 import static org.junit.Assert.assertSame;
30 import static org.junit.Assert.assertTrue;
31 import static org.junit.Assert.fail;
32
33 import java.net.InetAddress;
34 import java.net.ServerSocket;
35 import java.net.Socket;
36 import java.util.List;
37
38 import net.grinder.common.UncheckedInterruptedException;
39 import net.grinder.testutility.CallData;
40 import net.grinder.testutility.RandomStubFactory;
41 import net.grinder.util.StandardTimeAuthority;
42 import net.grinder.util.TimeAuthority;
43
44 import org.junit.Test;
45
46
47
48
49
50
51
52 public class TestAcceptor {
53
54 private TimeAuthority m_timeAuthority = new StandardTimeAuthority();
55
56 @Test public void testConstructor() throws Exception {
57
58 final InetAddress[] localAddresses =
59 InetAddress.getAllByName(InetAddress.getLocalHost().getHostName());
60
61 final String[] testAddresses = new String[localAddresses.length + 2];
62
63
64 testAddresses[0] = InetAddress.getByName(null).getHostName();
65
66
67 testAddresses[1] = "";
68
69 for (int i=0; i<localAddresses.length; ++i) {
70 testAddresses[i + 2] = localAddresses[i].getHostName();
71 }
72
73 final int port = findFreePort();
74
75 for (int i=0; i<testAddresses.length; ++i) {
76 final Acceptor acceptor =
77 new Acceptor(testAddresses[i], port, 2, m_timeAuthority);
78 assertEquals(port, acceptor.getPort());
79 assertNull(acceptor.peekPendingException());
80 acceptor.shutdown();
81
82
83 final Acceptor acceptor2 =
84 new Acceptor(testAddresses[i], 0, 2, m_timeAuthority);
85 assertEquals(port, acceptor.getPort());
86 assertNull(acceptor2.peekPendingException());
87 acceptor2.shutdown();
88 }
89
90 final ServerSocket usedSocket = new ServerSocket(0);
91 final int usedPort = usedSocket.getLocalPort();
92
93 for (int i=0; i<testAddresses.length; ++i) {
94 try {
95 new Acceptor(testAddresses[i], usedPort, 1, m_timeAuthority);
96 fail("Expected CommunicationException");
97 }
98 catch (CommunicationException e) {
99 }
100 }
101
102 usedSocket.close();
103 }
104
105 @Test public void testGetSocketSet() throws Exception {
106
107 final Acceptor acceptor = createAcceptor(2);
108
109 assertEquals(0, acceptor.getNumberOfConnections());
110
111 final RandomStubFactory<Acceptor.Listener> listenerStubFactory =
112 RandomStubFactory.create(Acceptor.Listener.class);
113
114 acceptor.addListener(ConnectionType.WORKER, listenerStubFactory.getStub());
115
116 final ResourcePool controlSocketSet =
117 acceptor.getSocketSet(ConnectionType.AGENT);
118
119 assertNotNull(controlSocketSet);
120 assertTrue(controlSocketSet.reserveNext().isSentinel());
121
122 final Connector controlConnector =
123 new Connector("localhost", acceptor.getPort(), ConnectionType.AGENT);
124
125 final Connector reportConnector =
126 new Connector("localhost", acceptor.getPort(), ConnectionType.WORKER);
127
128 controlConnector.connect();
129 controlConnector.connect();
130 reportConnector.connect();
131
132
133
134 for (int i = 0; controlSocketSet.countActive() != 2 && i < 10; ++i) {
135 Thread.sleep(i * i * 10);
136 }
137
138 listenerStubFactory.waitUntilCalled(1000);
139
140 final CallData callData =
141 listenerStubFactory.assertSuccess("connectionAccepted",
142 ConnectionType.class,
143 ConnectionIdentity.class);
144
145 assertEquals(ConnectionType.WORKER, callData.getParameters()[0]);
146
147 listenerStubFactory.assertNoMoreCalls();
148
149 assertSame(controlSocketSet,
150 acceptor.getSocketSet(ConnectionType.AGENT));
151
152 final List<?> controlSocketResources = controlSocketSet.reserveAll();
153 assertEquals(2, controlSocketResources.size());
154
155
156 final ResourcePool reportSocketSet =
157 acceptor.getSocketSet(ConnectionType.WORKER);
158
159 for (int i=0; reportSocketSet.countActive() != 1 && i<10; ++i) {
160 Thread.sleep(i * i * 10);
161 }
162
163 assertEquals(3, acceptor.getNumberOfConnections());
164
165 assertSame(reportSocketSet, acceptor.getSocketSet(ConnectionType.WORKER));
166
167 final List<?> reportSocketResources = reportSocketSet.reserveAll();
168 assertEquals(1, reportSocketResources.size());
169
170 acceptor.shutdown();
171
172 assertEquals(0, acceptor.getNumberOfConnections());
173
174 final CallData callData2 =
175 listenerStubFactory.assertSuccess("connectionClosed",
176 ConnectionType.class,
177 ConnectionIdentity.class);
178
179 assertEquals(callData.getParameters()[1], callData2.getParameters()[1]);
180 }
181
182 private Acceptor createAcceptor(int numberOfThreads) throws Exception {
183
184 final ServerSocket serverSocket = new ServerSocket(0);
185 final int port = serverSocket.getLocalPort();
186 serverSocket.close();
187
188 return new Acceptor("", port, numberOfThreads, m_timeAuthority);
189 }
190
191 @Test public void testShutdown() throws Exception {
192
193 final Acceptor acceptor = createAcceptor(3);
194
195 final ResourcePool socketSet =
196 acceptor.getSocketSet(ConnectionType.AGENT);
197
198 final Connector connector =
199 new Connector("localhost", acceptor.getPort(), ConnectionType.AGENT);
200
201 connector.connect();
202
203
204
205 for (int i = 0; socketSet.countActive() != 1 && i < 10; ++i) {
206 Thread.sleep(i * i * 10);
207 }
208
209 assertNull(acceptor.peekPendingException());
210
211 acceptor.shutdown();
212
213 try {
214 acceptor.getSocketSet(ConnectionType.AGENT);
215 fail("Expected Acceptor.ShutdownException");
216 }
217 catch (Acceptor.ShutdownException e) {
218 }
219
220 assertTrue(socketSet.reserveNext().isSentinel());
221 }
222
223 @Test public void testGetPendingException() throws Exception {
224
225 final Acceptor acceptor = createAcceptor(3);
226
227
228 assertNull(acceptor.peekPendingException());
229
230
231 final Socket socket = new Socket("localhost", acceptor.getPort());
232
233 for (int i = 0; i < 10; ++i) {
234 socket.getOutputStream().write(99);
235 }
236
237 socket.getOutputStream().flush();
238
239 final Socket socket2 = new Socket("localhost", acceptor.getPort());
240
241 for (int i = 0; i < 10; ++i) {
242 socket2.getOutputStream().write(99);
243 }
244
245 socket2.getOutputStream().flush();
246
247
248
249 assertTrue(acceptor.getPendingException()
250 instanceof CommunicationException);
251
252 assertTrue(acceptor.getPendingException()
253 instanceof CommunicationException);
254
255 assertNull(acceptor.peekPendingException());
256
257 acceptor.shutdown();
258
259 assertNull(acceptor.getPendingException());
260 }
261
262 @Test public void testGetPendingExceptionInterrupted() throws Exception {
263 final Acceptor acceptor = createAcceptor(3);
264
265 Thread.currentThread().interrupt();
266
267 try {
268 acceptor.getPendingException();
269 fail("Expected UncheckedInterruptedException");
270 }
271 catch (UncheckedInterruptedException e) {
272 }
273 }
274 }