okhttp-源码简析

1 Request req = new Request.Builder()
2     .cacheControl(new CacheControl.Builder().noCache().build())
3     .url("http://publicobject.com/helloworld.txt")
4     .build();//GET
5 //同步执行
6 client.newCall(req).execute();
7 //异步执行
8 client.newCall(req).enqueue(callback);

不管同步请求还是异步请求,都需要通过newCall创建一个Call实例

1 public Call newCall(Request request) {
2     //client、originalRequest、forWebSocket
3   return new RealCall(this, request, false /* for web socket */);
4 }

 

同步请求

 1 //RealCall的execute方法
 2 public Response execute() throws IOException {
 3   synchronized (this) {
 4       //RealCall只能执行一次
 5     if (executed) throw new IllegalStateException("Already Executed");
 6     executed = true;
 7   }
 8   captureCallStackTrace();
 9   try {
10       //调用runningSyncCalls.add(call),runningSyncCalls是一个queue,
11       //finally中的finished会调用runningSyncCalls.remove(call),
12       //runningSyncCalls的意义在于client可以统一管理同步call(异步call同样有一个queue),
13       //比如通过runningCalls()得到正在执行的http请求(包括同步、异步),还可以通过cancelAll()取消所有http请求(包括同步、异步)
14     client.dispatcher().executed(this);
15     //getResponseWithInterceptorChain是真正的执行逻辑,从方法名可以看出okhttp将执行逻辑分隔成了一个个拦截器,使用责任链来执行。
16     Response result = getResponseWithInterceptorChain();
17     if (result == null) throw new IOException("Canceled");
18     return result;
19   } finally {
20       //runningSyncCalls.remove(call)
21     client.dispatcher().finished(this);
22   }
23 }
 1 //getResponseWithInterceptorChain
 2 Response getResponseWithInterceptorChain() throws IOException {
 3   // Build a full stack of interceptors.
 4   List<Interceptor> interceptors = new ArrayList<>();
 5   //自定义的interceptor
 6   interceptors.addAll(client.interceptors());
 7   interceptors.add(retryAndFollowUpInterceptor);
 8   interceptors.add(new BridgeInterceptor(client.cookieJar()));
 9   interceptors.add(new CacheInterceptor(client.internalCache()));
10   interceptors.add(new ConnectInterceptor(client));
11   if (!forWebSocket) {
12     interceptors.addAll(client.networkInterceptors());
13   }
14   //拦截链的最后一个拦截器,通常来说最后一个拦截器就是真正的执行逻辑,比如tomcat的filters中最后一个filter其实就是Servlet
15   interceptors.add(new CallServerInterceptor(forWebSocket));
16 
17   Interceptor.Chain chain = new RealInterceptorChain(
18       interceptors, null, null, null, 0, originalRequest);
19   return chain.proceed(originalRequest);//执行拦截链
20 }
CallServerInterceptor的intercept方法
 1 //CallServerInterceptor的intercept,有删减
 2 public Response intercept(Chain chain) throws IOException {
 3   RealInterceptorChain realChain = (RealInterceptorChain) chain;
 4   HttpCodec httpCodec = realChain.httpStream();
 5   StreamAllocation streamAllocation = realChain.streamAllocation();
 6   RealConnection connection = (RealConnection) realChain.connection();
 7   Request request = realChain.request();
 8     //请求时间
 9   long sentRequestMillis = System.currentTimeMillis();
10   //内部会调用writeRequest(request.headers(), requestLine);,writeRquest会先写请求行,再遍历请求头并写出。
11   //这里的“写”操作,只是将字符串写入一个Buffer(后面会提到Buffer),socket还没有write。
12   httpCodec.writeRequestHeaders(request);
13      /** 什么是sink(okio.Sink接口)?sink翻译为水槽,可以理解为水的最终汇聚地,相对的有okio.Source,即水的发源地。
14         * Receives a stream of bytes. Use this interface to write data wherever it's
15         * needed: to the network, storage, or a buffer in memory. Sinks may be layered
16         * to transform received data, such as to compress, encrypt, throttle, or add
17         * protocol framing.
18         */
19   Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
20   //将sink包装一层缓冲区
21   BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
22   //将body字节序列全部写入bufferedRequestBody,注意是全部,这里就有可能出现OOM。
23   //此时body只是写入了buffer,还未写入socket。
24   request.body().writeTo(bufferedRequestBody);
25   //close时会将buffer中的数据写入sink(即requestBodyOut),这里仍旧没有写入socket,
26   //这里我不太理解,httpCodec.createRequestBody返回的sink其实已经是一个buffer了,为什么还要套一层?
27   bufferedRequestBody.close();
28   
29   //这里才真正写出socket,socketOutputStream.write
30     /** Flush the request to the underlying socket and signal no more bytes will be transmitted. */
31   httpCodec.finishRequest();
32   
33     //开始构造response
34     //读响应行、响应头
35   Response.Builder responseBuilder = httpCodec.readResponseHeaders(false);
36 
37   Response response = responseBuilder
38       .request(request)
39       .handshake(streamAllocation.connection().handshake())
40       .sentRequestAtMillis(sentRequestMillis)
41       .receivedResponseAtMillis(System.currentTimeMillis())
42       .build();
43 
44  /**
45       * httpCodec.openResponseBody会返回一个输入流,类似socketInputStream,还未从流中读取数据。
46       * 需要手动关闭
47      * Response.close()
48      * Response.body().close()
49      * Response.body().source().close()
50      * Response.body().charStream().close()
51      * Response.body().byteString().close()
52      * Response.body().bytes()
53      * Response.body().string()
54      */
55   response = response.newBuilder()
56       .body(httpCodec.openResponseBody(response))
57       .build();
58   return response;
59 }

