Java实现Http服务器(四)

(1)HTTPServer的监听启动

sun.net.httpserver.ServerImpl类中启动了Socket监听,ServerImpl的内部类Dispatch类启动了Http服务器的监听

 /* main server listener task */

    class Dispatcher implements Runnable {

        private void handleEvent (Event r) {
            ExchangeImpl t = r.exchange;
            HttpConnection c = t.getConnection();
            try {
                if (r instanceof WriteFinishedEvent) {

                    int exchanges = endExchange();
                    if (terminating && exchanges == 0) {
                        finished = true;
                    }
                    SocketChannel chan = c.getChannel();
                    LeftOverInputStream is = t.getOriginalInputStream();
                    if (!is.isEOF()) {
                        t.close = true;
                    }
                    if (t.close || idleConnections.size() >= MAX_IDLE_CONNECTIONS) {
                        c.close();
                        allConnections.remove (c);
                    } else {
                        if (is.isDataBuffered()) {
                            /* don't re-enable the interestops, just handle it */
                            handle (c.getChannel(), c);
                        } else {
                            /* re-enable interestops */
                            SelectionKey key = c.getSelectionKey();
                            if (key.isValid()) {
                                key.interestOps (
                                    key.interestOps()|SelectionKey.OP_READ
                                );
                            }
                            c.time = getTime() + IDLE_INTERVAL;
                            idleConnections.add (c);
                        }
                    }
                }
            } catch (IOException e) {
                logger.log (
                    Level.FINER, "Dispatcher (1)", e
                );
                c.close();
            }
        }

        public void run() {
            while (!finished) {
                try {

                    /* process the events list first */

                    while (resultSize() > 0) {
                        Event r;
                        synchronized (lolock) {
                            r = events.remove(0);
                            handleEvent (r);
                        }
                    }

                    selector.select(1000);

                    /* process the selected list now  */

                    Set<SelectionKey> selected = selector.selectedKeys();
                    Iterator<SelectionKey> iter = selected.iterator();
                    while (iter.hasNext()) {
                        SelectionKey key = iter.next();
                        iter.remove ();
                        if (key.equals (listenerKey)) {
                            if (terminating) {
                                continue;
                            }
                            SocketChannel chan = schan.accept();
                            if (chan == null) {
                                continue; /* cancel something ? */
                            }
                            chan.configureBlocking (false);
                            SelectionKey newkey = chan.register (selector, SelectionKey.OP_READ);
                            HttpConnection c = new HttpConnection ();
                            c.selectionKey = newkey;
                            c.setChannel (chan);
                            newkey.attach (c);
                            allConnections.add (c);
                        } else {
                            try {
                                if (key.isReadable()) {
                                    boolean closed;
                                    SocketChannel chan = (SocketChannel)key.channel();
                                    HttpConnection conn = (HttpConnection)key.attachment();
                                    // interestOps will be restored at end of read
                                    key.interestOps (0);
                                    handle (chan, conn);
                                } else {
                                    assert false;
                                }
                            } catch (IOException e) {
                                HttpConnection conn = (HttpConnection)key.attachment();
                                logger.log (
                                    Level.FINER, "Dispatcher (2)", e
                                );
                                conn.close();
                            }
                        }
                    }
                } catch (CancelledKeyException  e) {
                    logger.log (Level.FINER, "Dispatcher (3)", e);
                } catch (IOException e) {
                    logger.log (Level.FINER, "Dispatcher (4)", e);
                } catch (Exception e) {
                    logger.log (Level.FINER, "Dispatcher (7)", e);
                }
            }
        }

        public void handle (SocketChannel chan, HttpConnection conn)
        throws IOException
        {
            try {
                Exchange t = new Exchange (chan, protocol, conn);
                executor.execute (t);
            } catch (HttpError e1) {
                logger.log (Level.FINER, "Dispatcher (5)", e1);
                conn.close();
            } catch (IOException e) {
                logger.log (Level.FINER, "Dispatcher (6)", e);
                conn.close();
            }
        }
    }

    static boolean debug = ServerConfig.debugEnabled ();

    static synchronized void dprint (String s) {
        if (debug) {
            System.out.println (s);
        }
    }

    static synchronized void dprint (Exception e) {
        if (debug) {
            System.out.println (e);
            e.printStackTrace();
        }
    }

    Logger getLogger () {
        return logger;
    }

 

