Tomcat请求处理源码分析(三)

一、请求数据的读取

1、读取请求行

2、读取请求头

3、读取请求体

综合上面三个序列图,对于请求行,请求头,请求体的读取都最终调用了NioSocketWrapper 对象实例的 fillReadBuffer() 方法。只不过请求行和请求头读取参数传递为 true,请求体读取参数传递为 false。

private int fillReadBuffer(boolean block, ByteBuffer to) throws IOException {
    int nRead;
    NioChannel socket = getSocket();
    if (socket instanceof ClosedNioChannel) {
        throw new ClosedChannelException();
    }
    if (block) {
        Selector selector = null;
        try {
            selector = pool.get();
        } catch (IOException x) {
            // Ignore
        }
        try {
            nRead = pool.read(to, socket, selector, getReadTimeout());
        } finally {
            if (selector != null) {
                pool.put(selector);
            }
        }
    } else {
        //NioChannel的read方法
        nRead = socket.read(to);
        if (nRead == -1) {
            throw new EOFException();
        }
    }
    return nRead;
}
//NioChannel
protected SocketChannel sc
public int read(ByteBuffer dst) throws IOException {
    return sc.read(dst);
}

请求行和请求头的读取本质是调用 java NIO api SocketChannel 的 read() 方法,该方法为非阻塞方法。如果读不到数据就直接返回,继续由poller 线程监测是否有数据可读。

