1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37 package HTTPClient;
38
39 import java.io.IOException;
40 import java.io.EOFException;
41 import java.io.InterruptedIOException;
42 import java.net.Socket;
43 import java.net.SocketException;
44
45
46
47
48
49
50
51
52 class StreamDemultiplexor implements GlobalConstants
53 {
54
55 private int Protocol;
56
57
58 private HTTPConnection Connection;
59
60
61 private BufferedInputStream Stream;
62
63
64 private Socket Sock = null;
65
66
67 private ResponseHandler MarkedForClose;
68
69
70 private SocketTimeout.TimeoutEntry Timer = null;
71
72
73 private static SocketTimeout TimerThread = null;
74
75
76 private static Object cleanup;
77
78
79 private LinkedList RespHandlerList;
80
81
82 private long chunk_len;
83
84
85 private int cur_timeout = 0;
86
87
88 static
89 {
90 TimerThread = new SocketTimeout(60);
91 TimerThread.start();
92
93
94
95
96
97
98
99
100
101
102
103 cleanup = new Object() {
104 private final SocketTimeout timer = StreamDemultiplexor.TimerThread;
105
106 protected void finalize()
107 {
108 timer.kill();
109 }
110 };
111 }
112
113
114
115
116
117
118
119
120
121
122
123 StreamDemultiplexor(int protocol, Socket sock, HTTPConnection connection)
124 throws IOException
125 {
126 this.Protocol = protocol;
127 this.Connection = connection;
128 RespHandlerList = new LinkedList();
129 init(sock);
130 }
131
132
133
134
135
136
137
138 private void init(Socket sock) throws IOException
139 {
140 Log.write(Log.DEMUX, "Demux: Initializing Stream Demultiplexor (" +
141 this.hashCode() + ")");
142
143 this.Sock = sock;
144 this.Stream = new BufferedInputStream(sock.getInputStream());
145 MarkedForClose = null;
146 chunk_len = -1;
147
148
149
150 Timer = TimerThread.setTimeout(this);
151 Timer.hyber();
152 }
153
154
155
156
157
158
159
160 void register(Response resp_handler, Request req) throws RetryException
161 {
162 synchronized (RespHandlerList)
163 {
164 if (Sock == null)
165 throw new RetryException();
166
167 RespHandlerList.addToEnd(
168 new ResponseHandler(resp_handler, req, this));
169 }
170 }
171
172
173
174
175
176
177
178 RespInputStream getStream(Response resp)
179 {
180 ResponseHandler resph;
181 synchronized (RespHandlerList)
182 {
183 for (resph = (ResponseHandler) RespHandlerList.enumerate();
184 resph != null;
185 resph = (ResponseHandler) RespHandlerList.next())
186 {
187 if (resph.resp == resp) break;
188 }
189 }
190
191 if (resph != null)
192 return resph.stream;
193 else
194 return null;
195 }
196
197
198
199
200
201
202 void restartTimer()
203 {
204 if (Timer != null) Timer.reset();
205 }
206
207
208
209
210
211 int read(byte[] b, int off, int len, ResponseHandler resph, int timeout)
212 throws IOException
213 {
214 if (resph.exception != null)
215 {
216 resph.exception.fillInStackTrace();
217 throw resph.exception;
218 }
219
220 if (resph.eof)
221 return -1;
222
223
224
225
226 ResponseHandler head;
227 while ((head = (ResponseHandler) RespHandlerList.getFirst()) != null &&
228 head != resph)
229 {
230 try
231 { head.stream.readAll(timeout); }
232 catch (IOException ioe)
233 {
234 if (ioe instanceof InterruptedIOException)
235 throw ioe;
236 else
237 {
238 resph.exception.fillInStackTrace();
239 throw resph.exception;
240 }
241 }
242 }
243
244
245
246
247 synchronized (this)
248 {
249 if (resph.exception != null)
250 {
251 resph.exception.fillInStackTrace();
252 throw resph.exception;
253 }
254
255 if (resph.resp.cd_type != CD_HDRS)
256 Log.write(Log.DEMUX, "Demux: Reading for stream " +
257 resph.stream.hashCode());
258
259 if (Timer != null) Timer.hyber();
260
261 try
262 {
263 int rcvd = -1;
264
265 if (timeout != cur_timeout)
266 {
267 Log.write(Log.DEMUX, "Demux: Setting timeout to " +
268 timeout + " ms");
269
270 Sock.setSoTimeout(timeout);
271 cur_timeout = timeout;
272 }
273
274 switch (resph.resp.cd_type)
275 {
276 case CD_HDRS:
277 rcvd = Stream.read(b, off, len);
278 if (rcvd == -1)
279 throw new EOFException("Premature EOF encountered");
280 break;
281
282 case CD_0:
283 rcvd = -1;
284 close(resph);
285 break;
286
287 case CD_CLOSE:
288 rcvd = Stream.read(b, off, len);
289 if (rcvd == -1)
290 close(resph);
291 break;
292
293 case CD_CONTLEN:
294 int cl = resph.resp.ContentLength;
295 if (len > cl - resph.stream.count)
296 len = cl - resph.stream.count;
297
298 rcvd = Stream.read(b, off, len);
299 if (rcvd == -1)
300 throw new EOFException("Premature EOF encountered");
301
302 if (resph.stream.count+rcvd == cl)
303 close(resph);
304
305 break;
306
307 case CD_CHUNKED:
308 if (chunk_len == -1)
309 chunk_len = Codecs.getChunkLength(Stream);
310
311 if (chunk_len > 0)
312 {
313 if (len > chunk_len) len = (int) chunk_len;
314 rcvd = Stream.read(b, off, len);
315 if (rcvd == -1)
316 throw new EOFException("Premature EOF encountered");
317 chunk_len -= rcvd;
318 if (chunk_len == 0)
319 {
320 Stream.read();
321 Stream.read();
322 chunk_len = -1;
323 }
324 }
325 else
326 {
327 resph.resp.readTrailers(Stream);
328 rcvd = -1;
329 close(resph);
330 chunk_len = -1;
331 }
332 break;
333
334 case CD_MP_BR:
335 byte[] endbndry = resph.getEndBoundary(Stream);
336 int[] end_cmp = resph.getEndCompiled(Stream);
337
338 rcvd = Stream.read(b, off, len);
339 if (rcvd == -1)
340 throw new EOFException("Premature EOF encountered");
341
342 int ovf = Stream.pastEnd(endbndry, end_cmp);
343 if (ovf != -1)
344 {
345 rcvd -= ovf;
346 close(resph);
347 }
348
349 break;
350
351 default:
352 throw new Error("Internal Error in StreamDemultiplexor: " +
353 "Invalid cd_type " + resph.resp.cd_type);
354 }
355
356 restartTimer();
357 return rcvd;
358
359 }
360 catch (InterruptedIOException ie)
361 {
362 restartTimer();
363 throw ie;
364 }
365 catch (IOException ioe)
366 {
367 Log.write(Log.DEMUX, "Demux: ", ioe);
368
369 close(ioe, true);
370 throw resph.exception;
371 }
372 catch (ParseException pe)
373 {
374 Log.write(Log.DEMUX, "Demux: ", pe);
375
376 close(new IOException(pe.toString()), true);
377 throw resph.exception;
378 }
379 }
380 }
381
382
383
384
385
386 synchronized long skip(long num, ResponseHandler resph) throws IOException
387 {
388 if (resph.exception != null)
389 {
390 resph.exception.fillInStackTrace();
391 throw resph.exception;
392 }
393
394 if (resph.eof)
395 return 0;
396
397 byte[] dummy = new byte[(int) num];
398 int rcvd = read(dummy, 0, (int) num, resph, 0);
399 if (rcvd == -1)
400 return 0;
401 else
402 return rcvd;
403 }
404
405
406
407
408
409 synchronized int available(ResponseHandler resph) throws IOException
410 {
411 if (resph != null && resph.exception != null)
412 {
413 resph.exception.fillInStackTrace();
414 throw resph.exception;
415 }
416
417 if (resph != null && resph.eof)
418 return 0;
419
420 int avail = Stream.available();
421 if (resph == null)
422 return avail;
423
424 switch (resph.resp.cd_type)
425 {
426 case CD_0:
427 return 0;
428 case CD_HDRS:
429
430
431
432
433
434 return (avail > 0 ? 1 : 0);
435 case CD_CLOSE:
436 return avail;
437 case CD_CONTLEN:
438 int cl = resph.resp.ContentLength;
439 cl -= resph.stream.count;
440 return (avail < cl ? avail : cl);
441 case CD_CHUNKED:
442 return avail;
443 case CD_MP_BR:
444 return avail;
445 default:
446 throw new Error("Internal Error in StreamDemultiplexor: " +
447 "Invalid cd_type " + resph.resp.cd_type);
448 }
449
450 }
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469 synchronized void close(IOException exception, boolean was_reset)
470 {
471 if (Sock == null)
472 return;
473
474 Log.write(Log.DEMUX, "Demux: Closing all streams and socket (" +
475 this.hashCode() + ")");
476
477 try
478 { Stream.close(); }
479 catch (IOException ioe) { }
480 try
481 { Sock.close(); }
482 catch (IOException ioe) { }
483 Sock = null;
484
485 if (Timer != null)
486 {
487 Timer.kill();
488 Timer = null;
489 }
490
491 Connection.DemuxList.remove(this);
492
493
494
495
496 if (exception != null)
497 synchronized (RespHandlerList)
498 { retry_requests(exception, was_reset); }
499 }
500
501
502
503
504
505
506
507
508
509
510
511
512 private void retry_requests(IOException exception, boolean was_reset)
513 {
514 RetryException first = null,
515 prev = null;
516 ResponseHandler resph = (ResponseHandler) RespHandlerList.enumerate();
517
518 while (resph != null)
519 {
520
521
522
523
524 if (resph.resp.got_headers)
525 {
526 resph.exception = exception;
527 }
528 else
529 {
530 RetryException tmp = new RetryException(exception.getMessage());
531 if (first == null) first = tmp;
532
533 tmp.request = resph.request;
534 tmp.response = resph.resp;
535 tmp.exception = exception;
536 tmp.conn_reset = was_reset;
537 tmp.first = first;
538 tmp.addToListAfter(prev);
539
540 prev = tmp;
541 resph.exception = tmp;
542 }
543
544 RespHandlerList.remove(resph);
545 resph = (ResponseHandler) RespHandlerList.next();
546 }
547 }
548
549
550
551
552
553
554 private void close(ResponseHandler resph)
555 {
556 synchronized (RespHandlerList)
557 {
558 if (resph != (ResponseHandler) RespHandlerList.getFirst())
559 return;
560
561 Log.write(Log.DEMUX, "Demux: Closing stream " +
562 resph.stream.hashCode());
563
564 resph.eof = true;
565 RespHandlerList.remove(resph);
566 }
567
568 if (resph == MarkedForClose)
569 close(new IOException("Premature end of Keep-Alive"), false);
570 else
571 closeSocketIfAllStreamsClosed();
572 }
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597 synchronized void closeSocketIfAllStreamsClosed()
598 {
599 synchronized (RespHandlerList)
600 {
601 ResponseHandler resph = (ResponseHandler) RespHandlerList.enumerate();
602
603 while (resph != null && resph.stream.closed)
604 {
605 if (resph == MarkedForClose)
606 {
607
608 ResponseHandler tmp;
609 do
610 {
611 tmp = (ResponseHandler) RespHandlerList.getFirst();
612 RespHandlerList.remove(tmp);
613 }
614 while (tmp != resph);
615
616
617 close(new IOException("Premature end of Keep-Alive"), false);
618 return;
619 }
620
621 resph = (ResponseHandler) RespHandlerList.next();
622 }
623 }
624 }
625
626
627
628
629
630 synchronized Socket getSocket()
631 {
632 if (MarkedForClose != null)
633 return null;
634
635 if (Timer != null) Timer.hyber();
636 return Sock;
637 }
638
639
640
641
642
643
644
645
646
647
648 synchronized void markForClose(Response resp)
649 {
650 synchronized (RespHandlerList)
651 {
652 if (RespHandlerList.getFirst() == null)
653 {
654 close(new IOException("Premature end of Keep-Alive"), false);
655 return;
656 }
657
658 if (Timer != null)
659 {
660 Timer.kill();
661 Timer = null;
662 }
663
664 ResponseHandler resph, lasth = null;
665 for (resph = (ResponseHandler) RespHandlerList.enumerate();
666 resph != null;
667 resph = (ResponseHandler) RespHandlerList.next())
668 {
669 if (resph.resp == resp)
670 {
671 MarkedForClose = resph;
672
673 Log.write(Log.DEMUX, "Demux: stream " +
674 resp.inp_stream.hashCode() +
675 " marked for close");
676
677 closeSocketIfAllStreamsClosed();
678 return;
679 }
680
681 if (MarkedForClose == resph)
682 return;
683
684 lasth = resph;
685 }
686
687 if (lasth == null)
688 return;
689
690 MarkedForClose = lasth;
691 closeSocketIfAllStreamsClosed();
692
693 Log.write(Log.DEMUX, "Demux: stream " + lasth.stream.hashCode() +
694 " marked for close");
695 }
696 }
697
698
699
700
701
702
703
704
705 void abort()
706 {
707 Log.write(Log.DEMUX, "Demux: Aborting socket (" + this.hashCode() + ")");
708
709
710
711
712 synchronized (RespHandlerList)
713 {
714 for (ResponseHandler resph =
715 (ResponseHandler) RespHandlerList.enumerate();
716 resph != null;
717 resph = (ResponseHandler) RespHandlerList.next())
718 {
719 if (resph.resp.http_resp != null)
720 resph.resp.http_resp.markAborted();
721 if (resph.exception == null)
722 resph.exception = new IOException("Request aborted by user");
723 }
724
725
726
727
728
729
730
731 if (Sock != null)
732 {
733 try
734 {
735 try
736
737
738 { Sock.setSoLinger(true, 0); }
739
740 catch (SocketException se)
741 { }
742
743 try
744 { Stream.close(); }
745 catch (IOException ioe) { }
746 try
747 { Sock.close(); }
748 catch (IOException ioe) { }
749 Sock = null;
750
751 if (Timer != null)
752 {
753 Timer.kill();
754 Timer = null;
755 }
756 }
757 catch (NullPointerException npe)
758 { }
759
760 Connection.DemuxList.remove(this);
761 }
762 }
763 }
764
765
766
767
768
769 protected void finalize() throws Throwable
770 {
771 close((IOException) null, false);
772 super.finalize();
773 }
774
775
776
777
778
779
780 public String toString()
781 {
782 String prot;
783
784 switch (Protocol)
785 {
786 case HTTP:
787 prot = "HTTP"; break;
788 case HTTPS:
789 prot = "HTTPS"; break;
790 case SHTTP:
791 prot = "SHTTP"; break;
792 case HTTP_NG:
793 prot = "HTTP_NG"; break;
794 default:
795 throw new Error("HTTPClient Internal Error: invalid protocol " +
796 Protocol);
797 }
798
799 return getClass().getName() + "[Protocol=" + prot + "]";
800 }
801 }
802
803
804
805
806
807
808
809 class SocketTimeout extends Thread
810 {
811 private boolean alive = true;
812
813
814
815
816
817
818
819
820 class TimeoutEntry
821 {
822 boolean restart = false,
823 hyber = false,
824 alive = true;
825 StreamDemultiplexor demux;
826 TimeoutEntry next = null,
827 prev = null;
828
829 TimeoutEntry(StreamDemultiplexor demux)
830 {
831 this.demux = demux;
832 }
833
834 void reset()
835 {
836 hyber = false;
837 if (restart) return;
838 restart = true;
839
840 synchronized (time_list)
841 {
842 if (!alive) return;
843
844
845 next.prev = prev;
846 prev.next = next;
847
848
849 next = time_list[current];
850 prev = time_list[current].prev;
851 prev.next = this;
852 next.prev = this;
853 }
854 }
855
856 void hyber()
857 {
858 if (alive) hyber = true;
859 }
860
861 void kill()
862 {
863 alive = false;
864 restart = false;
865 hyber = false;
866
867 synchronized (time_list)
868 {
869 if (prev == null) return;
870 next.prev = prev;
871 prev.next = next;
872 prev = null;
873 }
874 }
875 }
876
877 TimeoutEntry[] time_list;
878 int current;
879
880
881 SocketTimeout(int secs)
882 {
883 super("SocketTimeout");
884
885 try { setDaemon(true); }
886 catch (SecurityException se) { }
887 setPriority(MAX_PRIORITY);
888
889 time_list = new TimeoutEntry[secs];
890 for (int idx=0; idx<secs; idx++)
891 {
892 time_list[idx] = new TimeoutEntry(null);
893 time_list[idx].next = time_list[idx].prev = time_list[idx];
894 }
895 current = 0;
896 }
897
898
899 public TimeoutEntry setTimeout(StreamDemultiplexor demux)
900 {
901 TimeoutEntry entry = new TimeoutEntry(demux);
902 synchronized (time_list)
903 {
904 entry.next = time_list[current];
905 entry.prev = time_list[current].prev;
906 entry.prev.next = entry;
907 entry.next.prev = entry;
908 }
909
910 return entry;
911 }
912
913
914
915
916
917
918 public void run()
919 {
920 TimeoutEntry marked = null;
921
922 while (alive)
923 {
924 try { sleep(1000L); } catch (InterruptedException ie) { }
925
926 synchronized (time_list)
927 {
928
929 for (TimeoutEntry entry = time_list[current].next;
930 entry != time_list[current];
931 entry = entry.next)
932 {
933 entry.restart = false;
934 }
935
936 current++;
937 if (current >= time_list.length)
938 current = 0;
939
940
941 for (TimeoutEntry entry = time_list[current].next;
942 entry != time_list[current];
943 entry = entry.next)
944 {
945 if (entry.alive && !entry.hyber)
946 {
947 TimeoutEntry prev = entry.prev;
948 entry.kill();
949
950
951
952
953
954 entry.next = marked;
955 marked = entry;
956 entry = prev;
957 }
958 }
959 }
960
961 while (marked != null)
962 {
963 marked.demux.markForClose(null);
964 marked = marked.next;
965 }
966 }
967 }
968
969
970
971
972 public void kill() {
973 alive = false;
974 }
975 }