飞道的博客

Java线程池七个参数详解:核心线程数、最大线程数、空闲线程存活时间、时间单位、工作队列、线程工厂、拒绝策略

659人阅读  评论(0)

源码简介

ThreadPoolExecutor是JDK中的线程池实现,这个类实现了一个线程池需要的各个方法,它提供了任务提交、线程管理、监控等方法。

下面是ThreadPoolExecutor类的构造方法源码,其他创建线程池的方法最终都会导向这个构造方法,共有7个参数:corePoolSize、maximumPoolSize、keepAliveTime、unit、workQueue、threadFactory、handler。


  
  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler) {
  8. if (corePoolSize < 0 ||
  9. maximumPoolSize <= 0 ||
  10. maximumPoolSize < corePoolSize ||
  11. keepAliveTime < 0)
  12. throw new IllegalArgumentException();
  13. if (workQueue == null || threadFactory == null || handler == null)
  14. throw new NullPointerException();
  15. this.acc = System.getSecurityManager() == null ?
  16. null :
  17. AccessController.getContext();
  18. this.corePoolSize = corePoolSize;
  19. this.maximumPoolSize = maximumPoolSize;
  20. this.workQueue = workQueue;
  21. this.keepAliveTime = unit.toNanos(keepAliveTime);
  22. this.threadFactory = threadFactory;
  23. this.handler = handler;
  24. }

这些参数都通过volatile修饰:


  
  1. public class ThreadPoolExecutor extends AbstractExecutorService {
  2. private final BlockingQueue<Runnable> workQueue;
  3. private volatile ThreadFactory threadFactory;
  4. private volatile RejectedExecutionHandler handler;
  5. private volatile long keepAliveTime;
  6. // 是否允许核心线程被回收
  7. private volatile boolean allowCoreThreadTimeOut;
  8. private volatile int corePoolSize;
  9. private volatile int maximumPoolSize;
  10. }

corePoolSize:核心线程数

线程池维护的最小线程数量,核心线程创建后不会被回收(注意:设置allowCoreThreadTimeout=true后,空闲的核心线程超过存活时间也会被回收)。

大于核心线程数的线程,在空闲时间超过keepAliveTime后会被回收。

线程池刚创建时,里面没有一个线程,当调用 execute() 方法添加一个任务时,如果正在运行的线程数量小于corePoolSize,则马上创建新线程并运行这个任务。

maximumPoolSize:最大线程数

线程池允许创建的最大线程数量。

当添加一个任务时,核心线程数已满,线程池还没达到最大线程数,并且没有空闲线程,工作队列已满的情况下,创建一个新线程并执行。

keepAliveTime:空闲线程存活时间

当一个可被回收的线程的空闲时间大于keepAliveTime,就会被回收。

可被回收的线程:

  1. 设置allowCoreThreadTimeout=true的核心线程。
  2. 大于核心线程数的线程(非核心线程)。

unit:时间单位

keepAliveTime的时间单位:


  
  1. TimeUnit.NANOSECONDS
  2. TimeUnit.MICROSECONDS
  3. TimeUnit.MILLISECONDS // 毫秒
  4. TimeUnit.SECONDS
  5. TimeUnit.MINUTES
  6. TimeUnit.HOURS
  7. TimeUnit.DAYS

workQueue:工作队列

存放待执行任务的队列:当提交的任务数超过核心线程数大小后,再提交的任务就存放在工作队列,任务调度时再从队列中取出任务。它仅仅用来存放被execute()方法提交的Runnable任务。工作队列实现了BlockingQueue接口。

JDK默认的工作队列有五种:

  1. ArrayBlockingQueue 数组型阻塞队列:数组结构,初始化时传入大小,有界,FIFO,使用一个重入锁,默认使用非公平锁,入队和出队共用一个锁,互斥。
  2. LinkedBlockingQueue 链表型阻塞队列:链表结构,默认初始化大小为Integer.MAX_VALUE,有界(近似无解),FIFO,使用两个重入锁分别控制元素的入队和出队,用Condition进行线程间的唤醒和等待。
  3. SynchronousQueue 同步队列:容量为0,添加任务必须等待取出任务,这个队列相当于通道,不存储元素。
  4. PriorityBlockingQueue 优先阻塞队列:无界,默认采用元素自然顺序升序排列。
  5. DelayQueue 延时队列:无界,元素有过期时间,过期的元素才能被取出。

