HttpClient官方sample代码的深入分析(连接池)

 

前言

  之前一直使用apache的httpclient(4.5.x), 进行http的交互处理. 而httpclient实例则使用了http连接池, 而一旦涉及到连接池, 那会不会在使用上有些隐藏很深的坑. 事实上, 通过分析httpclient源码, 发现它很优雅地解决了这个问题, 同时隐藏所有的连接池细节. 今天这边在这边做下笔记.

官方代码片段

  这是apache httpclient官网提供一段代码片段:

CloseableHttpClient httpclient = HttpClients.createDefault();
HttpGet httpGet = new HttpGet("http://targethost/homepage");
CloseableHttpResponse response1 = httpclient.execute(httpGet);
// 连接对象被response对象持有, 以保证内容通过response对象消费
// 确保在finally代码块添加ClosableHttpResponse#close的调用
// 值得注意的是, 如果连接没有被完全消费干净, 该连接将不能安全复用, 将会被关闭, 被连接池丢弃 
try {
    System.out.println(response1.getStatusLine());
    HttpEntity entity1 = response1.getEntity();
    // do something useful with the response body
    // and ensure it is fully consumed
    EntityUtils.consume(entity1);
} finally {
    response1.close();
}

  简单分析下代码, 非常的简练, 你丝毫看不到任何连接池操作的蛛丝马迹, 它是怎么设计, 又是怎么做到的呢?

常规连接池的注意点

  连接池的使用需要保证如下几点, 尤其对自研的连接池.
  1. Connection的get/release配对.
  2. 保证一次http交互中请求/响应处理完整干净(cleanup).
  比如一次请求交互中, 因某种原因没有消费掉响应内容, 导致该内容还处于socket的缓存中. 继而使得同一个连接下的第二次交互其响应内容为第一次的响应结果, 后果十分可怕. 以前做c++开发的时候, 封装编写redis连接池的时候, 就遇到类似的问题, 印象非常的深刻.

连接封装

  httpclient引入了ConnectionHolder类, 构建了真实连接(HttpCilentConnection)和连接池(HttpClientConnectionManager)的桥梁, 同时维护了该连接的可重用(reusable)和租赁(leased)状态.

class ConnectionHolder implements ConnectionReleaseTrigger, 
        Cancellable, Closeable {
    private final Log log;
    private final HttpClientConnectionManager manager;
    private final HttpClientConnection managedConn;
    private final AtomicBoolean released;  // 连接池租赁状态
    private volatile boolean reusable;     // 连接是否可复用
}

  该类最重要的一个方法为releaseConnection, 后续的执行流程多多少少会涉及到该方法.

private void releaseConnection(boolean reusable) {
    // *) 判断租赁状态, 若已归还连接池, 则不再执行后续的代码
    if(this.released.compareAndSet(false, true)) {
        HttpClientConnection var2 = this.managedConn;
        synchronized(this.managedConn) {
            // *) 根据可重用性分情况处理, 同时归还到连接池中
            if(reusable) {
                this.manager.releaseConnection(this.managedConn, 
                        this.state, this.validDuration, this.tunit);
            } else {
                try {
                    // *) 关闭连接
                    this.managedConn.close();
                    this.log.debug("Connection discarded");
                } catch (IOException var9) {
                    if(this.log.isDebugEnabled()) {
                        this.log.debug(var9.getMessage(), var9);
                    }
                } finally {
                    this.manager.releaseConnection(this.managedConn, 
                        (Object)null, 0L, TimeUnit.MILLISECONDS);
                }
            }
        }
    }

}

  而CloseableHttpResponse又持有ConnectionHolder对象, 它close方法, 本质上就是间接调用了ConnectionHolder的releaseConnection方法.

class HttpResponseProxy implements CloseableHttpResponse {

    public void close() throws IOException {
        if(this.connHolder != null) {
            this.connHolder.close();
        }
    }
}

class ConnectionHolder 
        implements ConnectionReleaseTrigger, Cancellable, Closeable {

    public void close() throws IOException {
        this.releaseConnection(false);
    }

}

  由此可见, 官方sample的推荐做法, 在finally中保证ClosableHttpResponse#close的调用, 能够确保连接池的get/release配对. 若是close前, 连接状态依旧为租赁状态(leased为false), 则该连接明确不被复用.

可重用性判断

  http的长连接复用, 其判定规则主要分两类.
  1. http协议支持+请求/响应header指定
  2. 一次交互处理的完整性(响应内容消费干净)
  对于前者, httpclient引入了ConnectionReuseStrategy来处理, 默认的采用如下的约定:

  • HTTP/1.0通过在Header中添加Connection:Keep-Alive来表示支持长连接.
  • HTTP/1.1默认支持长连接, 除非在Header中显式指定Connection:Close, 才被视为短连接模式.

  在MainClientExec类中相关的代码片段:

var27 = this.requestExecutor.execute(request, managedConn, context);
if(this.reuseStrategy.keepAlive(var27, context)) {
    long entity = this.keepAliveStrategy.getKeepAliveDuration(var27, context);
    if(this.log.isDebugEnabled()) {
        String s;
        if(entity > 0L) {
            s = "for " + entity + " " + TimeUnit.MILLISECONDS;
        } else {
            s = "indefinitely";
        }

        this.log.debug("Connection can be kept alive " + s);
    }

    var25.setValidFor(entity, TimeUnit.MILLISECONDS);
    var25.markReusable();
} else {
    var25.markNonReusable();
}

  具体ReusableStrategy中, 其执行代码如下:

