1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 package HTTPClient;
34
35 import java.io.InputStream;
36 import java.io.IOException;
37 import java.io.InterruptedIOException;
38
39
40
41
42
43
44
45
46
47
48 final class RespInputStream extends InputStream implements GlobalConstants
49 {
50
51 private static boolean dontTimeoutBody = false;
52
53
54 private StreamDemultiplexor demux = null;
55
56
57 private ResponseHandler resph;
58
59
60
61 boolean closed = false;
62
63
64 private boolean dont_truncate = false;
65
66
67 private byte[] buffer = null;
68
69
70 private boolean interrupted = false;
71
72
73 private int offset = 0;
74
75
76 private int end = 0;
77
78
79 int count = 0;
80
81 static
82 {
83 try
84 {
85 dontTimeoutBody = Boolean.getBoolean("HTTPClient.dontTimeoutRespBody");
86 if (dontTimeoutBody)
87 Log.write(Log.DEMUX, "RspIS: disabling timeouts when " +
88 "reading response body");
89 }
90 catch (Exception e)
91 { }
92 }
93
94
95
96
97 RespInputStream(StreamDemultiplexor demux, ResponseHandler resph)
98 {
99 this.demux = demux;
100 this.resph = resph;
101 }
102
103
104
105
106 private byte[] ch = new byte[1];
107
108
109
110
111
112
113 public synchronized int read() throws IOException
114 {
115 int rcvd = read(ch, 0, 1);
116 if (rcvd == 1)
117 return ch[0] & 0xff;
118 else
119 return -1;
120 }
121
122
123
124
125
126
127
128
129
130 public synchronized int read(byte[] b, int off, int len) throws IOException
131 {
132 if (closed)
133 return -1;
134
135 int left = end - offset;
136 if (buffer != null && !(left == 0 && interrupted))
137 {
138 if (left == 0) return -1;
139
140 len = (len > left ? left : len);
141 System.arraycopy(buffer, offset, b, off, len);
142 offset += len;
143
144 return len;
145 }
146 else
147 {
148 if (resph.resp.cd_type != CD_HDRS)
149 Log.write(Log.DEMUX, "RspIS: Reading stream " + this.hashCode());
150
151 int rcvd;
152 if (dontTimeoutBody && resph.resp.cd_type != CD_HDRS)
153 rcvd = demux.read(b, off, len, resph, 0);
154 else
155 rcvd = demux.read(b, off, len, resph, resph.resp.timeout);
156 if (rcvd != -1 && resph.resp.got_headers)
157 count += rcvd;
158
159 return rcvd;
160 }
161 }
162
163
164
165
166
167
168
169
170 public synchronized long skip(long num) throws IOException
171 {
172 if (closed)
173 return 0;
174
175 int left = end - offset;
176 if (buffer != null && !(left == 0 && interrupted))
177 {
178 num = (num > left ? left : num);
179 offset += num;
180 return num;
181 }
182 else
183 {
184 long skpd = demux.skip(num, resph);
185 if (resph.resp.got_headers)
186 count += skpd;
187 return skpd;
188 }
189 }
190
191
192
193
194
195
196
197
198 public synchronized int available() throws IOException
199 {
200 if (closed)
201 return 0;
202
203 if (buffer != null && !(end-offset == 0 && interrupted))
204 return end-offset;
205 else
206 return demux.available(resph);
207 }
208
209
210
211
212
213
214
215
216 public synchronized void close() throws IOException
217 {
218 if (!closed)
219 {
220 closed = true;
221
222 if (dont_truncate && (buffer == null || interrupted))
223 readAll(resph.resp.timeout);
224
225 Log.write(Log.DEMUX, "RspIS: User closed stream " + hashCode());
226
227 demux.closeSocketIfAllStreamsClosed();
228
229 if (dont_truncate)
230 {
231 try
232 { resph.resp.http_resp.invokeTrailerHandlers(false); }
233 catch (ModuleException me)
234 { throw new IOException(me.toString()); }
235 }
236 }
237 }
238
239
240
241
242
243 protected void finalize() throws Throwable
244 {
245 try
246 { close(); }
247 finally
248 { super.finalize(); }
249 }
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267 void readAll(int timeout) throws IOException
268 {
269 Log.write(Log.DEMUX, "RspIS: Read-all on stream " + this.hashCode());
270
271 synchronized (resph.resp)
272 {
273 if (!resph.resp.got_headers)
274 {
275 int sav_to = resph.resp.timeout;
276 resph.resp.timeout = timeout;
277 resph.resp.getStatusCode();
278 resph.resp.timeout = sav_to;
279 }
280 }
281
282 synchronized (this)
283 {
284 if (buffer != null && !interrupted) return;
285
286 int rcvd = 0;
287 try
288 {
289 if (closed)
290 {
291 buffer = new byte[10000];
292 do
293 {
294 count += rcvd;
295 rcvd = demux.read(buffer, 0, buffer.length, resph,
296 timeout);
297 } while (rcvd != -1);
298 buffer = null;
299 }
300 else
301 {
302 if (buffer == null)
303 {
304 buffer = new byte[10000];
305 offset = 0;
306 end = 0;
307 }
308
309 do
310 {
311 rcvd = demux.read(buffer, end, buffer.length-end, resph,
312 timeout);
313 if (rcvd < 0) break;
314
315 count += rcvd;
316 end += rcvd;
317 buffer = Util.resizeArray(buffer, end+10000);
318 } while (true);
319 }
320 }
321 catch (InterruptedIOException iioe)
322 {
323 interrupted = true;
324 throw iioe;
325 }
326 catch (IOException ioe)
327 {
328 buffer = null;
329 }
330
331 interrupted = false;
332 }
333 }
334
335
336
337
338
339
340
341 synchronized void dontTruncate()
342 {
343 dont_truncate = true;
344 }
345 }