1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package net.grinder.communication;
23
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertNull;
26 import static org.junit.Assert.assertSame;
27 import static org.junit.Assert.assertTrue;
28 import static org.junit.Assert.fail;
29
30 import java.util.LinkedList;
31 import java.util.List;
32 import java.util.Random;
33
34 import net.grinder.communication.MessageQueue.ShutdownException;
35
36 import org.junit.Test;
37
38
39
40
41
42
43
44 public class TestMessageQueue {
45
46 private final MessageQueue m_queue = new MessageQueue(false);
47
48 @Test public void testWithOneThread() throws Exception {
49 final Message[] messages = {
50 new SimpleMessage(10),
51 new SimpleMessage(0),
52 new SimpleMessage(999),
53 };
54
55 for (int i=0; i<messages.length; ++i) {
56 m_queue.queue(messages[i]);
57 }
58
59 for (int i=0; i<messages.length; ++i) {
60 assertSame(messages[i], m_queue.dequeue(false));
61 }
62
63 assertNull(m_queue.dequeue(false));
64 }
65
66 @Test public void testWithActiveDequeuer() throws Exception {
67 final Message[] messages = {
68 new SimpleMessage(10),
69 new SimpleMessage(0),
70 new SimpleMessage(999),
71 };
72
73 final DequeuerThread dequeuerThread = new DequeuerThread(messages.length);
74 dequeuerThread.start();
75
76 for (int i=0; i<messages.length; ++i) {
77 m_queue.queue(messages[i]);
78 }
79
80 dequeuerThread.join();
81
82 final List<Message> receivedMessages = dequeuerThread.getMessages();
83
84 assertEquals(messages.length, receivedMessages.size());
85
86 for (int i=0; i<messages.length; ++i) {
87 assertSame(messages[i], receivedMessages.get(i));
88 }
89 }
90
91 @Test public void testShutdownReciever() throws Exception {
92
93 final DequeuerThread dequeuerThread = new DequeuerThread(1);
94 dequeuerThread.start();
95 m_queue.shutdown();
96
97 dequeuerThread.join();
98 assertTrue(dequeuerThread.getException() instanceof ShutdownException);
99
100 try {
101 m_queue.queue(new SimpleMessage(0));
102 fail("Expected a ShutdownException");
103 }
104 catch (ShutdownException e) {
105 }
106
107 try {
108 m_queue.dequeue(true);
109 fail("Expected a ShutdownException");
110 }
111 catch (ShutdownException e) {
112 }
113 }
114
115 @Test public void testManyQueuersAndDequeuers() throws Exception {
116
117 final Thread[] queuers = new Thread[6];
118 final Thread[] dequeuers = new Thread[3];
119 final Random random = new Random();
120
121 for (int i=0; i<queuers.length; ++i) {
122 queuers[i] = new Thread() {
123 public void run() {
124 for (int j=0; j<10; ++j) {
125 try {
126 m_queue.queue(new SimpleMessage(0));
127 }
128 catch (ShutdownException e) {
129 fail("Unexpected ShutdownException");
130 }
131
132 try {
133 Thread.sleep(random.nextInt(10));
134 }
135 catch (InterruptedException e) {
136 }
137 }
138 }
139 };
140
141 queuers[i].start();
142 }
143
144 for (int i=0; i<dequeuers.length; ++i) {
145 dequeuers[i] = new DequeuerThread(20);
146 dequeuers[i].start();
147 }
148
149 for (int i=0; i<dequeuers.length; ++i) {
150 dequeuers[i].join();
151 }
152
153 assertNull(m_queue.dequeue(false));
154 }
155
156 @Test public void testExceptionPropagation() throws Exception {
157
158
159 try {
160 m_queue.queue(new CommunicationException(""));
161 fail("Expected an Assertion");
162 }
163 catch (AssertionError e) {
164 }
165
166 final MessageQueue exceptionQueue = new MessageQueue(true);
167
168 final CommunicationException[] exceptions = {
169 new CommunicationException("Exception 1"),
170 new CommunicationException("Exception 2"),
171 };
172
173 final Message[] messages = {
174 new SimpleMessage(0),
175 new SimpleMessage(999),
176 };
177
178 for (int i=0; i<messages.length; ++i) {
179 exceptionQueue.queue(exceptions[i]);
180 exceptionQueue.queue(messages[i]);
181 }
182
183 for (int i=0; i<messages.length; ++i) {
184 try {
185 exceptionQueue.dequeue(false);
186 fail("Expected a CommunicationException");
187 }
188 catch (CommunicationException e) {
189 assertSame(exceptions[i], e.getCause());
190 }
191
192 assertSame(messages[i], exceptionQueue.dequeue(false));
193 }
194
195 assertNull(m_queue.dequeue(false));
196 }
197
198 private class DequeuerThread extends Thread {
199
200 private List<Message> m_messages = new LinkedList<Message>();
201 private Exception m_exception;
202 private int m_howMany;
203
204 public DequeuerThread(int howMany) {
205 m_howMany = howMany;
206 }
207
208 public List<Message> getMessages() {
209 return m_messages;
210 }
211
212 public Exception getException() {
213 return m_exception;
214 }
215
216 public void run() {
217
218 m_exception = null;
219
220 try {
221 while (m_howMany-- > 0) {
222 m_messages.add(m_queue.dequeue(true));
223 }
224 }
225 catch (Exception e) {
226 m_exception = e;
227 }
228 }
229 }
230 }