View Javadoc

1   /*
2    * @(#)RespInputStream.java				0.3-3 06/05/2001
3    *
4    *  This file is part of the HTTPClient package
5    *  Copyright (C) 1996-2001 Ronald Tschalär
6    *
7    *  This library is free software; you can redistribute it and/or
8    *  modify it under the terms of the GNU Lesser General Public
9    *  License as published by the Free Software Foundation; either
10   *  version 2 of the License, or (at your option) any later version.
11   *
12   *  This library is distributed in the hope that it will be useful,
13   *  but WITHOUT ANY WARRANTY; without even the implied warranty of
14   *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15   *  Lesser General Public License for more details.
16   *
17   *  You should have received a copy of the GNU Lesser General Public
18   *  License along with this library; if not, write to the Free
19   *  Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
20   *  MA 02111-1307, USA
21   *
22   *  For questions, suggestions, bug-reports, enhancement-requests etc.
23   *  I may be contacted at:
24   *
25   *  ronald@innovation.ch
26   *
27   *  The HTTPClient's home page is located at:
28   *
29   *  http://www.innovation.ch/java/HTTPClient/ 
30   *
31   */
32  
33  package HTTPClient;
34  
35  import java.io.InputStream;
36  import java.io.IOException;
37  import java.io.InterruptedIOException;
38  
39  /**
40   * This is the InputStream that gets returned to the user. The extensions
41   * consist of the capability to have the data pushed into a buffer if the
42   * stream demux needs to.
43   *
44   * @version	0.3-3  06/05/2001
45   * @author	Ronald Tschalär
46   * @since	V0.2
47   */
48  final class RespInputStream extends InputStream implements GlobalConstants
49  {
50      /** Use old behaviour: don't set a timeout when reading the response body */
51      private static boolean dontTimeoutBody = false;
52  
53      /** the stream demultiplexor */
54      private StreamDemultiplexor demux = null;
55  
56      /** our response handler */
57      private ResponseHandler     resph;
58  
59      /** signals that the user has closed the stream and will therefore
60  	not read any further data */
61  	    boolean             closed = false;
62  
63      /** signals that the connection may not be closed prematurely */
64      private boolean             dont_truncate = false;
65  
66      /** this buffer is used to buffer data that the demux has to get rid of */
67      private byte[]              buffer = null;
68  
69      /** signals that we were interrupted and that the buffer is not complete */
70      private boolean             interrupted = false;
71  
72      /** the offset at which the unread data starts in the buffer */
73      private int                 offset = 0;
74  
75      /** the end of the data in the buffer */
76      private int                 end = 0;
77  
78      /** the total number of bytes of entity data read from the demux so far */
79              int                 count = 0;
80  
81      static
82      {
83  	try
84  	{
85  	    dontTimeoutBody = Boolean.getBoolean("HTTPClient.dontTimeoutRespBody");
86  	    if (dontTimeoutBody)
87  		Log.write(Log.DEMUX, "RspIS: disabling timeouts when " +
88  				     "reading response body");
89  	}
90  	catch (Exception e)
91  	    { }
92      }
93  
94  
95      // Constructors
96  
97      RespInputStream(StreamDemultiplexor demux, ResponseHandler resph)
98      {
99  	this.demux = demux;
100 	this.resph = resph;
101     }
102 
103 
104     // public Methods
105 
106     private byte[] ch = new byte[1];
107     /**
108      * Reads a single byte.
109      *
110      * @return the byte read, or -1 if EOF.
111      * @exception IOException if any exception occured on the connection.
112      */
113     public synchronized int read() throws IOException
114     {
115 	int rcvd = read(ch, 0, 1);
116 	if (rcvd == 1)
117 	    return ch[0] & 0xff;
118 	else
119 	    return -1;
120     }
121 
122 
123     /**
124      * Reads <var>len</var> bytes into <var>b</var>, starting at offset
125      * <var>off</var>.
126      *
127      * @return the number of bytes actually read, or -1 if EOF.
128      * @exception IOException if any exception occured on the connection.
129      */
130     public synchronized int read(byte[] b, int off, int len) throws IOException
131     {
132 	if (closed)
133 	    return -1;
134 
135 	int left = end - offset;
136 	if (buffer != null  &&  !(left == 0  &&  interrupted))
137 	{
138 	    if (left == 0)  return -1;
139 
140 	    len = (len > left ? left : len);
141 	    System.arraycopy(buffer, offset, b, off, len);
142 	    offset += len;
143 
144 	    return len;
145 	}
146 	else
147 	{
148 	    if (resph.resp.cd_type != CD_HDRS)
149 		Log.write(Log.DEMUX, "RspIS: Reading stream " + this.hashCode());
150 
151 	    int rcvd;
152 	    if (dontTimeoutBody  &&  resph.resp.cd_type != CD_HDRS)
153 		rcvd = demux.read(b, off, len, resph, 0);
154 	    else
155 		rcvd = demux.read(b, off, len, resph, resph.resp.timeout);
156 	    if (rcvd != -1  &&  resph.resp.got_headers)
157 		count += rcvd;
158 
159 	    return rcvd;
160 	}
161     }
162 
163 
164     /**
165      * skips <var>num</var> bytes.
166      *
167      * @return the number of bytes actually skipped.
168      * @exception IOException if any exception occured on the connection.
169      */
170     public synchronized long skip(long num) throws IOException
171     {
172 	if (closed)
173 	    return 0;
174 
175 	int left = end - offset;
176 	if (buffer != null  &&  !(left == 0  &&  interrupted))
177 	{
178 	    num = (num > left ? left : num);
179 	    offset  += num;
180 	    return num;
181 	}
182 	else
183 	{
184 	    long skpd = demux.skip(num, resph);
185 	    if (resph.resp.got_headers)
186 		count += skpd;
187 	    return skpd;
188 	}
189     }
190 
191 
192     /**
193      * gets the number of bytes available for reading without blocking.
194      *
195      * @return the number of bytes available.
196      * @exception IOException if any exception occured on the connection.
197      */
198     public synchronized int available() throws IOException
199     {
200 	if (closed)
201 	    return 0;
202 
203 	if (buffer != null  &&  !(end-offset == 0  &&  interrupted))
204 	    return end-offset;
205 	else
206 	    return demux.available(resph);
207     }
208 
209 
210     /**
211      * closes the stream.
212      *
213      * @exception if any exception occured on the connection before or
214      *            during close.
215      */
216     public synchronized void close()  throws IOException
217     {
218 	if (!closed)
219 	{
220 	    closed = true;
221 
222 	    if (dont_truncate  &&  (buffer == null  ||  interrupted))
223 		readAll(resph.resp.timeout);
224 
225 	    Log.write(Log.DEMUX, "RspIS: User closed stream " + hashCode());
226 
227 	    demux.closeSocketIfAllStreamsClosed();
228 
229 	    if (dont_truncate)
230 	    {
231 		try
232 		    { resph.resp.http_resp.invokeTrailerHandlers(false); }
233 		catch (ModuleException me)
234 		    { throw new IOException(me.toString()); }
235 	    }
236 	}
237     }
238 
239 
240     /**
241      * A safety net to clean up.
242      */
243     protected void finalize()  throws Throwable
244     {
245 	try
246 	    { close(); }
247 	finally
248 	    { super.finalize(); }
249     }
250 
251 
252     // local Methods
253 
254     /**
255      * Reads all remainings data into buffer. This is used to force a read
256      * of upstream responses.
257      *
258      * <P>This is probably the most tricky and buggy method around. It's the
259      * only one that really violates the strict top-down method invocation
260      * from the Response through the ResponseStream to the StreamDemultiplexor.
261      * This means we need to be awfully careful about what is synchronized
262      * and what parameters are passed to whom.
263      *
264      * @param timeout the timeout to use for reading from the demux
265      * @exception IOException If any exception occurs while reading stream.
266      */
267     void readAll(int timeout)  throws IOException
268     {
269 	Log.write(Log.DEMUX, "RspIS: Read-all on stream " + this.hashCode());
270 
271 	synchronized (resph.resp)
272 	{
273 	    if (!resph.resp.got_headers)	// force headers to be read
274 	    {
275 		int sav_to = resph.resp.timeout;
276 		resph.resp.timeout = timeout;
277 		resph.resp.getStatusCode();
278 		resph.resp.timeout = sav_to;
279 	    }
280 	}
281 
282 	synchronized (this)
283 	{
284 	    if (buffer != null  &&  !interrupted)  return;
285 
286 	    int rcvd = 0;
287 	    try
288 	    {
289 		if (closed)			// throw away
290 		{
291 		    buffer = new byte[10000];
292 		    do
293 		    {
294 			count += rcvd;
295 			rcvd   = demux.read(buffer, 0, buffer.length, resph,
296 					    timeout);
297 		    } while (rcvd != -1);
298 		    buffer = null;
299 		}
300 		else
301 		{
302 		    if (buffer == null)
303 		    {
304 			buffer = new byte[10000];
305 			offset = 0;
306 			end    = 0;
307 		    }
308 
309 		    do
310 		    {
311 			rcvd = demux.read(buffer, end, buffer.length-end, resph,
312 					  timeout);
313 			if (rcvd < 0)  break;
314 
315 			count  += rcvd;
316 			end    += rcvd;
317 			buffer  = Util.resizeArray(buffer, end+10000);
318 		    } while (true);
319 		}
320 	    }
321 	    catch (InterruptedIOException iioe)
322 	    {
323 		interrupted = true;
324 		throw iioe;
325 	    }
326 	    catch (IOException ioe)
327 	    {
328 		buffer = null;	// force a read on demux for exception
329 	    }
330 
331 	    interrupted = false;
332 	}
333     }
334 
335 
336     /**
337      * Sometime the full response body must be read, i.e. the connection may
338      * not be closed prematurely (by us). Currently this is needed when the
339      * chunked encoding with trailers is used in a response.
340      */
341     synchronized void dontTruncate()
342     {
343 	dont_truncate = true;
344     }
345 }