View Javadoc

1   // Copyright (C) 2000 - 2011 Philip Aston
2   // All rights reserved.
3   //
4   // This file is part of The Grinder software distribution. Refer to
5   // the file LICENSE which is part of The Grinder distribution for
6   // licensing details. The Grinder distribution is available on the
7   // Internet at http://grinder.sourceforge.net/
8   //
9   // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
10  // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
11  // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
12  // FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
13  // COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
14  // INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
15  // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
16  // SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
17  // HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
18  // STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
19  // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
20  // OF THE POSSIBILITY OF SUCH DAMAGE.
21  
22  package net.grinder.communication;
23  
24  import static org.junit.Assert.assertEquals;
25  import static org.junit.Assert.assertNull;
26  import static org.junit.Assert.assertSame;
27  import static org.junit.Assert.assertTrue;
28  import static org.junit.Assert.fail;
29  
30  import java.util.LinkedList;
31  import java.util.List;
32  import java.util.Random;
33  
34  import net.grinder.communication.MessageQueue.ShutdownException;
35  
36  import org.junit.Test;
37  
38  
39  /**
40   *  Unit test case for {@code MessageQueue}.
41   *
42   * @author Philip Aston
43   **/
44  public class TestMessageQueue {
45  
46    private final MessageQueue m_queue = new MessageQueue(false);
47  
48    @Test public void testWithOneThread() throws Exception {
49      final Message[] messages = {
50        new SimpleMessage(10),
51        new SimpleMessage(0),
52        new SimpleMessage(999),
53      };
54  
55      for (int i=0; i<messages.length; ++i) {
56        m_queue.queue(messages[i]);
57      }
58  
59      for (int i=0; i<messages.length; ++i) {
60        assertSame(messages[i], m_queue.dequeue(false));
61      }
62  
63      assertNull(m_queue.dequeue(false));
64    }
65  
66    @Test public void testWithActiveDequeuer() throws Exception {
67      final Message[] messages = {
68        new SimpleMessage(10),
69        new SimpleMessage(0),
70        new SimpleMessage(999),
71      };
72  
73      final DequeuerThread dequeuerThread = new DequeuerThread(messages.length);
74      dequeuerThread.start();
75  
76      for (int i=0; i<messages.length; ++i) {
77        m_queue.queue(messages[i]);
78      }
79  
80      dequeuerThread.join();
81  
82      final List<Message> receivedMessages = dequeuerThread.getMessages();
83  
84      assertEquals(messages.length, receivedMessages.size());
85  
86      for (int i=0; i<messages.length; ++i) {
87        assertSame(messages[i], receivedMessages.get(i));
88      }
89    }
90  
91    @Test public void testShutdownReciever() throws Exception {
92  
93      final DequeuerThread dequeuerThread = new DequeuerThread(1);
94      dequeuerThread.start();
95      m_queue.shutdown();
96  
97      dequeuerThread.join();
98      assertTrue(dequeuerThread.getException() instanceof ShutdownException);
99  
100     try {
101       m_queue.queue(new SimpleMessage(0));
102       fail("Expected a ShutdownException");
103     }
104     catch (ShutdownException e) {
105     }
106 
107     try {
108       m_queue.dequeue(true);
109       fail("Expected a ShutdownException");
110     }
111     catch (ShutdownException e) {
112     }
113   }
114 
115   @Test public void testManyQueuersAndDequeuers() throws Exception {
116 
117     final Thread[] queuers = new Thread[6];
118     final Thread[] dequeuers = new Thread[3];
119     final Random random = new Random();
120 
121     for (int i=0; i<queuers.length; ++i) {
122       queuers[i] = new Thread() {
123           public void run() {
124             for (int j=0; j<10; ++j) {
125               try {
126                 m_queue.queue(new SimpleMessage(0));
127               }
128               catch (ShutdownException e) {
129                 fail("Unexpected ShutdownException");
130               }
131 
132               try {
133                 Thread.sleep(random.nextInt(10));
134               }
135               catch (InterruptedException e) {
136               }
137             }
138           }
139         };
140 
141       queuers[i].start();
142     }
143 
144     for (int i=0; i<dequeuers.length; ++i) {
145       dequeuers[i] = new DequeuerThread(20);
146       dequeuers[i].start();
147     }
148 
149     for (int i=0; i<dequeuers.length; ++i) {
150       dequeuers[i].join();
151     }
152 
153     assertNull(m_queue.dequeue(false));
154   }
155 
156   @Test public void testExceptionPropagation() throws Exception {
157 
158     // m_queue does not allow exceptions to be queued.
159     try {
160       m_queue.queue(new CommunicationException(""));
161       fail("Expected an Assertion");
162     }
163     catch (AssertionError e) {
164     }
165 
166     final MessageQueue exceptionQueue = new MessageQueue(true);
167 
168     final CommunicationException[] exceptions = {
169       new CommunicationException("Exception 1"),
170       new CommunicationException("Exception 2"),
171     };
172 
173     final Message[] messages = {
174       new SimpleMessage(0),
175       new SimpleMessage(999),
176     };
177 
178     for (int i=0; i<messages.length; ++i) {
179       exceptionQueue.queue(exceptions[i]);
180       exceptionQueue.queue(messages[i]);
181     }
182 
183     for (int i=0; i<messages.length; ++i) {
184       try {
185         exceptionQueue.dequeue(false);
186         fail("Expected a CommunicationException");
187       }
188       catch (CommunicationException e) {
189         assertSame(exceptions[i], e.getCause());
190       }
191 
192       assertSame(messages[i], exceptionQueue.dequeue(false));
193     }
194 
195     assertNull(m_queue.dequeue(false));
196   }
197 
198   private class DequeuerThread extends Thread {
199 
200     private List<Message> m_messages = new LinkedList<Message>();
201     private Exception m_exception;
202     private int m_howMany;
203 
204     public DequeuerThread(int howMany) {
205       m_howMany = howMany;
206     }
207 
208     public List<Message> getMessages() {
209       return m_messages;
210     }
211 
212     public Exception getException() {
213       return m_exception;
214     }
215 
216     public void run() {
217 
218       m_exception = null;
219 
220       try {
221         while (m_howMany-- > 0) {
222           m_messages.add(m_queue.dequeue(true));
223         }
224       }
225       catch (Exception e) {
226         m_exception = e;
227       }
228     }
229   }
230 }