1 /*
2 * @(#)RespInputStream.java 0.3-3 06/05/2001
3 *
4 * This file is part of the HTTPClient package
5 * Copyright (C) 1996-2001 Ronald Tschalär
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free
19 * Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
20 * MA 02111-1307, USA
21 *
22 * For questions, suggestions, bug-reports, enhancement-requests etc.
23 * I may be contacted at:
24 *
25 * ronald@innovation.ch
26 *
27 * The HTTPClient's home page is located at:
28 *
29 * http://www.innovation.ch/java/HTTPClient/
30 *
31 */
32
33 package HTTPClient;
34
35 import java.io.InputStream;
36 import java.io.IOException;
37 import java.io.InterruptedIOException;
38
39 /**
40 * This is the InputStream that gets returned to the user. The extensions
41 * consist of the capability to have the data pushed into a buffer if the
42 * stream demux needs to.
43 *
44 * @version 0.3-3 06/05/2001
45 * @author Ronald Tschalär
46 * @since V0.2
47 */
48 final class RespInputStream extends InputStream implements GlobalConstants
49 {
50 /** Use old behaviour: don't set a timeout when reading the response body */
51 private static boolean dontTimeoutBody = false;
52
53 /** the stream demultiplexor */
54 private StreamDemultiplexor demux = null;
55
56 /** our response handler */
57 private ResponseHandler resph;
58
59 /** signals that the user has closed the stream and will therefore
60 not read any further data */
61 boolean closed = false;
62
63 /** signals that the connection may not be closed prematurely */
64 private boolean dont_truncate = false;
65
66 /** this buffer is used to buffer data that the demux has to get rid of */
67 private byte[] buffer = null;
68
69 /** signals that we were interrupted and that the buffer is not complete */
70 private boolean interrupted = false;
71
72 /** the offset at which the unread data starts in the buffer */
73 private int offset = 0;
74
75 /** the end of the data in the buffer */
76 private int end = 0;
77
78 /** the total number of bytes of entity data read from the demux so far */
79 int count = 0;
80
81 static
82 {
83 try
84 {
85 dontTimeoutBody = Boolean.getBoolean("HTTPClient.dontTimeoutRespBody");
86 if (dontTimeoutBody)
87 Log.write(Log.DEMUX, "RspIS: disabling timeouts when " +
88 "reading response body");
89 }
90 catch (Exception e)
91 { }
92 }
93
94
95 // Constructors
96
97 RespInputStream(StreamDemultiplexor demux, ResponseHandler resph)
98 {
99 this.demux = demux;
100 this.resph = resph;
101 }
102
103
104 // public Methods
105
106 private byte[] ch = new byte[1];
107 /**
108 * Reads a single byte.
109 *
110 * @return the byte read, or -1 if EOF.
111 * @exception IOException if any exception occured on the connection.
112 */
113 public synchronized int read() throws IOException
114 {
115 int rcvd = read(ch, 0, 1);
116 if (rcvd == 1)
117 return ch[0] & 0xff;
118 else
119 return -1;
120 }
121
122
123 /**
124 * Reads <var>len</var> bytes into <var>b</var>, starting at offset
125 * <var>off</var>.
126 *
127 * @return the number of bytes actually read, or -1 if EOF.
128 * @exception IOException if any exception occured on the connection.
129 */
130 public synchronized int read(byte[] b, int off, int len) throws IOException
131 {
132 if (closed)
133 return -1;
134
135 int left = end - offset;
136 if (buffer != null && !(left == 0 && interrupted))
137 {
138 if (left == 0) return -1;
139
140 len = (len > left ? left : len);
141 System.arraycopy(buffer, offset, b, off, len);
142 offset += len;
143
144 return len;
145 }
146 else
147 {
148 if (resph.resp.cd_type != CD_HDRS)
149 Log.write(Log.DEMUX, "RspIS: Reading stream " + this.hashCode());
150
151 int rcvd;
152 if (dontTimeoutBody && resph.resp.cd_type != CD_HDRS)
153 rcvd = demux.read(b, off, len, resph, 0);
154 else
155 rcvd = demux.read(b, off, len, resph, resph.resp.timeout);
156 if (rcvd != -1 && resph.resp.got_headers)
157 count += rcvd;
158
159 return rcvd;
160 }
161 }
162
163
164 /**
165 * skips <var>num</var> bytes.
166 *
167 * @return the number of bytes actually skipped.
168 * @exception IOException if any exception occured on the connection.
169 */
170 public synchronized long skip(long num) throws IOException
171 {
172 if (closed)
173 return 0;
174
175 int left = end - offset;
176 if (buffer != null && !(left == 0 && interrupted))
177 {
178 num = (num > left ? left : num);
179 offset += num;
180 return num;
181 }
182 else
183 {
184 long skpd = demux.skip(num, resph);
185 if (resph.resp.got_headers)
186 count += skpd;
187 return skpd;
188 }
189 }
190
191
192 /**
193 * gets the number of bytes available for reading without blocking.
194 *
195 * @return the number of bytes available.
196 * @exception IOException if any exception occured on the connection.
197 */
198 public synchronized int available() throws IOException
199 {
200 if (closed)
201 return 0;
202
203 if (buffer != null && !(end-offset == 0 && interrupted))
204 return end-offset;
205 else
206 return demux.available(resph);
207 }
208
209
210 /**
211 * closes the stream.
212 *
213 * @exception if any exception occured on the connection before or
214 * during close.
215 */
216 public synchronized void close() throws IOException
217 {
218 if (!closed)
219 {
220 closed = true;
221
222 if (dont_truncate && (buffer == null || interrupted))
223 readAll(resph.resp.timeout);
224
225 Log.write(Log.DEMUX, "RspIS: User closed stream " + hashCode());
226
227 demux.closeSocketIfAllStreamsClosed();
228
229 if (dont_truncate)
230 {
231 try
232 { resph.resp.http_resp.invokeTrailerHandlers(false); }
233 catch (ModuleException me)
234 { throw new IOException(me.toString()); }
235 }
236 }
237 }
238
239
240 /**
241 * A safety net to clean up.
242 */
243 protected void finalize() throws Throwable
244 {
245 try
246 { close(); }
247 finally
248 { super.finalize(); }
249 }
250
251
252 // local Methods
253
254 /**
255 * Reads all remainings data into buffer. This is used to force a read
256 * of upstream responses.
257 *
258 * <P>This is probably the most tricky and buggy method around. It's the
259 * only one that really violates the strict top-down method invocation
260 * from the Response through the ResponseStream to the StreamDemultiplexor.
261 * This means we need to be awfully careful about what is synchronized
262 * and what parameters are passed to whom.
263 *
264 * @param timeout the timeout to use for reading from the demux
265 * @exception IOException If any exception occurs while reading stream.
266 */
267 void readAll(int timeout) throws IOException
268 {
269 Log.write(Log.DEMUX, "RspIS: Read-all on stream " + this.hashCode());
270
271 synchronized (resph.resp)
272 {
273 if (!resph.resp.got_headers) // force headers to be read
274 {
275 int sav_to = resph.resp.timeout;
276 resph.resp.timeout = timeout;
277 resph.resp.getStatusCode();
278 resph.resp.timeout = sav_to;
279 }
280 }
281
282 synchronized (this)
283 {
284 if (buffer != null && !interrupted) return;
285
286 int rcvd = 0;
287 try
288 {
289 if (closed) // throw away
290 {
291 buffer = new byte[10000];
292 do
293 {
294 count += rcvd;
295 rcvd = demux.read(buffer, 0, buffer.length, resph,
296 timeout);
297 } while (rcvd != -1);
298 buffer = null;
299 }
300 else
301 {
302 if (buffer == null)
303 {
304 buffer = new byte[10000];
305 offset = 0;
306 end = 0;
307 }
308
309 do
310 {
311 rcvd = demux.read(buffer, end, buffer.length-end, resph,
312 timeout);
313 if (rcvd < 0) break;
314
315 count += rcvd;
316 end += rcvd;
317 buffer = Util.resizeArray(buffer, end+10000);
318 } while (true);
319 }
320 }
321 catch (InterruptedIOException iioe)
322 {
323 interrupted = true;
324 throw iioe;
325 }
326 catch (IOException ioe)
327 {
328 buffer = null; // force a read on demux for exception
329 }
330
331 interrupted = false;
332 }
333 }
334
335
336 /**
337 * Sometime the full response body must be read, i.e. the connection may
338 * not be closed prematurely (by us). Currently this is needed when the
339 * chunked encoding with trailers is used in a response.
340 */
341 synchronized void dontTruncate()
342 {
343 dont_truncate = true;
344 }
345 }