View Javadoc

1   // Copyright (C) 2003 - 2012 Philip Aston
2   // All rights reserved.
3   //
4   // This file is part of The Grinder software distribution. Refer to
5   // the file LICENSE which is part of The Grinder distribution for
6   // licensing details. The Grinder distribution is available on the
7   // Internet at http://grinder.sourceforge.net/
8   //
9   // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
10  // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
11  // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
12  // FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
13  // COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
14  // INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
15  // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
16  // SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
17  // HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
18  // STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
19  // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
20  // OF THE POSSIBILITY OF SUCH DAMAGE.
21  
22  package net.grinder.communication;
23  
24  import static org.junit.Assert.assertEquals;
25  import static org.junit.Assert.assertNotNull;
26  import static org.junit.Assert.assertTrue;
27  import static org.junit.Assert.fail;
28  
29  import java.io.InputStream;
30  import java.io.ObjectInputStream;
31  import java.io.StreamCorruptedException;
32  import java.net.InetAddress;
33  import java.net.Socket;
34  
35  import net.grinder.util.StandardTimeAuthority;
36  import net.grinder.util.TimeAuthority;
37  
38  import org.junit.Test;
39  
40  
41  /**
42   *  Unit tests for {@link FanOutServerSender}.
43   *
44   * @author Philip Aston
45   */
46  public class TestFanOutServerSender {
47  
48    private final TimeAuthority m_timeAuthority = new StandardTimeAuthority();
49  
50    @Test public void testConstructor() throws Exception {
51  
52      final Acceptor acceptor = new Acceptor("localhost", 0, 1, m_timeAuthority);
53  
54      final FanOutServerSender serverSender =
55        new FanOutServerSender(acceptor, ConnectionType.AGENT, 3);
56  
57      serverSender.shutdown();
58      acceptor.shutdown();
59    }
60  
61    @Test public void testSend() throws Exception {
62  
63      final Acceptor acceptor = new Acceptor("localhost", 0, 1, m_timeAuthority);
64  
65      final FanOutServerSender serverSender =
66        new FanOutServerSender(acceptor, ConnectionType.AGENT, 3);
67  
68      final Socket[] socket = new Socket[5];
69  
70      for (int i=0; i<socket.length; ++i) {
71        socket[i] = new Connector(InetAddress.getByName(null).getHostName(),
72                                  acceptor.getPort(),
73                                  ConnectionType.AGENT).connect();
74      }
75  
76      // Sleep until we've accepted all connections. Give up after a few
77      // seconds.
78      final ResourcePool socketSet = acceptor.getSocketSet(ConnectionType.AGENT);
79  
80      for (int i=0; socketSet.countActive() != 5 && i<10; ++i) {
81        Thread.sleep(i * i * 10);
82      }
83  
84      final SimpleMessage message1 = new SimpleMessage();
85      final SimpleMessage message2 = new SimpleMessage();
86  
87      serverSender.send(message1);
88      serverSender.send(message2);
89  
90      for (int i=0; i<socket.length; ++i) {
91        final InputStream socketInput = socket[i].getInputStream();
92  
93        final Message m1 = readMessage(socketInput);
94        final Message m2 = readMessage(socketInput);
95  
96        assertEquals(message1, m1);
97        assertEquals(message2, m2);
98  
99        assertEquals(0, socketInput.available());
100 
101       socket[i].close();
102     }
103 
104     serverSender.shutdown();
105     acceptor.shutdown();
106   }
107 
108   private static Message readMessage(final InputStream socketInput)
109     throws Exception {
110 
111     for (int i = 0; socketInput.available() == 0 && i < 5; ++i) {
112       Thread.sleep(i * i * 10);
113     }
114 
115     if (socketInput.available() == 0) {
116       return null;
117     }
118 
119     return (Message)new ObjectInputStream(socketInput).readObject();
120   }
121 
122   @Test public void testSendAddressedMessage() throws Exception {
123 
124     final Acceptor acceptor = new Acceptor("localhost", 0, 1, m_timeAuthority);
125 
126     final FanOutServerSender serverSender =
127       new FanOutServerSender(acceptor, ConnectionType.AGENT, 3);
128 
129     final Socket[] socket = new Socket[5];
130 
131     for (int i = 0; i < socket.length; ++i) {
132       socket[i] =
133         new Connector(InetAddress.getByName(null).getHostName(),
134                       acceptor.getPort(),
135                       ConnectionType.AGENT)
136         .connect(new StubAddress(new Integer(i)));
137     }
138 
139     // Sleep until we've accepted all connections. Give up after a few
140     // seconds.
141     final ResourcePool socketSet = acceptor.getSocketSet(ConnectionType.AGENT);
142 
143     for (int i = 0; socketSet.countActive() != 5 && i < 10; ++i) {
144       Thread.sleep(i * i * 10);
145     }
146 
147     final SimpleMessage message1 = new SimpleMessage();
148     final SimpleMessage message2 = new SimpleMessage();
149 
150     serverSender.send(new StubAddress(new Integer(1)), message1);
151     serverSender.send(new StubAddress(new Integer(2)), message2);
152 
153     for (int i = 0; i < socket.length; ++i) {
154       final InputStream socketInput = socket[i].getInputStream();
155 
156       if (i == 1) {
157         final Message m = readMessage(socketInput);
158         assertEquals(message1, m);
159       }
160       else if (i == 2) {
161         final Message m = readMessage(socketInput);
162         assertEquals(message2, m);
163       }
164 
165       assertEquals(0, socketInput.available());
166 
167       socket[i].close();
168     }
169 
170     serverSender.shutdown();
171 
172     try {
173       serverSender.send(new Address() {
174         public boolean includes(Address address) { return false; }
175         },
176         message1);
177       fail("Expected CommunicationException");
178     }
179     catch (CommunicationException e) {
180     }
181 
182     acceptor.shutdown();
183   }
184 
185   @Test public void testShutdown() throws Exception {
186 
187     final Acceptor acceptor = new Acceptor("localhost", 0, 1, m_timeAuthority);
188 
189     final FanOutServerSender serverSender =
190       new FanOutServerSender(acceptor, ConnectionType.AGENT, 3);
191 
192     final Socket socket =
193       new Connector(InetAddress.getByName(null).getHostName(),
194         acceptor.getPort(),
195         ConnectionType.AGENT).connect();
196 
197     // Sleep until we've accepted the connection. Give up after a few
198     // seconds.
199     final ResourcePool socketSet =
200       acceptor.getSocketSet(ConnectionType.AGENT);
201 
202     for (int i=0; socketSet.countActive() != 1 && i<10; ++i) {
203       Thread.sleep(i * i * 10);
204     }
205 
206     final Message message = new SimpleMessage();
207     serverSender.send(message);
208 
209     final InputStream socketStream = socket.getInputStream();
210 
211     final Object o1 = readMessage(socketStream);
212     assertNotNull(o1);
213 
214     serverSender.shutdown();
215 
216     try {
217       serverSender.send(message);
218       fail("Expected CommunicationException");
219     }
220     catch (CommunicationException e) {
221     }
222 
223     try {
224       final Object o2 = readMessage(socketStream);
225 
226       assertTrue(o2 instanceof CloseCommunicationMessage);
227     }
228     catch (StreamCorruptedException e) {
229       // Occasionally this occurs because the connection is shutdown.
230       // Whatever.
231     }
232 
233     acceptor.shutdown();
234   }
235  }