该类的初始化在sun.net.httpserver.ServerImpl的构造方法当中

ServerImpl (HttpServer wrapper, String protocol, InetSocketAddress addr, int backlog) throws IOException {
        this.protocol = protocol;
        this.wrapper = wrapper;
        this.logger = Logger.getLogger ("com.sun.net.httpserver");
        https = protocol.equalsIgnoreCase ("https");
        this.address = addr;
        contexts = new ContextList();
        schan = ServerSocketChannel.open();
        if (addr != null) {
            ServerSocket socket = schan.socket();
            socket.bind (addr, backlog);
            bound = true;
        }
        selector = Selector.open ();
        schan.configureBlocking (false);
        listenerKey = schan.register (selector, SelectionKey.OP_ACCEPT);
        dispatcher = new Dispatcher();
        idleConnections = Collections.synchronizedSet (new HashSet<HttpConnection>());
        allConnections = Collections.synchronizedSet (new HashSet<HttpConnection>());
        time = System.currentTimeMillis();
        timer = new Timer ("server-timer", true);
        timer.schedule (new ServerTimerTask(), CLOCK_TICK, CLOCK_TICK);
        events = new LinkedList<Event>();
        logger.config ("HttpServer created "+protocol+" "+ addr);
    }

随后在sun.net.httpserver.ServerImpl类的start方法当中,作为线程被启动

public void start () {
        if (!bound || started || finished) {
            throw new IllegalStateException ("server in wrong state");
        }
        if (executor == null) {
            executor = new DefaultExecutor();
        }
        Thread t = new Thread (dispatcher);
        started = true;
        t.start();
    }
import com.sun.net.httpserver.HttpServer;
private HttpServer httpServer = null; public final void init() throws IOException { this.executor = Executors.newCachedThreadPool(); final InetSocketAddress sa = new InetSocketAddress("0.0.0.0", 8080); this.httpServer = HttpServer.create(sa, 0); this.httpServer.setExecutor(this.executor); this.httpServer.createContext("/", new HttpServerHandler()); this.httpServer.start(); }

上面是使用JDK内置HttpServer的方法。

在第二篇文章中介绍过返回的默认对象是

sun.net.httpserver.HttpServerImpl对象,该对象是com.sun.net.httpserver.HttpServer对象的子类

public class HttpServerImpl extends HttpServer {

    ServerImpl server;

    HttpServerImpl () throws IOException {
        this (new InetSocketAddress(80), 0);
    }

    HttpServerImpl (
        InetSocketAddress addr, int backlog
    ) throws IOException {
        server = new ServerImpl (this, "http", addr, backlog);
    }
.............................................
}

该对象又是ServerImpl对象的外观类,提供了HttpServer的方法,封装了ServerImpl自身的各种方法实现

最终在应用中调用this.httpserver.start()

本质上调用的就是sun.net.httpserver.ServerImpl对象的start()方法

---------------------------------------------------------------------------------------------------------------------------------------

下面分析下Dispatcher类中run方法的监听过程

