package com.thread.threadpool;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
/**
* 线程池
*/
public class ThreadPool extends Thread{
//线程的数量
private int size;
//任务对列的大小
private final int taskQueueSize;
//默认任务对列的大小
private final static int TASK_QUEUE_SIZE = 2000;
//线程从 0 开始起命名
private static volatile int seq = 0;
//给线程的名字加前缀
private final static String THREAD_PREFIX = "SIMPLE_THREAD_POOL-";
//定义线程组
private final static ThreadGroup GROUP = new ThreadGroup("POOL_GROUP");
//任务队列
private final static LinkedList<Runnable> TASK_QUEUE = new LinkedList<>();
//存放线程的集合
private final static List<WorkerTask> THREAD_QUEUE = new ArrayList<>();
//定一默认的拒绝策略
private final DiscardPolicy discardPolicy;
//线程池的状态
private volatile boolean destroy = false;
//默认拒绝策略
public final static DiscardPolicy DEFAULT_DISCARD_POLICY = ()->{
throw new DiscardException("over ......");
};
//最小线程数
private int min;
//最小线程数默认值
private final static int MIN_VALUE = 4;
//活跃线程数
private int active;
//活跃线程数默认值
private final static int ACTIVE_VALUE = 8;
//最大线程数
private int max;
//最大线程数默认值
private final static int MAX_VALUE = 12;
public ThreadPool(){
this.min = MIN_VALUE;
this.active = ACTIVE_VALUE;
this.max = MAX_VALUE;
this.taskQueueSize = TASK_QUEUE_SIZE;
this.discardPolicy = DEFAULT_DISCARD_POLICY;
init();
}
public ThreadPool(int min,int active,int max,int taskQueueSize,DiscardPolicy discardPolicy){
this.min = min;
this.active = active;
this.max = max;
this.taskQueueSize = taskQueueSize;
this.discardPolicy = discardPolicy;
init();
}
//重写线程池run()方法
@Override
public void run(){
while (!destroy){
System.out.printf("pool#Min:%d,active:%d,max:%d,ThreadQueueSize:%d,TaskQueueSize:%d\n",
min,active,max,THREAD_QUEUE.size(),TASK_QUEUE.size());
try {
Thread.sleep(5_000);
if (TASK_QUEUE.size()>active && size<active){
for (int i = size ; i <active ; i++) {
creatWorkTask();
}
//给线程池数量赋值
this.size = active;
System.out.println("扩容到 active");
}else if (TASK_QUEUE.size()>max && size<max){
for (int i = active ; i <max ; i++) {
creatWorkTask();
}
//给线程池数量赋值
this.size = max;
System.out.println("扩容到 max");
}
if (TASK_QUEUE.isEmpty() && size>active){
synchronized (THREAD_QUEUE){
System.out.println("开始 release thread");
int releaseSize = size - active;
Iterator<WorkerTask> iterator = THREAD_QUEUE.iterator();
while (iterator.hasNext()){
if (releaseSize<=0){
break;
}
WorkerTask workerTask = iterator.next();
if (workerTask.getTaskStatus()==TaskStatus.BLOCKED){
//先关闭再打断
workerTask.colse();
workerTask.interrupt();
iterator.remove();
releaseSize--;
}
}
size = active;
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public int getSize() {
return size;
}
public int getMin() {
return min;
}
public int getActive() {
return active;
}
public int getMax() {
return max;
}
public int getTaskQueueSize() {
return taskQueueSize;
}
//查看线程池是否可用
public boolean isDestroy(){
return this.destroy;
}
//初始化线程池
private void init() {
for (int i = 0; i <min ; i++) {
creatWorkTask();
}
//给线程池数量赋值
this.size = min;
//开启线程池线程
this.start();
}
//添加任务
public void submit(Runnable runnable){
//如果线程池已经被破坏,不能提交任务
if (isDestroy()){
//非法状态异常
throw new IllegalStateException("the thread pool is already destoryed and cant submit!!");
}
//这里是添加任务,下面有消耗任务,但是,你需要等我添加完成之后,你才能消耗,所以得枷锁
synchronized (TASK_QUEUE){
if (TASK_QUEUE.size()>taskQueueSize){
discardPolicy.discar();
}
TASK_QUEUE.addLast(runnable);
TASK_QUEUE.notifyAll();
}
}
//创建线程
private void creatWorkTask(){
WorkerTask workerTask = new WorkerTask(GROUP,THREAD_PREFIX+(seq++));
workerTask.start();
//加入到集合中
THREAD_QUEUE.add(workerTask);
}
//结束线程池
public void shotdown() throws InterruptedException{
while (!TASK_QUEUE.isEmpty()){
Thread.sleep(50);
}
synchronized (THREAD_QUEUE){
int initVal = THREAD_QUEUE.size();
while (initVal>0){
for (WorkerTask workerTask:THREAD_QUEUE){
if (workerTask.getTaskStatus()==TaskStatus.BLOCKED){
workerTask.interrupt();
workerTask.colse();
initVal--;
}else {
Thread.sleep(10);
}
}
}
this.destroy = true;
System.out.println("the thread pool is disposed");
}
}
//用枚举类型定义线程的状态
private enum TaskStatus{
FREE,RUNNING,BLOCKED,DEAD
}
//定义运行时异常
public static class DiscardException extends RuntimeException{
public DiscardException(String message){
super(message);
}
}
//定义拒绝策略的接口
public interface DiscardPolicy{
//jdk1.8开始,接口内的所有方法,默认都是public类型
void discar() throws DiscardException;
}
private static class WorkerTask extends Thread{
//默认状态为free
private volatile TaskStatus taskStatus = TaskStatus.FREE;
//使用父亲的构造器方法
public WorkerTask(ThreadGroup group,String name){
super(group,name);
}
public TaskStatus getTaskStatus() {
return taskStatus;
}
//重写run方法,避免线程执行完就被销毁
@Override
public void run(){
OUTER:
while (taskStatus!=TaskStatus.DEAD){
Runnable runnable;
synchronized (TASK_QUEUE){
while (TASK_QUEUE.isEmpty()){
try {
taskStatus = TaskStatus.BLOCKED;
TASK_QUEUE.wait();
} catch (InterruptedException e) {
System.out.println("close!!");
break OUTER;
}
}
runnable = TASK_QUEUE.removeFirst();
//如果把执行的任务放在同步代码块里,执行任务的时候,这个线程,还持有锁,导致任务无法添加
// if (runnable!=null){
// taskStatus = TaskStatus.RUNNING;
// runnable.run();
// taskStatus = TaskStatus.FREE;
}
if (runnable!=null){
taskStatus = TaskStatus.RUNNING;
runnable.run();
taskStatus = TaskStatus.FREE;
}
}
}
public void colse(){
taskStatus = TaskStatus.DEAD;
}
}
//测试一把
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool();
for (int i = 0; i <40 ; i++) {
threadPool.submit(()->{
System.out.println(Thread.currentThread().getName()+"start");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"结束");
});
}
try {
Thread.sleep(50_000);
threadPool.shotdown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
转载:https://blog.csdn.net/qq_40315045/article/details/104682076
查看评论