一、Java 中的线程池
1. 线程池状态
ThreadPoolExecutor使用int的高3位来表示线程池状态,低29位表示线程数量
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
线程池状态和线程池中线程的数量由一个原子整型ctl来共同表示
- 使用一个数来表示两个值的主要原因是:可以通过一次CAS同时更改两个属性的值
// 原子整数,前3位保存了线程池的状态,剩余位保存的是线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//去掉前三位保存线程状态的位数,剩下的用于保存线程数量
private static final int COUNT_BITS = Integer.SIZE - 3;
// 2^COUNT_BITS次方,表示可以保存的最大线程数
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
获取线程池状态、线程数量以及合并两个值的操作
// Packing and unpacking ctl
// 传入 ctl 值 获取运行状态 该操作会让除高3位以外的数全部变为0
private static int runStateOf(int c) {
return c & ~CAPACITY; }
// 传入 ctl 值 获取运行线程数 该操作会让高3位为0
private static int workerCountOf(int c) {
return c & CAPACITY; }
// 传入 rs 运行状态 wc 线程数量 计算ctl新值
private static int ctlOf(int rs, int wc) {
return rs | wc; }
2. 线程池主要属性参数
//阻塞队列,用于存放来不及被核心线程执行的任务
private final BlockingQueue<Runnable> workQueue;
// 全局锁,解决创建销毁线程等线程安全问题
private final ReentrantLock mainLock = new ReentrantLock();
// 用于存放核心线程的容器,只有当持有锁时才能够获取其中的元素
private final HashSet<Worker> workers = new HashSet<Worker>();
//线程工厂,给线程取名字
private volatile ThreadFactory threadFactory;
// 拒绝执行处理器 处理拒绝策略
private volatile RejectedExecutionHandler handler;
// 救急线程(或者核心线程)空闲时的最大生存时间
private volatile long keepAliveTime;
// 核心线程数
private volatile int corePoolSize;
// 最大线程数
// 最大线程数 - 核心线程数 = 九级线程数
private volatile int maximumPoolSize;
// 默认拒绝策略
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
- corePoolSize : (核心线程数量),如果调用了线程池的 prestartAllCoreThreads( ) 方法,线程池会提前创建并启动所有基本线程,否则是懒惰创建
- workQueue:用于保存等待执行的任务的阻塞队列。可以选择以下几个具体实现
- ① ArrayBlockingQueue:是一个基于数组的有界阻塞队列,按FIFO(先进先出原则)排序。新任务进来后,会放到该队列的队尾,有界的数组可以防止资源耗尽问题。
- ② LinkedBlockingQuene:基于链表的无界阻塞队列(其实最大容量为Interger.MAX),按照FIFO排序。由于该队列的近似无界性,当线程池中线程数量达到corePoolSize后,再有新任务进来,会一直存入该队列,而不会去创建新线程直到maxPoolSize,因此使用该工作队列时,参数maxPoolSize其实是不起作用的。吞吐量通常要高于ArrayBlockingQueue 。静态工厂方法 Executors.newFixedThreadPool( ) 使用了该队列
- ③ SynchronousQuene 是一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene ,静态工厂方法 Executors.newCachedThreadPool() 用的是此队列
- ④ PriorityBlockingQueue:具有优先级的无界阻塞队列,优先级通过参数Comparator实现。
- maximumPoolSize:线程池最大线程数量,包括了核心线程数量和救急线程数量
- threadFactory:线程工厂,可以给线程设置名字等
- handler:拒绝执行处理器 处理拒绝策略 在处理过程中具体讲解
- keepAliveTime:线程活动保持时间,线程池工作线程空闲后,保持存活的时间,所以,如果任务很多,并且每个任务执行时间很多,可以调大存活时间,提高线程利用率
- unit:空闲线程存活时间单位
3. 线程池的实现原理
3.1 ThreadPoolExecutor 线程池主要处理流程
- 使用者 发布任务
- 如果当前运行的线程少于 核心线程数(corePoolSize),则创建新线程来执行任务(这一步需要获得全局锁,不然会引发线程安全问题)
- 如果运行的线程等于或者大于corePoolSize 则将任务加入阻塞队列(BlockQueue)
- 如果BlockQueue 已满,无法将任务加入队列,则创建新线程来处理任务(这同样需要获得全局锁)
- 此处就用到救急线程,其数量就是最大线程数减去核心线程数的数量
- 如果创建新线程使当前运行的线程超出maximumPoolSize 任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution(Runnable r, ThreadPoolExecutor executor) 方法
-
拒绝策略 jdk 提供了 4 种实现
-
AbortPolicy:让调用者抛出 RejectedExecutionException 异常,这是默认策略
-
CallerRunsPolicy:让调用者运行任务
-
DiscardPolicy:放弃本次任务
-
DiscardOldestPolicy:放弃队列中最早的任务,本任务取而代之
-
明白了上述内容 我们就可以看看源码是如何实现的
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 工作线程数小于核心线程数 就 调用addWorker 创建新线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果线程池还在运行,并且可以加入阻塞队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 重新检查线程池是否还在运行 或者有的线程已经死了 必要时回滚队列并且采用拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 线程池仍在运行 采用救急线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果创建线程失败,则采用拒绝策略
else if (!addWorker(command, false))
reject(command);
}
3.2 线程池方法解析
① 添加线程的addWorker( ) 方法
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (; ; ) {
// 获取表示状态和线程数的原子整数
int c = ctl.get();
// 获取线程池状态
int rs = runStateOf(c);
// 如果线程池状态不是 RUNNING 或者 阻塞队列中有任务 则创建线程失败
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN &&
firstTask == null &&
!workQueue.isEmpty()))
return false;
for (; ; ) {
// 获取线程个数
int wc = workerCountOf(c);
// 如果线程数大于容量 或者 大于核心线程数或者最大线程数(用哪个绑定取决于传入的core)则创建线程失败
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 在多线程情况下 如果CAS创建线程 修改 原子整数 失败 则回滚到retry 重新循环
if (compareAndIncrementWorkerCount(c))
break retry;
// 重新获取 表示状态和线程个数的原子整数
c = ctl.get();
// 如果 运行状态和当初不同,则回滚重新循环
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 创建新线程处理任务
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 创建新线程需要获得全局锁
mainLock.lock();
try {
//加锁的同时再次检测 避免在释放锁之前调用了shut down
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 再次确认线程存活
if (t.isAlive())
throw new IllegalThreadStateException();
// 将该线程 加入到 HashSet集合中(线程池)
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// 更新标志位
workerAdded = true;
}
} finally {
// 释放锁
mainLock.unlock();
}
// 如果工作线程成功添加,开始线程开始工作 并更新标志位
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 如果线程启动失败
// 调用addWorkerFailed(w)方法: 删除该工作线程 工作线程数减一,并且尝试终止线程池
if (!workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
② 向线程池提交任务
- execute() : 上面已经分析过,用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功
- submit() :用于提交需要返回值的任务,线程池会返回一个 future 对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future 的get () 方法来获取返回值,get() 方法会阻塞当前线程直到任务完成,也可以使用带超时时间的get() ,这里着重分析该方法
使用
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(2);
// 通过submit执行Callable中的call方法
Future<String> future = threadPool.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return "OKK";
}
});
try {
// 通过future 来获得返回值
System.out.println(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
③ 关闭线程池的方法
shutdown() 将线程池的状态设置成 SHUTDOWN 中断没有执行任务的线程,其他线程执行完任务,自己消亡
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
// 获取全局锁
mainLock.lock();
try {
// 通过安全管理器看是否有权关闭线程池
checkShutdownAccess();
// 将线程池状态设置为 SHUTDOWN
advanceRunState(SHUTDOWN);
// 打断 空闲的工作线程
interruptIdleWorkers();
// 给子类提供一些扩展方法
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 尝试终结线程池
tryTerminate();
}
final void tryTerminate() {
for (; ; ) {
// 获取存储状态和线程数量的 原子整数
int c = ctl.get();
// 如果存在以下三种情况,尝试终结线程池失败
// 1、线程池状态为RUNNING
// 2、线程池状态为 RUNNING SHUTDOWN STOP (状态值大于TIDYING)
// 3、线程池状态为SHUTDOWN,但阻塞队列中还有任务等待执行
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
return;
// 如果存活线程数不为0 打断空闲的线程
if (workerCountOf(c) != 0) {
// Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
// 获取全局锁
mainLock.lock();
try {
// 尝试使用CAS将线程池状态改为 TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
// 通过CAS将线程池状态改为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
shutdownNow() 首先将线程池状态设置为STOP,然后尝试停止所有的正在执行或暂停人物的线程,并返回等待执行的任务的列表
public List<Runnable> shutdownNow() {
// 返回的任务列表
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
// 获取全局锁
mainLock.lock();
try {
// 通过安全管理器看是否有权关闭线程池
checkShutdownAccess();
// 将线程池状态设置为STOP
advanceRunState(STOP);
// 遍历打断所有线程
interruptWorkers();
// 将未执行的任务从队列中移除,然后返回给调用者
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
4. 合理地配置线程池
可以从以下角度分析:
- 任务的性质:CPU密集型任务、IO密集型任务、混合型任务
- CPU密集型应该配置尽可能小的线程,通常采用cpu核数+1能够实现最优的CPU利用率,+1是保证当线程由于页缺失故障(操作系统或其它原因导致暂停时,额外的这个线程就能顶上去,保证CPU时钟周期不被浪费
- IO密集型应该配置尽可能多的线程,因为CPU不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用CPU资源,但当你执行IO操作时、远程RPC调用时,包括进行数据库操作时,这时候CPU就闲下来了,你可以利用多线程提高它的利用率。
- 任务的优先级:高、中和低
- 任务的执行时间:长、中和短
- 任务的依赖性:是否依赖其他系统资源,如数据库连接
注意:
建议使用有界队列,有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点,比如几千。
有一次,我们系统里后台任务线程的队列和线程池全满了,不断抛出抛弃任务的异常,通过排查发现是数据库出现的问题,导致执行SQL变得非常缓慢,因为后台任务线程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的工作线程全部阻塞,任务积压在线程池里,如果当时我们设置成无界队列,那么线程池的队列就会越来越多,最多可能会撑满内存,OutOfMemory,导致整个系统不可用
5. 线程池的监控
如果在系统中大量使用线程池,则有必要对线程池进行监控,方便在出现问题时,快速定位问题。可以通过线程池提供的参数进行监控:
- largestPoolSize:线程池里曾经创建过的最大线程数量,可以判断知道线程池是否满过
- completedTaskCount:线程池已完成的任务数量
- getPoolSize( ):线程池的线程数量
- getActiveCount():获取活动的线程数
也可以通过扩展线程池进行监控,通过继承线程池来自定义线程池,重写线程池的下列方法,在一些特定的时间段进行一些监控
- protected void beforeExecute(Thread t, Runnable r) { }
- protected void afterExecute(Runnable r, Throwable t) { }
- protected void terminated() { }
二、手写线程池
1. 实现阻塞队列
主要字段
- 任务队列queue 用于存放发布的任务
- ReentrantLock加锁保证取放任务的线程安全
- fullWaitSet 和 emptyWaitSet 作为任务队列满或者空 时的等待队列
主要方法
- 线程池获取任务 T take()
- 重载方法,分别用于没有时间限制的获取任务以及带超时的获取任务
- 若任务队列为空 进入等待队列
- 不为空则选取第一个
- 主线程 添加任务 void put(T task)
- 获取 任务队列的任务个数 size ()
/**
* 阻塞队列
* @param <T>
*/
class BlockQueue<T> {
/**
* 任务队列
*/
private Deque<T> queue = new ArrayDeque<>();
/**
* 锁
*/
private ReentrantLock lock = new ReentrantLock();
/**
* 任务队列满后 生产者进入等待队列等待
*/
private Condition fullWaitSet = lock.newCondition();
/**
* 任务队列空时,消费者进入等待队列等待
*/
private Condition emptyWaitSet = lock.newCondition();
/**
* 容量
*/
private int capacity;
public BlockQueue(int capacity) {
this.capacity = capacity;
}
/**
* 从任务队列获取任务
* @return
*/
public T take() {
lock.lock();
try {
while (queue.isEmpty()) {
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T task = queue.removeFirst();
fullWaitSet.signal();
return task;
}finally {
lock.unlock();
}
}
/**
* 有超时时间的获取任务
* @param timeout
* @param unit
* @return
*/
public T take(long timeout, TimeUnit unit) {
lock.lock();
try {
//将timeout 转换成纳秒
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()) {
try {
if (nanos <= 0) {
return null;
}
//返回剩余的时间
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T task = queue.removeFirst();
fullWaitSet.signal();
return task;
}finally {
lock.unlock();
}
}
/**
* 添加任务方法
* @param task
*/
public void put(T task) {
lock.lock();
try {
while (queue.size() == capacity) {
try {
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(task);
emptyWaitSet.signal();
}finally {
lock.unlock();
}
}
/**
* 获取任务队列任务数
* @return
*/
public int size () {
lock.lock();
try {
return queue.size();
}finally {
lock.unlock();
}
}
}
2. 实现线程池
主要方法
- void execute (Runnable task)
- 如果 任务数量小于 线程池中的线程数,则创建Worker对象(实现Thread类)来执行任务
- 如果 大于线程数,则先放入阻塞队列中存放
- Worker 的 run 方法
- 如果传入的任务不为空 则执行传入的任务
- 执行完成之后 继续执行任务队列中的任务
- 全部结束之后删除该线程
/**
* 线程池
*/
class ThreadPool {
/**
* 任务队列
*/
private BlockQueue<Runnable> taskQueue;
/**
* 线程集合
*/
private HashSet<Worker> workers = new HashSet();
/**
* 核心线程数
*/
private int coreSize;
/**
* 获取任务超时时间
*/
private long timeout;
/**
* 时间工具
*/
private TimeUnit timeUnit;
/**
* 执行任务
* @param task
*/
public void execute (Runnable task) {
synchronized (workers) {
// 当任务数没有超过核心线程数,直接交给线程执行
if (workers.size() < coreSize) {
Worker worker = new Worker(task);
workers.add(worker);
System.out.println("新增worker对象"+worker);
worker.start();
}else {
//超过核心线程数 就加入任务队列暂存
System.out.println("线程数满,将任务加入任务队列" + task);
taskQueue.put(task);
}
}
}
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapacity) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockQueue<>(queueCapacity);
}
/**
* 真正执行任务的线程
*/
private class Worker extends Thread{
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
while (task != null || (task = taskQueue.take(timeout, timeUnit)) != null) {
try {
System.out.println("正在执行任务"+task);
task.run();
}catch (Exception e) {
e.printStackTrace();
}finally {
task = null;
}
}
synchronized (workers) {
System.out.println("删除"+this+"线程");
workers.remove(this);
}
}
}
}
3. 测试
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(2,1000,TimeUnit.MILLISECONDS,10);
for (int i = 0; i < 5; i++) {
int j = i;
threadPool.execute(() -> {
System.out.println("任务"+j + "执行完成");
});
}
}
- 设置线程数为2个,执行代码 前两个任务进入时 创建两个线程执行任务,之后任务则无法执行进入任务队列
- 等两个线程执行完任务继续 获取任务队列中的任务执行,如果超过任务获取等待时间,退出执行任务循环,没有任务后删除线程。
以上实现了基本的线程池,但是如果任务数量庞大,并且执行任务比较缓慢,任务队列满后,迟迟等不到解决,并且有新的任务来,会一直处于等待状态,所以要添加拒绝策略
4. 拒绝策略
4.1 带超时的添加任务
/**
* 待超时的添加任务
* @param task
* @param timeout
* @param timeUnit
* @return
*/
public boolean put(T task, long timeout, TimeUnit timeUnit) {
lock.lock();
try {
long nanos = timeUnit.toNanos(timeout);
while (queue.size() == capacity) {
try {
if (nanos <= 0) {
return false;
}
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(task);
emptyWaitSet.signal();
return true;
}finally {
lock.unlock();
}
}
4.2 拒绝策略(策略模式)
队列满后,新的任务可以选择继续死等,带超时的等待,放弃执行任务,抛出异常等等很多解决策略,所以应该把选择权交给工程师,提高可扩展性,把所有操作抽象成接口,让使用者自己实现
/**
* 拒绝策略
* @param <T>
*/
@FunctionalInterface
interface RejectPolicy<T> {
void reject(BlockQueue<T> queue, T task);
}
给 ThreadPool 线程池加入拒绝策略属性,并在构造方法中初始化
/**
* 拒绝策略
*/
private RejectPolicy<Runnable> rejectPolicy;
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapacity, RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockQueue<>(queueCapacity);
this.rejectPolicy = rejectPolicy;
}
将执行任务中 加入任务队列的方法改成 tryPut() 尝试加入队列
/**
* 执行任务
* @param task
*/
public void execute (Runnable task) {
synchronized (workers) {
// 当任务数没有超过核心线程数,直接交给线程执行
if (workers.size() < coreSize) {
Worker worker = new Worker(task);
workers.add(worker);
System.out.println("新增worker对象"+worker);
worker.start();
}else {
//超过核心线程数 就加入任务队列暂存
//taskQueue.put(task);
taskQueue.tryPut(rejectPolicy, task);
}
}
}
并且在阻塞队列中实现该方法
- 上锁,保证队列的线程安全
- 如果队列满了,则调用自己实现的拒绝策略处理
- 没有满则直接加入任务队列
/**
* 尝试加入任务队列
* @param rejectPolicy
* @param task
*/
public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try {
if (queue.size() == capacity) {
rejectPolicy.reject(this,task);
}else {
System.out.println("加入任务队列"+task);
queue.addLast(task);
emptyWaitSet.signal();
}
}finally {
lock.unlock();
}
}
4.3 测试利用带超时时间的拒绝策略
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(1,1000,TimeUnit.MILLISECONDS,
1, (queue, task) -> {
queue.put(task,500,TimeUnit.MILLISECONDS);//测试使用有超时时间的拒绝策略
});
for (int i = 0; i < 3; i++) {
int j = i;
threadPool.execute(() -> {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务"+j + "执行完成");
});
}
}
- 设置阻塞队列大小为 1, 线程池线程数为1,执行第一个任务时, 第二个任务进入任务队列,第三个任务等待进入队列,0.5秒后第一个任务没有完成,任务队列还是满的,所以第三个任务放弃加入任务队列,所以最后只完成了两个任务。
5. 完整代码
package threadPool_Test;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* @Description: 自定义线程池
* @Author: Aiguodala
* @CreateDate: 2021/4/11 14:59
*/
public class ThreadPoolDemo {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(1,1000,TimeUnit.MILLISECONDS,
1, (queue, task) -> {
queue.put(task,500,TimeUnit.MILLISECONDS);//测试使用有超时时间的拒绝策略
});
for (int i = 0; i < 3; i++) {
int j = i;
threadPool.execute(() -> {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务"+j + "执行完成");
});
}
}
}
/**
* 拒绝策略
* @param <T>
*/
@FunctionalInterface
interface RejectPolicy<T> {
void reject(BlockQueue<T> queue, T task);
}
/**
* 线程池
*/
class ThreadPool {
/**
* 任务队列
*/
private BlockQueue<Runnable> taskQueue;
/**
* 线程集合
*/
private HashSet<Worker> workers = new HashSet();
/**
* 核心线程数
*/
private int coreSize;
/**
* 获取任务超时时间
*/
private long timeout;
/**
* 时间工具
*/
private TimeUnit timeUnit;
/**
* 拒绝策略
*/
private RejectPolicy<Runnable> rejectPolicy;
/**
* 执行任务
* @param task
*/
public void execute (Runnable task) {
synchronized (workers) {
// 当任务数没有超过核心线程数,直接交给线程执行
if (workers.size() < coreSize) {
Worker worker = new Worker(task);
workers.add(worker);
System.out.println("新增worker对象"+worker);
worker.start();
}else {
//超过核心线程数 就加入任务队列暂存
//taskQueue.put(task);
taskQueue.tryPut(rejectPolicy, task);
}
}
}
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapacity, RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockQueue<>(queueCapacity);
this.rejectPolicy = rejectPolicy;
}
/**
* 真正执行任务的线程
*/
private class Worker extends Thread{
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
while (task != null || (task = taskQueue.take(timeout, timeUnit)) != null) {
try {
System.out.println("正在执行任务"+task);
task.run();
}catch (Exception e) {
e.printStackTrace();
}finally {
task = null;
}
}
synchronized (workers) {
System.out.println("删除"+this+"线程");
workers.remove(this);
}
}
}
}
/**
* 阻塞队列
* @param <T>
*/
class BlockQueue<T> {
/**
* 任务队列
*/
private Deque<T> queue = new ArrayDeque<>();
/**
* 锁
*/
private ReentrantLock lock = new ReentrantLock();
/**
* 任务队列满后 生产者进入等待队列等待
*/
private Condition fullWaitSet = lock.newCondition();
/**
* 任务队列空时,消费者进入等待队列等待
*/
private Condition emptyWaitSet = lock.newCondition();
/**
* 容量
*/
private int capacity;
public BlockQueue(int capacity) {
this.capacity = capacity;
}
/**
* 从任务队列获取任务
* @return
*/
public T take() {
lock.lock();
try {
while (queue.isEmpty()) {
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T task = queue.removeFirst();
fullWaitSet.signal();
return task;
}finally {
lock.unlock();
}
}
/**
* 有超时时间的获取任务
* @param timeout
* @param unit
* @return
*/
public T take(long timeout, TimeUnit unit) {
lock.lock();
try {
//将timeout 转换成纳秒
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()) {
try {
if (nanos <= 0) {
return null;
}
//返回剩余的时间
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T task = queue.removeFirst();
fullWaitSet.signal();
return task;
}finally {
lock.unlock();
}
}
/**
* 添加任务方法
* @param task
*/
public void put(T task) {
lock.lock();
try {
while (queue.size() == capacity) {
try {
System.out.println("等待加入任务队列"+task);
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("加入任务队列"+task);
queue.addLast(task);
emptyWaitSet.signal();
}finally {
lock.unlock();
}
}
/**
* 待超时的添加任务
* @param task
* @param timeout
* @param timeUnit
* @return
*/
public boolean put(T task, long timeout, TimeUnit timeUnit) {
lock.lock();
try {
long nanos = timeUnit.toNanos(timeout);
while (queue.size() == capacity) {
try {
if (nanos <= 0) {
return false;
}
System.out.println("等待加入任务队列");
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("加入任务队列"+task);
queue.addLast(task);
emptyWaitSet.signal();
return true;
}finally {
lock.unlock();
}
}
/**
* 获取任务队列任务数
* @return
*/
public int size () {
lock.lock();
try {
return queue.size();
}finally {
lock.unlock();
}
}
/**
* 尝试加入任务队列
* @param rejectPolicy
* @param task
*/
public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try {
if (queue.size() == capacity) {
rejectPolicy.reject(this,task);
}else {
System.out.println("加入任务队列"+task);
queue.addLast(task);
emptyWaitSet.signal();
}
}finally {
lock.unlock();
}
}
}
三、 Executor 框架
1、概述
对于线程池的一些参数、实现原理或者实现过程等内容可以看我另一篇博客 点击此处
在HotSpot 虚拟机的线程模型中,Java线程(java.lang.Thread)被一对一映射为本地操作系统线程。Java线程启动时,会创建一个本地操作系统线程;当该Java线程终止时,这个操作系统线程也会被回收。操作系统会调度所有与线程并将它们分配给可用的CPU。
在上层,Java多线程程序通常会把应该分解成若干个任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程;在底层,操作系统内核将这些线程映射到硬件处理器上。
应用程序通过Executor框架控制上层的调度;而下层的调度由操作系统内核控制,下层的调度不受应用程序的控制。
2、FIxedThreadPool 详解
FIxedThreadPool 被称为可重用固定线程数的线程池,适用于为了满足资源管理的需求,而需要限制当前线程数量的应用场景,适用于负载比较重的服务器。
静态工厂的实现:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- corePoolSize(核心线程数)和 maximumPoolSize(最大线程数)都是指定的nThreads,所以,不存在救急线程
- keepAliveTime 设置为0,说明一旦线程没有任务执行就会被立即终止
- 使用无界队列 LinkedBlockingQueue 作为线程池的工作队列(队列的容量为Integer.MAX_VALUE)造成以下影响:
- 线程池线程数等于核心线程数(corePoolSize)时,进入队列等待,队列中可以一直添加任务,所以maximumPoolSize 是无效参数
- 所以运行中的FIxedThreadPool 如果未执行shutdown() 或 shutdownNow() 方法,则不会采用拒绝策略
3、SingleThreadExecutor 详解
SingleThreadExecutor 是使用单个worker 线程的Executor,适用于需要保证顺序的执行各个任务;并且在任意时间点,不会有多个线程的应用场景
静态工厂的实现:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- 大体配置和FIxedThreadPool 类似,只是将 核心线程数和最大线程数 都设置为了 1,为了保证任务有顺序的进行
4、CachedThreadPool 详解
CachedThreadPool 是一个会根据需要创建新线程的线程池,大小无界,适用于执行很多的短期异步任务的小程序,或者是负载较轻的服务器
静态工厂的实现:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- 核心线程数(corePoolSize)设置为0,最大线程数(maximumPoolSize)设置为Intger.MAX_VALUE表示无界,keepAliveTime 为 60L 表示空闲线程最长等待新任务的时间为60秒
- 使用没有容量的SynchronousQueue 作为工作队列,每个插入操作必须等待另一个线程的对应移除操作
执行过程
- 首先执行SynchronousQueue.offer()操作,如果当前有空闲线程正在执行SynchronousQueue.poll() 操作获取任务,那么配对成功,主线程把任务交给空闲线程执行
- 如果 初始maximumPool 为空或者没有空闲线程,则会创建新线程执行任务
- 如果空闲线程等待时间超过60秒,则线程会被终止,所以保持空闲的CachedThreadPool 不会占用任何资源
5、ScheduledThreadPoolExecutor 详解
- ScheduledThreadPoolExecutor :包含若干个线程,适用于需要多个后台线程执行周期任务,同时为了满足资源管理的需求而需要限制后台线程的数量的应用场景
- SingleThreadScheduledExecutor:只包含一个线程,适用于需要单个后台线程执行周期任务,同时需要保证顺序的执行各个任务的应用场景
- 都使用了DelayQueue 作为任务队列
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
1. 运行机制以及实现
1. 概述
当调用 ScheduledThreadPoolExecutor 的 scheduleWithFixedDelay() 或者 scheduleAtFixedRate() 方法时,会向 ScheduledThreadPoolExecutor 的 DelayQueue 中添加一个实现了RunnableScheduledFuture接口的ScheduledFutureTask,DelayQueue 封装了一个PriorityQueue,这个PriorityQueue会对队列中的ScheduledFutureTask 排序,time 小的会排在前面更早执行,time相同则比较sequenceNumber 小的早执行(相当于早入队的早执行)
ScheduledFutureTask主要包含三个成员变量
/** 表示这个任务将要被添加到ScheduledThreadPoolExecutor中的序号 */
private final long sequenceNumber;
/** 将要被执行的具体时间 */
private long time;
/** 表示任务执行的间隔周期 */
private final long period;
内部维护的优先队列的具体实现
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
2. 执行周期任务的过程
1) 工作线程从DelayQueue 中获取time 大于等于当前时间的ScheduledFutureTask (DelayQueue.take() )
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 获取全局锁
lock.lockInterruptibly();
try {
for (;;) {
// 获取到期的任务
RunnableScheduledFuture<?> first = queue[0];
// 如果队列为空 当前线程就进入Condition中等待
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
// 如果已经超过时间,则结束poll该任务并移除任务
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
// 如果前导线程不为空,则当前线程就进入Condition中等待
// 让前导线程先获取任务
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 等待到执行该任务的时间
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 前导线程为空并且队列不空,则唤醒所有线程
if (leader == null && queue[0] != null)
available.signal();
// 释放锁
lock.unlock();
}
}
2) 工作线程执行这个 ScheduledFutureTask
3) 工作线程修改ScheduledFutureTask 的 time 为下次将要被执行的时间
4) 把这个修改好的 ScheduledFutureTask 加入到DelayQueue (DelayQueue.add() )
public boolean add(Runnable e) {
return offer(e);
}
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
// 判断队列容量是否足够,不够则扩容
if (i >= queue.length)
grow();
size = i + 1;
// 添加任务
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
siftUp(i, e);
}
// 如果添加的是任务队列中的一个元素,则唤醒所有线程
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
6、FutureTask
FutureTask 除了实现 Future 接口外,还间接实现了 Runnable 接口。因此,FutureTask可以交给 Executor 执行,也可以由调用线程直接执行。
FutureTask 有以下 3 种状态
- 未启动:在调用FutureTask.run( ) 之前,FutureTask 处于未启动状态
- 已启动:调用了FutureTask.run( ) 方法,任务执行中,处于已启动状态
- 已完成:FutureTask.run() 方法执行完正常结束,或被取消FutureTask.cancel(…),或抛出异常而结束,都属于已完成状态
当FutureTask处于未启动或者已启动状态,执行FutureTask.get() 方法将导致调用线程阻塞,当FutureTask处于完成状态时,执行FutureTask.get() 方法,将导致调用线程立即返回结果或者抛出异常
当FutureTask 处于未启动状态,执行FutureTask.cancel() 方法将导致此任务永远不会被执行;当FutureTask处于已启动状态时,执行FutureTask.cancel(true)方法将以中断执行此任务线程的方式来试图停止任务;当FutureTask处于已启动状态时,执行FutureTask.cancel(false) 方法将不会对正在执行此任务的线程产生影响;当FutureTask 处于已完成状态时,FutureTask.cancel(…)方法会返回false
四、Fork / Join 框架
Fork / Join 框架是Java 7 提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务的结果得到大任务的框架
1. 使用 Fork / Join 框架
package threadPool_Test;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
/**
* @Description: 举例从 1 加到 100 拆分成小任务
* @Author: Aiguodala
* @CreateDate: 2021/4/16 12:13
*/
public class ForkJoinTest {
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
ContTask task = new ContTask(1,100);
// 用ForkJoinPool 执行任务
ForkJoinTask<Integer> res = forkJoinPool.submit(task);
try {
System.out.println(res.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
class ContTask extends RecursiveTask<Integer> {
/**
* 执行加操作的阈值
*/
private static final int THRESHOLD = 2;
private int start;
private int end;
public ContTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int res = 0;
boolean canCompute = (end - start) <= THRESHOLD;
// 如果子任务分割的足够小,可以开始计算
if (canCompute) {
System.out.println(Thread.currentThread().getName() + "计算");
for (int i = start; i <= end; i++) {
res += i;
}
}else {
// 如果任务大于阈值 就分裂成两个子任务
int mid = (start + end) / 2;
System.out.println("创建分任务");
ContTask task1 = new ContTask(start, mid);
ContTask task2 = new ContTask(mid + 1, end);
// 执行子任务
task1.fork();
task2.fork();
// 获取子任务结果
int task1Res = task1.join();
int task2Res = task2.join();
// 合并结果
res = task1Res + task2Res;
}
return res;
}
}
Fork / Join 框架主要有两步
- 步骤一:分割任务。首先需要把大任务分割成子任务,可能子任务还是很大,可以继续递归分割,出口取决于自己对逻辑的实现
- 步骤二:执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列中获取任务执行,子任务执行完的结果统一放在一个队列里,启动一个线程对结果进行合并
Fork / Join 通过两个类实现以上过程
- ForkJoinTask:我们要使用Fork/Join框架,必须首先创建一个ForkJoin 任务,它提供了在任务中执行fork() 和 join() ,通常我们不需要直接继承ForkJoinTask,只需要继承它的子类
- RecursiveTask 用于有返回值的任务
- RecursiveAction 用于没有返回值的任务
- ForkJoinPool:ForkJoinTask需要通过ForkJoinPool 来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列,进入队列的头部,当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务(工作窃取算法)
2. Fork / Join 框架 实现原理
WorkQueue 主要字段
// 使用此注解的对象或字段的前后各增加 128 字节大小的 padding(空白),
// 从而让 CPU 将对象预读至缓存时占用不同的缓存行
@sun.misc.Contended
// 提交的任务数组
ForkJoinTask<?>[] array;
// 该工作队列的拥有线程
final ForkJoinWorkerThread owner;
ForkJoinTask 的fork 方法
如果当前线程是ForkJoinWorkerThread 就加入到任务队列
public final ForkJoinTask<V> fork() {
Thread t;
// 如果当前线程是ForkJoinWorkerThread 就加入到任务队列
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
把任务放到工作队列之后唤醒或者创建线程执行它
final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; ForkJoinPool p;
int b = base, s = top, n;
if ((a = array) != null) {
// ignore if queue removed
int m = a.length - 1; // fenced write for task visibility
// 加入到工作队列
U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
U.putOrderedInt(this, QTOP, s + 1);
if ((n = s - b) <= 1) {
if ((p = pool) != null)
// 唤醒线程执行任务
p.signalWork(p.workQueues, this);
}
else if (n >= m)
growArray();
}
}
ForkJoinTask 的 Join() 方法
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
通过doJoin() 来判断任务状态,任务状态有四种:
- 已完成(NORMAL):如果是已完成,则直接返回结果
- 被取消(CANCELLED):任务状态是被取消则抛出CancellationException
- 信号(SIGNAL)
- 出现异常(EXCEPTIONAL):则直接抛出相应异常
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
参考《Java 并发编程的艺术》
转载:https://blog.csdn.net/weixin_48922154/article/details/115681974