public class DefaultClientConnectionReuseStrategy 
            extends DefaultConnectionReuseStrategy {
    public static final DefaultClientConnectionReuseStrategy INSTANCE 
            = new DefaultClientConnectionReuseStrategy();

    public DefaultClientConnectionReuseStrategy() {
    }

    public boolean keepAlive(HttpResponse response, HttpContext context) {
        HttpRequest request = (HttpRequest)context
              .getAttribute("http.request");
        if(request != null) {
            // *) 寻找Connection:Close
            Header[] connHeaders = request.getHeaders("Connection");
            if(connHeaders.length != 0) {
                BasicTokenIterator ti = new BasicTokenIterator(
                        new BasicHeaderIterator(connHeaders, (String)null)
                    );

                while(ti.hasNext()) {
                    String token = ti.nextToken();
                    if("Close".equalsIgnoreCase(token)) {
                        return false;
                    }
                }
            }
        }

        return super.keepAlive(response, context);
    }
}
 而在父类的keepAlive函数中, 其实现如下:
public class DefaultConnectionReuseStrategy 
        implements ConnectionReuseStrategy {

    public boolean keepAlive(HttpResponse response, HttpContext context) {
        // 省略一段代码
        if(headerIterator1.hasNext()) {
            try {
                BasicTokenIterator px1 = new BasicTokenIterator(headerIterator1);
                boolean keepalive1 = false;

                while(px1.hasNext()) {
                    String token = px1.nextToken();
                    // *) 存在Close Tag, 则不可重用
                    if("Close".equalsIgnoreCase(token)) {
                        return false;
                    }
                    // *) 存在Keep-Alive Tag 则可重用
                    if("Keep-Alive".equalsIgnoreCase(token)) {
                        keepalive1 = true;
                    }
                }

                if(keepalive1) {
                    return true;
                }
            } catch (ParseException var11) {
                return false;
            }
        }
        // 高于HTTP/1.0版本的都复用连接  
        return !ver1.lessEquals(HttpVersion.HTTP_1_0);
    }

}

  总结一下:

  • request首部中包含Connection:Close,不复用
  • response中Content-Length长度设置不正确,不复用
  • response首部包含Connection:Close,不复用
  • reponse首部包含Connection:Keep-Alive,复用
  • 都没命中的情况下,如果HTTP版本高于1.0则复用

  而对于后者(一次交互处理的完整性), 这是怎么判定的呢? 其实很简单, 就是response返回的InputStream(HttpEntity#getContent)明确调用close方法(没有引发socket的close), 即认为消费完整.
  让我们来简单分析一下EntityUtils.consume方法.

public final class EntityUtils {

    public static void consume(HttpEntity entity) throws IOException {
        if(entity != null) {
            if(entity.isStreaming()) {
                InputStream instream = entity.getContent();
                if(instream != null) {
                    instream.close();
                }
            }
        }
    }

} 
  让我们在ConnectionHolder类的releaseConnection方法中添加断点. 
  

  然后具体执行一个http请求, 我们会发现程序运行到该断点时的, 线程调用堆栈如下:

"main@1" prio=5 tid=0x1 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
      at org.apache.http.impl.execchain.ConnectionHolder.releaseConnection(ConnectionHolder.java:97)
      at org.apache.http.impl.execchain.ConnectionHolder.releaseConnection(ConnectionHolder.java:120)
      at org.apache.http.impl.execchain.ResponseEntityProxy.releaseConnection(ResponseEntityProxy.java:76)
      at org.apache.http.impl.execchain.ResponseEntityProxy.streamClosed(ResponseEntityProxy.java:145)
      at org.apache.http.conn.EofSensorInputStream.checkClose(EofSensorInputStream.java:228)
      at org.apache.http.conn.EofSensorInputStream.close(EofSensorInputStream.java:172)
      at org.apache.http.client.entity.LazyDecompressingInputStream.close(LazyDecompressingInputStream.java:97)
      at org.apache.http.util.EntityUtils.consume(EntityUtils.java:90)

  你会发现inputstream#close的调用, 会引发连接的归还, 而此时reusable状态值为true(前提KeepaliveStrategy判断该连接为可复用).
  再额外添加一个Apache HttpClient中定义的ContentLengthInputStream类的close实现, 用于明确close会附带消费完数据, 以此打消最后的疑惑.

public class ContentLengthInputStream extends InputStream {

    // *) 该close会把剩余的字节全部消费, 才设定自己为关闭状态
    public void close() throws IOException {
        if(!this.closed) {
            try {
                if(this.pos < this.contentLength) {
                    byte[] buffer = new byte[2048];

                    while(true) {
                        if(this.read(buffer) >= 0) {
                            continue;
                        }
                    }
                }
            } finally {
                this.closed = true;
            }
        }

    }

}

  

总结

  让我们再回到最初的官方sample代码.

CloseableHttpClient httpclient = HttpClients.createDefault();
HttpGet httpGet = new HttpGet("http://targethost/homepage");
CloseableHttpResponse response1 = httpclient.execute(httpGet);
try {
    System.out.println(response1.getStatusLine());
    HttpEntity entity1 = response1.getEntity();

    // *) 引发releaseConnect()调用, reusable值取决于keepAliveStrategy判定, leased置为true
    EntityUtils.consume(entity1);
} finally {
    // *) 若连接leased为false, 则releaseConnect(false)调用, 明确不可复用, leased置为true
    // *) 若连接leased为true, 则do nothing
    response1.close();
}

  c++会使用RAII模式, 即利用对象的构造/析构函数来自动实现资源申请和释放, java这边的话, 还是需要明确的一个finally中, 添加保证释放的代码, ^_^.
  总的来说, 该段代码, 堪称完美. 对于官方推荐的代码, 放心大胆的使用即可.

 

参考文章

  Http持久连接与HttpClient连接池
  关于HttpClient重试策略的研究

 
原文地址:https://www.cnblogs.com/mumuxinfei/p/9121829.html