一.简介
线程池(Thread Pool)是一种基于池化思想的管理线程的工具,经常出现在多线程服务器中,如MySQL。
线程过多会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,同时也降低了计算机整体性能。线程池维护多个线程,等待监督管理者分配可并发执行的任务。这种做法,一方面避免处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。
优势
- 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
- 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
- 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
- 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。
不足
很多业务依赖同一个线程池,当其中一个业务因为各种不可控的原因消耗了所有的线程,导致线程池全部占满。解决方法进行业务划分,多线程池。
二.原理
线程池解决的核心问题是资源管理问题,运用“池化”思想解决这个问题,最大化收益并最小化风险,而将资源统一在一起管理的一种思想。
三.示例
3.1 分类(jdk1.8)
- newCachedThreadPool 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则创建新线程。
- newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
- newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
- newSingleThreadExecutor 创建一个单线程化线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照(FIFO, LIFO, 优先级)执行。
- newWorkStealingPool 并行线程池,parallelism参数传入一个线程并发的数量,这里和之前就有很明显区别,前面4种线程池都有核心线程数、最大线程数等等,而这就使用了一个并发线程数解决问题,这个线程池不会保证任务的顺序执行,也就是 WorkStealing 的意思,抢占式的工作。
3.2 工厂类
线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
public class ThreadPool {
public static ThreadFactory threadFactory(String threadName,Boolean isDaemon){
ThreadFactory result = new ThreadFactory(){
AtomicInteger num = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(isDaemon);
t.setName(threadName+num.incrementAndGet());
return t;
}
};
return result;
}
/**
* corePoolSize : corePoolSize线程池中要保持的数量,即时它们是空闲的除非设置allowCoreThreadTimeOut属性为true
* maximumPoolSize:最大允许在池中最大线程数
* keepAliveTime:当线程的数量大于核心时,这是空闲线程在终止之前等待新任务的最大时间。(超过核心的线程数的线程如果长时间获得不到新任务就会终止掉)
* unit : keepAliveTime的单位 如TimeUnit.SECONDS 代表秒
* workQueue:工作队列(实现BlockingQueue接口的阻塞队列)用于在执行任务之前保存任务的队列。此队列只保留execute方法提交的Runnable任务。常使用的阻塞队列LinkedBlockingQueue、SynchronousQueue、ArrayBlockingQueue。使用SynchronousQueue队列表示,没有缓存队列在存储多余的线程。
* threadFactory:创建线程时工厂
* handler: 处理程序在执行被阻塞时使用的处理程序,因为达到线程边界和队列容量
* handler有以下四种策略:
*
* ThreadPoolExecutor.AbortPolicy():丢弃任务并抛出RejectedExecutionException异常。 (ThreadPoolExecutor默认)
* ThreadPoolExecutor.CallerRunsPolicy():由调用线程处理该任务 。
* ThreadPoolExecutor.DiscardPolicy():也是丢弃任务,但是不抛出异常。
* ThreadPoolExecutor.DiscardOldestPolicy():丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
*
* @param threadNum corePoolSize
* @param threadName maximumPoolSize
* @param isDaemon
* @return
*/
public static ThreadPoolExecutor newCachedThreadPool(int threadNum,String threadName,Boolean isDaemon){
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(threadNum,threadNum,120L, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(10 * threadNum), threadFactory(threadName, isDaemon));
//核心线程在规定时间内会被回收
threadPool.allowCoreThreadTimeOut(true);
return threadPool;
}
public static ThreadPoolExecutor newFixedThreadPool(int threadNum,String threadName,Boolean isDaemon){
return new ThreadPoolExecutor(threadNum, threadNum,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory(threadName, isDaemon));
}
public static ScheduledThreadPoolExecutor getDefaultScheduler(){
ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(20,threadFactory("BDP-Default-Scheduler-Thread-", false));
scheduler.setMaximumPoolSize(20);
scheduler.setKeepAliveTime(5,TimeUnit.MINUTES);
return scheduler;
}
}
newCachedThreadPool
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
public static void main(String[] args) throws InterruptedException {
ExecutorService pool = ThreadPool.newCachedThreadPool(2,"TestNewCachedThreadPoolPool-",false);
// 创建线程
Thread t1 = new MyThread();
Thread t2 = new MyThread();
Thread t3 = new MyThread();
Thread t4 = new MyThread();
Thread t5 = new MyThread();
// 将线程放入池中进行执行
pool.execute(t1);
pool.execute(t2);
/**
* 设置超过keepAliveTime时间就会回收空闲线程
*/
Thread.sleep(1000);
pool.execute(t3);
pool.execute(t4);
pool.execute(t5);
// 关闭线程池
pool.shutdown();
}
class MyThread extends Thread {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "正在执行。。。");
}
}
newFixedThreadPool
创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池最大大小,线程池大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池就会补充一个新的线程。
public class TestNewFixedThreadPool {
public static void main(String[] args) {
ExecutorService pool = ThreadPool.newFixedThreadPool(5,"TestNewFixedThreadPool-",false);
// 创建线程
Thread t1 = new MyThread();
Thread t2 = new MyThread();
Thread t3 = new MyThread();
Thread t4 = new MyThread();
Thread t5 = new MyThread();
// 将线程放入池中进行执行
pool.execute(t1);
pool.execute(t2);
pool.execute(t3);
pool.execute(t4);
pool.execute(t5);
// 关闭线程池
pool.shutdown();
}
}
class MyThread extends Thread {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "正在执行。。。");
}
}
线程池容量5
TestNewFixedThreadPool-1正在执行。。。
TestNewFixedThreadPool-5正在执行。。。
TestNewFixedThreadPool-3正在执行。。。
TestNewFixedThreadPool-4正在执行。。。
TestNewFixedThreadPool-2正在执行。。。
线程池容量2
TestNewFixedThreadPool-1正在执行。。。
TestNewFixedThreadPool-2正在执行。。。
TestNewFixedThreadPool-1正在执行。。。
TestNewFixedThreadPool-2正在执行。。。
TestNewFixedThreadPool-1正在执行。。。
newSingleThreadExecutor
创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
pool-1-thread-1正在执行。。。
pool-1-thread-1正在执行。。。
pool-1-thread-1正在执行。。。
pool-1-thread-1正在执行。。。
pool-1-thread-1正在执行。。。
newScheduledThreadPool
public static void main(String[] args) {
ScheduledThreadPoolExecutor exec = ThreadPool.getDefaultScheduler();
exec.scheduleAtFixedRate(new Runnable() {
//每隔一段时间就触发异常
@Override
public void run() {
//throw new RuntimeException();
System.out.println("================");
}
}, 1000, 5000, TimeUnit.MILLISECONDS);
//每隔一段时间打印系统时间,证明两者是互不影响的
exec.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println(System.nanoTime());
}
}, 1000, 2000, TimeUnit.MILLISECONDS);
}
参考
https://www.cnblogs.com/dolphin0520/p/3932921.html
https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html
https://crossoverjie.top/2018/07/29/java-senior/ThreadPool/
转载:https://blog.csdn.net/qq_19968255/article/details/108783510