View Javadoc

1   /*
2    * @(#)StreamDemultiplexor.java				0.3-3 06/05/2001
3    *
4    *  This file is part of the HTTPClient package
5    *  Copyright (C) 1996-2001 Ronald Tschalär
6    *
7    *  This library is free software; you can redistribute it and/or
8    *  modify it under the terms of the GNU Lesser General Public
9    *  License as published by the Free Software Foundation; either
10   *  version 2 of the License, or (at your option) any later version.
11   *
12   *  This library is distributed in the hope that it will be useful,
13   *  but WITHOUT ANY WARRANTY; without even the implied warranty of
14   *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15   *  Lesser General Public License for more details.
16   *
17   *  You should have received a copy of the GNU Lesser General Public
18   *  License along with this library; if not, write to the Free
19   *  Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
20   *  MA 02111-1307, USA
21   *
22   *  For questions, suggestions, bug-reports, enhancement-requests etc.
23   *  I may be contacted at:
24   *
25   *  ronald@innovation.ch
26   *
27   *  The HTTPClient's home page is located at:
28   *
29   *  http://www.innovation.ch/java/HTTPClient/ 
30   *
31   * This file contains modifications for use with "The Grinder"
32   * (http://grinder.sourceforge.net) under the terms of the LGPL. They
33   * are marked below with the comment "GRINDER MODIFICATION".
34   *
35   */
36  
37  package HTTPClient;
38  
39  import java.io.IOException;
40  import java.io.EOFException;
41  import java.io.InterruptedIOException;
42  import java.net.Socket;
43  import java.net.SocketException;
44  
45  /**
46   * This class handles the demultiplexing of input stream. This is needed
47   * for things like keep-alive in HTTP/1.0, persist in HTTP/1.1 and in HTTP-NG.
48   *
49   * @version	0.3-3  06/05/2001
50   * @author	Ronald Tschalär
51   */
52  class StreamDemultiplexor implements GlobalConstants
53  {
54      /** the protocol were handling request for */
55      private int                    Protocol;
56  
57      /** the connection we're working for */
58      private HTTPConnection         Connection;
59  
60      /** the input stream to demultiplex */
61      private BufferedInputStream    Stream;
62  
63      /** the socket this hangs off */
64      private Socket                 Sock = null;
65  
66      /** signals after the closing of which stream to close the socket */
67      private ResponseHandler        MarkedForClose;
68  
69      /** timer used to close the socket if unused for a given time */
70      private SocketTimeout.TimeoutEntry Timer = null;
71  
72      /** timer thread which implements the timers */
73      private static SocketTimeout   TimerThread = null;
74  
75      /** cleanup object to stop timer thread when we're gc'd */
76      private static Object          cleanup;
77  
78      /** a Vector to hold the list of response handlers were serving */
79      private LinkedList             RespHandlerList;
80  
81      /** number of unread bytes in current chunk (if transf-enc == chunked) */
82      private long                   chunk_len;
83  
84      /** the currently set timeout for the socket */
85      private int                    cur_timeout = 0;
86  
87  
88      static
89      {
90  	TimerThread = new SocketTimeout(60);
91  	TimerThread.start();
92  
93  	/* This is here to clean up the timer thread should the
94  	 * StreamDemultiplexor class be gc'd. This will not usually happen,
95  	 * unless the stuff is being run in an Applet or similar environment
96  	 * where multiple classloaders are used to load the same class
97  	 * multiple times. However, even in those environments it's not clear
98  	 * that this here will do us any good, because classes aren't usually
99  	 * gc'd unless their classloader is, but the timer thread keeps a
100 	 * reference to the classloader, and hence ought to prevent the
101 	 * classloader from being gc'd.
102 	 */
103 	cleanup = new Object() {
104 	    private final SocketTimeout timer = StreamDemultiplexor.TimerThread;
105 
106 	    protected void finalize()
107 	    {
108 		timer.kill();
109 	    }
110 	};
111     }
112 
113 
114     // Constructors
115 
116     /**
117      * a simple contructor.
118      *
119      * @param protocol   the protocol used on this stream.
120      * @param sock       the socket which we're to demux.
121      * @param connection the http-connection this socket belongs to.
122      */
123     StreamDemultiplexor(int protocol, Socket sock, HTTPConnection connection)
124 	    throws IOException
125     {
126 	this.Protocol   = protocol;
127 	this.Connection = connection;
128 	RespHandlerList = new LinkedList();
129 	init(sock);
130     }
131 
132 
133     /**
134      * Initializes the demultiplexor with a new socket.
135      *
136      * @param stream   the stream to demultiplex
137      */
138     private void init(Socket sock)  throws IOException
139     {
140 	Log.write(Log.DEMUX, "Demux: Initializing Stream Demultiplexor (" +
141 			     this.hashCode() + ")");
142 
143 	this.Sock       = sock;
144 	this.Stream     = new BufferedInputStream(sock.getInputStream());
145 	MarkedForClose  = null;
146 	chunk_len       = -1;
147 
148 	// create a timer to close the socket after 60 seconds, but don't
149 	// start it yet
150 	Timer = TimerThread.setTimeout(this);
151 	Timer.hyber();
152     }
153 
154 
155     // Methods
156 
157     /**
158      * Each Response must register with us.
159      */
160     void register(Response resp_handler, Request req)  throws RetryException
161     {
162 	synchronized (RespHandlerList)
163 	{
164 	    if (Sock == null)
165 		throw new RetryException();
166 
167 	    RespHandlerList.addToEnd(
168 				new ResponseHandler(resp_handler, req, this));
169 	}
170     }
171 
172     /**
173      * creates an input stream for the response.
174      *
175      * @param resp the response structure requesting the stream
176      * @return an InputStream
177      */
178     RespInputStream getStream(Response resp)
179     {
180 	ResponseHandler resph;
181 	synchronized (RespHandlerList)
182 	{
183 	    for (resph = (ResponseHandler) RespHandlerList.enumerate();
184 		 resph != null;
185 		 resph = (ResponseHandler) RespHandlerList.next())
186 	    {
187 		if (resph.resp == resp)  break;
188 	    }
189 	}
190 
191 	if (resph != null)
192 	    return resph.stream;
193 	else
194 	    return null;
195     }
196 
197 
198     /**
199      * Restarts the timer thread that will close an unused socket after
200      * 60 seconds.
201      */
202     void restartTimer()
203     {
204 	if (Timer != null)  Timer.reset();
205     }
206 
207 
208     /**
209      * reads an array of bytes from the master stream.
210      */
211     int read(byte[] b, int off, int len, ResponseHandler resph, int timeout)
212 	    throws IOException
213     {
214 	if (resph.exception != null)
215 	{
216 	    resph.exception.fillInStackTrace();
217 	    throw resph.exception;
218 	}
219 
220 	if (resph.eof)
221 	    return -1;
222 
223 
224 	// read the headers and data for all responses preceding us.
225 
226 	ResponseHandler head;
227 	while ((head = (ResponseHandler) RespHandlerList.getFirst()) != null  &&
228 		head != resph)
229 	{
230 	    try
231 		{ head.stream.readAll(timeout); }
232 	    catch (IOException ioe)
233 	    {
234 		if (ioe instanceof InterruptedIOException)
235 		    throw ioe;
236 		else
237 		{
238 		    resph.exception.fillInStackTrace();
239 		    throw resph.exception;
240 		}
241 	    }
242 	}
243 
244 
245 	// Now we can read from the stream.
246 
247 	synchronized (this)
248 	{
249 	    if (resph.exception != null)
250 	    {
251 		resph.exception.fillInStackTrace();
252 		throw resph.exception;
253 	    }
254 
255 	    if (resph.resp.cd_type != CD_HDRS)
256 		Log.write(Log.DEMUX, "Demux: Reading for stream " +
257 				     resph.stream.hashCode());
258 
259 	    if (Timer != null)  Timer.hyber();
260 
261 	    try
262 	    {
263 		int rcvd = -1;
264 
265 		if (timeout != cur_timeout)
266 		{
267 		    Log.write(Log.DEMUX, "Demux: Setting timeout to " +
268 					 timeout + " ms");
269 
270 		    Sock.setSoTimeout(timeout);
271 		    cur_timeout = timeout;
272 		}
273 
274 		switch (resph.resp.cd_type)
275 		{
276 		    case CD_HDRS:
277 			rcvd = Stream.read(b, off, len);
278 			if (rcvd == -1)
279 			    throw new EOFException("Premature EOF encountered");
280 			break;
281 
282 		    case CD_0:
283 			rcvd = -1;
284 			close(resph);
285 			break;
286 
287 		    case CD_CLOSE:
288 			rcvd = Stream.read(b, off, len);
289 			if (rcvd == -1)
290 			    close(resph);
291 			break;
292 
293 		    case CD_CONTLEN:
294 			int cl = resph.resp.ContentLength;
295 			if (len > cl - resph.stream.count)
296 			    len = cl - resph.stream.count;
297 
298 			rcvd = Stream.read(b, off, len);
299 			if (rcvd == -1)
300 			    throw new EOFException("Premature EOF encountered");
301 
302 			if (resph.stream.count+rcvd == cl)
303 			    close(resph);
304 
305 			break;
306 
307 		    case CD_CHUNKED:
308 			if (chunk_len == -1)	// it's a new chunk
309 			    chunk_len = Codecs.getChunkLength(Stream);
310 
311 			if (chunk_len > 0)		// it's data
312 			{
313 			    if (len > chunk_len)  len = (int) chunk_len;
314 			    rcvd = Stream.read(b, off, len);
315 			    if (rcvd == -1)
316 				throw new EOFException("Premature EOF encountered");
317 			    chunk_len -= rcvd;
318 			    if (chunk_len == 0)	// got the whole chunk
319 			    {
320 				Stream.read();	// CR
321 				Stream.read();	// LF
322 				chunk_len = -1;
323 			    }
324 			}
325 			else	// the footers (trailers)
326 			{
327 			    resph.resp.readTrailers(Stream);
328 			    rcvd = -1;
329 			    close(resph);
330 			    chunk_len = -1;
331 			}
332 			break;
333 
334 		    case CD_MP_BR:
335 			byte[] endbndry = resph.getEndBoundary(Stream);
336 			int[]  end_cmp  = resph.getEndCompiled(Stream);
337 
338 			rcvd = Stream.read(b, off, len);
339 			if (rcvd == -1)
340 			    throw new EOFException("Premature EOF encountered");
341 
342 			int ovf = Stream.pastEnd(endbndry, end_cmp);
343 			if (ovf != -1)
344 			{
345 			    rcvd -= ovf;
346 			    close(resph);
347 			}
348 
349 			break;
350 
351 		    default:
352 			throw new Error("Internal Error in StreamDemultiplexor: " +
353 					"Invalid cd_type " + resph.resp.cd_type);
354 		}
355 
356 		restartTimer();
357 		return rcvd;
358 
359 	    }
360 	    catch (InterruptedIOException ie)	// don't intercept this one
361 	    {
362 		restartTimer();
363 		throw ie;
364 	    }
365 	    catch (IOException ioe)
366 	    {
367 		Log.write(Log.DEMUX, "Demux: ", ioe);
368 
369 		close(ioe, true);
370 		throw resph.exception;		// set by retry_requests
371 	    }
372 	    catch (ParseException pe)
373 	    {
374 		Log.write(Log.DEMUX, "Demux: ", pe);
375 
376 		close(new IOException(pe.toString()), true);
377 		throw resph.exception;		// set by retry_requests
378 	    }
379 	}
380     }
381 
382     /**
383      * skips a number of bytes in the master stream. This is done via a
384      * dummy read, as the socket input stream doesn't like skip()'s.
385      */
386     synchronized long skip(long num, ResponseHandler resph) throws IOException
387     {
388 	if (resph.exception != null)
389 	{
390 	    resph.exception.fillInStackTrace();
391 	    throw resph.exception;
392 	}
393 
394 	if (resph.eof)
395 	    return 0;
396 
397 	byte[] dummy = new byte[(int) num];
398 	int rcvd = read(dummy, 0, (int) num, resph, 0);
399 	if (rcvd == -1)
400 	    return 0;
401 	else
402 	    return rcvd;
403     }
404 
405     /**
406      * Determines the number of available bytes. If <var>resph</var> is null, return
407      * available bytes on the socket stream itself (used by HTTPConnection).
408      */
409     synchronized int available(ResponseHandler resph) throws IOException
410     {
411 	if (resph != null  &&  resph.exception != null)
412 	{
413 	    resph.exception.fillInStackTrace();
414 	    throw resph.exception;
415 	}
416 
417 	if (resph != null  &&  resph.eof)
418 	    return 0;
419 
420 	int avail = Stream.available();
421 	if (resph == null)
422 	  return avail;
423 
424 	switch (resph.resp.cd_type)
425 	{
426 	    case CD_0:
427 		return 0;
428 	    case CD_HDRS:
429 		// this is something of a hack; I could return 0, but then
430 		// if you were waiting for something on a response that
431 		// wasn't first in line (and you didn't try to read the
432 		// other response) you'd wait forever. On the other hand,
433 		// we might be making a false promise here...
434 		return (avail > 0 ? 1 : 0);
435 	    case CD_CLOSE:
436 		return avail;
437 	    case CD_CONTLEN:
438 		int cl = resph.resp.ContentLength;
439 		cl -= resph.stream.count;
440 		return (avail < cl ? avail : cl);
441 	    case CD_CHUNKED:
442 		return avail;	// not perfect...
443 	    case CD_MP_BR:
444 		return avail;	// not perfect...
445 	    default:
446 		throw new Error("Internal Error in StreamDemultiplexor: " +
447 				"Invalid cd_type " + resph.resp.cd_type);
448 	}
449 
450     }
451 
452 
453     /**
454      * Closes the socket and all associated streams. If <var>exception</var>
455      * is not null then all active requests are retried.
456      *
457      * <P>There are five ways this method may be activated. 1) if an exception
458      * occurs during read or write. 2) if the stream is marked for close but
459      * no responses are outstanding (e.g. due to a timeout). 3) when the
460      * markedForClose response is closed. 4) if all response streams up until
461      * and including the markedForClose response have been closed. 5) if this
462      * demux is finalized.
463      *
464      * @param exception the IOException to be sent to the streams.
465      * @param was_reset if true then the exception is due to a connection
466      *                  reset; otherwise it means we generated the exception
467      *                  ourselves and this is a "normal" close.
468      */
469     synchronized void close(IOException exception, boolean was_reset)
470     {
471 	if (Sock == null)	// already cleaned up
472 	    return;
473 
474 	Log.write(Log.DEMUX, "Demux: Closing all streams and socket (" +
475 			     this.hashCode() + ")");
476 
477 	try
478 	    { Stream.close(); }
479 	catch (IOException ioe) { }
480 	try
481 	    { Sock.close(); }
482 	catch (IOException ioe) { }
483 	Sock = null;
484 
485 	if (Timer != null)
486 	{
487 	    Timer.kill();
488 	    Timer = null;
489 	}
490 
491 	Connection.DemuxList.remove(this);
492 
493 
494 	// Here comes the tricky part: redo outstanding requests!
495 
496 	if (exception != null)
497 	    synchronized (RespHandlerList)
498 		{ retry_requests(exception, was_reset); }
499     }
500 
501 
502     /**
503      * Retries outstanding requests. Well, actually the RetryModule does
504      * that. Here we just throw a RetryException for each request so that
505      * the RetryModule can catch and handle them.
506      *
507      * @param exception the exception that led to this call.
508      * @param was_reset this flag is passed to the RetryException and is
509      *                  used by the RetryModule to distinguish abnormal closes
510      *                  from expected closes.
511      */
512     private void retry_requests(IOException exception, boolean was_reset)
513     {
514 	RetryException  first = null,
515 			prev  = null;
516 	ResponseHandler resph = (ResponseHandler) RespHandlerList.enumerate();
517 
518 	while (resph != null)
519 	{
520 	    /* if the application is already reading the data then the
521 	     * response has already been handled. In this case we must
522 	     * throw the real exception.
523 	     */
524 	    if (resph.resp.got_headers)
525 	    {
526 		resph.exception = exception;
527 	    }
528 	    else
529 	    {
530 		RetryException tmp = new RetryException(exception.getMessage());
531 		if (first == null)  first = tmp;
532 
533 		tmp.request    = resph.request;
534 		tmp.response   = resph.resp;
535 		tmp.exception  = exception;
536 		tmp.conn_reset = was_reset;
537 		tmp.first      = first;
538 		tmp.addToListAfter(prev);
539 
540 		prev = tmp;
541 		resph.exception = tmp;
542 	    }
543 
544 	    RespHandlerList.remove(resph);
545 	    resph = (ResponseHandler) RespHandlerList.next();
546 	}
547     }
548 
549 
550     /**
551      * Closes the associated stream. If this one has been markedForClose then
552      * the socket is closed; else closeSocketIfAllStreamsClosed is invoked.
553      */
554     private void close(ResponseHandler resph)
555     {
556 	synchronized (RespHandlerList)
557 	{
558 	    if (resph != (ResponseHandler) RespHandlerList.getFirst())
559 		return;
560 
561 	    Log.write(Log.DEMUX, "Demux: Closing stream " +
562 				 resph.stream.hashCode());
563 
564 	    resph.eof = true;
565 	    RespHandlerList.remove(resph);
566 	}
567 
568 	if (resph == MarkedForClose)
569 	    close(new IOException("Premature end of Keep-Alive"), false);
570 	else
571 	    closeSocketIfAllStreamsClosed();
572     }
573 
574 
575     /**
576      * Close the socket if all the streams have been closed.
577      *
578      * <P>When a stream reaches eof it is removed from the response handler
579      * list, but when somebody close()'s the response stream it is just
580      * marked as such. This means that all responses in the list have either
581      * not been read at all or only partially read, but they might have been
582      * close()'d meaning that nobody is interested in the data. So If all the
583      * response streams up till and including the one markedForClose have
584      * been close()'d then we can remove them from our list and close the
585      * socket.
586      *
587      * <P>Note: if the response list is emtpy or if no response is
588      * markedForClose then this method does nothing. Specifically it does
589      * not close the socket. We only want to close the socket if we've been
590      * told to do so.
591      *
592      * <P>Also note that there might still be responses in the list after
593      * the markedForClose one. These are due to us having pipelined more
594      * requests to the server than it's willing to serve on a single
595      * connection. These requests will be retried if possible.
596      */
597     synchronized void closeSocketIfAllStreamsClosed()
598     {
599 	synchronized (RespHandlerList)
600 	{
601 	    ResponseHandler resph = (ResponseHandler) RespHandlerList.enumerate();
602 
603 	    while (resph != null  &&  resph.stream.closed)
604 	    {
605 		if (resph == MarkedForClose)
606 		{
607 		    // remove all response handlers first
608 		    ResponseHandler tmp;
609 		    do
610 		    {
611 			tmp = (ResponseHandler) RespHandlerList.getFirst();
612 			RespHandlerList.remove(tmp);
613 		    }
614 		    while (tmp != resph);
615 
616 		    // close the socket
617 		    close(new IOException("Premature end of Keep-Alive"), false);
618 		    return;
619 		}
620 
621 		resph = (ResponseHandler) RespHandlerList.next();
622 	    }
623 	}
624     }
625 
626 
627     /**
628      * returns the socket associated with this demux
629      */
630     synchronized Socket getSocket()
631     {
632 	if (MarkedForClose != null)
633 	    return null;
634 
635 	if (Timer != null)  Timer.hyber();
636 	return Sock;
637     }
638 
639 
640     /**
641      * Mark this demux to not accept any more request and to close the
642      * stream after this <var>resp</var>onse or all requests have been
643      * processed, or close immediately if no requests are registered.
644      *
645      * @param response the Response after which the connection should
646      *                 be closed.
647      */
648     synchronized void markForClose(Response resp)
649     {
650 	synchronized (RespHandlerList)
651 	{
652 	    if (RespHandlerList.getFirst() == null)	// no active request,
653 	    {						// so close the socket
654 		close(new IOException("Premature end of Keep-Alive"), false);
655 		return;
656 	    }
657 
658 	    if (Timer != null)
659 	    {
660 		Timer.kill();
661 		Timer = null;
662 	    }
663 
664 	    ResponseHandler resph, lasth = null;
665 	    for (resph = (ResponseHandler) RespHandlerList.enumerate();
666 		 resph != null;
667 		 resph = (ResponseHandler) RespHandlerList.next())
668 	    {
669 		if (resph.resp == resp)	// new resp precedes any others
670 		{
671 		    MarkedForClose = resph;
672 
673 		    Log.write(Log.DEMUX, "Demux: stream " +
674 					 resp.inp_stream.hashCode() +
675 					 " marked for close");
676 
677 		    closeSocketIfAllStreamsClosed();
678 		    return;
679 		}
680 
681 		if (MarkedForClose == resph)
682 		    return;	// already marked for closing after an earlier resp
683 
684 		lasth = resph;
685 	    }
686 
687 	    if (lasth == null)
688 		return;
689 
690 	    MarkedForClose = lasth;		// resp == null, so use last resph
691 	    closeSocketIfAllStreamsClosed();
692 
693 	    Log.write(Log.DEMUX, "Demux: stream " + lasth.stream.hashCode() +
694 				 " marked for close");
695 	}
696     }
697 
698 
699     /**
700      * Emergency stop. Closes the socket and notifies the responses that
701      * the requests are aborted.
702      *
703      * @since V0.3
704      */
705     void abort()
706     {
707 	Log.write(Log.DEMUX, "Demux: Aborting socket (" + this.hashCode() + ")");
708 
709 
710 	// notify all responses of abort
711 
712 	synchronized (RespHandlerList)
713 	{
714 	    for (ResponseHandler resph =
715 				(ResponseHandler) RespHandlerList.enumerate();
716 		 resph != null;
717 		 resph = (ResponseHandler) RespHandlerList.next())
718 	    {
719 		if (resph.resp.http_resp != null)
720 		    resph.resp.http_resp.markAborted();
721 		if (resph.exception == null)
722 		    resph.exception = new IOException("Request aborted by user");
723 	    }
724 
725 
726 	    /* Close the socket.
727 	     * Note: this duplicates most of close(IOException, boolean). We
728 	     * do *not* call close() because that is synchronized, but we want
729 	     * abort() to be asynch.
730 	     */
731 	    if (Sock != null)
732 	    {
733 		try
734 		{
735 		    try
736                       /** ++GRINDER MODIFICATION **/
737                         // { Sock.setSoLinger(false, 0); }
738 			{ Sock.setSoLinger(true, 0); }
739                       /** --GRINDER MODIFICATION **/
740 		    catch (SocketException se)
741 			{ }
742 
743 		    try
744 			{ Stream.close(); }
745 		    catch (IOException ioe) { }
746 		    try
747 			{ Sock.close(); }
748 		    catch (IOException ioe) { }
749 		    Sock = null;
750 
751 		    if (Timer != null)
752 		    {
753 			Timer.kill();
754 			Timer = null;
755 		    }
756 		}
757 		catch (NullPointerException npe)
758 		    { }
759 
760 		Connection.DemuxList.remove(this);
761 	    }
762 	}
763     }
764 
765 
766     /**
767      * A safety net to close the connection.
768      */
769     protected void finalize() throws Throwable
770     {
771 	close((IOException) null, false);
772 	super.finalize();
773     }
774 
775 
776     /**
777      * produces a string.
778      * @return a string containing the class name and protocol number
779      */
780     public String toString()
781     {
782 	String prot;
783 
784 	switch (Protocol)
785 	{
786 	    case HTTP:
787 		prot = "HTTP"; break;
788 	    case HTTPS:
789 		prot = "HTTPS"; break;
790 	    case SHTTP:
791 		prot = "SHTTP"; break;
792 	    case HTTP_NG:
793 		prot = "HTTP_NG"; break;
794 	    default:
795 		throw new Error("HTTPClient Internal Error: invalid protocol " +
796 				Protocol);
797 	}
798 
799 	return getClass().getName() + "[Protocol=" + prot + "]";
800     }
801 }
802 
803 
804 /**
805  * This thread is used to reap idle connections. It is NOT used to timeout
806  * reads or writes on a socket. It keeps a list of timer entries and expires
807  * them after a given time.
808  */
809 class SocketTimeout extends Thread
810 {
811     private boolean alive = true;
812 
813     /**
814      * This class represents a timer entry. It is used to close an
815      * inactive socket after n seconds. Once running, the timer may be
816      * suspended (hyber()), restarted (reset()), or aborted (kill()).
817      * When the timer expires it invokes markForClose() on the
818      * associated stream demultipexer.
819      */
820     class TimeoutEntry
821     {
822 	boolean restart = false,
823 		hyber   = false,
824 		alive   = true;
825 	StreamDemultiplexor demux;
826 	TimeoutEntry next = null,
827 		     prev = null;
828 
829 	TimeoutEntry(StreamDemultiplexor demux)
830 	{
831 	    this.demux = demux;
832 	}
833 
834 	void reset()
835 	{
836 	    hyber = false;
837 	    if (restart)  return;
838 	    restart = true;
839 
840 	    synchronized (time_list)
841 	    {
842 		if (!alive)  return;
843 
844 		// remove from current position
845 		next.prev = prev;
846 		prev.next = next;
847 
848 		// and add to end of timeout list
849 		next = time_list[current];
850 		prev = time_list[current].prev;
851 		prev.next = this;
852 		next.prev = this; 
853 	    }
854 	}
855 
856 	void hyber()
857 	{
858 	    if (alive)  hyber = true;
859 	}
860 
861 	void kill()
862 	{
863 	    alive   = false;
864 	    restart = false;
865 	    hyber   = false;
866 
867 	    synchronized (time_list)
868 	    {
869 		if (prev == null)  return;
870 		next.prev = prev;
871 		prev.next = next;
872 		prev = null;
873 	    }
874 	}
875     }
876 
877     TimeoutEntry[] time_list;	// jdk 1.1.x javac bug: these must not
878     int		   current;	//   be private!
879 
880 
881     SocketTimeout(int secs)
882     {
883 	super("SocketTimeout");
884 
885 	try { setDaemon(true); }
886 	catch (SecurityException se) { }	// Oh well...
887 	setPriority(MAX_PRIORITY);
888 
889 	time_list = new TimeoutEntry[secs];
890 	for (int idx=0; idx<secs; idx++)
891 	{
892 	    time_list[idx] = new TimeoutEntry(null);
893 	    time_list[idx].next = time_list[idx].prev = time_list[idx];
894 	}
895 	current = 0;
896     }
897 
898 
899     public TimeoutEntry setTimeout(StreamDemultiplexor demux)
900     {
901 	TimeoutEntry entry = new TimeoutEntry(demux);
902 	synchronized (time_list)
903 	{
904 	    entry.next = time_list[current];
905 	    entry.prev = time_list[current].prev;
906 	    entry.prev.next = entry;
907 	    entry.next.prev = entry; 
908 	}
909 
910 	return entry;
911     }
912 
913 
914     /**
915      * This timer is implemented by sleeping for 1 second and then
916      * checking the timer list.
917      */
918     public void run()
919     {
920 	TimeoutEntry marked = null;
921 
922 	while (alive)
923 	{
924 	    try { sleep(1000L); } catch (InterruptedException ie) { }
925 
926 	    synchronized (time_list)
927 	    {
928 		// reset all restart flags
929 		for (TimeoutEntry entry = time_list[current].next;
930 		     entry != time_list[current];
931 		     entry = entry.next)
932 		{
933 		    entry.restart = false;
934 		}
935 
936 		current++;
937 		if (current >= time_list.length)
938 		    current = 0;
939 
940 		// remove all expired timers 
941 		for (TimeoutEntry entry = time_list[current].next;
942 		     entry != time_list[current];
943 		     entry = entry.next)
944 		{
945 		    if (entry.alive  &&  !entry.hyber)
946 		    {
947 			TimeoutEntry prev = entry.prev;
948 			entry.kill();
949 			/* put on death row. Note: we must not invoke
950 			 * markForClose() here because it is synch'd
951 			 * and can therefore lead to a deadlock if that
952 			 * thread is trying to do a reset() or kill()
953 			 */
954 			entry.next = marked;
955 			marked = entry;
956 			entry = prev;
957 		    }
958 		}
959 	    }
960 
961 	    while (marked != null)
962 	    {
963 		marked.demux.markForClose(null);
964 		marked = marked.next;
965 	    }
966 	}
967     }
968 
969     /**
970      * Stop the timer thread.
971      */
972     public void kill() {
973 	alive = false;
974     }
975 }