/* * @(#)StreamDemultiplexor.java 0.3-3 06/05/2001 * * This file is part of the HTTPClient package * Copyright (C) 1996-2001 Ronald Tschalär * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free * Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, * MA 02111-1307, USA * * For questions, suggestions, bug-reports, enhancement-requests etc. * I may be contacted at: * * ronald@innovation.ch * * The HTTPClient's home page is located at: * * http://www.innovation.ch/java/HTTPClient/ * */ package HTTPClient; import java.io.IOException; import java.io.EOFException; import java.io.InterruptedIOException; import java.net.Socket; import java.net.SocketException; /** * This class handles the demultiplexing of input stream. This is needed * for things like keep-alive in HTTP/1.0, persist in HTTP/1.1 and in HTTP-NG. * * @version 0.3-3 06/05/2001 * @author Ronald Tschalär */ class StreamDemultiplexor implements GlobalConstants { /** the protocol were handling request for */ private int Protocol; /** the connection we're working for */ private HTTPConnection Connection; /** the input stream to demultiplex */ private BufferedInputStream Stream; /** the socket this hangs off */ private Socket Sock = null; /** signals after the closing of which stream to close the socket */ private ResponseHandler MarkedForClose; /** timer used to close the socket if unused for a given time */ private SocketTimeout.TimeoutEntry Timer = null; /** timer thread which implements the timers */ private static SocketTimeout TimerThread = null; /** cleanup object to stop timer thread when we're gc'd */ private static Object cleanup; /** a Vector to hold the list of response handlers were serving */ private LinkedList RespHandlerList; /** number of unread bytes in current chunk (if transf-enc == chunked) */ private long chunk_len; /** the currently set timeout for the socket */ private int cur_timeout = 0; static { TimerThread = new SocketTimeout(60); TimerThread.start(); /* This is here to clean up the timer thread should the * StreamDemultiplexor class be gc'd. This will not usually happen, * unless the stuff is being run in an Applet or similar environment * where multiple classloaders are used to load the same class * multiple times. However, even in those environments it's not clear * that this here will do us any good, because classes aren't usually * gc'd unless their classloader is, but the timer thread keeps a * reference to the classloader, and hence ought to prevent the * classloader from being gc'd. */ cleanup = new Object() { private final SocketTimeout timer = StreamDemultiplexor.TimerThread; protected void finalize() { timer.kill(); } }; } // Constructors /** * a simple contructor. * * @param protocol the protocol used on this stream. * @param sock the socket which we're to demux. * @param connection the http-connection this socket belongs to. */ StreamDemultiplexor(int protocol, Socket sock, HTTPConnection connection) throws IOException { this.Protocol = protocol; this.Connection = connection; RespHandlerList = new LinkedList(); init(sock); } /** * Initializes the demultiplexor with a new socket. * * @param stream the stream to demultiplex */ private void init(Socket sock) throws IOException { Log.write(Log.DEMUX, "Demux: Initializing Stream Demultiplexor (" + this.hashCode() + ")"); this.Sock = sock; this.Stream = new BufferedInputStream(sock.getInputStream()); MarkedForClose = null; chunk_len = -1; // create a timer to close the socket after 60 seconds, but don't // start it yet Timer = TimerThread.setTimeout(this); Timer.hyber(); } // Methods /** * Each Response must register with us. */ void register(Response resp_handler, Request req) throws RetryException { synchronized (RespHandlerList) { if (Sock == null) throw new RetryException(); RespHandlerList.addToEnd( new ResponseHandler(resp_handler, req, this)); } } /** * creates an input stream for the response. * * @param resp the response structure requesting the stream * @return an InputStream */ RespInputStream getStream(Response resp) { ResponseHandler resph; synchronized (RespHandlerList) { for (resph = (ResponseHandler) RespHandlerList.enumerate(); resph != null; resph = (ResponseHandler) RespHandlerList.next()) { if (resph.resp == resp) break; } } if (resph != null) return resph.stream; else return null; } /** * Restarts the timer thread that will close an unused socket after * 60 seconds. */ void restartTimer() { if (Timer != null) Timer.reset(); } /** * reads an array of bytes from the master stream. */ int read(byte[] b, int off, int len, ResponseHandler resph, int timeout) throws IOException { if (resph.exception != null) { resph.exception.fillInStackTrace(); throw resph.exception; } if (resph.eof) return -1; // read the headers and data for all responses preceding us. ResponseHandler head; while ((head = (ResponseHandler) RespHandlerList.getFirst()) != null && head != resph) { try { head.stream.readAll(timeout); } catch (IOException ioe) { if (ioe instanceof InterruptedIOException) throw ioe; else { resph.exception.fillInStackTrace(); throw resph.exception; } } } // Now we can read from the stream. synchronized (this) { if (resph.exception != null) { resph.exception.fillInStackTrace(); throw resph.exception; } if (resph.resp.cd_type != CD_HDRS) Log.write(Log.DEMUX, "Demux: Reading for stream " + resph.stream.hashCode()); if (Timer != null) Timer.hyber(); try { int rcvd = -1; if (timeout != cur_timeout) { Log.write(Log.DEMUX, "Demux: Setting timeout to " + timeout + " ms"); Sock.setSoTimeout(timeout); cur_timeout = timeout; } switch (resph.resp.cd_type) { case CD_HDRS: rcvd = Stream.read(b, off, len); if (rcvd == -1) throw new EOFException("Premature EOF encountered"); break; case CD_0: rcvd = -1; close(resph); break; case CD_CLOSE: rcvd = Stream.read(b, off, len); if (rcvd == -1) close(resph); break; case CD_CONTLEN: int cl = resph.resp.ContentLength; if (len > cl - resph.stream.count) len = cl - resph.stream.count; rcvd = Stream.read(b, off, len); if (rcvd == -1) throw new EOFException("Premature EOF encountered"); if (resph.stream.count+rcvd == cl) close(resph); break; case CD_CHUNKED: if (chunk_len == -1) // it's a new chunk chunk_len = Codecs.getChunkLength(Stream); if (chunk_len > 0) // it's data { if (len > chunk_len) len = (int) chunk_len; rcvd = Stream.read(b, off, len); if (rcvd == -1) throw new EOFException("Premature EOF encountered"); chunk_len -= rcvd; if (chunk_len == 0) // got the whole chunk { Stream.read(); // CR Stream.read(); // LF chunk_len = -1; } } else // the footers (trailers) { resph.resp.readTrailers(Stream); rcvd = -1; close(resph); chunk_len = -1; } break; case CD_MP_BR: byte[] endbndry = resph.getEndBoundary(Stream); int[] end_cmp = resph.getEndCompiled(Stream); rcvd = Stream.read(b, off, len); if (rcvd == -1) throw new EOFException("Premature EOF encountered"); int ovf = Stream.pastEnd(endbndry, end_cmp); if (ovf != -1) { rcvd -= ovf; close(resph); } break; default: throw new Error("Internal Error in StreamDemultiplexor: " + "Invalid cd_type " + resph.resp.cd_type); } restartTimer(); return rcvd; } catch (InterruptedIOException ie) // don't intercept this one { restartTimer(); throw ie; } catch (IOException ioe) { Log.write(Log.DEMUX, "Demux: ", ioe); close(ioe, true); throw resph.exception; // set by retry_requests } catch (ParseException pe) { Log.write(Log.DEMUX, "Demux: ", pe); close(new IOException(pe.toString()), true); throw resph.exception; // set by retry_requests } } } /** * skips a number of bytes in the master stream. This is done via a * dummy read, as the socket input stream doesn't like skip()'s. */ synchronized long skip(long num, ResponseHandler resph) throws IOException { if (resph.exception != null) { resph.exception.fillInStackTrace(); throw resph.exception; } if (resph.eof) return 0; byte[] dummy = new byte[(int) num]; int rcvd = read(dummy, 0, (int) num, resph, 0); if (rcvd == -1) return 0; else return rcvd; } /** * Determines the number of available bytes. If resph is null, return * available bytes on the socket stream itself (used by HTTPConnection). */ synchronized int available(ResponseHandler resph) throws IOException { if (resph != null && resph.exception != null) { resph.exception.fillInStackTrace(); throw resph.exception; } if (resph != null && resph.eof) return 0; int avail = Stream.available(); if (resph == null) return avail; switch (resph.resp.cd_type) { case CD_0: return 0; case CD_HDRS: // this is something of a hack; I could return 0, but then // if you were waiting for something on a response that // wasn't first in line (and you didn't try to read the // other response) you'd wait forever. On the other hand, // we might be making a false promise here... return (avail > 0 ? 1 : 0); case CD_CLOSE: return avail; case CD_CONTLEN: int cl = resph.resp.ContentLength; cl -= resph.stream.count; return (avail < cl ? avail : cl); case CD_CHUNKED: return avail; // not perfect... case CD_MP_BR: return avail; // not perfect... default: throw new Error("Internal Error in StreamDemultiplexor: " + "Invalid cd_type " + resph.resp.cd_type); } } /** * Closes the socket and all associated streams. If exception * is not null then all active requests are retried. * *

There are five ways this method may be activated. 1) if an exception * occurs during read or write. 2) if the stream is marked for close but * no responses are outstanding (e.g. due to a timeout). 3) when the * markedForClose response is closed. 4) if all response streams up until * and including the markedForClose response have been closed. 5) if this * demux is finalized. * * @param exception the IOException to be sent to the streams. * @param was_reset if true then the exception is due to a connection * reset; otherwise it means we generated the exception * ourselves and this is a "normal" close. */ synchronized void close(IOException exception, boolean was_reset) { if (Sock == null) // already cleaned up return; Log.write(Log.DEMUX, "Demux: Closing all streams and socket (" + this.hashCode() + ")"); try { Stream.close(); } catch (IOException ioe) { } try { Sock.close(); } catch (IOException ioe) { } Sock = null; if (Timer != null) { Timer.kill(); Timer = null; } Connection.DemuxList.remove(this); // Here comes the tricky part: redo outstanding requests! if (exception != null) synchronized (RespHandlerList) { retry_requests(exception, was_reset); } } /** * Retries outstanding requests. Well, actually the RetryModule does * that. Here we just throw a RetryException for each request so that * the RetryModule can catch and handle them. * * @param exception the exception that led to this call. * @param was_reset this flag is passed to the RetryException and is * used by the RetryModule to distinguish abnormal closes * from expected closes. */ private void retry_requests(IOException exception, boolean was_reset) { RetryException first = null, prev = null; ResponseHandler resph = (ResponseHandler) RespHandlerList.enumerate(); while (resph != null) { /* if the application is already reading the data then the * response has already been handled. In this case we must * throw the real exception. */ if (resph.resp.got_headers) { resph.exception = exception; } else { RetryException tmp = new RetryException(exception.getMessage()); if (first == null) first = tmp; tmp.request = resph.request; tmp.response = resph.resp; tmp.exception = exception; tmp.conn_reset = was_reset; tmp.first = first; tmp.addToListAfter(prev); prev = tmp; resph.exception = tmp; } RespHandlerList.remove(resph); resph = (ResponseHandler) RespHandlerList.next(); } } /** * Closes the associated stream. If this one has been markedForClose then * the socket is closed; else closeSocketIfAllStreamsClosed is invoked. */ private void close(ResponseHandler resph) { synchronized (RespHandlerList) { if (resph != (ResponseHandler) RespHandlerList.getFirst()) return; Log.write(Log.DEMUX, "Demux: Closing stream " + resph.stream.hashCode()); resph.eof = true; RespHandlerList.remove(resph); } if (resph == MarkedForClose) close(new IOException("Premature end of Keep-Alive"), false); else closeSocketIfAllStreamsClosed(); } /** * Close the socket if all the streams have been closed. * *

When a stream reaches eof it is removed from the response handler * list, but when somebody close()'s the response stream it is just * marked as such. This means that all responses in the list have either * not been read at all or only partially read, but they might have been * close()'d meaning that nobody is interested in the data. So If all the * response streams up till and including the one markedForClose have * been close()'d then we can remove them from our list and close the * socket. * *

Note: if the response list is emtpy or if no response is * markedForClose then this method does nothing. Specifically it does * not close the socket. We only want to close the socket if we've been * told to do so. * *

Also note that there might still be responses in the list after * the markedForClose one. These are due to us having pipelined more * requests to the server than it's willing to serve on a single * connection. These requests will be retried if possible. */ synchronized void closeSocketIfAllStreamsClosed() { synchronized (RespHandlerList) { ResponseHandler resph = (ResponseHandler) RespHandlerList.enumerate(); while (resph != null && resph.stream.closed) { if (resph == MarkedForClose) { // remove all response handlers first ResponseHandler tmp; do { tmp = (ResponseHandler) RespHandlerList.getFirst(); RespHandlerList.remove(tmp); } while (tmp != resph); // close the socket close(new IOException("Premature end of Keep-Alive"), false); return; } resph = (ResponseHandler) RespHandlerList.next(); } } } /** * returns the socket associated with this demux */ synchronized Socket getSocket() { if (MarkedForClose != null) return null; if (Timer != null) Timer.hyber(); return Sock; } /** * Mark this demux to not accept any more request and to close the * stream after this response or all requests have been * processed, or close immediately if no requests are registered. * * @param response the Response after which the connection should * be closed. */ synchronized void markForClose(Response resp) { synchronized (RespHandlerList) { if (RespHandlerList.getFirst() == null) // no active request, { // so close the socket close(new IOException("Premature end of Keep-Alive"), false); return; } if (Timer != null) { Timer.kill(); Timer = null; } ResponseHandler resph, lasth = null; for (resph = (ResponseHandler) RespHandlerList.enumerate(); resph != null; resph = (ResponseHandler) RespHandlerList.next()) { if (resph.resp == resp) // new resp precedes any others { MarkedForClose = resph; Log.write(Log.DEMUX, "Demux: stream " + resp.inp_stream.hashCode() + " marked for close"); closeSocketIfAllStreamsClosed(); return; } if (MarkedForClose == resph) return; // already marked for closing after an earlier resp lasth = resph; } if (lasth == null) return; MarkedForClose = lasth; // resp == null, so use last resph closeSocketIfAllStreamsClosed(); Log.write(Log.DEMUX, "Demux: stream " + lasth.stream.hashCode() + " marked for close"); } } /** * Emergency stop. Closes the socket and notifies the responses that * the requests are aborted. * * @since V0.3 */ void abort() { Log.write(Log.DEMUX, "Demux: Aborting socket (" + this.hashCode() + ")"); // notify all responses of abort synchronized (RespHandlerList) { for (ResponseHandler resph = (ResponseHandler) RespHandlerList.enumerate(); resph != null; resph = (ResponseHandler) RespHandlerList.next()) { if (resph.resp.http_resp != null) resph.resp.http_resp.markAborted(); if (resph.exception == null) resph.exception = new IOException("Request aborted by user"); } /* Close the socket. * Note: this duplicates most of close(IOException, boolean). We * do *not* call close() because that is synchronized, but we want * abort() to be asynch. */ if (Sock != null) { try { try { Sock.setSoLinger(false, 0); } catch (SocketException se) { } try { Stream.close(); } catch (IOException ioe) { } try { Sock.close(); } catch (IOException ioe) { } Sock = null; if (Timer != null) { Timer.kill(); Timer = null; } } catch (NullPointerException npe) { } Connection.DemuxList.remove(this); } } } /** * A safety net to close the connection. */ protected void finalize() throws Throwable { close((IOException) null, false); super.finalize(); } /** * produces a string. * @return a string containing the class name and protocol number */ public String toString() { String prot; switch (Protocol) { case HTTP: prot = "HTTP"; break; case HTTPS: prot = "HTTPS"; break; case SHTTP: prot = "SHTTP"; break; case HTTP_NG: prot = "HTTP_NG"; break; default: throw new Error("HTTPClient Internal Error: invalid protocol " + Protocol); } return getClass().getName() + "[Protocol=" + prot + "]"; } } /** * This thread is used to reap idle connections. It is NOT used to timeout * reads or writes on a socket. It keeps a list of timer entries and expires * them after a given time. */ class SocketTimeout extends Thread { private boolean alive = true; /** * This class represents a timer entry. It is used to close an * inactive socket after n seconds. Once running, the timer may be * suspended (hyber()), restarted (reset()), or aborted (kill()). * When the timer expires it invokes markForClose() on the * associated stream demultipexer. */ class TimeoutEntry { boolean restart = false, hyber = false, alive = true; StreamDemultiplexor demux; TimeoutEntry next = null, prev = null; TimeoutEntry(StreamDemultiplexor demux) { this.demux = demux; } void reset() { hyber = false; if (restart) return; restart = true; synchronized (time_list) { if (!alive) return; // remove from current position next.prev = prev; prev.next = next; // and add to end of timeout list next = time_list[current]; prev = time_list[current].prev; prev.next = this; next.prev = this; } } void hyber() { if (alive) hyber = true; } void kill() { alive = false; restart = false; hyber = false; synchronized (time_list) { if (prev == null) return; next.prev = prev; prev.next = next; prev = null; } } } TimeoutEntry[] time_list; // jdk 1.1.x javac bug: these must not int current; // be private! SocketTimeout(int secs) { super("SocketTimeout"); try { setDaemon(true); } catch (SecurityException se) { } // Oh well... setPriority(MAX_PRIORITY); time_list = new TimeoutEntry[secs]; for (int idx=0; idx= time_list.length) current = 0; // remove all expired timers for (TimeoutEntry entry = time_list[current].next; entry != time_list[current]; entry = entry.next) { if (entry.alive && !entry.hyber) { TimeoutEntry prev = entry.prev; entry.kill(); /* put on death row. Note: we must not invoke * markForClose() here because it is synch'd * and can therefore lead to a deadlock if that * thread is trying to do a reset() or kill() */ entry.next = marked; marked = entry; entry = prev; } } } while (marked != null) { marked.demux.markForClose(null); marked = marked.next; } } } /** * Stop the timer thread. */ public void kill() { alive = false; } }