小言_互联网的博客

玩转Java线程池(2):Tomcat是如何修改创建线程的策略的?

487人阅读  评论(0)

0 线程池创建线程的过程是怎样的?

要知道创建线程策略是如何的,就要从构造函数入手,因为构造函数中有几个核心的参数

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

例如,我们用构造方法的几个参数构造了这么一个对象。

ThreadPoolExecutor executor = new  ThreadPoolExecutor(5,
                15,
                60,
                TimeUnit.MINUTES,
                new ArrayBlockingQueue<>(100),
                new ThreadFactoryBuilder().setNameFormat("my-%d").build(), // 这里是用了 Google Guava 的 ThreadFactoryBuilder;
                new ThreadPoolExecutor.CallerRunsPolicy());

这时候我们提交100任务,这个时候线程池里的线程是如何创建的?
1 在向线程池不断提提交任务,在线程池里的工作线程的数量还不到 核心线程数量(corePoolSize)的时候,就继续新增工作线程
2 如果工作线程数量达到了 核心线程数量(corePoolSize),还是没有停止提交任务,那么把工作线程还没来得及处理的任务增加到 workQueue 中。
3 如果往 workQueue#offer 任务失败,那么就会继续创建工作线程。
比如在 ArrayBlockingList#offer的代码,返回false的情况就是 count == items.length

if (count == items.length)
   return false;
else {
    enqueue(e);
    return true;
}

说明此时的队列里面任务的数量等于count的时候,也就是队列慢的时候,就会offer 任务。
4 如果此时继续新增任务,而且此时新增工作线程也失败了(也就是工作线程数量达到了最大线程数量),那么就会走拒绝的逻辑,也就是使用 RejectedExecutionHandler

2 Tomcat 做了哪些改造?

在Tomcat的线程改写主要在 https://github.com/apache/tomcat/blob/a801409b37294c3f3dd5590453fb9580d7e33af2/java/org/apache/tomcat/util/threads/
tomcat 在原来的基础上做了哪些的新增的功能?因为在 Tomcat 在修改了原来的创建线程的策略,原来是要在阻塞队列已经满了的情况下,才会继续增加工作线程的数量。但是tomcat是直接吧工作线程增加到了最大的工作线程的数量,然后再往阻塞队列中新增任务,这个过程的好像更加符合我们对“最大的核心线程数”的理解。但是 Tomcat 是紧耦合实现的功能,需要 ThreadPoolExecutor 和 TaskQueue(阻塞队列)配合使用。

2.1 ThreadPoolExecutor

这个线程池的执行器的执行方法,核心还是用了JDK 中的ThreadPoolExecutor,但是在异常处理的阶段做了改造。

/**
 * Executes the given command at some time in the future.  The command
 * may execute in a new thread, in a pooled thread, or in the calling
 * thread, at the discretion of the <code>Executor</code> implementation.
 * If no threads are available, it will be added to the work queue.
 * If the work queue is full, the system will wait for the specified
 * time and it throw a RejectedExecutionException if the queue is still
 * full after that.
 *
 * @param command the runnable task
 * @param timeout A timeout for the completion of the task
 * @param unit The timeout time unit
 * @throws RejectedExecutionException if this task cannot be
 * accepted for execution - the queue is full
 * @throws NullPointerException if command or unit is null
 */
public void execute(Runnable command, long timeout, TimeUnit unit) {
    submittedCount.incrementAndGet();
    try {
        super.execute(command);
    } catch (RejectedExecutionException rx) {
        if (super.getQueue() instanceof TaskQueue) {
            final TaskQueue queue = (TaskQueue)super.getQueue();
            try {
                if (!queue.force(command, timeout, unit)) {
                    submittedCount.decrementAndGet();
                    throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
                }
            } catch (InterruptedException x) {
                submittedCount.decrementAndGet();
                throw new RejectedExecutionException(x);
            }
        } else {
            submittedCount.decrementAndGet();
            throw rx;
        }
    }
}
2.1.1 RejectedExecutionException 是来自内部类

下面是 Tomcat 的自定义的一个内部类 RejectHandler

private static class RejectHandler implements RejectedExecutionHandler {
    private RejectHandler() {
    }

    public void rejectedExecution(Runnable r, java.util.concurrent.ThreadPoolExecutor executor) {
        throw new RejectedExecutionException();
    }
}

