View Javadoc

1   // Copyright (C) 2003, 2004, 2005 Philip Aston
2   // All rights reserved.
3   //
4   // This file is part of The Grinder software distribution. Refer to
5   // the file LICENSE which is part of The Grinder distribution for
6   // licensing details. The Grinder distribution is available on the
7   // Internet at http://grinder.sourceforge.net/
8   //
9   // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
10  // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
11  // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
12  // FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
13  // COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
14  // INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
15  // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
16  // SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
17  // HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
18  // STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
19  // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
20  // OF THE POSSIBILITY OF SUCH DAMAGE.
21  
22  package net.grinder.communication;
23  
24  import java.io.ObjectInputStream;
25  import java.io.PipedInputStream;
26  import java.io.PipedOutputStream;
27  import java.io.StreamCorruptedException;
28  
29  import junit.framework.TestCase;
30  
31  
32  /**
33   *  Unit tests for <code>FanOutStreamSender</code>.
34   *
35   * @author Philip Aston
36   */
37  public class TestFanOutStreamSender extends TestCase {
38  
39    public TestFanOutStreamSender(String name) {
40      super(name);
41    }
42  
43    public void testAddAndSend() throws Exception {
44  
45      final FanOutStreamSender serverSender = new FanOutStreamSender(3);
46  
47      final PipedInputStream[] inputStreams = new PipedInputStream[10];
48      final PipedOutputStream[] outputStreams =
49        new PipedOutputStream[inputStreams.length];
50  
51      for (int i=0; i<outputStreams.length; ++i) {
52        inputStreams[i] = new PipedInputStream();
53        outputStreams[i] = new PipedOutputStream(inputStreams[i]);
54        serverSender.add(outputStreams[i]);
55      }
56  
57      final SimpleMessage message1 = new SimpleMessage();
58      final SimpleMessage message2 = new SimpleMessage();
59  
60      serverSender.send(message1);
61      serverSender.send(message2);
62  
63      for (int i=0; i<outputStreams.length; ++i) {
64        final ObjectInputStream inputStream1 =
65          new ObjectInputStream(inputStreams[i]);
66        final Object o1 = inputStream1.readObject();
67  
68        final ObjectInputStream inputStream2 =
69          new ObjectInputStream(inputStreams[i]);
70        final Object o2 = inputStream2.readObject();
71  
72        assertEquals(message1, o1);
73        assertEquals(message2, o2);
74  
75        assertEquals(0, inputStreams[i].available());
76      }
77  
78      serverSender.shutdown();
79    }
80  
81    public void testShutdown() throws Exception {
82  
83      final FanOutStreamSender serverSender = new FanOutStreamSender(3);
84  
85      final PipedInputStream inputStream = new PipedInputStream();
86      final PipedOutputStream outputStream = new PipedOutputStream(inputStream);
87      serverSender.add(outputStream);
88  
89      final Message message = new SimpleMessage();
90      serverSender.send(message);
91  
92      final ObjectInputStream inputStream1 =
93        new ObjectInputStream(inputStream);
94      final Object o1 = inputStream1.readObject();
95      assertNotNull(o1);
96  
97      serverSender.shutdown();
98  
99      try {
100       serverSender.send(message);
101       fail("Expected CommunicationException");
102     }
103     catch (CommunicationException e) {
104     }
105 
106     try {
107       final ObjectInputStream inputStream2 =
108         new ObjectInputStream(inputStream);
109       final Object o2 = inputStream2.readObject();
110 
111       assertTrue(o2 instanceof CloseCommunicationMessage);
112     }
113     catch (StreamCorruptedException e) {
114       // Occasionally this occurs because the connection is shutdown.
115       // Whatever.
116     }
117   }
118 }