View Javadoc

1   // Copyright (C) 2000 - 2012 Philip Aston
2   // All rights reserved.
3   //
4   // This file is part of The Grinder software distribution. Refer to
5   // the file LICENSE which is part of The Grinder distribution for
6   // licensing details. The Grinder distribution is available on the
7   // Internet at http://grinder.sourceforge.net/
8   //
9   // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
10  // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
11  // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
12  // FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
13  // COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
14  // INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
15  // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
16  // SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
17  // HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
18  // STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
19  // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
20  // OF THE POSSIBILITY OF SUCH DAMAGE.
21  
22  package net.grinder.communication;
23  
24  import static org.junit.Assert.assertEquals;
25  import static org.junit.Assert.assertNull;
26  import static org.junit.Assert.assertTrue;
27  
28  import java.io.IOException;
29  import java.io.PipedInputStream;
30  import java.io.PipedOutputStream;
31  
32  import org.junit.After;
33  import org.junit.Before;
34  import org.junit.Test;
35  
36  
37  /**
38   *  Abstract unit test cases for {@link Sender} and
39   *  {@link Receiver} implementations.
40   *
41   * @author Philip Aston
42   */
43  public abstract class AbstractSenderAndReceiverTests {
44  
45    protected volatile Receiver m_receiver;
46    protected Sender m_sender;
47  
48    private ExecuteThread m_executeThread;
49  
50    protected void initialise(Receiver receiver, Sender sender) {
51      m_receiver = receiver;
52      m_sender = sender;
53    }
54  
55    @Before public void startExecuteThread() throws Exception {
56      m_executeThread = new ExecuteThread();
57      m_executeThread.start();
58    }
59  
60    @After public void stopThreads() throws Exception {
61      m_executeThread.shutdown();
62  
63      m_receiver.shutdown();
64      m_sender.shutdown();
65    }
66  
67    @Test public void testSendSimpleMessage() throws Exception {
68  
69      final SimpleMessage sentMessage = new SimpleMessage();
70      m_sender.send(sentMessage);
71  
72      final Message receivedMessage = m_executeThread.waitForMessage();
73      assertEquals(sentMessage, receivedMessage);
74      assertTrue(sentMessage != receivedMessage);
75    }
76  
77    @Test public void testSendManyMessages() throws Exception {
78  
79      for (int i=1; i<=10; ++i) {
80        final SimpleMessage[] sentMessages = new SimpleMessage[i];
81  
82        for (int j=0; j<i; ++j) {
83          sentMessages[j] = new SimpleMessage(i);
84          m_sender.send(sentMessages[j]);
85        }
86  
87        for (int j=0; j<i; ++j) {
88          final SimpleMessage receivedMessage =
89            (SimpleMessage) m_executeThread.waitForMessage();
90  
91          assertEquals(sentMessages[j], receivedMessage);
92          assertTrue(sentMessages[j] != receivedMessage);
93        }
94      }
95    }
96  
97    @Test public void testSendLargeMessage() throws Exception {
98      // This causes a message size of about 38K. Should be limited by
99      // the buffer size in Receiver.
100     final SimpleMessage sentMessage = new SimpleMessage(8000);
101     m_sender.send(sentMessage);
102 
103     final SimpleMessage receivedMessage =
104       (SimpleMessage) m_executeThread.waitForMessage();
105 
106     assertEquals(sentMessage, receivedMessage);
107     assertTrue(sentMessage != receivedMessage);
108   }
109 
110   @Test public void testShutdownReceiver() throws Exception {
111     m_receiver.shutdown();
112     assertNull(m_executeThread.waitForMessage());
113   }
114 
115   @Test public void testQueueAndFlush() throws Exception {
116 
117     final QueuedSender sender = new QueuedSenderDecorator(m_sender);
118 
119     final SimpleMessage[] messages = new SimpleMessage[25];
120 
121     for (int i=0; i<messages.length; ++i) {
122       messages[i] = new SimpleMessage();
123       sender.send(messages[i]);
124     }
125 
126     sender.flush();
127 
128     for (int i=0; i<messages.length; ++i) {
129       final Message receivedMessage = m_executeThread.waitForMessage();
130 
131       assertEquals(messages[i], receivedMessage);
132       assertTrue(messages[i] != receivedMessage);
133     }
134   }
135 
136   @Test public void testQueueAndSend() throws Exception {
137 
138     final QueuedSender sender = new QueuedSenderDecorator(m_sender);
139 
140     final SimpleMessage[] messages = new SimpleMessage[25];
141 
142     for (int i=0; i<messages.length; ++i) {
143       messages[i] = new SimpleMessage();
144       sender.send(messages[i]);
145     }
146 
147     final SimpleMessage finalMessage = new SimpleMessage();
148     sender.send(finalMessage);
149     sender.flush();
150 
151     for (int i=0; i<messages.length; ++i) {
152       final Message receivedMessage = m_executeThread.waitForMessage();
153 
154       assertEquals(messages[i], receivedMessage);
155       assertTrue(messages[i] != receivedMessage);
156     }
157 
158     final Message receivedFinalMessage = m_executeThread.waitForMessage();
159 
160     assertEquals(finalMessage, receivedFinalMessage);
161     assertTrue(finalMessage != receivedFinalMessage);
162   }
163 
164   /**
165    * Pico-kernel! Need a long running thread because of the half-baked
166    * PipedInputStream/PipedOutputStream thread checking.
167    */
168   private final class ExecuteThread extends Thread {
169 
170     private Action m_action;
171 
172     public ExecuteThread() {
173       super("ExecuteThread");
174     }
175 
176     public synchronized void run() {
177 
178       try {
179         while (true) {
180           while (m_action == null) {
181             wait();
182           }
183 
184           m_action.run();
185           m_action = null;
186 
187           notifyAll();
188         }
189       }
190       catch (InterruptedException e) {
191       }
192     }
193 
194     private synchronized Object execute(Action action) throws Exception {
195 
196       m_action = action;
197       notifyAll();
198 
199       while (!action.getHasRun()) {
200         wait();
201       }
202 
203       return action.getResult();
204     }
205 
206     public Message waitForMessage() throws Exception {
207       return (Message) execute(
208         new Action() {
209           public Object doAction() throws Exception {
210             return m_receiver.waitForMessage();
211           }
212         }
213         );
214     }
215 
216     public void shutdown() throws Exception {
217       execute(
218         new Action() {
219           public Object doAction() throws Exception {
220             throw new InterruptedException();
221           }
222         }
223         );
224     }
225 
226     private abstract class Action {
227 
228       private Object m_result;
229       private Exception m_exception;
230       private boolean m_hasRun = false;
231 
232       public void run() throws InterruptedException {
233         try {
234           m_result = doAction();
235         }
236         catch (InterruptedException e) {
237           throw e;
238         }
239         catch (Exception e) {
240           m_exception = e;
241         }
242         finally {
243           m_hasRun = true;
244         }
245       }
246 
247       public Object getResult() throws Exception {
248         if (m_exception != null) {
249           throw m_exception;
250         }
251 
252         return m_result;
253       }
254 
255       public boolean getHasRun() {
256         return m_hasRun;
257       }
258 
259       protected abstract Object doAction() throws Exception;
260     }
261   }
262 
263   static final class BigBufferPipedInputStream extends PipedInputStream {
264     public BigBufferPipedInputStream(PipedOutputStream src)
265       throws IOException {
266       super(src);
267       // JDK, I laugh at your puny buffer.
268       buffer = new byte[32768];
269     }
270   }
271 }