对于请求体的读取采用阻塞的方式,调用NioSelectorPool的read(ByteBuffer buf, NioChannel socket, Selector selector, long readTimeout)方法,在该方法中又会调用 NioBlockingSelector 的 read() 方法。

    protected BlockPoller poller;
    public int read(ByteBuffer buf, NioChannel socket, long readTimeout) throws IOException {
        SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
        if (key == null) {
            throw new IOException("Key no longer registered");
        }
        KeyReference reference = keyReferenceStack.pop();
        if (reference == null) {
            reference = new KeyReference();
        }
        NioSocketWrapper att = (NioSocketWrapper) key.attachment();
        int read = 0;
        boolean timedout = false;
        int keycount = 1; //assume we can read
        long time = System.currentTimeMillis(); //start the timeout timer
        try {
            while (!timedout) {
                if (keycount > 0) {
                    //读取数据
                    read = socket.read(buf);
                    //有数据读到就跳出循环
                    if (read != 0) {
                        break;
                    }
                }
                try {
                    if (att.getReadLatch()==null || att.getReadLatch().getCount()==0) {
                        att.startReadLatch(1);
                    }
                    //没有读到数据则将封装的 OP_READ 事件添加到 BlockPoller 的事件队列
                    poller.add(att,SelectionKey.OP_READ, reference);
                    //调用 NioSocketWrapper 中的 CountDownLatch 类型 readLatch 属性的 await() 方法,使当前线程(一般是tomcat io线程)在 readLatch 上等待。
                    if (readTimeout < 0) {
                        att.awaitReadLatch(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
                    } else {
                        att.awaitReadLatch(readTimeout, TimeUnit.MILLISECONDS);
                    }
                } catch (InterruptedException ignore) {
                    // Ignore
                }
                if ( att.getReadLatch()!=null && att.getReadLatch().getCount()> 0) {
                    //we got interrupted, but we haven't received notification from the poller.
                    keycount = 0;
                }else {
                    //latch countdown has happened
                    keycount = 1;
                    att.resetReadLatch();
                }
                if (readTimeout >= 0 && (keycount == 0)) {
                    timedout = (System.currentTimeMillis() - time) >= readTimeout;
                }
            }
            if (timedout) {
                throw new SocketTimeoutException();
            }
        } finally {
            poller.remove(att,SelectionKey.OP_READ);
            if (timedout && reference.key != null) {
                poller.cancelKey(reference.key);
            }
            reference.key = null;
            keyReferenceStack.push(reference);
        }
        return read;
    }

4、总结

对于请求行,请求头和请求体的读取默认(不开启异步)都在 tomcat io 线程中进行。
对于请求行和请求头的读取是非阻塞读取,即不阻塞 tomcat io 线程,如果没有读取到数据,则由 poll 线程继续监测下次数据的到来。
对于请求体的读取是阻塞的读取,如果发现请求体数据不可读,那么首先注册封装的 OP_READ 事件到 BlockPoller 对象实例的事件队列里,然后利用 NioSocketWrapper 对象中的 readLatch 来阻塞 tomcat io 线程。
对于 tomcat io 线程阻塞时间为读超时,默认不配置为 -1,这时超时时间为 Long.MAX_VALUE 毫秒。
如果超时,则抛出 SocketTimeoutException,并取消上面注册的读事件。
最后将该事件从 selector 中移除(一般是可读事件)。

二、响应数据的写入

 

CoyoteOutputStream 实例对象就是 ServletOutputStream 的实现,我们平时调用 servlet API 向 OutputStream 中写数据的时候就是走的这个调用图,最终调用了 NioSocketWrapper 的 doWrite() 方法。

    public void write(byte[] b, int off, int len) throws IOException {
        //是否阻塞,一般情况下都不会设置writeListener,所以都是返回true
        boolean nonBlocking = checkNonBlockingWrite();
        ob.write(b, off, len);
        if (nonBlocking) {
            checkRegisterForWrite();
        }
    }
    private boolean checkNonBlockingWrite() {
        boolean nonBlocking = !ob.isBlocking();
        if (nonBlocking && !ob.isReady()) {
            throw new IllegalStateException(sm.getString("coyoteOutputStream.nbNotready"));
        }
        return nonBlocking;
    }
    public boolean isBlocking() {
        return coyoteResponse.getWriteListener() == null;
    }

NioSocketWrapper 的 doWrite() 方法核心代码如下:

        //NioSocketWrapper
        protected void doWrite(boolean block, ByteBuffer from) throws IOException {
            long writeTimeout = getWriteTimeout();
            Selector selector = null;
            try {
                selector = pool.get();
            } catch (IOException x) {
                // Ignore
            }
            try {
                pool.write(from, getSocket(), selector, writeTimeout, block);
                if (block) {
                    // Make sure we are flushed
                    do {
                        if (getSocket().flush(true, selector, writeTimeout)) {
                            break;
                        }
                    } while (true);
                }
                updateLastWrite();
            } finally {
                if (selector != null) {
                    pool.put(selector);
                }
            }
            // If there is data left in the buffer the socket will be registered for
            // write further up the stack. This is to ensure the socket is only
            // registered for write once as both container and user code can trigger
            // write registration.
        }
    
    //NioSelectorPool
    public int write(ByteBuffer buf, NioChannel socket, Selector selector,
                     long writeTimeout, boolean block) throws IOException {
        if ( SHARED && block ) {
            return blockingSelector.write(buf,socket,writeTimeout);
        }
        ···
    }
    //NioBlockingSelector 
     public int write(ByteBuffer buf, NioChannel socket, long writeTimeout)
            throws IOException {
        SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
        if (key == null) {
            throw new IOException("Key no longer registered");
        }
        KeyReference reference = keyReferenceStack.pop();
        if (reference == null) {
            reference = new KeyReference();
        }
        NioSocketWrapper att = (NioSocketWrapper) key.attachment();
        if (att.previousIOException != null) { 
            throw new IOException(att.previousIOException);
        }
        int written = 0;
        boolean timedout = false;
        int keycount = 1; //assume we can write
        long time = System.currentTimeMillis(); //start the timeout timer
        try {
            //
            while (!timedout && buf.hasRemaining()) {
                if (keycount > 0) { //only write if we were registered for a write
                    int cnt = socket.write(buf); //write the data
                    if (cnt == -1) {
                        throw new EOFException();
                    }
                    written += cnt;
                    if (cnt > 0) {
                        time = System.currentTimeMillis(); //reset our timeout timer
                        continue; //we successfully wrote, try again without a selector
                    }
                }
                try {
                    if (att.getWriteLatch() == null || att.getWriteLatch().getCount() == 0) {
                        att.startWriteLatch(1);
                    }
                    //如果数据不可写(例如写缓冲已满),则调用 BlockPoller 实例的 add() 方法,
                    //将封装的 OP_WRITE 事件添加到 BlockPoller 的事件队列里。
                    poller.add(att, SelectionKey.OP_WRITE, reference);
                    //调用NioSocketWrapper中的CountDownLatch类型 writeLatch 属性的 await() 方法,
                    //使当前线程(一般是tomcat io线程)在 writeLatch 上等待。
                    //等待时间,默认不配置为 -1,这时超时时间为 Long.MAX_VALUE 毫秒。
                    if (writeTimeout < 0) {
                        att.awaitWriteLatch(Long.MAX_VALUE,TimeUnit.MILLISECONDS);
                    } else {
                        att.awaitWriteLatch(writeTimeout,TimeUnit.MILLISECONDS);
                    }
                } catch (InterruptedException ignore) {
                    // Ignore
                }
                if (att.getWriteLatch() != null && att.getWriteLatch().getCount() > 0) {
                    //we got interrupted, but we haven't received notification from the poller.
                    keycount = 0;
                } else {
                    //latch countdown has happened
                    keycount = 1;
                    att.resetWriteLatch();
                }

                if (writeTimeout > 0 && (keycount == 0)) {
                    timedout = (System.currentTimeMillis() - time) >= writeTimeout;
                }
            }
            if (timedout) {
                att.previousIOException = new SocketTimeoutException();
                throw att.previousIOException;
            }
        } finally {
            poller.remove(att, SelectionKey.OP_WRITE);
            if (timedout && reference.key != null) {
                poller.cancelKey(reference.key);
            }
            reference.key = null;
            keyReferenceStack.push(reference);
        }
        return written;
    }

总结:

1、响应数据的写入是阻塞的,如果发现数据不可写(例如写缓冲已满),那么首先注册封装的 OP_WRITE 事件到 BlockPoller 的事件队列里。然后会利用 NioSocketWrapper 对象中的 writeLatch 来阻塞当前线程。
2、对于线程阻塞时间为写超时,默认不配置为 -1,这时写时时间为 Long.MAX_VALUE 毫秒。
3、如果超时,则抛出 SocketTimeoutException,并取消上面注册的读事件。
4、最后将该事件从 selector 中移除(一般是可写事件)。

三、BlockPoller

  BlockPoller线程主要处理阻塞的读写操作,对于请求体数据读取,一般由 tomcat io线程进行,当数据不可读的时候的时候(例如:客户端数据未发送完毕),会注册封装的OPEN_READ事件到 BlockPoller 线程中,然后阻塞当前线程(一般为tomcat io线程)。
对于响应数据的写入,一般也由 tomcat io 线程进行,当数据不可写的时候(例如:原始socket发送缓冲区满),会注册封装的OPEN_WRITE事件对象到BlockPoller线程中,然后阻塞当前线程。
BlockPoller 实例都有一个 NIO selector 对象,主要用于监测注册在原始 scoket 上的事件是否发生。该实例有事件队列 SynchronizedQueue<PollerEvent>,用来存放发生的事件,一般该事件队列的元素由tomcat io 线程放入(当请求体不可读或者响应数据不可写的时候)。
1、启动

  在tomcat容器启动的时候,其中的一步协议处理器protocolHandler初始化时,就会创建BlockPoller线程,并启动。

    //NioEndpoint类
    public void bind() throws Exception {
        
        ···
        
        selectorPool.open();
    }
    public void open() throws IOException {
        enabled = true;
        getSharedSelector();
        if (SHARED) {
            blockingSelector = new NioBlockingSelector();
            blockingSelector.open(getSharedSelector());
        }
    }
    //NioBlockingSelector
    public void open(Selector selector) {
        sharedSelector = selector;
        poller = new BlockPoller();
        poller.selector = sharedSelector;
        poller.setDaemon(true);
        poller.setName("NioBlockingSelector.BlockPoller-"+(threadCounter.getAndIncrement()));
        poller.start();
    }

2、添加事件到队列

protected Selector selector = null;
protected final SynchronizedQueue<Runnable> events = new SynchronizedQueue<>();
protected final AtomicInteger wakeupCounter = new AtomicInteger(0);
 
 
public void add(final NioSocketWrapper key, final int ops, final KeyReference ref) {
        if (key == null) {
            return;
        }
        NioChannel nch = key.getSocket();
        final SocketChannel ch = nch.getIOChannel();
        if (ch == null) {
            return;
        }
        Runnable r = new RunnableAdd(ch, key, ops, ref);
        events.offer(r);
        wakeup();
}
public void wakeup() {
    if (wakeupCounter.addAndGet(1)==0) {
        selector.wakeup();
    }
 }

该方法主要是当请求体不可读或者响应体不可写时,被调用

被添加到SynchronizedQueue<PollerEvent> 队列,事件类型是RunnabledAdd

add方法会调用事件队列的offer方法放入事件,然后调用wakeup方法判断是否要唤醒selector

3、注册事件到selector

  BrockPoller线程的run方法会执行events方法,循环事件队列调用RunnableAdd线程的run方法,往原生selector上注册事件。

        //BrockPoller
        public void run() {
            while (run) {
                try {
                    events();
                    
                    ···
                    
                }catch ( Throwable t ) {
                    log.error("",t);
                }
            }
            events.clear();
            
            ···
            
        }
        //NioBlockingSelector
        public boolean events() {
            Runnable r = null;

            int size = events.size();
            for (int i = 0; i < size && (r = events.poll()) != null; i++) {
                r.run();
            }

            return (size > 0);
        }
        

    //RunnableAdd
    public void run() {
      SelectionKey sk = ch.keyFor(selector);
        try {
              if (sk == null) {
                  sk = ch.register(selector, ops, key);
                  ref.key = sk;
              } else if (!sk.isValid()) {
                  cancel(sk, key, ops);
              } else {
                  sk.interestOps(sk.interestOps() | ops);
              }
          } catch (CancelledKeyException cx) {
              cancel(sk, key, ops);
          } catch (ClosedChannelException cx) {
              cancel(null, key, ops);
          }
    }    

BlockPoller 的 event() 方法会遍历实例中事件队列中的所有 RunnableAdd 事件对象,然后依次调用该对象的 run() 方法。
RunnableAdd 的 run() 方法会把原始 socket 感兴趣的事件注册到 selector 对象中。

4、BrockPoller的核心run方法

 public void run() {
            while (run) {
                try {
                    //对队列中所有原始 socket 注册事件
                    events();
                    int keyCount = 0;
                    try {
                        //监听是否有 event() 方法注册的事件发生
                        if (wakeupCounter.getAndSet(-1) > 0) {
                            keyCount = selector.selectNow();
                        } else {
                            keyCount = selector.select(1000);
                        }
                        wakeupCounter.set(0);
                        if (!run) {
                            break;
                        }
                    }catch ( NullPointerException x ) {
                        ···
                    }

                    Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null;

                    // 如果 selector 有检测到事件发生,并且原始 socket 可读或者可写,则调用 countDown() 方法
                    while (run && iterator != null && iterator.hasNext()) {
                        SelectionKey sk = iterator.next();
                        NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
                        try {
                            iterator.remove();
                            sk.interestOps(sk.interestOps() & (~sk.readyOps()));
                            //调用NioSocketWrapper 对象实例关联的 readLatch 或者 writeLatch 的 countDown() 方法
                            if ( sk.isReadable() ) {
                                countDown(attachment.getReadLatch());
                            }
                            if (sk.isWritable()) {
                                countDown(attachment.getWriteLatch());
                            }
                        }catch (CancelledKeyException ckx) {
                            sk.cancel();
                            countDown(attachment.getReadLatch());
                            countDown(attachment.getWriteLatch());
                        }
                    }//while
                }catch ( Throwable t ) {
                    log.error("",t);
                }
            }
            //事件队列循环结束,清除事件队列
            events.clear();
            if (selector.isOpen()) {
                try {
                    // Cancels all remaining keys
                    selector.selectNow();
                }catch( Exception ignore ) {
                    if (log.isDebugEnabled()) {
                        log.debug("",ignore);
                    }
                }
            }
            try {
                selector.close();
            }catch( Exception ignore ) {
                if (log.isDebugEnabled()) {
                    log.debug("",ignore);
                }
            }
        }
        
        public void countDown(CountDownLatch latch) {
            if ( latch == null ) {
                return;
            }
            latch.countDown();
        }

四、BrockPoller线程的等待与唤醒

Block poller线程一般会和 tomcat io 线程有交互,即io线程会把事件放到 block poller线程的 SynchronizedQueue事件队列之中。而Block poller 线程会轮询事件队列进行操作,但是不能一直 while(true) 的轮询,这样会占用大量的 cpu 资源,所以会有 block poller 线程的阻塞与唤醒(一般由tomcat io线程注册事件的时候唤醒)。

Block poller 线程的阻塞与唤醒主要涉及 block poller 实例的 selector 属性和 wakeupCounter(AtomicLong类型)属性。

Block poller 的核心逻辑run方法会调用 selector.selectNow() 方法来获取是否有注册在原始 socket 上的事件发生。这个方法是非阻塞方法,即调用之后立即返回,不会阻塞当前 block poller 线程,这个方法会在确定有事件添加到队列的情况下调用,这样尽可能监测到连接是否有可读或可写事件。
Block poller 调用 selector.select(timeout) 方法,这个方法是阻塞方法,调用该方法之后 block poller 线程会一直处于等待状态,一直等待到有事件发生或者超时。这个方法会在没有事件添加到队列的情况下调用,从而让 block poller 线程进入等待状态,避免 cpu 空闲轮询造成使用率过高(极端情况下会导致 java 进程占用 cpu 100% 的现象)。
Block poller 实例会有 wakeupCounter 属性,这个属性为 AtomicLong 类型,初始值为 0,在 tomcat io 线程注册事件的时候,会根据该值是否为 0 来决定是否由 io 线程唤醒 block poller 线程。

protected final AtomicInteger wakeupCounter = new AtomicInteger(0);

while(run){
  events();
  if (wakeupCounter.getAndSet(-1) > 0) {   keyCount = selector.selectNow();   } else { keyCount = selector.select(1000);   }   wakeupCounter.set(0);   if (!run) {   break;   }
}

初始情况下wakeupCounter的值为0,每次调用getAndSet(-1)方式把值设置为-1,返回初始值0,所以会走else分支,select(1000)方法是阻塞的会阻塞1秒钟,之后再把值设置为0,一直循环。

唤醒

BrockPoller线程的唤醒由add()方法实现

protected Selector selector = null;
protected final SynchronizedQueue<Runnable> events = new SynchronizedQueue<>();
protected final AtomicInteger wakeupCounter = new AtomicInteger(0);
 
 
public void add(final NioSocketWrapper key, final int ops, final KeyReference ref) {
        if (key == null) {
            return;
        }
        NioChannel nch = key.getSocket();
        final SocketChannel ch = nch.getIOChannel();
        if (ch == null) {
            return;
        }
        Runnable r = new RunnableAdd(ch, key, ops, ref);
        events.offer(r);
        wakeup();
}
public void wakeup() {
    if (wakeupCounter.addAndGet(1)==0) {
        selector.wakeup();
    }
 }

当Brock Poller线程运行的时候,wakeupCounter 的值可能是0,也可能为 -1,也可能是其他值。当调用add方法往事件队列里添加事件的时候,调用addAndGet(1) 方法+1,并获取+1之后的值进行判断。

如果等于0,说明正在阻塞中,因为之前讲了run方法会调用wakeupCounter.getAndSet(-1)设置值为-1,并阻塞线程1秒钟,这时需要调用 selector.wakeup() 唤醒处于阻塞状态的 block poller 线程。

如果不等于0,说明线程不在阻塞状态,是正在执行状态,此时并不需要唤醒。

如果BrockPoller线程被唤醒,那么调用wakeupCounter.getAndSet(-1)获得的值一定大于0,那么就会调用非阻塞selectNow()方法来获得事件。但是,此时也不一定能获得事件,因为注册的是可读或者可写事件,可读的发生还是靠 client 端把数据发送过来,可写的发生是要求原始 socket 缓冲区可用。之所以要唤醒 block poller 线程,是因为对原始 socket 的读写事件在 events() 方法里注册好了。正常情况下,可读在 client 端在建立好连接之后应该会发送数据发生,可写在发送完上一次的响应数据之后原始 socket 缓冲区就可用。所以就有数据可读可写的可能性,然后马上唤醒 poller 线程,来用 selector 监测是否有可读可写事件发生。

原文链接:https://blog.csdn.net/weixin_46073333/article/details/109712956

原文地址:https://www.cnblogs.com/sglx/p/15440004.html