Heritrix 3.1.0 源码解析(二十四)

上文中我们提到Recorder httpRecorder = Recorder.getHttpRecorder()对象封装了SOCKET连接的输出流和输入流,我们下面来看看Recorder类是怎么封装SOCKET的输入流和输出流的

Recorder类的重要成员如下,主要包括封装的输入流和输出流以及有序的字符序列(缓存到本地文件)

    private RecordingInputStream ris = null;
    private RecordingOutputStream ros = null;

    /**
     * Backing file basename.
     *
     * Keep it around so can clean up backing files left on disk.
     */
    private String backingFileBasename = null;

    /**
     * Backing file output stream suffix.
     */
    private static final String RECORDING_OUTPUT_STREAM_SUFFIX = ".ros";

   /**
    * Backing file input stream suffix.
    */
    private static final String RECORDING_INPUT_STREAM_SUFFIX = ".ris";

    /**
     * recording-input (ris) content character encoding.
     */
    protected String characterEncoding = null;
    
    /**
     * Charset to use for CharSequence provision. Will be UTF-8 if no
     * encoding ever requested; a Charset matching above characterEncoding
     * if possible; ISO_8859 if above characterEncoding is unsatisfiable. 
     * TODO: unify to UTF-8 for unspecified and bad-specified cases? 
     * (current behavior is for consistency with our prior but perhaps not
     * optimal behavior) 
     */
    protected Charset charset = Charsets.UTF_8; 
    
    /** whether recording-input (ris) message-body is chunked */
    protected boolean inputIsChunked = false; 

    /** recording-input (ris) entity content-encoding (eg gzip, deflate), if any */ 
    protected String contentEncoding = null; 
    
    private ReplayCharSequence replayCharSequence;

RecordingInputStream ris对象和RecordingOutputStream ros对象分别为SOCKET的输入流和输出流的装饰类,就流缓存到本地文件,里面用到了装饰模式,相关方法我就不分析了,不懂的读者可以参考java的输入流和输出流及装饰模式

构造方法用于初始化封装的输入流和输出流对象

/**
     * Create an HttpRecorder.
     *
     * @param tempDir Directory into which we drop backing files for
     * recorded input and output.
     * @param backingFilenameBase Backing filename base to which we'll append
     * suffices <code>ris</code> for recorded input stream and
     * <code>ros</code> for recorded output stream.
     * @param outBufferSize Size of output buffer to use.
     * @param inBufferSize Size of input buffer to use.
     */
    public Recorder(File tempDir, String backingFilenameBase, 
            int outBufferSize, int inBufferSize) {
        this(new File(ensure(tempDir), backingFilenameBase),
                outBufferSize, inBufferSize);
    }
    
    
    private static File ensure(File tempDir) {
        try {
            org.archive.util.FileUtils.ensureWriteableDirectory(tempDir);
        } catch (IOException e) {
            throw new IllegalStateException(e);
        }
        
        return tempDir;
    }
    
    public Recorder(File file, int outBufferSize, int inBufferSize) {
        super();
        this.backingFileBasename = file.getAbsolutePath();
        this.ris = new RecordingInputStream(inBufferSize,
            this.backingFileBasename + RECORDING_INPUT_STREAM_SUFFIX);
        this.ros = new RecordingOutputStream(outBufferSize,
            this.backingFileBasename + RECORDING_OUTPUT_STREAM_SUFFIX);
    }

装饰输入流和输出流的相关方法如下

/**
     * Wrap the provided stream with the internal RecordingInputStream
     *
     * open() throws an exception if RecordingInputStream is already open.
     *
     * @param is InputStream to wrap.
     *
     * @return The input stream wrapper which itself is an input stream.
     * Pass this in place of the passed stream so input can be recorded.
     *
     * @throws IOException
     */
    public InputStream inputWrap(InputStream is) 
    throws IOException {
        logger.fine(Thread.currentThread().getName() + " wrapping input");
        
        // discard any state from previously-recorded input
        this.characterEncoding = null;
        this.inputIsChunked = false;
        this.contentEncoding = null; 
        
        this.ris.open(is);
        return this.ris;
    }

    /**
     * Wrap the provided stream with the internal RecordingOutputStream
     *
     * open() throws an exception if RecordingOutputStream is already open.
     * 
     * @param os The output stream to wrap.
     *
     * @return The output stream wrapper which is itself an output stream.
     * Pass this in place of the passed stream so output can be recorded.
     *
     * @throws IOException
     */
    public OutputStream outputWrap(OutputStream os) 
    throws IOException {
        this.ros.open(os);
        return this.ros;
    }

