飞道的博客

Java线程池

488人阅读  评论(0)

一.简介

线程池(Thread Pool)是一种基于池化思想的管理线程的工具,经常出现在多线程服务器中,如MySQL。

线程过多会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,同时也降低了计算机整体性能。线程池维护多个线程,等待监督管理者分配可并发执行的任务。这种做法,一方面避免处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。

优势

  • 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
  • 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
  • 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
  • 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。

不足

很多业务依赖同一个线程池,当其中一个业务因为各种不可控的原因消耗了所有的线程,导致线程池全部占满。解决方法进行业务划分,多线程池。

二.原理

线程池解决的核心问题是资源管理问题,运用“池化”思想解决这个问题,最大化收益并最小化风险,而将资源统一在一起管理的一种思想。

三.示例

3.1 分类(jdk1.8)

  • newCachedThreadPool 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则创建新线程。
  • newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
  • newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
  • newSingleThreadExecutor 创建一个单线程化线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照(FIFO, LIFO, 优先级)执行。
  • newWorkStealingPool 并行线程池,parallelism参数传入一个线程并发的数量,这里和之前就有很明显区别,前面4种线程池都有核心线程数、最大线程数等等,而这就使用了一个并发线程数解决问题,这个线程池不会保证任务的顺序执行,也就是 WorkStealing 的意思,抢占式的工作。

3.2 工厂类

线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

public class ThreadPool {
   
    public static ThreadFactory threadFactory(String threadName,Boolean isDaemon){
   
        ThreadFactory result = new ThreadFactory(){
   
            AtomicInteger num = new AtomicInteger(0);
            @Override
            public Thread newThread(Runnable r) {
   
                Thread t = new Thread(r);
                t.setDaemon(isDaemon);
                t.setName(threadName+num.incrementAndGet());
                return t;
            }
        };
        return result;
    }
    /**
     *  corePoolSize : corePoolSize线程池中要保持的数量,即时它们是空闲的除非设置allowCoreThreadTimeOut属性为true
     *  maximumPoolSize:最大允许在池中最大线程数
     *  keepAliveTime:当线程的数量大于核心时,这是空闲线程在终止之前等待新任务的最大时间。(超过核心的线程数的线程如果长时间获得不到新任务就会终止掉)
     *  unit : keepAliveTime的单位 如TimeUnit.SECONDS 代表秒
     *  workQueue:工作队列(实现BlockingQueue接口的阻塞队列)用于在执行任务之前保存任务的队列。此队列只保留execute方法提交的Runnable任务。常使用的阻塞队列LinkedBlockingQueue、SynchronousQueue、ArrayBlockingQueue。使用SynchronousQueue队列表示,没有缓存队列在存储多余的线程。
     *  threadFactory:创建线程时工厂
     *  handler: 处理程序在执行被阻塞时使用的处理程序,因为达到线程边界和队列容量
     *  handler有以下四种策略:
     *
     *  ThreadPoolExecutor.AbortPolicy():丢弃任务并抛出RejectedExecutionException异常。 (ThreadPoolExecutor默认)
     *  ThreadPoolExecutor.CallerRunsPolicy():由调用线程处理该任务 。
     *  ThreadPoolExecutor.DiscardPolicy():也是丢弃任务,但是不抛出异常。 
     *  ThreadPoolExecutor.DiscardOldestPolicy():丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
     *
     * @param threadNum corePoolSize
     * @param threadName  maximumPoolSize
     * @param isDaemon
     * @return
     */
    public  static ThreadPoolExecutor newCachedThreadPool(int threadNum,String threadName,Boolean isDaemon){
   
        ThreadPoolExecutor threadPool =  new ThreadPoolExecutor(threadNum,threadNum,120L, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(10 * threadNum), threadFactory(threadName, isDaemon));
        //核心线程在规定时间内会被回收
        threadPool.allowCoreThreadTimeOut(true);
        return threadPool;
    }
    public  static ThreadPoolExecutor newFixedThreadPool(int threadNum,String threadName,Boolean isDaemon){
   
        return new ThreadPoolExecutor(threadNum, threadNum,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(),
                threadFactory(threadName, isDaemon));
    }
    public static ScheduledThreadPoolExecutor getDefaultScheduler(){
   
        ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(20,threadFactory("BDP-Default-Scheduler-Thread-", false));
        scheduler.setMaximumPoolSize(20);
        scheduler.setKeepAliveTime(5,TimeUnit.MINUTES);
        return scheduler;
    }
}

