1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package net.grinder.communication;
23
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertNotNull;
26 import static org.junit.Assert.assertTrue;
27 import static org.junit.Assert.fail;
28
29 import java.io.InputStream;
30 import java.io.ObjectInputStream;
31 import java.io.StreamCorruptedException;
32 import java.net.InetAddress;
33 import java.net.Socket;
34
35 import net.grinder.util.StandardTimeAuthority;
36 import net.grinder.util.TimeAuthority;
37
38 import org.junit.Test;
39
40
41
42
43
44
45
46 public class TestFanOutServerSender {
47
48 private final TimeAuthority m_timeAuthority = new StandardTimeAuthority();
49
50 @Test public void testConstructor() throws Exception {
51
52 final Acceptor acceptor = new Acceptor("localhost", 0, 1, m_timeAuthority);
53
54 final FanOutServerSender serverSender =
55 new FanOutServerSender(acceptor, ConnectionType.AGENT, 3);
56
57 serverSender.shutdown();
58 acceptor.shutdown();
59 }
60
61 @Test public void testSend() throws Exception {
62
63 final Acceptor acceptor = new Acceptor("localhost", 0, 1, m_timeAuthority);
64
65 final FanOutServerSender serverSender =
66 new FanOutServerSender(acceptor, ConnectionType.AGENT, 3);
67
68 final Socket[] socket = new Socket[5];
69
70 for (int i=0; i<socket.length; ++i) {
71 socket[i] = new Connector(InetAddress.getByName(null).getHostName(),
72 acceptor.getPort(),
73 ConnectionType.AGENT).connect();
74 }
75
76
77
78 final ResourcePool socketSet = acceptor.getSocketSet(ConnectionType.AGENT);
79
80 for (int i=0; socketSet.countActive() != 5 && i<10; ++i) {
81 Thread.sleep(i * i * 10);
82 }
83
84 final SimpleMessage message1 = new SimpleMessage();
85 final SimpleMessage message2 = new SimpleMessage();
86
87 serverSender.send(message1);
88 serverSender.send(message2);
89
90 for (int i=0; i<socket.length; ++i) {
91 final InputStream socketInput = socket[i].getInputStream();
92
93 final Message m1 = readMessage(socketInput);
94 final Message m2 = readMessage(socketInput);
95
96 assertEquals(message1, m1);
97 assertEquals(message2, m2);
98
99 assertEquals(0, socketInput.available());
100
101 socket[i].close();
102 }
103
104 serverSender.shutdown();
105 acceptor.shutdown();
106 }
107
108 private static Message readMessage(final InputStream socketInput)
109 throws Exception {
110
111 for (int i = 0; socketInput.available() == 0 && i < 5; ++i) {
112 Thread.sleep(i * i * 10);
113 }
114
115 if (socketInput.available() == 0) {
116 return null;
117 }
118
119 return (Message)new ObjectInputStream(socketInput).readObject();
120 }
121
122 @Test public void testSendAddressedMessage() throws Exception {
123
124 final Acceptor acceptor = new Acceptor("localhost", 0, 1, m_timeAuthority);
125
126 final FanOutServerSender serverSender =
127 new FanOutServerSender(acceptor, ConnectionType.AGENT, 3);
128
129 final Socket[] socket = new Socket[5];
130
131 for (int i = 0; i < socket.length; ++i) {
132 socket[i] =
133 new Connector(InetAddress.getByName(null).getHostName(),
134 acceptor.getPort(),
135 ConnectionType.AGENT)
136 .connect(new StubAddress(new Integer(i)));
137 }
138
139
140
141 final ResourcePool socketSet = acceptor.getSocketSet(ConnectionType.AGENT);
142
143 for (int i = 0; socketSet.countActive() != 5 && i < 10; ++i) {
144 Thread.sleep(i * i * 10);
145 }
146
147 final SimpleMessage message1 = new SimpleMessage();
148 final SimpleMessage message2 = new SimpleMessage();
149
150 serverSender.send(new StubAddress(new Integer(1)), message1);
151 serverSender.send(new StubAddress(new Integer(2)), message2);
152
153 for (int i = 0; i < socket.length; ++i) {
154 final InputStream socketInput = socket[i].getInputStream();
155
156 if (i == 1) {
157 final Message m = readMessage(socketInput);
158 assertEquals(message1, m);
159 }
160 else if (i == 2) {
161 final Message m = readMessage(socketInput);
162 assertEquals(message2, m);
163 }
164
165 assertEquals(0, socketInput.available());
166
167 socket[i].close();
168 }
169
170 serverSender.shutdown();
171
172 try {
173 serverSender.send(new Address() {
174 public boolean includes(Address address) { return false; }
175 },
176 message1);
177 fail("Expected CommunicationException");
178 }
179 catch (CommunicationException e) {
180 }
181
182 acceptor.shutdown();
183 }
184
185 @Test public void testShutdown() throws Exception {
186
187 final Acceptor acceptor = new Acceptor("localhost", 0, 1, m_timeAuthority);
188
189 final FanOutServerSender serverSender =
190 new FanOutServerSender(acceptor, ConnectionType.AGENT, 3);
191
192 final Socket socket =
193 new Connector(InetAddress.getByName(null).getHostName(),
194 acceptor.getPort(),
195 ConnectionType.AGENT).connect();
196
197
198
199 final ResourcePool socketSet =
200 acceptor.getSocketSet(ConnectionType.AGENT);
201
202 for (int i=0; socketSet.countActive() != 1 && i<10; ++i) {
203 Thread.sleep(i * i * 10);
204 }
205
206 final Message message = new SimpleMessage();
207 serverSender.send(message);
208
209 final InputStream socketStream = socket.getInputStream();
210
211 final Object o1 = readMessage(socketStream);
212 assertNotNull(o1);
213
214 serverSender.shutdown();
215
216 try {
217 serverSender.send(message);
218 fail("Expected CommunicationException");
219 }
220 catch (CommunicationException e) {
221 }
222
223 try {
224 final Object o2 = readMessage(socketStream);
225
226 assertTrue(o2 instanceof CloseCommunicationMessage);
227 }
228 catch (StreamCorruptedException e) {
229
230
231 }
232
233 acceptor.shutdown();
234 }
235 }