10 JUC的任务调度线程池,Tomcat线程池以及fork/join线程池

1 任务调度线程池的应用(固定时间点执行任务)

需求:每周四18:00开始执行一个任务。

package chapter8;
import java.time.DayOfWeek;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class test13 {
    public static void main(String[] args) {
        /*需求:每周四18:00执行任务*/
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
        /*距离最近的时间点需要延时多少时间*/
        LocalDateTime now = LocalDateTime.now();
        System.out.println(now);
        // 获取周四的时间
        LocalDateTime time = now.withHour(18).withMinute(0).withSecond(0).withNano(0).with(DayOfWeek.THURSDAY);
        // 如果当前的时间已经超过这周的周四时间,那么将时间加一周
        if(now.compareTo(time) > 0){
            time = time.plusWeeks(1);
        }
        long initialDelay = Duration.between(now,time).toMillis();
        System.out.println(time);
        /*相邻的时间点的间隔时间,7天的时间*/
        long period = 1000*60*60*21*7;
        pool.scheduleWithFixedDelay(()->{
            System.out.println("running!");
        },initialDelay,period, TimeUnit.MILLISECONDS);

    }
}

2 Tomcat线程池

2-1 概述

一个Tomcat中只有一个Server,一个Server可以包含多个Service,一个Service只有一个Container,但是可以有多个Connectors(一个服务可以有多个连接,如同时提供Http和Https链接)Tomcat的service主要由 Container 和 Connector 以及相关组件构成:

  • Container用于封装和管理Servlet,以及具体处理Request请求;

  • Connector: tomcat 与外部世界的连接器(用于封装和管理Servlet,以及具体处理Request请求),监听固定端口接收外部请求,传递给 Container,并将 Container 处理的结果返回给外部,Connector最底层使用的是Socket来进行连接的Request和Response是按照HTTP协议来封装的,所以Connector同时需要实现TCP/IP协议和HTTP协议

Tomcat的connector的NIO EndPoint

  • LimitLatch 用来限流,可以控制最大连接个数,类似 J.U.C 中的 Semaphore 后面再讲
  • Acceptor 只负责【接收新的 socket 连接】
  • Poller 只负责监听 socket channel 是否有【可读的 I/O 事件】,一旦可读,封装一个任务对象(socketProcessor), 提交给 Executor 线程池处理
  • Executor 线程池中的工作线程最终负责【处理请求】

总结:Tomcat线程池用于处理可读的IO事件

2-2 Tomcat线程池与JUC线程池联系

联系:tomcat线程池扩展了JUC的ThreadPoolExecutor。

区别

  • 拒绝策略上,tomcat线程池中当总线程数达到 maximumPoolSize,不会立刻抛 RejectedExecutionException 异常
    而是尝试将任务放入队列,如果还失败,才抛出 RejectedExecutionException 异常 。

源码分析

/*Tomcat重写了JUC的execute方法*/
public void execute(Runnable command, long timeout, TimeUnit unit) {
submittedCount.incrementAndGet();
    try {
        /*调用父类的execute方法,如果执行过程中触发拒绝策略-抛出异常的话,那么Tomcat会捕获该异常进行处理*/
        super.execute(command);
    } catch (RejectedExecutionException rx) {
       /*调用父类的execute方法,如果执行过程中触发拒绝策略-抛出异常的话,那么Tomcat会捕获该异常进行处理*/
        if (super.getQueue() instanceof TaskQueue) {
            // step1:获取当前线程池的阻塞队列
            final TaskQueue queue = (TaskQueue)super.getQueue();
        try {
            // step2:尝试将任务再次加入阻塞对立,如果失败返回fasle,进入if条件语句,抛出异常
            if (!queue.force(command, timeout, unit)) {
                submittedCount.decrementAndGet();
                throw new RejectedExecutionException("Queue capacity is full.");
            }
        } catch (InterruptedException x) {
            submittedCount.decrementAndGet();
            Thread.interrupted();
            throw new RejectedExecutionException(x);
        }
        } else {
            submittedCount.decrementAndGet();
            throw rx;
        }
   }
}

TaskQueue.java

public boolean force(Runnable o, long timeout, TimeUnit unit) throws 		       InterruptedException {
	if ( parent.isShutdown() )
		throw new RejectedExecutionException(
			"Executor not running, can't force a command into the queue"
	);
	return super.offer(o,timeout,unit); 
    //forces the item onto the queue, to be used if the taskis rejected
}
  • 尝试将任务放入阻塞队列

2-3 Tomcat线程池的配置

Connector的配置(对照上图中的配置选项)

