小言_互联网的博客

一篇看懂java线程池

394人阅读  评论(0)

这些学习笔记其实早就写好了,只不过都是文档的形式,后续慢慢会整理成博客上传。这篇主要从源码的角度,讲解四个基本的线程池。个人目前水平有限,有些地方可能无法深入或者有所错漏的地方,还望谅解。但是随着日后水平增长,我会补全的。好了进入主题。

一、什么是线程池?

学术的定义是,线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。

个人学习的时候,喜欢想象成一个工厂,工厂内部对外封闭,工人们只将需要生产的任务交给工厂,然后工厂内部自动会将这些任务分配给不同的机器,然后完成生产作业。这里我们可以将线程理解为一台台的机器设备。

二、为什么要使用线程池?

那么为什么需要线程池呢?还是刚刚那个例子,如果不是这个大工厂内执行的话,而是在一块空地上一无所有。我每有一个新任务,都需要提交任务的人自带一台机器设备,那也太奢侈了,而且不现实。另外一方面,假设有钱任性,能够满足一每个人带一台设备,那也没有这么大的地方可以放置这么多设备,即使放得下肯定也是都挨着,影响效率。

所以我们可以对应得出线程池的两个优点

1)当出现大量任务的时候,不需要重复创建和销毁线程,因为这样做带来的开销十分巨大。

2)可以控制同一时间内并发执行的线程数,防止竞争带来的额外开销。

三、工作流程

那么接下来看工作流程,继续举例。当一个新任务被提交给工厂时,首先看常驻设备(核心线程)是否已经达到建厂的规划了,没有就再买设备(创建核心线程)。如果已经满了,就提交到这些设备的缓存队列(工作队列)中。这些设备一旦处理完当前任务,就可以从缓存队列中取出任务继续生产。如果缓存队列都满了,说明这段时间提交的任务太多了,需要额外再买设备加班加点生产(可以想象一下双十一),直到这个工厂放不下设备了,就执行拒绝策略,这个之后再谈。

那么计算机里的线程池的工作流程是这样的。

四、源码解读(觉得复杂的可以跳过,直接看后半部分对于ThreadPoolExecutor)

说到源码,相信很多人就头大,因为看不懂。但如果能够看得懂的话,学习起来一方面十分有成就感,另一方面学的十分透彻。首先来看executor的框架,看图。Executor接口是线程池体系的最顶级接口。

其他的好像和今天的主题关系不是很大,因此先不管它,主要看ExecutorService这个子接口。这个接口中只有一个抽象方法execute() 大概就是在将来会执行所给的命令,既有可能在新线程中执行,也可能在线程池的线程执行,或者在调用者线程执行。


  
  1. public interface Executor {
  2. /**
  3. * Executes the given command at some time in the future. The command
  4. * may execute in a new thread, in a pooled thread, or in the calling
  5. * thread, at the discretion of the {@code Executor} implementation.
  6. *
  7. * @param command the runnable task
  8. * @throws RejectedExecutionException if this task cannot be
  9. * accepted for execution
  10. * @throws NullPointerException if command is null
  11. */
  12. void execute(Runnable command);
  13. }

接下来是ExecutorService接口,这个接口中主要是一些对任务的一些操作,比如比如sumbit,shutdown。


  
  1. public interface ExecutorService extends Executor {
  2. void shutdown();
  3. List<Runnable> shutdownNow();
  4. <T> Future<T> submit(Callable<T> task);
  5. <T> Future<T> submit(Runnable task, T result);
  6. Future<?> submit(Runnable task);
  7. }

然后是一个实现类和一个子接口。


  
  1. /**
  2. * Provides default implementations of {@link ExecutorService}
  3. * execution methods. This class implements the {@code submit},
  4. * {@code invokeAny} and {@code invokeAll} methods using a
  5. * {@link RunnableFuture} returned by {@code newTaskFor}, which defaults
  6. * to the {@link FutureTask} class provided in this package. For example,
  7. * the implementation of {@code submit(Runnable)} creates an
  8. * associated {@code RunnableFuture} that is executed and
  9. * returned. Subclasses may override the {@code newTaskFor} methods
  10. * to return {@code RunnableFuture} implementations other than
  11. * {@code FutureTask}.
  12. *
  13. * <p><b>Extension example</b>. Here is a sketch of a class
  14. * that customizes {@link ThreadPoolExecutor} to use
  15. * a {@code CustomTask} class instead of the default {@code FutureTask}:
  16. * <pre> {@code
  17. * public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
  18. *
  19. * static class CustomTask<V> implements RunnableFuture<V> {...}
  20. *
  21. * protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
  22. * return new CustomTask<V>(c);
  23. * }
  24. * protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
  25. * return new CustomTask<V>(r, v);
  26. * }
  27. * // ... add constructors, etc.
  28. * }}</pre>
  29. *
  30. * @since 1.5
  31. * @author Doug Lea
  32. */
  33. public abstract class AbstractExecutorService implements ExecutorService {

abstractExecutorService大概可以看出来,就是实现了ExecutorService,对其中的一些方法提供了默认实现。


  
  1. /**
  2. * A wrapper class that exposes only the ExecutorService methods
  3. * of an ExecutorService implementation.
  4. */
  5. static class DelegatedExecutorService extends AbstractExecutorService {

可以看到 DelegatedExecutorService实际上就是一个包装类,就不用管它了。


  
  1. /**
  2. * An {@link ExecutorService} for running {@link ForkJoinTask}s.
  3. * A {@code ForkJoinPool} provides the entry point for submissions
  4. * from non-{@code ForkJoinTask} clients, as well as management and
  5. * monitoring operations.
  6. 省略中间的
  7. public class ForkJoinPool extends AbstractExecutorService {

这个ForkJoinPool呢,我查了一下好像是JDK7加入的新线程池,用来分而划之大任务的。由于时间关系,暂且不提,感兴趣的可以看这一下这篇https://blog.csdn.net/niyuelin1990/article/details/78658251 

然后就是我们今天的重点ThreadPoolExecutor。其他的代码先不管了,等用到的时候再看。首先来看这个最重要的构造方法。

实际上后面所讲的四种的基本线程池,都是调用的这个构造方法。


  
  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue) {
  6. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  7. Executors.defaultThreadFactory(), defaultHandler);
  8. }

以下有几个参数。

corePoolSize 核心线程数   就是前面所说的现有的设备

maxiumPoolSize 最大线程数  就是前面所说工厂里允许存放的最大设备数量

keepAliveTime 存活时间   指线程允许存货的时间

unit 时间单位   存活时间的单位

workQueue 工作队列   规定存储队列的性质

而它的子类ScheduledThreadPoolExecutor,顾名思义则是规定了这些任务在未来的某个时刻进行执行。

五 、四种基本线程池

分别是定长型,定长计划型,单一线程型和缓存型。

先看定长型

六、有关参考:

1、Java并发——Executor框架详解(Executor框架结构与框架成员)

https://blog.csdn.net/tongdanping/article/details/79604637

2、


转载:https://blog.csdn.net/mrkyee/article/details/104411889
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场