1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package net.grinder.communication;
23
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertNotNull;
26 import static org.junit.Assert.assertNull;
27 import static org.junit.Assert.assertTrue;
28 import static org.junit.Assert.fail;
29
30 import java.io.ObjectOutputStream;
31 import java.net.InetAddress;
32 import java.net.Socket;
33
34 import net.grinder.common.UncheckedInterruptedException;
35 import net.grinder.testutility.IsolatedObjectFactory;
36 import net.grinder.util.StandardTimeAuthority;
37 import net.grinder.util.TimeAuthority;
38
39 import org.junit.Test;
40
41
42
43
44
45
46
47 public class TestServerReceiver {
48
49 private final TimeAuthority m_timeAuthority = new StandardTimeAuthority();
50
51 @Test public void testConstructor() throws Exception {
52
53 final Acceptor acceptor = new Acceptor("localhost", 0, 1, m_timeAuthority);
54
55 final ServerReceiver serverReceiver = new ServerReceiver();
56
57
58 serverReceiver.receiveFrom(acceptor, new ConnectionType[0], 1, 2, 3);
59
60 serverReceiver.receiveFrom(
61 acceptor, new ConnectionType[] { ConnectionType.AGENT }, 3, 10, 1000);
62
63 serverReceiver.shutdown();
64 acceptor.shutdown();
65 }
66
67 @Test public void testWaitForMessage() throws Exception {
68
69 final Acceptor acceptor = new Acceptor("localhost", 0, 1, m_timeAuthority);
70
71 final ServerReceiver serverReceiver = new ServerReceiver();
72 serverReceiver.receiveFrom(
73 acceptor, new ConnectionType[] { ConnectionType.AGENT }, 3, 10, 99);
74
75 final Socket[] socket = new Socket[5];
76
77 for (int i=0; i<socket.length; ++i) {
78 socket[i] =
79 new Connector(InetAddress.getByName(null).getHostName(),
80 acceptor.getPort(),
81 ConnectionType.AGENT)
82 .connect();
83 }
84
85
86
87 final ResourcePool socketSet =
88 acceptor.getSocketSet(ConnectionType.AGENT);
89
90 for (int i=0; socketSet.countActive() != 5 && i<10; ++i) {
91 Thread.sleep(i * i * 10);
92 }
93
94 final SimpleMessage message1 = new SimpleMessage();
95 final SimpleMessage message2 = new SimpleMessage();
96 final SimpleMessage message3 = new SimpleMessage();
97
98 final ObjectOutputStream objectStream1 =
99 new ObjectOutputStream(socket[0].getOutputStream());
100 objectStream1.writeObject(message1);
101 objectStream1.flush();
102
103 final ObjectOutputStream objectStream2 =
104 new ObjectOutputStream(socket[1].getOutputStream());
105 objectStream2.writeObject(message2);
106 objectStream2.flush();
107
108 final ObjectOutputStream objectStream3 =
109 new ObjectOutputStream(socket[0].getOutputStream());
110 objectStream3.writeObject(message3);
111 objectStream3.flush();
112
113 Message receivedMessage1 = serverReceiver.waitForMessage();
114 Message receivedMessage2 = serverReceiver.waitForMessage();
115 Message receivedMessage3 = serverReceiver.waitForMessage();
116
117 assertEquals(
118 UncheckedInterruptedException.class,
119 new BlockingActionThread() {
120 protected void blockingAction() throws CommunicationException {
121 serverReceiver.waitForMessage();
122 }
123 }.getException().getClass());
124
125 if (receivedMessage1.equals(message2)) {
126 final Message temp = receivedMessage2;
127 receivedMessage2 = receivedMessage1;
128 receivedMessage1 = temp;
129 }
130 else if (receivedMessage3.equals(message2)) {
131 final Message temp = receivedMessage3;
132 receivedMessage3 = receivedMessage2;
133 receivedMessage2 = temp;
134 }
135 else {
136 assertEquals(message2, receivedMessage2);
137 }
138
139 assertEquals(message1, receivedMessage1);
140 assertEquals(message2, receivedMessage2);
141 assertEquals(message3, receivedMessage3);
142
143 serverReceiver.shutdown();
144 acceptor.shutdown();
145 }
146
147 @Test public void testWaitForBadMessage() throws Exception {
148
149 final Acceptor acceptor = new Acceptor("localhost", 0, 1, m_timeAuthority);
150
151 final ServerReceiver serverReceiver = new ServerReceiver();
152 serverReceiver.receiveFrom(
153 acceptor, new ConnectionType[] { ConnectionType.AGENT }, 3, 10, 123);
154
155 final Socket socket =
156 new Connector(InetAddress.getByName(null).getHostName(),
157 acceptor.getPort(),
158 ConnectionType.AGENT)
159 .connect();
160
161
162
163 final ResourcePool socketSet = acceptor.getSocketSet(ConnectionType.AGENT);
164
165 for (int i=0; socketSet.countActive() != 1 && i<10; ++i) {
166 Thread.sleep(i * i * 10);
167 }
168
169
170 final SimpleMessage message = new SimpleMessage();
171 message.setPayload(IsolatedObjectFactory.getIsolatedObject());
172
173 final ObjectOutputStream objectStream =
174 new ObjectOutputStream(socket.getOutputStream());
175 objectStream.writeObject(message);
176 objectStream.flush();
177
178 try {
179 serverReceiver.waitForMessage();
180 fail("Expected CommunicationException");
181 }
182 catch (CommunicationException e) {
183 }
184
185 serverReceiver.shutdown();
186 acceptor.shutdown();
187 }
188
189 @Test public void testShutdown() throws Exception {
190
191 final Acceptor acceptor = new Acceptor("localhost", 0, 1, m_timeAuthority);
192
193 final ServerReceiver serverReceiver = new ServerReceiver();
194 serverReceiver.receiveFrom(
195 acceptor, new ConnectionType[] { ConnectionType.AGENT }, 3, 10, 100);
196
197 final Socket socket =
198 new Connector(InetAddress.getByName(null).getHostName(),
199 acceptor.getPort(),
200 ConnectionType.AGENT)
201 .connect();
202
203
204
205 final ResourcePool socketSet =
206 acceptor.getSocketSet(ConnectionType.AGENT);
207
208 for (int i=0; socketSet.countActive() != 1 && i<10; ++i) {
209 Thread.sleep(i * i * 10);
210 }
211
212 final SimpleMessage message = new SimpleMessage();
213
214 final ObjectOutputStream objectStream =
215 new ObjectOutputStream(socket.getOutputStream());
216 objectStream.writeObject(message);
217 objectStream.flush();
218
219 final Message receivedMessage = serverReceiver.waitForMessage();
220 assertNotNull(receivedMessage);
221
222 serverReceiver.shutdown();
223
224 try {
225 serverReceiver.receiveFrom(
226 acceptor, new ConnectionType[] { ConnectionType.AGENT }, 3, 10, 100);
227 fail("Expected a CommunicationException");
228 }
229 catch (CommunicationException e) {
230 }
231
232 assertNull(serverReceiver.waitForMessage());
233
234 acceptor.shutdown();
235 }
236
237 @Test public void testCloseCommunicationMessage() throws Exception {
238
239 final Acceptor acceptor = new Acceptor("localhost", 0, 1, m_timeAuthority);
240
241 final ServerReceiver serverReceiver = new ServerReceiver();
242 serverReceiver.receiveFrom(
243 acceptor, new ConnectionType[] { ConnectionType.AGENT }, 5, 10, 100);
244
245 final Socket socket =
246 new Connector(InetAddress.getByName(null).getHostName(),
247 acceptor.getPort(),
248 ConnectionType.AGENT)
249 .connect();
250
251
252
253 final ResourcePool socketSet =
254 acceptor.getSocketSet(ConnectionType.AGENT);
255
256 for (int i=0; socketSet.countActive() != 1 && i<10; ++i) {
257 Thread.sleep(i * i * 10);
258 }
259
260 final SimpleMessage message = new SimpleMessage();
261
262 final ObjectOutputStream objectStream1 =
263 new ObjectOutputStream(socket.getOutputStream());
264 objectStream1.writeObject(message);
265 objectStream1.flush();
266
267 final Message receivedMessage = serverReceiver.waitForMessage();
268 assertNotNull(receivedMessage);
269
270 final Message closeCommunicationMessage = new CloseCommunicationMessage();
271
272 final ObjectOutputStream objectStream2 =
273 new ObjectOutputStream(socket.getOutputStream());
274 objectStream2.writeObject(closeCommunicationMessage);
275 objectStream2.flush();
276
277
278
279 assertEquals(
280 UncheckedInterruptedException.class,
281 new BlockingActionThread() {
282 protected void blockingAction() throws CommunicationException {
283 serverReceiver.waitForMessage();
284 }
285 }.getException().getClass());
286
287 serverReceiver.shutdown();
288
289 acceptor.shutdown();
290 }
291
292 @Test public void testWithResponseSender() throws Exception {
293 final Acceptor acceptor = new Acceptor("localhost", 0, 1, m_timeAuthority);
294
295 final ServerReceiver serverReceiver = new ServerReceiver();
296 serverReceiver.receiveFrom(
297 acceptor, new ConnectionType[] { ConnectionType.AGENT }, 3, 10, 100);
298
299 final Socket socket =
300 new Connector(InetAddress.getByName(null).getHostName(),
301 acceptor.getPort(),
302 ConnectionType.AGENT)
303 .connect();
304
305
306
307 final ResourcePool socketSet = acceptor.getSocketSet(ConnectionType.AGENT);
308
309 for (int i=0; socketSet.countActive() != 1 && i<10; ++i) {
310 Thread.sleep(i * i * 10);
311 }
312
313
314
315 final SimpleMessage message = new SimpleMessage();
316 final MessageRequiringResponse responseSender = new MessageRequiringResponse(message);
317
318 final ObjectOutputStream objectStream =
319 new ObjectOutputStream(socket.getOutputStream());
320 objectStream.writeObject(responseSender);
321 objectStream.flush();
322
323
324 final Message received = serverReceiver.waitForMessage();
325 assertTrue(received instanceof MessageRequiringResponse);
326 final MessageRequiringResponse receivedResponseSender = (MessageRequiringResponse)received;
327 assertTrue(receivedResponseSender.getMessage() instanceof SimpleMessage);
328
329
330 final SimpleMessage responseMessage = new SimpleMessage();
331 receivedResponseSender.sendResponse(responseMessage);
332
333
334 final StreamReceiver receiver = new StreamReceiver(socket.getInputStream());
335 final Message clientMessage = receiver.waitForMessage();
336 assertEquals(responseMessage, clientMessage);
337
338 serverReceiver.shutdown();
339 acceptor.shutdown();
340 receiver.shutdown();
341 }
342 }