配置项 默认值 说明
acceptorThreadCount 1 acceptor 线程数量
pollerThreadCount 1 poller 线程数量
minSpareThreads 10 核心线程数,即 corePoolSize
maxThreads 200 最大线程数,即 maximumPoolSize
executor - Executor 名称,用来引用下面的 Executor,配置时寻找对应名称的配置项,并利用其中的参数。
  • 一般acceptor线程数量为1就足够了
  • poll线程用于监听 socket channel 是否有可读的 I/O 事件,一旦可读,封装一个任务对象(socketProcessor)提交给 Executor 线程池处理。由于采用多路复用机制,一个线程可以监听多个channel的事件。一般情况下也不需要调整

Executor 线程配置

配置项 默认值 说明
threadPriority 5 线程优先级
daemon true 是否守护线程
minSpareThreads 25 核心线程数,即 corePoolSize
maxThreads 200 最大线程数,即 maximumPoolSize
maxIdleTime 60000 线程生存时间,单位是毫秒,默认值即 1 分钟(救急线程生存事件
maxQueueSize Integer.MAX_VALUE 队列长度(默认整数最大值,相当于无界队列
prestartminSpareThreads false 核心线程是否在服务器启动时启动(核心线程时是否懒惰初始化
  • tomcat线程池的线程时守护线程,会随着主线程结束而结束
Tomcat的无界队列是否意味着没有机会创建救急线程

通常情况下:

  • 当阻塞队列满了,才会创建救急线程

Tomcat中:

  • 在加入阻塞队列前,先比较提交任务的数量是否大于最大线程的数目,如果大于直接创建救急线程,而不是加入阻塞队列(这种实现使得无界队列情况下也会创建救急线程)

3 JUC中的fork/join线程池(JDK1.7)

3-1 概述

定义:JDK 1.7 加入的新的线程池实现 ,采用分治思想,用于能够进行任务拆分的 cpu 密集型运算,在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运算效率运算

3-2 实例

实例:采用fork/join的方式累计1到5d

package chapter8;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

class Mytask extends RecursiveTask<Integer>{
    private int n;
    public Mytask(int n){
        this.n = n;
    }
    @Override
    protected Integer compute() {
        if(n == 1)
            return 1;
        Mytask t1 = new Mytask(n-1);
        t1.fork();
        int result = n + t1.join();
        return result;
    }
}

public class test14 {
    public static void main(String[] args) {
        // 默认线程数目等于CPU核数
        ForkJoinPool pool = new ForkJoinPool(4);
        System.out.println(pool.invoke(new Mytask(5)));
    }
}

执行结果:

15

3-3 实例的问题

  • 上面例子的计算过程类似于递归的思路,只不过采用“任务”去替换“函数的调用,显然上面的实现是有明显问题的,任务之间有着线性的依赖关系。不是并行执行

3-4 实例的改进

package chapter8;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
@Slf4j(topic = "c.test15")
class AddTask3 extends RecursiveTask<Integer> {
    int begin;
    int end;
    public AddTask3(int begin, int end) {
        this.begin = begin;
        this.end = end;
    }
    @Override
    public String toString() {
        return "{" + begin + "," + end + '}';
    }
    @Override
    protected Integer compute() {
// 5, 5
        if (begin == end) {
            log.warn("join() {}", begin);
            return begin;
        }
// 4, 5
        if (end - begin == 1) {
            log.warn("join() {} + {} = {}", begin, end, end + begin);
            return end + begin;
        }
// 1 5
        int mid = (end + begin) / 2; // 3
        AddTask3 t1 = new AddTask3(begin, mid); // 1,3
        t1.fork();
        AddTask3 t2 = new AddTask3(mid + 1, end); // 4,5
        t2.fork();
        int result = t1.join() + t2.join();
        log.warn("join() {} + {} = {}", t1, t2, result);
        return result;
    }
}

public class test15{
    public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool(4);
        System.out.println(pool.invoke(new AddTask3(1, 10)));
    }
}

执行结果

[ForkJoinPool-1-worker-2] WARN c.test15 - join() 1 + 2 = 3
[ForkJoinPool-1-worker-2] WARN c.test15 - join() 3
[ForkJoinPool-1-worker-2] WARN c.test15 - join() {1,2} + {3,3} = 6
[ForkJoinPool-1-worker-1] WARN c.test15 - join() 4 + 5 = 9
[ForkJoinPool-1-worker-1] WARN c.test15 - join() {1,3} + {4,5} = 15
15

总结:任务的合理拆分有利于fork/join框架下任务执行并发度的提高

参考资料

Tomcat组成与工作原理

多线程基础课程

原文地址:https://www.cnblogs.com/kfcuj/p/14629741.html