如果你在初始化对象的时候,没有指定 RejectedExecutionHandler 的参数,那么Tomcat 就默认指定这个 RejectHandler 。

2.1.2 捕获异常之后的操作

尝试去用 force 任务,如果连 force 都失败了,那么就表示,队列真的满了,这个 force 方法代码如下:

public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
    if (parent == null || parent.isShutdown()) throw new RejectedExecutionException(sm.getString("taskQueue.notRunning"));
    return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected
}

可以看到,还是使用了阻塞队列的offer的方法。目的还是为了把任务给 offer 进队列中。

2.1.3 小总结

可以看出,Tocmat 的这个线程池的excute的方法其实并没有做太大的改变,还是用了原来 ThreadPoolExecutor 的方法,重点在于出现了拒绝异常之后的操作。而出现了拒绝异常之后,里面的操作的是和 TaskQueue 息息相关的,那么就表示,所以只有知道了 TaskQueue 的操作,才能理解 Tomcat 是如何进行修改了创建策略的,所以说,Tomcat 的实现是『紧耦合』的。

2.3 TaskQueue

其实 TaskQueue 里面最核心的实现是就是 offer() 方法。

2.3.1 offer()

先来看看代码,注意,这里的parent的指的是 ThreadPoolExecutor,某些使用的场景下,有些会去调用 setParent 方法来主动的指定。

public void setParent(ThreadPoolExecutor tp) {
    parent = tp;
}
public boolean offer(Runnable o) {
 	//we can't do any checks 如果没有指定 ThreadPoolExecutor 为 null ,那么就直接调用父类的 offer 方法
    if (parent==null) return super.offer(o);
    // 如果这个时候的线程池的工作线程的数量已经达到了最大的线程数量,那么就 offer 方法,把任务offer进队列中。
    if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
    // 如果线程池中有空闲的线程,那么把任务提交给队列
    if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o);
    //如果此时的线程池里的工作线程的数量是少于线程池的最大的线程数量,那么就返回false
    if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
    // 如果以上条件都没有触发,那么就默认 offer 
    return super.offer(o);
}

这里的 getSubmittedCount 返回的 submittedCount 的值,这里记录是已经提交的,但是还没执行结束的任务的数量。

private final AtomicInteger submittedCount = new AtomicInteger(0);

submittedCount 这个要和 getPoolSize() 进行比较才显得有用

  • submittedCount.get() < getPoolSize(): 就表示当前的在执行中的任务的数量是小于当前线程池里的线程的,说明说有空闲的工作线程
  • submittedCount.get() = getPoolSize(): 就表示当前的线程池的工作线程的书满载的状态。
2.3.2 小总结

offer方法中,判断了当前线程的几种状态,工作线程是否已经达到了最大?是否有空闲线程?如果当前工作现场数量没有达到最大,那么就不进行入队操作。

2.2 总结

tomcat 自己实现的线程要实现的功能就是,只有在工作线程的数量达到最大的时候,而且,才进行入队操作,无论如何,工作线程要先达到最大。

为什么要这样?
  1. 因为Tomcat要尽可能的处理更多的请求,所以就尽可能不让线程进行入队。
  2. 如果只要线程最大,那么把核心线程数量和最大线程数量都设置一样,都设置最大不就行?是的,这样也可以达到相同的目的,但是这样就会一直保持多个工作线程的状态,就不会在空闲的时候降下来了。一直保持多个工作线程是不利于程序的执行的。维持这么多线程本来就会带来不小的开销。
  3. 在目前的CPU的性能已经如此之高的情况下,很多请求的处理瓶颈不是计算瓶颈了,而是进行数据库操作带来的瓶颈,例如数据库的查询,增加,删除等等。因为数据库的数据还是存在硬盘里的,硬盘的IO需要的时间要远远大于计算的时间。

2.3 除了 Tomcat 还有别的项目中有这样的实现吗?

在著名开源项目 Dubbo 中 EagerThreadPoolExecutor 也是类似的实现。
还有一种松耦合的实现,下一篇玩转线程池系列,我也会继续讨论。


水平有限,写的不好的地方欢迎支出,欢迎友好交流。


转载:https://blog.csdn.net/RAYFUXK/article/details/105894254
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场