源码简介
ThreadPoolExecutor是JDK中的线程池实现,这个类实现了一个线程池需要的各个方法,它提供了任务提交、线程管理、监控等方法。
下面是ThreadPoolExecutor类的构造方法源码,其他创建线程池的方法最终都会导向这个构造方法,共有7个参数:corePoolSize、maximumPoolSize、keepAliveTime、unit、workQueue、threadFactory、handler。
-
public ThreadPoolExecutor(int corePoolSize,
-
int maximumPoolSize,
-
long keepAliveTime,
-
TimeUnit unit,
-
BlockingQueue<Runnable> workQueue,
-
ThreadFactory threadFactory,
-
RejectedExecutionHandler handler) {
-
if (corePoolSize <
0 ||
-
maximumPoolSize <=
0 ||
-
maximumPoolSize < corePoolSize ||
-
keepAliveTime <
0)
-
throw
new IllegalArgumentException();
-
if (workQueue ==
null || threadFactory ==
null || handler ==
null)
-
throw
new NullPointerException();
-
this.acc = System.getSecurityManager() ==
null ?
-
null :
-
AccessController.getContext();
-
this.corePoolSize = corePoolSize;
-
this.maximumPoolSize = maximumPoolSize;
-
this.workQueue = workQueue;
-
this.keepAliveTime = unit.toNanos(keepAliveTime);
-
this.threadFactory = threadFactory;
-
this.handler = handler;
-
}
这些参数都通过volatile修饰:
-
public
class ThreadPoolExecutor extends AbstractExecutorService {
-
private
final BlockingQueue<Runnable> workQueue;
-
private
volatile ThreadFactory threadFactory;
-
private
volatile RejectedExecutionHandler handler;
-
private
volatile
long keepAliveTime;
-
// 是否允许核心线程被回收
-
private
volatile
boolean allowCoreThreadTimeOut;
-
private
volatile
int corePoolSize;
-
private
volatile
int maximumPoolSize;
-
}
corePoolSize:核心线程数
线程池维护的最小线程数量,核心线程创建后不会被回收(注意:设置allowCoreThreadTimeout=true后,空闲的核心线程超过存活时间也会被回收)。
大于核心线程数的线程,在空闲时间超过keepAliveTime后会被回收。
线程池刚创建时,里面没有一个线程,当调用 execute() 方法添加一个任务时,如果正在运行的线程数量小于corePoolSize,则马上创建新线程并运行这个任务。
maximumPoolSize:最大线程数
线程池允许创建的最大线程数量。
当添加一个任务时,核心线程数已满,线程池还没达到最大线程数,并且没有空闲线程,工作队列已满的情况下,创建一个新线程并执行。
keepAliveTime:空闲线程存活时间
当一个可被回收的线程的空闲时间大于keepAliveTime,就会被回收。
可被回收的线程:
- 设置allowCoreThreadTimeout=true的核心线程。
- 大于核心线程数的线程(非核心线程)。
unit:时间单位
keepAliveTime的时间单位:
-
TimeUnit.NANOSECONDS
-
TimeUnit.MICROSECONDS
-
TimeUnit.MILLISECONDS
// 毫秒
-
TimeUnit.SECONDS
-
TimeUnit.MINUTES
-
TimeUnit.HOURS
-
TimeUnit.DAYS
workQueue:工作队列
存放待执行任务的队列:当提交的任务数超过核心线程数大小后,再提交的任务就存放在工作队列,任务调度时再从队列中取出任务。它仅仅用来存放被execute()方法提交的Runnable任务。工作队列实现了BlockingQueue接口。
JDK默认的工作队列有五种:
- ArrayBlockingQueue 数组型阻塞队列:数组结构,初始化时传入大小,有界,FIFO,使用一个重入锁,默认使用非公平锁,入队和出队共用一个锁,互斥。
- LinkedBlockingQueue 链表型阻塞队列:链表结构,默认初始化大小为Integer.MAX_VALUE,有界(近似无解),FIFO,使用两个重入锁分别控制元素的入队和出队,用Condition进行线程间的唤醒和等待。
- SynchronousQueue 同步队列:容量为0,添加任务必须等待取出任务,这个队列相当于通道,不存储元素。
- PriorityBlockingQueue 优先阻塞队列:无界,默认采用元素自然顺序升序排列。
- DelayQueue 延时队列:无界,元素有过期时间,过期的元素才能被取出。
threadFactory:线程工厂
创建线程的工厂,可以设定线程名、线程编号等。
默认线程工厂:
-
/**
-
* The default thread factory
-
*/
-
static
class DefaultThreadFactory implements ThreadFactory {
-
private
static
final AtomicInteger poolNumber =
new AtomicInteger(
1);
-
private
final ThreadGroup group;
-
private
final AtomicInteger threadNumber =
new AtomicInteger(
1);
-
private
final String namePrefix;
-
-
DefaultThreadFactory() {
-
SecurityManager s = System.getSecurityManager();
-
group = (s !=
null) ? s.getThreadGroup() :
-
Thread.currentThread().getThreadGroup();
-
namePrefix =
"pool-" +
-
poolNumber.getAndIncrement() +
-
"-thread-";
-
}
-
-
public Thread newThread(Runnable r) {
-
Thread t =
new Thread(group, r,
-
namePrefix + threadNumber.getAndIncrement(),
-
0);
-
if (t.isDaemon())
-
t.setDaemon(
false);
-
if (t.getPriority() != Thread.NORM_PRIORITY)
-
t.setPriority(Thread.NORM_PRIORITY);
-
return t;
-
}
-
}
handler:拒绝策略
当线程池线程数已满,并且工作队列达到限制,新提交的任务使用拒绝策略处理。可以自定义拒绝策略,拒绝策略需要实现RejectedExecutionHandler接口。
JDK默认的拒绝策略有四种:
- AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
- DiscardPolicy:丢弃任务,但是不抛出异常。可能导致无法发现系统的异常状态。
- DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务。
- CallerRunsPolicy:由调用线程处理该任务。
默认拒绝策略:
-
/**
-
* The default rejected execution handler
-
*/
-
private
static
final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
-
-
public
static
class AbortPolicy implements RejectedExecutionHandler {
-
/**
-
* Creates an {@code AbortPolicy}.
-
*/
-
public AbortPolicy() { }
-
-
/**
-
* Always throws RejectedExecutionException.
-
*
-
* @param r the runnable task requested to be executed
-
* @param e the executor attempting to execute this task
-
* @throws RejectedExecutionException always
-
*/
-
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
-
throw
new RejectedExecutionException(
"Task " + r.toString() +
-
" rejected from " +
-
e.toString());
-
}
-
}
线程池的执行流程
自定义线程池工具
-
import java.util.concurrent.*;
-
import java.util.concurrent.atomic.AtomicInteger;
-
-
/**
-
* 线程池工厂工具
-
*
-
* @author 向振华
-
* @date 2021/04/11 10:24
-
*/
-
public
class ThreadPoolFactory {
-
-
/**
-
* 生成固定大小的线程池
-
*
-
* @param threadName 线程名称
-
* @return 线程池
-
*/
-
public static ExecutorService createFixedThreadPool(String threadName) {
-
AtomicInteger threadNumber =
new AtomicInteger(
0);
-
return
new ThreadPoolExecutor(
-
// 核心线程数
-
desiredThreadNum(),
-
// 最大线程数
-
desiredThreadNum(),
-
// 空闲线程存活时间
-
60L,
-
// 空闲线程存活时间单位
-
TimeUnit.SECONDS,
-
// 工作队列
-
new ArrayBlockingQueue<>(
1024),
-
// 线程工厂
-
new ThreadFactory() {
-
@Override
-
public Thread newThread(Runnable r) {
-
return
new Thread(r, threadName +
"-" + threadNumber.getAndIncrement());
-
}
-
},
-
// 拒绝策略
-
new RejectedExecutionHandler() {
-
@Override
-
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
-
if (!executor.isShutdown()) {
-
try {
-
//尝试阻塞式加入任务队列
-
executor.getQueue().put(r);
-
}
catch (Exception e) {
-
//保持线程的中断状态
-
Thread.currentThread().interrupt();
-
}
-
}
-
}
-
});
-
}
-
-
/**
-
* 理想的线程数,使用 2倍cpu核心数
-
*/
-
public static int desiredThreadNum() {
-
return Runtime.getRuntime().availableProcessors() *
2;
-
}
-
}
转载:https://blog.csdn.net/Anenan/article/details/115603481