异步请求

 1 //RealCall的enqueue方法
 2 public void enqueue(Callback responseCallback) {
 3   synchronized (this) {
 4     if (executed) throw new IllegalStateException("Already Executed");
 5     executed = true;
 6   }
 7   captureCallStackTrace();
 8   //入队
 9   client.dispatcher().enqueue(new AsyncCall(responseCallback));
10 }

dispatcher()会返回与该okHttpClient关联的Dispatcher实例,dispatcher的enqueue方法

 1 //Dispatcher的enqueue方法
 2 synchronized void enqueue(AsyncCall call) {
 3   if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
 4       //放入异步队列,与之前提到的同步队列意义相同
 5     runningAsyncCalls.add(call);
 6     //异步执行,通常的做法是放入executorService维护的任务队列中,然后由线程池竞争执行队列中的任务。
 7     //executorService的默认构造逻辑:executorService = new java.util.concurrent.ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, 
 8     //new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
 9     //参数对应:corePoolSize,maximumPoolSize,keepAliveTime,timeUnit,workQueue,threadFactory。
10     //也可以自定义executorService:
11     //OkHttpClient okHttpClient = new OkHttpClient.Builder().dispatcher(new Dispatcher(Executors.newSingleThreadExecutor())).build();
12 
13     executorService().execute(call);
14   } else {
15     readyAsyncCalls.add(call);
16   }
17 }

AsynCall在executorService.execute中的执行逻辑是AsynCall的execute逻辑

 1 //AsyncCall的执行逻辑
 2 protected void execute() {
 3   boolean signalledCallback = false;
 4   try {
 5       //getResponseWithInterceptorChain与同步调用一样
 6     Response response = getResponseWithInterceptorChain();
 7     if (retryAndFollowUpInterceptor.isCanceled()) {
 8       signalledCallback = true;
 9       //onFailure回调
10       responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
11     } else {
12       signalledCallback = true;
13       //onResponse回调
14       responseCallback.onResponse(RealCall.this, response);
15     }
16   } catch (IOException e) {
17     if (signalledCallback) {
18       // Do not signal the callback twice!
19       Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
20     } else {
21       responseCallback.onFailure(RealCall.this, e);
22     }
23   } finally {
24       //从异步队列中删除该call
25     client.dispatcher().finished(this);
26   }
27 }

Buffer