void close()方法用于关闭流

 /**
     * Close all streams.
     */
    public void close() {
        logger.fine(Thread.currentThread().getName() + " closing");
        try {
            this.ris.close();
        } catch (IOException e) {
            // TODO: Can we not let the exception out of here and report it
            // higher up in the caller?
            DevUtils.logger.log(Level.SEVERE, "close() ris" +
                DevUtils.extraInfo(), e);
        }
        try {
            this.ros.close();
        } catch (IOException e) {
            DevUtils.logger.log(Level.SEVERE, "close() ros" +
                DevUtils.extraInfo(), e);
        }
    }

 下面的成员和方法非常重要,用于在多线程里面设置和获取当前的Recorder对象(HttpClient组件里面的HttpConnection对象就是通过这里获取当前Recorder对象的) 

static ThreadLocal<Recorder> currentRecorder = new ThreadLocal<Recorder>();
    
    public static void setHttpRecorder(Recorder httpRecorder) {
        currentRecorder.set(httpRecorder);
    } 
    
    /**
     * Get the current threads' HttpRecorder.
     *
     * @return This threads' HttpRecorder.  Returns null if can't find a
     * HttpRecorder in current instance.
     */
    public static Recorder getHttpRecorder() {
        return currentRecorder.get(); 
    }

下面的方法围绕着获取ReplayCharSequence对象,均为实现有序的字符集相关,用于内容字符的解析

    static Set<String> SUPPORTED_ENCODINGS = new HashSet<String>();
    static {
        SUPPORTED_ENCODINGS.add("gzip"); 
        SUPPORTED_ENCODINGS.add("x-gzip");
        SUPPORTED_ENCODINGS.add("deflate");
        SUPPORTED_ENCODINGS.add("identity");
        SUPPORTED_ENCODINGS.add("none"); // unofficial but common
    }
    /**
     * @param contentEncoding declared content-encoding of input recording.
     */
    public void setContentEncoding(String contentEncoding) {
        String lowerCoding = contentEncoding.toLowerCase(); 
        if(!SUPPORTED_ENCODINGS.contains(contentEncoding.toLowerCase())) {
            throw new IllegalArgumentException("contentEncoding unsupported: "+contentEncoding); 
        }
        this.contentEncoding = lowerCoding;
    }

    /**
     * @return Returns the characterEncoding.
     */
    public String getContentEncoding() {
        return this.contentEncoding;
    }
