1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package net.grinder.communication;
23
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertNull;
26 import static org.junit.Assert.assertTrue;
27
28 import java.io.IOException;
29 import java.io.PipedInputStream;
30 import java.io.PipedOutputStream;
31
32 import org.junit.After;
33 import org.junit.Before;
34 import org.junit.Test;
35
36
37
38
39
40
41
42
43 public abstract class AbstractSenderAndReceiverTests {
44
45 protected volatile Receiver m_receiver;
46 protected Sender m_sender;
47
48 private ExecuteThread m_executeThread;
49
50 protected void initialise(Receiver receiver, Sender sender) {
51 m_receiver = receiver;
52 m_sender = sender;
53 }
54
55 @Before public void startExecuteThread() throws Exception {
56 m_executeThread = new ExecuteThread();
57 m_executeThread.start();
58 }
59
60 @After public void stopThreads() throws Exception {
61 m_executeThread.shutdown();
62
63 m_receiver.shutdown();
64 m_sender.shutdown();
65 }
66
67 @Test public void testSendSimpleMessage() throws Exception {
68
69 final SimpleMessage sentMessage = new SimpleMessage();
70 m_sender.send(sentMessage);
71
72 final Message receivedMessage = m_executeThread.waitForMessage();
73 assertEquals(sentMessage, receivedMessage);
74 assertTrue(sentMessage != receivedMessage);
75 }
76
77 @Test public void testSendManyMessages() throws Exception {
78
79 for (int i=1; i<=10; ++i) {
80 final SimpleMessage[] sentMessages = new SimpleMessage[i];
81
82 for (int j=0; j<i; ++j) {
83 sentMessages[j] = new SimpleMessage(i);
84 m_sender.send(sentMessages[j]);
85 }
86
87 for (int j=0; j<i; ++j) {
88 final SimpleMessage receivedMessage =
89 (SimpleMessage) m_executeThread.waitForMessage();
90
91 assertEquals(sentMessages[j], receivedMessage);
92 assertTrue(sentMessages[j] != receivedMessage);
93 }
94 }
95 }
96
97 @Test public void testSendLargeMessage() throws Exception {
98
99
100 final SimpleMessage sentMessage = new SimpleMessage(8000);
101 m_sender.send(sentMessage);
102
103 final SimpleMessage receivedMessage =
104 (SimpleMessage) m_executeThread.waitForMessage();
105
106 assertEquals(sentMessage, receivedMessage);
107 assertTrue(sentMessage != receivedMessage);
108 }
109
110 @Test public void testShutdownReceiver() throws Exception {
111 m_receiver.shutdown();
112 assertNull(m_executeThread.waitForMessage());
113 }
114
115 @Test public void testQueueAndFlush() throws Exception {
116
117 final QueuedSender sender = new QueuedSenderDecorator(m_sender);
118
119 final SimpleMessage[] messages = new SimpleMessage[25];
120
121 for (int i=0; i<messages.length; ++i) {
122 messages[i] = new SimpleMessage();
123 sender.send(messages[i]);
124 }
125
126 sender.flush();
127
128 for (int i=0; i<messages.length; ++i) {
129 final Message receivedMessage = m_executeThread.waitForMessage();
130
131 assertEquals(messages[i], receivedMessage);
132 assertTrue(messages[i] != receivedMessage);
133 }
134 }
135
136 @Test public void testQueueAndSend() throws Exception {
137
138 final QueuedSender sender = new QueuedSenderDecorator(m_sender);
139
140 final SimpleMessage[] messages = new SimpleMessage[25];
141
142 for (int i=0; i<messages.length; ++i) {
143 messages[i] = new SimpleMessage();
144 sender.send(messages[i]);
145 }
146
147 final SimpleMessage finalMessage = new SimpleMessage();
148 sender.send(finalMessage);
149 sender.flush();
150
151 for (int i=0; i<messages.length; ++i) {
152 final Message receivedMessage = m_executeThread.waitForMessage();
153
154 assertEquals(messages[i], receivedMessage);
155 assertTrue(messages[i] != receivedMessage);
156 }
157
158 final Message receivedFinalMessage = m_executeThread.waitForMessage();
159
160 assertEquals(finalMessage, receivedFinalMessage);
161 assertTrue(finalMessage != receivedFinalMessage);
162 }
163
164
165
166
167
168 private final class ExecuteThread extends Thread {
169
170 private Action m_action;
171
172 public ExecuteThread() {
173 super("ExecuteThread");
174 }
175
176 public synchronized void run() {
177
178 try {
179 while (true) {
180 while (m_action == null) {
181 wait();
182 }
183
184 m_action.run();
185 m_action = null;
186
187 notifyAll();
188 }
189 }
190 catch (InterruptedException e) {
191 }
192 }
193
194 private synchronized Object execute(Action action) throws Exception {
195
196 m_action = action;
197 notifyAll();
198
199 while (!action.getHasRun()) {
200 wait();
201 }
202
203 return action.getResult();
204 }
205
206 public Message waitForMessage() throws Exception {
207 return (Message) execute(
208 new Action() {
209 public Object doAction() throws Exception {
210 return m_receiver.waitForMessage();
211 }
212 }
213 );
214 }
215
216 public void shutdown() throws Exception {
217 execute(
218 new Action() {
219 public Object doAction() throws Exception {
220 throw new InterruptedException();
221 }
222 }
223 );
224 }
225
226 private abstract class Action {
227
228 private Object m_result;
229 private Exception m_exception;
230 private boolean m_hasRun = false;
231
232 public void run() throws InterruptedException {
233 try {
234 m_result = doAction();
235 }
236 catch (InterruptedException e) {
237 throw e;
238 }
239 catch (Exception e) {
240 m_exception = e;
241 }
242 finally {
243 m_hasRun = true;
244 }
245 }
246
247 public Object getResult() throws Exception {
248 if (m_exception != null) {
249 throw m_exception;
250 }
251
252 return m_result;
253 }
254
255 public boolean getHasRun() {
256 return m_hasRun;
257 }
258
259 protected abstract Object doAction() throws Exception;
260 }
261 }
262
263 static final class BigBufferPipedInputStream extends PipedInputStream {
264 public BigBufferPipedInputStream(PipedOutputStream src)
265 throws IOException {
266 super(src);
267
268 buffer = new byte[32768];
269 }
270 }
271 }