okhttp并没有使用jdk的ByteBuffer,而是自己对byte[]进行了池化,这也使得okhttp没有机会使用DirectByteBuffer了,同时由于池化的byte[]被限定为64kb,okhttp也不适合大数据量应用。

 1 //Buffer即是sink,又是source
 2 public final class Buffer implements BufferedSource, BufferedSink, Cloneable {
 3     //双向链表头结点
 4     Segment head;
 5     //......
 6     
 7     public void write(Buffer source, long byteCount) {
 8         // Move bytes from the head of the source buffer to the tail of this buffer
 9         // while balancing two conflicting goals: don't waste CPU and don't waste
10         // memory.
11         //
12         // Don't waste CPU (ie. don't copy data around).
13         //
14         // Copying large amounts of data is expensive. Instead, we prefer to
15         // reassign entire segments from one buffer to the other.
16         //内存间的复制操作是需要cpu参与的,为了避免复制,就将source中的Segment链表接到本Buffer的Segment链表尾部.
17         //
18         // Don't waste memory.
19         //
20         // As an invariant, adjacent pairs of segments in a buffer should be at
21         // least 50% full, except for the head segment and the tail segment.
22         //当Segment的利用率低于50%时,依旧需要使用System.arrayCopy将next的数据复制到pre中,也就是进行compact.
23   }
24   
25   public Buffer writeUtf8(String string, int beginIndex, int endIndex) {
26       //如果调用String.getBytes必然会在内部new byte[],这里的writeUtf8是通过String的getBytes编码string的吗?
27       //writeUtf8并没有使用getBytes,而是自己实现了utf8的编码规则,并将字节编码序列存放在Buffer持有的data字节数组中。
28   }
29 }
 1 final class Segment {
 2   /** The size of all segments in bytes. */
 3   static final int SIZE = 8192;
 4 
 5   /** Segments will be shared when doing so avoids {@code arraycopy()} of this many bytes. */
 6   static final int SHARE_MINIMUM = 1024;
 7     //字节数组,真正存放数据的地方
 8   final byte[] data;
 9 
10   /** The next byte of application data byte to read in this segment. */
11   int pos;
12 
13   /** The first byte of available data ready to be written to. */
14   int limit;
15 
16   /** True if other segments or byte strings use the same byte array. */
17   boolean shared;
18 
19   /** True if this segment owns the byte array and can append to it, extending {@code limit}. */
20   boolean owner;
21 
22   /** Next segment in a linked or circularly-linked list. */
23   //segment可以组成一个单链表(SegmentPool中)或循环链表(Buffer中)
24   Segment next;
25 
26   /** Previous segment in a circularly-linked list. */
27   //双向链表的前指针
28   Segment prev;
29 
30   Segment() {
31     this.data = new byte[SIZE];
32     this.owner = true;
33     this.shared = false;
34   }
35 
36   Segment(Segment shareFrom) {
37     this(shareFrom.data, shareFrom.pos, shareFrom.limit);
38     shareFrom.shared = true;
39   }
40 
41   Segment(byte[] data, int pos, int limit) {
42     this.data = data;
43     this.pos = pos;
44     this.limit = limit;
45     this.owner = false;
46     this.shared = true;
47   }
48   //......
49 }
 1 /**
 2  * A collection of unused segments, necessary to avoid GC churn and zero-fill.
 3  * This pool is a thread-safe static singleton.
 4  */
 5 //SegmentPool是未使用的Segment的集合,即池化的byte[]
 6 final class SegmentPool {
 7   /** The maximum number of bytes to pool. */
 8   // TODO: Is 64 KiB a good maximum size? Do we ever have that many idle segments?
 9   //client最大持有64kb的缓冲池,超出的部分可以申请,但回收时不会被放回池中,看来okio.SegmentPool只适合小数据量应用
10   static final long MAX_SIZE = 64 * 1024; // 64 KiB.
11 
12   /** Singly-linked list of segments. */
13   //单链表
14   static @Nullable Segment next;
15   
16   /** Total bytes in this pool. */
17   static long byteCount;
18 
19   private SegmentPool() {
20   }
21     //从池中获取Segment
22   static Segment take() {
23     synchronized (SegmentPool.class) {
24       if (next != null) {
25         Segment result = next;
26         next = result.next;
27         result.next = null;
28         byteCount -= Segment.SIZE;
29         return result;
30       }
31     }
32     return new Segment(); // Pool is empty. Don't zero-fill while holding a lock.
33   }
34     //回收Segment置池中
35   static void recycle(Segment segment) {
36     if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();
37     if (segment.shared) return; // This segment cannot be recycled.
38     synchronized (SegmentPool.class) {
39       if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full.
40       byteCount += Segment.SIZE;
41       segment.next = next;
42       segment.pos = segment.limit = 0;
43       next = segment;
44     }
45   }
46 }