/**
     * @return A ReplayCharSequence. Caller may call
     *         {@link ReplayCharSequence#close()} when finished. However, in
     *         heritrix, the ReplayCharSequence is closed automatically when url
     *         processing has finished; in that context it's preferable not
     *         to close, so that processors can reuse the same instance.
     * @throws IOException
     * @see {@link #endReplays()}
     */
    public ReplayCharSequence getContentReplayCharSequence() throws IOException {
        if (replayCharSequence == null || !replayCharSequence.isOpen() 
                || !replayCharSequence.getCharset().equals(charset)) {
            if(replayCharSequence!=null && replayCharSequence.isOpen()) {
                // existing sequence must not have matched now-configured Charset; close
                replayCharSequence.close(); 
            }
            replayCharSequence = getContentReplayCharSequence(this.charset);
        }
        return replayCharSequence;
    }
    
    
    /**
     * @param characterEncoding Encoding of recorded stream.
     * @return A ReplayCharSequence  Will return null if an IOException.  Call
     * close on returned RCS when done.
     * @throws IOException
     */
    public ReplayCharSequence getContentReplayCharSequence(Charset requestedCharset) throws IOException {
        // raw data overflows to disk; use temp file
        InputStream ris = getContentReplayInputStream();
        ReplayCharSequence rcs =  new GenericReplayCharSequence(
                ris,
                calcRecommendedCharBufferSize(this.getRecordedInput()), 
                this.backingFileBasename + RECORDING_OUTPUT_STREAM_SUFFIX,
                requestedCharset);
        ris.close();
        return rcs;
    }
    
    /**
     * Calculate a recommended size for an in-memory decoded-character buffer
     * of this content. We seek a size that is itself no larger (in 2-byte chars)
     * than the memory already used by the RecordingInputStream's internal raw 
     * byte buffer, and also no larger than likely necessary. So, we take the 
     * minimum of the actual recorded byte size and the RecordingInputStream's
     * max buffer size. 
     * 
     * @param inStream
     * @return int length for in-memory decoded-character buffer
     */
    static protected int calcRecommendedCharBufferSize(RecordingInputStream inStream) {
        return (int) Math.min(inStream.getRecordedBufferLength()/2, inStream.getSize());
    }
    
    /**
     * Get a raw replay of all recorded data (including, for example, HTTP 
     * protocol headers)
     * 
     * @return A replay input stream.
     * @throws IOException
     */
    public ReplayInputStream getReplayInputStream() throws IOException {
        return getRecordedInput().getReplayInputStream();
    }
    
    /**
     * Get a raw replay of the 'message-body'. For the common case of 
     * HTTP, this is the raw, possibly chunked-transfer-encoded message 
     * contents not including the leading headers. 
     * 
     * @return A replay input stream.
     * @throws IOException
     */
    public ReplayInputStream getMessageBodyReplayInputStream() throws IOException {
        return getRecordedInput().getMessageBodyReplayInputStream();
    }
    
    /**
     * Get a raw replay of the 'entity'. For the common case of 
     * HTTP, this is the message-body after any (usually-unnecessary)
     * transfer-decoding but before any content-encoding (eg gzip) decoding
     * 
     * @return A replay input stream.
     * @throws IOException
     */
    public InputStream getEntityReplayInputStream() throws IOException {
        if(inputIsChunked) {
            return new ChunkedInputStream(getRecordedInput().getMessageBodyReplayInputStream());
        } else {
            return getRecordedInput().getMessageBodyReplayInputStream();
        }
    }
    
    /**
     * Get a replay cued up for the 'content' (after all leading headers)
     * 
     * @return A replay input stream.
     * @throws IOException
     */
    public InputStream getContentReplayInputStream() throws IOException {
        InputStream entityStream = getEntityReplayInputStream();
        if(StringUtils.isEmpty(contentEncoding)) {
            return entityStream;
        } else if ("gzip".equalsIgnoreCase(contentEncoding) || "x-gzip".equalsIgnoreCase(contentEncoding)) {
            try {
                return new GZIPInputStream(entityStream);
            } catch (IOException ioe) {
                logger.log(Level.WARNING,"gzip problem; using raw entity instead",ioe);
                IOUtils.closeQuietly(entityStream); // close partially-read stream
                return getEntityReplayInputStream(); 
            }
        } else if ("deflate".equalsIgnoreCase(contentEncoding)) {
            return new DeflaterInputStream(entityStream);
        } else if ("identity".equalsIgnoreCase(contentEncoding) || "none".equalsIgnoreCase(contentEncoding)) {
            return entityStream;
        } else {
            // shouldn't be reached given check on setContentEncoding
            logger.log(Level.INFO,"Unknown content-encoding '"+contentEncoding+"' declared; using raw entity instead");
            return entityStream; 
        }
    }

---------------------------------------------------------------------------

本系列Heritrix 3.1.0 源码解析系本人原创

转载请注明出处 博客园 刺猬的温驯

本文链接 http://www.cnblogs.com/chenying99/archive/2013/04/28/3048392.html

原文地址:https://www.cnblogs.com/chenying99/p/3048392.html