1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package net.grinder.communication;
23
24 import java.io.ObjectInputStream;
25 import java.io.PipedInputStream;
26 import java.io.PipedOutputStream;
27 import java.io.StreamCorruptedException;
28
29 import junit.framework.TestCase;
30
31
32
33
34
35
36
37 public class TestFanOutStreamSender extends TestCase {
38
39 public TestFanOutStreamSender(String name) {
40 super(name);
41 }
42
43 public void testAddAndSend() throws Exception {
44
45 final FanOutStreamSender serverSender = new FanOutStreamSender(3);
46
47 final PipedInputStream[] inputStreams = new PipedInputStream[10];
48 final PipedOutputStream[] outputStreams =
49 new PipedOutputStream[inputStreams.length];
50
51 for (int i=0; i<outputStreams.length; ++i) {
52 inputStreams[i] = new PipedInputStream();
53 outputStreams[i] = new PipedOutputStream(inputStreams[i]);
54 serverSender.add(outputStreams[i]);
55 }
56
57 final SimpleMessage message1 = new SimpleMessage();
58 final SimpleMessage message2 = new SimpleMessage();
59
60 serverSender.send(message1);
61 serverSender.send(message2);
62
63 for (int i=0; i<outputStreams.length; ++i) {
64 final ObjectInputStream inputStream1 =
65 new ObjectInputStream(inputStreams[i]);
66 final Object o1 = inputStream1.readObject();
67
68 final ObjectInputStream inputStream2 =
69 new ObjectInputStream(inputStreams[i]);
70 final Object o2 = inputStream2.readObject();
71
72 assertEquals(message1, o1);
73 assertEquals(message2, o2);
74
75 assertEquals(0, inputStreams[i].available());
76 }
77
78 serverSender.shutdown();
79 }
80
81 public void testShutdown() throws Exception {
82
83 final FanOutStreamSender serverSender = new FanOutStreamSender(3);
84
85 final PipedInputStream inputStream = new PipedInputStream();
86 final PipedOutputStream outputStream = new PipedOutputStream(inputStream);
87 serverSender.add(outputStream);
88
89 final Message message = new SimpleMessage();
90 serverSender.send(message);
91
92 final ObjectInputStream inputStream1 =
93 new ObjectInputStream(inputStream);
94 final Object o1 = inputStream1.readObject();
95 assertNotNull(o1);
96
97 serverSender.shutdown();
98
99 try {
100 serverSender.send(message);
101 fail("Expected CommunicationException");
102 }
103 catch (CommunicationException e) {
104 }
105
106 try {
107 final ObjectInputStream inputStream2 =
108 new ObjectInputStream(inputStream);
109 final Object o2 = inputStream2.readObject();
110
111 assertTrue(o2 instanceof CloseCommunicationMessage);
112 }
113 catch (StreamCorruptedException e) {
114
115
116 }
117 }
118 }