Cache

http缓存,首先server需要发送带缓存首部的响应(比如Cache-Control:max-age=3600),然后client需要开启缓存功能(浏览器自动会开启,chrome在debug时可以disable cache,okhttp需要在创建okHttpClient时设置cache)。

1 int cacheSize = 10 * 1024 * 1024; // 10 MiB
2 //最终会得到一个okhttp3.internal.cache.DiskLruCache
3 Cache cache = new Cache(cacheDirectory, cacheSize);
4 OkHttpClient client = new OkHttpClient.Builder()
5     .cache(cache)
6     .build();

client缓存响应时会记录一个缓存时间(不是Date,是本地时间),响应头的max-age表明在max-age时间内,该response都是新鲜的,即不需要去server验证;响应头的no-cache表示client可以对response缓存,但client请求时必须先向server验证该缓存的新鲜度。

请求头的max-age表示client只接收max-age之内的缓存,max-stale表示client能接受最大的缓存过期时间,no-cache也是要先向server验证新鲜度,if-only-cached表示只从缓存获取数据。

 1 Request request = new Request.Builder()
 2     //Cache-Control:no-cache
 3     //Forces caches to submit the request to the origin server for validation before releasing a cached copy.
 4     //准确的讲应该叫donot-serve-from-cache-without-revalidation
 5     .cacheControl(new CacheControl.Builder().noCache().build())
 6     .url("http://publicobject.com/helloworld.txt")
 7     .build();
 8 
 9 Request request = new Request.Builder()
10     //Cache-Control:max-age=<seconds>
11     //Specifies the maximum amount of time a resource will be considered fresh. 
12     //Contrary to Expires, this directive is relative to the time of the request.
13     .cacheControl(new CacheControl.Builder()
14         .maxAge(0, TimeUnit.SECONDS)
15         .build())
16     .url("http://publicobject.com/helloworld.txt")
17     .build();
18 
19 Request request = new Request.Builder()
20     //Cache-Control:only-if-cached
21     //Indicates to not retrieve new data. The client only wishes to obtain a cached response, 
22     //and should not contact the origin-server to see if a newer copy exists.
23     .cacheControl(new CacheControl.Builder()
24         .onlyIfCached()
25         .build())
26     .url("http://publicobject.com/helloworld.txt")
27     .build();
28 Response forceCacheResponse = client.newCall(request).execute();
29 if (forceCacheResponse.code() != 504) {
30     // The resource was cached! Show it.
31 } else {
32     // The resource was not cached.
33 }
34 
35 Request request = new Request.Builder()
36     //Cache-Control:max-stale[=<seconds>]
37     //Indicates that the client is willing to accept a response that has exceeded its expiration time. 
38     //Optionally, you can assign a value in seconds, indicating the time the response must not be expired by.
39     //okhttp中max-stale必须赋值?赋0表示max-stale无效?
40     .cacheControl(new CacheControl.Builder()
41         .maxStale(365, TimeUnit.DAYS)
42         .build())
43     .url("http://publicobject.com/helloworld.txt")
44     .build();

如下图,是否过期由响应头max-age决定,如果响应头没有ETag、Last-Modified则直接发送原始请求。

参考:https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Cache-Control

  https://segmentfault.com/a/1190000006741200

原文地址:https://www.cnblogs.com/holoyong/p/7413403.html