public void run() {
//server的关闭标志,在调用httpserver.stop()方法后--->即ServerImpl类的stop()方法,设置finished为true(初始化为false) while (!finished) { try { /* process the events list first */ //由于支持HTTP1.1的原因,在一次发送数据结束之后,并不是立即关闭连接和socket,而是将发送完成作为一个事件传递过来,根据上下文决定是否关闭连接 while (resultSize() > 0) { Event r; synchronized (lolock) { r = events.remove(0); handleEvent (r); } } selector.select(1000); /* process the selected list now */ Set<SelectionKey> selected = selector.selectedKeys(); Iterator<SelectionKey> iter = selected.iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove (); if (key.equals (listenerKey)) { if (terminating) { continue; } SocketChannel chan = schan.accept(); if (chan == null) { continue; /* cancel something ? */ } chan.configureBlocking (false); SelectionKey newkey = chan.register (selector, SelectionKey.OP_READ); HttpConnection c = new HttpConnection (); c.selectionKey = newkey; c.setChannel (chan); newkey.attach (c); allConnections.add (c); } else { try { if (key.isReadable()) { boolean closed; SocketChannel chan = (SocketChannel)key.channel(); HttpConnection conn = (HttpConnection)key.attachment(); // interestOps will be restored at end of read key.interestOps (0); handle (chan, conn); } else { assert false; } } catch (IOException e) { HttpConnection conn = (HttpConnection)key.attachment(); logger.log ( Level.FINER, "Dispatcher (2)", e ); conn.close(); } } } } catch (CancelledKeyException e) { logger.log (Level.FINER, "Dispatcher (3)", e); } catch (IOException e) { logger.log (Level.FINER, "Dispatcher (4)", e); } catch (Exception e) { logger.log (Level.FINER, "Dispatcher (7)", e); } } }

设置请求处理器的方法

public HttpContextImpl createContext (String path, HttpHandler handler) {
        return server.createContext (path, handler);
}
public synchronized HttpContextImpl createContext (String path) {
        if (path == null) {
            throw new NullPointerException ("null path parameter");
        }
        HttpContextImpl context = new HttpContextImpl (protocol, path, null, this);
        contexts.add (context);
        logger.config ("context created: " + path);
        return context;
}

HttpContextImpl类的介绍

/**
* HttpContext represents a mapping between a protocol (http or https) together with a root URI path
* to a {@link HttpHandler} which is invoked to handle requests destined
* for the protocol/path on the associated HttpServer.
* <p>
* HttpContext instances are created by {@link HttpServer#createContext(String,String,HttpHandler,Object)}
* <p>
*/

contexts对象是  private ContextList contexts; 

是ServerImpl的成员变量

该类提供了URL和处理该URL的Handler的映射,可以看到,最多支持50个URL的处理。

add(HttpContextImpl ctx)方法增加映射对象

findContext(String protocol,String path) 返回相应请求的HttpContextImpl对象 

下面可以看一下find的过程(坚持最长匹配 比如/guowuxin/hello  /guowuxin 这是两个映射  则当/guowuxin 请求来的时候将使用前者的映射,貌似很没有礼貌)

坚持最长匹配也是没有使用map的原因

(1)判断protocol (HTTP1.0 HTTP1.1)

(2)exact变量要求是否是全匹配亦或者是开头(startsWith) 默认 我们看到run方法中默认的是开头匹配

package sun.net.httpserver;

import java.util.*;
import com.sun.net.httpserver.*;
import com.sun.net.httpserver.spi.*;

class ContextList {

    final static int MAX_CONTEXTS = 50;

    LinkedList<HttpContextImpl> list = new LinkedList<HttpContextImpl>();

    public synchronized void add (HttpContextImpl ctx) {
        assert ctx.getPath() != null;
        list.add (ctx);
    }

    public synchronized int size () {
        return list.size();
    }

   /* initially contexts are located only by protocol:path.
    * Context with longest prefix matches (currently case-sensitive)
    */
    synchronized HttpContextImpl findContext (String protocol, String path) {
        return findContext (protocol, path, false);
    }

    synchronized HttpContextImpl findContext (String protocol, String path, boolean exact) {
        protocol = protocol.toLowerCase();
        String longest = "";
        HttpContextImpl lc = null;
        for (HttpContextImpl ctx: list) {
            if (!ctx.getProtocol().equals(protocol)) {
                continue;
            }
            String cpath = ctx.getPath();
            if (exact && !cpath.equals (path)) {
                continue;
            } else if (!exact && !path.startsWith(cpath)) {
                continue;
            }
            if (cpath.length() > longest.length()) {
                longest = cpath;
                lc = ctx;
            }
        }
        return lc;
    }

    public synchronized void remove (String protocol, String path)
        throws IllegalArgumentException
    {
        HttpContextImpl ctx = findContext (protocol, path, true);
        if (ctx == null) {
            throw new IllegalArgumentException ("cannot remove element from list");
        }
        list.remove (ctx);
    }

    public synchronized void remove (HttpContextImpl context)
        throws IllegalArgumentException
    {
        for (HttpContextImpl ctx: list) {
            if (ctx.equals (context)) {
                list.remove (ctx);
                return;
            }
        }
        throw new IllegalArgumentException ("no such context in list");
    }
}

  

-------------------------------------------------------------------------------------------------------------------------------------------------

下面重点分析下server端的HttpConnection的复用(在HTTP1.1协议下,TCP连接在发送完数据之后不会断开)

 exchange.getResponseBody().close();

这里就要先介绍下,我们的请求处理器HttpHandler接口

package com.sun.net.httpserver;

import java.io.IOException;

/**
 * A handler which is invoked to process HTTP exchanges. Each
 * HTTP exchange is handled by one of these handlers.
 * @since 1.6
 */
public interface HttpHandler {
    /**
     * Handle the given request and generate an appropriate response.
     * See {@link HttpExchange} for a description of the steps
     * involved in handling an exchange.
     * @param exchange the exchange containing the request from the
     *      client and used to send the response
     * @throws NullPointerException if exchange is <code>null</code>
     */
    public abstract void handle (HttpExchange exchange) throws IOException;
}

该接口的实现类用于处理相应的url请求,其中参数HttpExchange类如其名,作用就是接收输入参数,返回输出信息,就是用来交换Http消息的一个工具接口。

下面是它的一个实现类,也是一个外观类,包装了ExchangeImpl类,提供了HttpExchange接口的方法,封装了ExchangeImpl类的方法。

package sun.net.httpserver;

import java.io.*;
import java.nio.*;
import java.nio.channels.*;
import java.net.*;
import javax.net.ssl.*;
import java.util.*;
import sun.net.www.MessageHeader;
import com.sun.net.httpserver.*;
import com.sun.net.httpserver.spi.*;

class HttpExchangeImpl extends HttpExchange {

    ExchangeImpl impl;

    HttpExchangeImpl (ExchangeImpl impl) {
        this.impl = impl;
    }

    public Headers getRequestHeaders () {
        return impl.getRequestHeaders();
    }

    public Headers getResponseHeaders () {
        return impl.getResponseHeaders();
    }

    public URI getRequestURI () {
        return impl.getRequestURI();
    }

    public String getRequestMethod (){
        return impl.getRequestMethod();
    }

    public HttpContextImpl getHttpContext (){
        return impl.getHttpContext();
    }

    public void close () {
        impl.close();
    }

    public InputStream getRequestBody () {
        return impl.getRequestBody();
    }

    public int getResponseCode () {
        return impl.getResponseCode();
    }

    public OutputStream getResponseBody () {
        return impl.getResponseBody();
    }


    public void sendResponseHeaders (int rCode, long contentLen)
    throws IOException
    {
        impl.sendResponseHeaders (rCode, contentLen);
    }

    public InetSocketAddress getRemoteAddress (){
        return impl.getRemoteAddress();
    }

    public InetSocketAddress getLocalAddress (){
        return impl.getLocalAddress();
    }

    public String getProtocol (){
        return impl.getProtocol();
    }

    public Object getAttribute (String name) {
        return impl.getAttribute (name);
    }

    public void setAttribute (String name, Object value) {
        impl.setAttribute (name, value);
    }

    public void setStreams (InputStream i, OutputStream o) {
        impl.setStreams (i, o);
    }

    public HttpPrincipal getPrincipal () {
        return impl.getPrincipal();
    }

    ExchangeImpl getExchangeImpl () {
        return impl;
    }
}

下面是sun.net.httpserver包中ExchangeImpl类的实现,该类中保存了TCP连接输入输出流的引用,

package sun.net.httpserver;

import java.io.*;
import java.nio.*;
import java.nio.channels.*;
import java.net.*;
import javax.net.ssl.*;
import java.util.*;
import java.text.*;
import sun.net.www.MessageHeader;
import com.sun.net.httpserver.*;
import com.sun.net.httpserver.spi.*;

class ExchangeImpl {

    Headers reqHdrs, rspHdrs;
    Request req;
    String method;
    URI uri;
    HttpConnection connection;
    int reqContentLen;
    long rspContentLen;
    /* raw streams which access the socket directly */
    InputStream ris;
    OutputStream ros;
    Thread thread;
    /* close the underlying connection when this exchange finished */
    boolean close;
    boolean closed;
    boolean http10 = false;

    /* for formatting the Date: header */
    static TimeZone tz;
    static DateFormat df;
    static {
        String pattern = "EEE, dd MMM yyyy HH:mm:ss zzz";
        tz = TimeZone.getTimeZone ("GMT");
        df = new SimpleDateFormat (pattern, Locale.US);
        df.setTimeZone (tz);
    }

    /* streams which take care of the HTTP protocol framing
     * and are passed up to higher layers
     */
    InputStream uis;
    OutputStream uos;
    LeftOverInputStream uis_orig; // uis may have be a user supplied wrapper
    PlaceholderOutputStream uos_orig;

    boolean sentHeaders; /* true after response headers sent */
    Map<String,Object> attributes;
    int rcode = -1;
    HttpPrincipal principal;
    ServerImpl server;

    ExchangeImpl (
        String m, URI u, Request req, int len, HttpConnection connection
    ) throws IOException {
        this.req = req;
        this.reqHdrs = req.headers();
        this.rspHdrs = new Headers();
        this.method = m;
        this.uri = u;
        this.connection = connection;
        this.reqContentLen = len;
        /* ros only used for headers, body written directly to stream */
        this.ros = req.outputStream();
        this.ris = req.inputStream();
        server = getServerImpl();
        server.startExchange();
    }

    public Headers getRequestHeaders () {
        return new UnmodifiableHeaders (reqHdrs);
    }

    public Headers getResponseHeaders () {
        return rspHdrs;
    }

    public URI getRequestURI () {
        return uri;
    }

    public String getRequestMethod (){
        return method;
    }

    public HttpContextImpl getHttpContext (){
        return connection.getHttpContext();
    }

    public void close () {
        if (closed) {
            return;
        }
        closed = true;

        /* close the underlying connection if,
         * a) the streams not set up yet, no response can be sent, or
         * b) if the wrapper output stream is not set up, or
         * c) if the close of the input/outpu stream fails
         */
        try {
            if (uis_orig == null || uos == null) {
                connection.close();
                return;
            }
            if (!uos_orig.isWrapped()) {
                connection.close();
                return;
            }
            if (!uis_orig.isClosed()) {
                uis_orig.close();
            }
            uos.close();
        } catch (IOException e) {
            connection.close();
        }
    }

    public InputStream getRequestBody () {
        if (uis != null) {
            return uis;
        }
        if (reqContentLen == -1) {
            uis_orig = new ChunkedInputStream (this, ris);
            uis = uis_orig;
        } else {
            uis_orig = new FixedLengthInputStream (this, ris, reqContentLen);
            uis = uis_orig;
        }
        return uis;
    }

    LeftOverInputStream getOriginalInputStream () {
        return uis_orig;
    }

    public int getResponseCode () {
        return rcode;
    }

    public OutputStream getResponseBody () {
        /* TODO. Change spec to remove restriction below. Filters
         * cannot work with this restriction
         *
         * if (!sentHeaders) {
         *    throw new IllegalStateException ("headers not sent");
         * }
         */
        if (uos == null) {
            uos_orig = new PlaceholderOutputStream (null);
            uos = uos_orig;
        }
        return uos;
    }


    /* returns the place holder stream, which is the stream
     * returned from the 1st call to getResponseBody()
     * The "real" ouputstream is then placed inside this
     */
    PlaceholderOutputStream getPlaceholderResponseBody () {
        getResponseBody();
        return uos_orig;
    }

    public void sendResponseHeaders (int rCode, long contentLen)
    throws IOException
    {
        if (sentHeaders) {
            throw new IOException ("headers already sent");
        }
        this.rcode = rCode;
        String statusLine = "HTTP/1.1 "+rCode+Code.msg(rCode)+"
";
        OutputStream tmpout = new BufferedOutputStream (ros);
        PlaceholderOutputStream o = getPlaceholderResponseBody();
        tmpout.write (bytes(statusLine, 0), 0, statusLine.length());
        boolean noContentToSend = false; // assume there is content
        rspHdrs.set ("Date", df.format (new Date()));
        if (contentLen == 0) {
            if (http10) {
                o.setWrappedStream (new UndefLengthOutputStream (this, ros));
                close = true;
            } else {
                rspHdrs.set ("Transfer-encoding", "chunked");
                o.setWrappedStream (new ChunkedOutputStream (this, ros));
            }
        } else {
            if (contentLen == -1) {
                noContentToSend = true;
                contentLen = 0;
            }
            /* content len might already be set, eg to implement HEAD resp */
            if (rspHdrs.getFirst ("Content-length") == null) {
                rspHdrs.set ("Content-length", Long.toString(contentLen));
            }
            o.setWrappedStream (new FixedLengthOutputStream (this, ros, contentLen));
        }
        write (rspHdrs, tmpout);
        this.rspContentLen = contentLen;
        tmpout.flush() ;
        tmpout = null;
        sentHeaders = true;
        if (noContentToSend) {
            WriteFinishedEvent e = new WriteFinishedEvent (this);
            server.addEvent (e);
            closed = true;
        }
        server.logReply (rCode, req.requestLine(), null);
    }

    void write (Headers map, OutputStream os) throws IOException {
        Set<Map.Entry<String,List<String>>> entries = map.entrySet();
        for (Map.Entry<String,List<String>> entry : entries) {
            String key = entry.getKey();
            byte[] buf;
            List<String> values = entry.getValue();
            for (String val : values) {
                int i = key.length();
                buf = bytes (key, 2);
                buf[i++] = ':';
                buf[i++] = ' ';
                os.write (buf, 0, i);
                buf = bytes (val, 2);
                i = val.length();
                buf[i++] = '
';
                buf[i++] = '
';
                os.write (buf, 0, i);
            }
        }
        os.write ('
');
        os.write ('
');
    }

    private byte[] rspbuf = new byte [128]; // used by bytes()

    /**
     * convert string to byte[], using rspbuf
     * Make sure that at least "extra" bytes are free at end
     * of rspbuf. Reallocate rspbuf if not big enough.
     * caller must check return value to see if rspbuf moved
     */
    private byte[] bytes (String s, int extra) {
        int slen = s.length();
        if (slen+extra > rspbuf.length) {
            int diff = slen + extra - rspbuf.length;
            rspbuf = new byte [2* (rspbuf.length + diff)];
        }
        char c[] = s.toCharArray();
        for (int i=0; i<c.length; i++) {
            rspbuf[i] = (byte)c[i];
        }
        return rspbuf;
    }

    public InetSocketAddress getRemoteAddress (){
        Socket s = connection.getChannel().socket();
        InetAddress ia = s.getInetAddress();
        int port = s.getPort();
        return new InetSocketAddress (ia, port);
    }

    public InetSocketAddress getLocalAddress (){
        Socket s = connection.getChannel().socket();
        InetAddress ia = s.getLocalAddress();
        int port = s.getLocalPort();
        return new InetSocketAddress (ia, port);
    }

    public String getProtocol (){
        String reqline = req.requestLine();
        int index = reqline.lastIndexOf (' ');
        return reqline.substring (index+1);
    }

    public SSLSession getSSLSession () {
        SSLEngine e = connection.getSSLEngine();
        if (e == null) {
            return null;
        }
        return e.getSession();
    }

    public Object getAttribute (String name) {
        if (name == null) {
            throw new NullPointerException ("null name parameter");
        }
        if (attributes == null) {
            attributes = getHttpContext().getAttributes();
        }
        return attributes.get (name);
    }

    public void setAttribute (String name, Object value) {
        if (name == null) {
            throw new NullPointerException ("null name parameter");
        }
        if (attributes == null) {
            attributes = getHttpContext().getAttributes();
        }
        attributes.put (name, value);
    }

    public void setStreams (InputStream i, OutputStream o) {
        assert uis != null;
        if (i != null) {
            uis = i;
        }
        if (o != null) {
            uos = o;
        }
    }

    /**
     * PP
     */
    HttpConnection getConnection () {
        return connection;
    }

    ServerImpl getServerImpl () {
        return getHttpContext().getServerImpl();
    }

    public HttpPrincipal getPrincipal () {
        return principal;
    }

    void setPrincipal (HttpPrincipal principal) {
        this.principal = principal;
    }

    static ExchangeImpl get (HttpExchange t) {
        if (t instanceof HttpExchangeImpl) {
            return ((HttpExchangeImpl)t).getExchangeImpl();
        } else {
            assert t instanceof HttpsExchangeImpl;
            return ((HttpsExchangeImpl)t).getExchangeImpl();
        }
    }
}

/**
 * An OutputStream which wraps another stream
 * which is supplied either at creation time, or sometime later.
 * If a caller/user tries to write to this stream before
 * the wrapped stream has been provided, then an IOException will
 * be thrown.
 */
class PlaceholderOutputStream extends java.io.OutputStream {

    OutputStream wrapped;

    PlaceholderOutputStream (OutputStream os) {
        wrapped = os;
    }

    void setWrappedStream (OutputStream os) {
        wrapped = os;
    }

    boolean isWrapped () {
        return wrapped != null;
    }

    private void checkWrap () throws IOException {
        if (wrapped == null) {
            throw new IOException ("response headers not sent yet");
        }
    }

    public void write(int b) throws IOException {
        checkWrap();
        wrapped.write (b);
    }

    public void write(byte b[]) throws IOException {
        checkWrap();
        wrapped.write (b);
    }

    public void write(byte b[], int off, int len) throws IOException {
        checkWrap();
        wrapped.write (b, off, len);
    }

    public void flush() throws IOException {
        checkWrap();
        wrapped.flush();
    }

    public void close() throws IOException {
        checkWrap();
        wrapped.close();
    }
}

下面是一个输出流的装饰类

package sun.net.httpserver;

import java.io.*;
import com.sun.net.httpserver.*;
import com.sun.net.httpserver.spi.*;

/**
 * a (filter) input stream which can tell us if bytes are "left over"
 * on the underlying stream which can be read (without blocking)
 * on another instance of this class.
 *
 * The class can also report if all bytes "expected" to be read
 * were read, by the time close() was called. In that case,
 * bytes may be drained to consume them (by calling drain() ).
 *
 * isEOF() returns true, when all expected bytes have been read
 */
abstract class LeftOverInputStream extends FilterInputStream {
    ExchangeImpl t;
    ServerImpl server;
    protected boolean closed = false;
    protected boolean eof = false;
    byte[] one = new byte [1];

    public LeftOverInputStream (ExchangeImpl t, InputStream src) {
        super (src);
        this.t = t;
        this.server = t.getServerImpl();
    }
    /**
     * if bytes are left over buffered on *the UNDERLYING* stream
     */
    public boolean isDataBuffered () throws IOException {
        assert eof;
        return super.available() > 0;
    }

    public void close () throws IOException {
        if (closed) {
            return;
        }
        closed = true;
        if (!eof) {
            eof = drain (ServerConfig.getDrainAmount());
        }
    }

    public boolean isClosed () {
        return closed;
    }

    public boolean isEOF () {
        return eof;
    }

    protected abstract int readImpl (byte[]b, int off, int len) throws IOException;

    public synchronized int read () throws IOException {
        if (closed) {
            throw new IOException ("Stream is closed");
        }
        int c = readImpl (one, 0, 1);
        if (c == -1 || c == 0) {
            return c;
        } else {
            return one[0] & 0xFF;
        }
    }

    public synchronized int read (byte[]b, int off, int len) throws IOException {
        if (closed) {
            throw new IOException ("Stream is closed");
        }
        return readImpl (b, off, len);
    }

    /**
     * read and discard up to l bytes or "eof" occurs,
     * (whichever is first). Then return true if the stream
     * is at eof (ie. all bytes were read) or false if not
     * (still bytes to be read)
     */
    public boolean drain (long l) throws IOException {
        int bufSize = 2048;
        byte[] db = new byte [bufSize];
        while (l > 0) {
            long len = readImpl (db, 0, bufSize);
            if (len == -1) {
                eof = true;
                return true;
            } else {
                l = l - len;
            }
        }
        return false;
    }
}

  

package sun.net.httpserver;

import java.io.*;
import javax.net.ssl.*;
import java.nio.channels.*;
import java.util.logging.Logger;
import com.sun.net.httpserver.*;
import com.sun.net.httpserver.spi.*;

/**
 * encapsulates all the connection specific state for a HTTP/S connection
 * one of these is hung from the selector attachment and is used to locate
 * everything from that.
 */
class HttpConnection {

    HttpContextImpl context;
    SSLEngine engine;
    SSLContext sslContext;
    SSLStreams sslStreams;

    /* high level streams returned to application */
    InputStream i;

    /* low level stream that sits directly over channel */
    InputStream raw;
    OutputStream rawout;

    SocketChannel chan;
    SelectionKey selectionKey;
    String protocol;
    long time;
    int remaining;
    boolean closed = false;
    Logger logger;

    public String toString() {
        String s = null;
        if (chan != null) {
            s = chan.toString();
        }
        return s;
    }

    HttpConnection () {
    }

    void setChannel (SocketChannel c) {
        chan = c;
    }

    void setContext (HttpContextImpl ctx) {
        context = ctx;
    }

    void setParameters (
        InputStream in, OutputStream rawout, SocketChannel chan,
        SSLEngine engine, SSLStreams sslStreams, SSLContext sslContext, String protocol,
        HttpContextImpl context, InputStream raw
    )
    {
        this.context = context;
        this.i = in;
        this.rawout = rawout;
        this.raw = raw;
        this.protocol = protocol;
        this.engine = engine;
        this.chan = chan;
        this.sslContext = sslContext;
        this.sslStreams = sslStreams;
        this.logger = context.getLogger();
    }

    SocketChannel getChannel () {
        return chan;
    }

    synchronized void close () {
        if (closed) {
            return;
        }
        closed = true;
        if (logger != null && chan != null) {
            logger.finest ("Closing connection: " + chan.toString());
        }

        if (!chan.isOpen()) {
            ServerImpl.dprint ("Channel already closed");
            return;
        }
        try {
            /* need to ensure temporary selectors are closed */
            if (raw != null) {
                raw.close();
            }
        } catch (IOException e) {
            ServerImpl.dprint (e);
        }
        try {
            if (rawout != null) {
                rawout.close();
            }
        } catch (IOException e) {
            ServerImpl.dprint (e);
        }
        try {
            if (sslStreams != null) {
                sslStreams.close();
            }
        } catch (IOException e) {
            ServerImpl.dprint (e);
        }
        try {
            chan.close();
        } catch (IOException e) {
            ServerImpl.dprint (e);
        }
    }

    /* remaining is the number of bytes left on the lowest level inputstream
     * after the exchange is finished
     */
    void setRemaining (int r) {
        remaining = r;
    }

    int getRemaining () {
        return remaining;
    }

    SelectionKey getSelectionKey () {
        return selectionKey;
    }

    InputStream getInputStream () {
            return i;
    }

    OutputStream getRawOutputStream () {
            return rawout;
    }

    String getProtocol () {
            return protocol;
    }

    SSLEngine getSSLEngine () {
            return engine;
    }

    SSLContext getSSLContext () {
            return sslContext;
    }

    HttpContextImpl getHttpContext () {
            return context;
    }
}

  

 

原文地址:https://www.cnblogs.com/wuxinliulei/p/4992633.html