newCachedThreadPool

创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

public static void main(String[] args) throws InterruptedException {
   
    ExecutorService pool = ThreadPool.newCachedThreadPool(2,"TestNewCachedThreadPoolPool-",false);
    // 创建线程
    Thread t1 = new MyThread();
    Thread t2 = new MyThread();
    Thread t3 = new MyThread();
    Thread t4 = new MyThread();
    Thread t5 = new MyThread();
    // 将线程放入池中进行执行
    pool.execute(t1);

    pool.execute(t2);
    /**
     * 设置超过keepAliveTime时间就会回收空闲线程
     */
    Thread.sleep(1000);
    pool.execute(t3);
    pool.execute(t4);
    pool.execute(t5);
    // 关闭线程池
    pool.shutdown();
}
class MyThread extends Thread {
   
    @Override
    public void run() {
   
        System.out.println(Thread.currentThread().getName() + "正在执行。。。");
    }
}

newFixedThreadPool

创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池最大大小,线程池大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池就会补充一个新的线程。

public class TestNewFixedThreadPool {
   
    public static void main(String[] args) {
   
        ExecutorService pool = ThreadPool.newFixedThreadPool(5,"TestNewFixedThreadPool-",false);
        // 创建线程
        Thread t1 = new MyThread();
        Thread t2 = new MyThread();
        Thread t3 = new MyThread();
        Thread t4 = new MyThread();
        Thread t5 = new MyThread();
        // 将线程放入池中进行执行
        pool.execute(t1);
        pool.execute(t2);
        pool.execute(t3);
        pool.execute(t4);
        pool.execute(t5);
        // 关闭线程池
        pool.shutdown();
    }
}
class MyThread extends Thread {
   
    @Override
    public void run() {
   
        System.out.println(Thread.currentThread().getName() + "正在执行。。。");
    }
}

线程池容量5

TestNewFixedThreadPool-1正在执行。。。
TestNewFixedThreadPool-5正在执行。。。
TestNewFixedThreadPool-3正在执行。。。
TestNewFixedThreadPool-4正在执行。。。
TestNewFixedThreadPool-2正在执行。。。

线程池容量2

TestNewFixedThreadPool-1正在执行。。。
TestNewFixedThreadPool-2正在执行。。。
TestNewFixedThreadPool-1正在执行。。。
TestNewFixedThreadPool-2正在执行。。。
TestNewFixedThreadPool-1正在执行。。。

newSingleThreadExecutor

创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。

pool-1-thread-1正在执行。。。  
pool-1-thread-1正在执行。。。  
pool-1-thread-1正在执行。。。  
pool-1-thread-1正在执行。。。  
pool-1-thread-1正在执行。。。  

newScheduledThreadPool

public static void main(String[] args) {
   
        ScheduledThreadPoolExecutor exec = ThreadPool.getDefaultScheduler();
        exec.scheduleAtFixedRate(new Runnable() {
   
            //每隔一段时间就触发异常
            @Override
            public void run() {
   
                //throw new RuntimeException();
                System.out.println("================");
            }
        }, 1000, 5000, TimeUnit.MILLISECONDS);
        //每隔一段时间打印系统时间,证明两者是互不影响的
        exec.scheduleAtFixedRate(new Runnable() {
   
            @Override
            public void run() {
   
                System.out.println(System.nanoTime());
            }
        }, 1000, 2000, TimeUnit.MILLISECONDS);
}

参考

https://www.cnblogs.com/dolphin0520/p/3932921.html

https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html

https://crossoverjie.top/2018/07/29/java-senior/ThreadPool/


转载:https://blog.csdn.net/qq_19968255/article/details/108783510
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场