threadFactory:线程工厂

创建线程的工厂,可以设定线程名、线程编号等。

默认线程工厂:


  
  1. /**
  2. * The default thread factory
  3. */
  4. static class DefaultThreadFactory implements ThreadFactory {
  5. private static final AtomicInteger poolNumber = new AtomicInteger( 1);
  6. private final ThreadGroup group;
  7. private final AtomicInteger threadNumber = new AtomicInteger( 1);
  8. private final String namePrefix;
  9. DefaultThreadFactory() {
  10. SecurityManager s = System.getSecurityManager();
  11. group = (s != null) ? s.getThreadGroup() :
  12. Thread.currentThread().getThreadGroup();
  13. namePrefix = "pool-" +
  14. poolNumber.getAndIncrement() +
  15. "-thread-";
  16. }
  17. public Thread newThread(Runnable r) {
  18. Thread t = new Thread(group, r,
  19. namePrefix + threadNumber.getAndIncrement(),
  20. 0);
  21. if (t.isDaemon())
  22. t.setDaemon( false);
  23. if (t.getPriority() != Thread.NORM_PRIORITY)
  24. t.setPriority(Thread.NORM_PRIORITY);
  25. return t;
  26. }
  27. }

handler:拒绝策略

当线程池线程数已满,并且工作队列达到限制,新提交的任务使用拒绝策略处理。可以自定义拒绝策略,拒绝策略需要实现RejectedExecutionHandler接口。

JDK默认的拒绝策略有四种:

  1. AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
  2. DiscardPolicy:丢弃任务,但是不抛出异常。可能导致无法发现系统的异常状态。
  3. DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务。
  4. CallerRunsPolicy:由调用线程处理该任务。

默认拒绝策略:


  
  1. /**
  2. * The default rejected execution handler
  3. */
  4. private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
  5. public static class AbortPolicy implements RejectedExecutionHandler {
  6. /**
  7. * Creates an {@code AbortPolicy}.
  8. */
  9. public AbortPolicy() { }
  10. /**
  11. * Always throws RejectedExecutionException.
  12. *
  13. * @param r the runnable task requested to be executed
  14. * @param e the executor attempting to execute this task
  15. * @throws RejectedExecutionException always
  16. */
  17. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  18. throw new RejectedExecutionException( "Task " + r.toString() +
  19. " rejected from " +
  20. e.toString());
  21. }
  22. }

线程池的执行流程

自定义线程池工具


  
  1. import java.util.concurrent.*;
  2. import java.util.concurrent.atomic.AtomicInteger;
  3. /**
  4. * 线程池工厂工具
  5. *
  6. * @author 向振华
  7. * @date 2021/04/11 10:24
  8. */
  9. public class ThreadPoolFactory {
  10. /**
  11. * 生成固定大小的线程池
  12. *
  13. * @param threadName 线程名称
  14. * @return 线程池
  15. */
  16. public static ExecutorService createFixedThreadPool(String threadName) {
  17. AtomicInteger threadNumber = new AtomicInteger( 0);
  18. return new ThreadPoolExecutor(
  19. // 核心线程数
  20. desiredThreadNum(),
  21. // 最大线程数
  22. desiredThreadNum(),
  23. // 空闲线程存活时间
  24. 60L,
  25. // 空闲线程存活时间单位
  26. TimeUnit.SECONDS,
  27. // 工作队列
  28. new ArrayBlockingQueue<>( 1024),
  29. // 线程工厂
  30. new ThreadFactory() {
  31. @Override
  32. public Thread newThread(Runnable r) {
  33. return new Thread(r, threadName + "-" + threadNumber.getAndIncrement());
  34. }
  35. },
  36. // 拒绝策略
  37. new RejectedExecutionHandler() {
  38. @Override
  39. public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
  40. if (!executor.isShutdown()) {
  41. try {
  42. //尝试阻塞式加入任务队列
  43. executor.getQueue().put(r);
  44. } catch (Exception e) {
  45. //保持线程的中断状态
  46. Thread.currentThread().interrupt();
  47. }
  48. }
  49. }
  50. });
  51. }
  52. /**
  53. * 理想的线程数,使用 2倍cpu核心数
  54. */
  55. public static int desiredThreadNum() {
  56. return Runtime.getRuntime().availableProcessors() * 2;
  57. }
  58. }

 


转载:https://blog.csdn.net/Anenan/article/details/115603481
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场