完整的线程池源码注释见章尾。

一、JDK 中的线程池类图


线程池核心类图

classDiagram

%% ================= 接口与抽象类 =================
class Executor {
    <<interface>>
    +execute(Runnable command) void
}

class ExecutorService {
    <<interface>>
    +shutdown() void
    +shutdownNow() List~Runnable~
    +submit(Callable~T~ task) Future~T~
    +invokeAll(Collection~Callable~T~~ tasks) List~Future~T~~
    ...其他方法...
}

class AbstractExecutorService {
    <<abstract>>
    +submit(Runnable task) Future~?~
    +invokeAny(Collection~Callable~T~~ tasks) T
    ...其他方法...
}

%% ================= 核心实现类 =================
class ThreadPoolExecutor {
    -corePoolSize: int
    -maximumPoolSize: int
    -keepAliveTime: long
    -workQueue: BlockingQueue~Runnable~
    -threadFactory: ThreadFactory
    -handler: RejectedExecutionHandler

    +execute(Runnable command) void
    +shutdown() void
    +shutdownNow() List~Runnable~
    +prestartAllCoreThreads() int
    ...其他方法...
}

class ScheduledThreadPoolExecutor {
    +schedule(Runnable command, long delay, TimeUnit unit) ScheduledFuture~?~
    +scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) ScheduledFuture~?~
    ...其他定时任务方法...
}

%% ================= 依赖组件 =================
class BlockingQueue~Runnable~ {
    <<interface>>
    +offer(Runnable e) boolean
    +poll(long timeout, TimeUnit unit) Runnable
    +take() Runnable
    ...其他方法...
}

class ThreadFactory {
    <<interface>>
    +newThread(Runnable r) Thread
}

class RejectedExecutionHandler {
    <<interface>>
    +rejectedExecution(Runnable r, ThreadPoolExecutor executor) void
}

%% ================= 继承与实现关系 =================
Executor <|-- ExecutorService
ExecutorService <|.. AbstractExecutorService
AbstractExecutorService <|-- ThreadPoolExecutor
ThreadPoolExecutor <|-- ScheduledThreadPoolExecutor

%% ================= 组合关系 =================
ThreadPoolExecutor *-- BlockingQueue~Runnable~   : 任务队列
ThreadPoolExecutor *-- ThreadFactory            : 线程工厂
ThreadPoolExecutor *-- RejectedExecutionHandler   : 拒绝策略

类图说明

1. 接口与抽象类

  • Executor
    最基础的执行器接口,定义任务提交方法 execute()

  • ExecutorService
    扩展 Executor,添加线程池生命周期管理(如 shutdown())和任务批量操作方法(如 submit())。

  • AbstractExecutorService
    提供 ExecutorService 接口的默认实现(如 submit() 的通用逻辑)。

2. 核心实现类

  • ThreadPoolExecutor
    标准线程池实现,核心参数:

    • corePoolSize: 核心线程数
    • maximumPoolSize: 最大线程数
    • keepAliveTime: 非核心线程空闲存活时间
    • workQueue: 任务队列(如 LinkedBlockingQueue
    • threadFactory: 线程创建工厂
    • handler: 拒绝策略(如 AbortPolicy
  • ScheduledThreadPoolExecutor
    支持定时任务的线程池(继承自 ThreadPoolExecutor)。

3. 依赖组件

  • BlockingQueue<Runnable>
    任务队列接口,决定线程池的任务调度策略(如无界队列、有界队列、同步移交队列)。

  • ThreadFactory
    线程创建工厂,允许自定义线程名称、优先级、守护状态等。

  • RejectedExecutionHandler
    拒绝策略处理器,定义队列和线程池满时的行为(如抛出异常、直接运行、静默丢弃等)。


类关系说明

关系类型 示例 说明
**继承(< --)** `Executor <
**实现(< ..)** `ExecutorService <
组合(*--) ThreadPoolExecutor *-- BlockingQueue 线程池持有任务队列实例

渲染方法

  1. 在线预览
    将代码粘贴到 Mermaid Live Editor 中查看效果。

  2. 本地工具
    使用支持 Mermaid 的 Markdown 工具(如 VSCode + Mermaid 插件、Typora)。


应用场景

  • 面试复习:快速理解线程池的架构设计
  • 代码评审:分析自定义线程池的参数配置
  • 系统设计:规划线程池与其他组件的交互关系

通过此图可以清晰看到线程池如何通过 组合模式 将任务队列、线程工厂、拒绝策略等组件解耦,实现高度可定制化的并发处理框架。

二、线程池构造函数详解

ThreadPoolExecutor 是 Java 中功能最强大的线程池实现类,其构造函数包含多个核心参数,直接影响线程池的行为和性能。以下是其构造函数及各参数的详细解析:

public ThreadPoolExecutor(
    int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    ThreadFactory threadFactory,
    RejectedExecutionHandler handler
)

1. 核心参数解析

参数名 类型 作用 默认值/示例
corePoolSize int 核心线程数:即使线程空闲也保留的线程数(除非设置allowCoreThreadTimeOut 需显式设置(如CPU密集型任务建议设置为CPU核数)
maximumPoolSize int 最大线程数:线程池允许创建的最大线程数 需显式设置(一般为核心数的2倍)
keepAliveTime long 非核心线程的空闲存活时间:超出核心数的线程在空闲时的存活时间 60(秒)
unit TimeUnit 时间单位:keepAliveTime的时间单位(纳秒、毫秒、秒等) TimeUnit.SECONDS
workQueue BlockingQueue 任务队列:用于保存等待执行的任务的阻塞队列 LinkedBlockingQueue
threadFactory ThreadFactory 线程工厂:用于创建新线程(可自定义线程名称、优先级等) Executors.defaultThreadFactory()
handler RejectedExecutionHandler 拒绝策略:当线程池和队列都满时,处理新提交任务的策略 AbortPolicy(抛出异常)

2. 线程池工作流程

当新任务提交时,线程池的处理逻辑如下:

graph TD
    A[提交任务] --> B{核心线程是否已满?}
    B -->|未满| C[创建新线程执行]
    B -->|已满| D{队列是否已满?}
    D -->|未满| E[任务入队等待]
    D -->|已满| F{最大线程数是否已满?}
    F -->|未满| G[创建非核心线程执行]
    F -->|已满| H[触发拒绝策略]

具体步骤

  1. 核心线程优先:任务提交后优先创建核心线程执行。
  2. 任务入队:核心线程满后,任务进入阻塞队列。
  3. 扩容线程:当队列已满且线程数未达最大值时,创建非核心线程。
  4. 拒绝策略:队列和线程池均满时,按策略处理新任务。

3. 关键参数详解

生产环境需要结合实际业务,压测并优化线程数。

(1)线程数配置
  • CPU密集型任务(如计算、压缩):
    corePoolSize = CPU核数Runtime.getRuntime().availableProcessors()
    maximumPoolSize = corePoolSize(避免过多线程竞争)

  • IO密集型任务(如网络请求、数据库操作):
    corePoolSize = CPU核数 * 2
    maximumPoolSize = corePoolSize + 预期并发数


(2)任务队列(workQueue)
队列类型 特点 适用场景
LinkedBlockingQueue 无界队列(默认容量Integer.MAX_VALUE),导致maximumPoolSize无效 任务量平稳且需要保证所有任务被处理
ArrayBlockingQueue 有界队列,需指定固定容量 需要控制队列大小防止内存溢出
SynchronousQueue 不存储任务,直接移交线程执行(需配合较大maximumPoolSize 高并发且任务处理快速的场景
PriorityBlockingQueue 带优先级的无界队列 需要任务按优先级执行

示例

// 创建容量为100的有界队列
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(100);

(3)拒绝策略(handler)
策略类 行为 适用场景
AbortPolicy 直接抛出RejectedExecutionException 需要明确感知任务被拒绝(生产环境常用)
CallerRunsPolicy 由提交任务的线程直接执行任务 需要保证任务不丢失(如日志记录)
DiscardPolicy 静默丢弃新任务 可容忍任务丢失(如监控数据上报)
DiscardOldestPolicy 丢弃队列中最旧的任务,然后重试提交 需要优先处理新任务(如实时消息处理)

自定义拒绝策略示例

new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        // 记录日志并降级处理
        logger.warn("Task rejected: {}", r);
        r.run(); // 降级由当前线程执行
    }
}

(4)线程工厂(threadFactory)

用于自定义线程属性,增强可观测性:

ThreadFactory factory = new ThreadFactory() {
    private final AtomicInteger counter = new AtomicInteger(1);
  
    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r);
        t.setName("Worker-" + counter.getAndIncrement());
        t.setPriority(Thread.NORM_PRIORITY);
        t.setDaemon(false); // 非守护线程
        return t;
    }
};

4. 完整配置示例

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    4,                                  // corePoolSize
    8,                                  // maximumPoolSize
    30,                                 // keepAliveTime
    TimeUnit.SECONDS,                   // unit
    new ArrayBlockingQueue<>(100),      // workQueue
    new CustomThreadFactory(),          // threadFactory
    new ThreadPoolExecutor.CallerRunsPolicy() // handler
);

5. 注意事项

  1. 避免使用无界队列:可能导致内存溢出(如LinkedBlockingQueue默认容量极大)。
  2. 合理设置最大线程数:避免过高导致线程竞争,或过低导致任务堆积。
  3. 监控线程池状态:通过getActiveCount()getQueue().size()等API监控运行状态。
  4. 优雅关闭:调用shutdown()平滑关闭,或shutdownNow()强制终止。

6. 常见问题

Q1:核心线程数设为0会怎样?

  • 所有线程都会被视为非核心线程,空闲时会被回收。适合任务量波动大的场景。

Q2:如何预热核心线程?

  • 调用prestartAllCoreThreads()提前创建所有核心线程。

Q3:动态调整参数

  • 通过setCorePoolSize()setMaximumPoolSize()动态调整线程数。

通过合理配置这些参数,可以构建出适应不同业务场景的高效线程池,平衡系统资源利用率和任务处理能力。

三、状态与线程数的原子控制

		/**
     * 用它来计算当前线程池的运行状态:原子变量,高3位保存线程池状态,低29位保存工作线程数
     */
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

在线程池中,线程状态和线程数维护在一个单原子变量中,这样做可以避免多变量同步问题。

    /**
     * 29位
     */
    private static final int COUNT_BITS = Integer.SIZE - 3;
    
    /**
     * 最大线程数:2^29 - 1
     */
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    /**
     * 线程池状态
     * 1. RUNNING:接受新任务并处理阻塞队列中的任务。11100000 00000000 00000000 00000000
     * 2. SHUTDOWN:不接受新任务,但处理阻塞队列中的任务。00000000 00000000 00000000 00000000
     * 3. STOP:不接受新任务,不处理阻塞队列中的任务,中断正在处理的任务。00100000 00000000 00000000 00000000
     * 4. TIDYING:所有任务已终止,workerCount(有效线程数)为0,线程过渡到该状态时,会执行terminated()方法。00100000 00000000 00000000 00000000
     * 5. TERMINATED:terminated()方法完成后,线程池的状态就会变成TERMINATED。01100000 00000000 00000000 00000000
     */
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

 // Packing and unpacking ctl
    /**
     * 计算线程池状态
     * 解释如下:
     * 1. ctl是一个AtomicInteger类型的变量,它同时存储了线程池的状态和当前工作线程数
     * 2. ctl的高3位存储线程池状态,低29位存储工作线程数
     * 3. CAPACITY是一个常量,值为(1 << COUNT_BITS) - 1,其中COUNT_BITS是29
     * 4. ~CAPACITY是对CAPACITY取反,得到的是一个高3位为1,低29位为0的掩码
     * 5. c & ~CAPACITY通过与操作,将ctl的低29位(工作线程数)置为0,只保留高3位的线程池状态
     */
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    
    /** 
     * 计算工作线程数
     * 解释如下:
     * 1. ctl是一个AtomicInteger类型的变量,它同时存储了线程池的状态和当前工作线程数
     * 2. ctl的高3位存储线程池状态,低29位存储工作线程数
     * 3. CAPACITY是一个常量,值为(1 << COUNT_BITS) - 1,其中COUNT_BITS是29
     * 4. c & CAPACITY通过与操作,将ctl的高3位(线程池状态)置为0,只保留低29位的工作线程数
     */
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    
    /**
     * 计算ctl的值
     * 解释如下:
     * 1. rs是线程池状态,wc是工作线程数
     * 2. rs << COUNT_BITS将线程池状态左移29位,得到一个高3位为线程池状态,低29位为0的数字
     * 3. wc & CAPACITY将工作线程数与CAPACITY进行与操作,得到一个低29位为工作线程数,高3位为0的数字
     * 4. rs << COUNT_BITS | wc & CAPACITY将两个数字进行或操作,得到一个高3位为线程池状态,低29位为工作线程数的数字
     */
    private static int ctlOf(int rs, int wc) { return rs | wc; }

    /*
     * Bit field accessors that don't require unpacking ctl.
     * These depend on the bit layout and on workerCount being never negative.
     */
    /**
     * 线程池状态小于s
     * 解释如下:
     * 1. c是ctl的值,它同时存储了线程池的状态和当前工作线程数
     * 2. s是线程池状态
     * 3. c < s通过比较操作,判断ctl的高3位是否小于s,如果小于,则返回true,否则返回false
     */
    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }
    
    /**
     * 线程池状态大于等于s
     * 解释如下:
     * 1. c是ctl的值,它同时存储了线程池的状态和当前工作线程数
     * 2. s是线程池状态
     * 3. c >= s通过比较操作,判断ctl的高3位是否大于等于s,如果大于等于,则返回true,否则返回false
     */
    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }

    /**
     * 线程池状态是RUNNING
     * 解释如下:
     * 1. c是ctl的值,它同时存储了线程池的状态和当前工作线程数
     * 2. c == RUNNING通过比较操作,判断ctl的高3位是否等于RUNNING,如果等于,则返回true,否则返回false
     */
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }

四、Worker线程核心逻辑

private final class Worker 
    extends AbstractQueuedSynchronizer 
    implements Runnable 
{
    final Thread thread;      // 实际执行任务的线程对象
    Runnable firstTask;       // 初始任务(可能为null)
    volatile long completedTasks; // 完成的任务计数器

    Worker(Runnable firstTask) {
        setState(-1); // 初始状态-1禁止中断(直到runWorker执行)
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this); // 关键:将Worker自身作为Runnable传给线程
    }

    public void run() {
        runWorker(this); // 实际执行入口
    }

    // 实现简单的不可重入锁
    protected boolean isHeldExclusively() { return getState() != 0; }
    protected boolean tryAcquire(int unused) { 
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }
  
    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
}

关键设计

  1. Worker本身作为Runnable:通过 thread.start() 触发 runWorker() 执行
  2. AQS锁的作用
    • 防止任务执行期间被外部中断
    • 实现 shutdownNow() 时批量中断空闲线程
  3. 状态-1初始化:避免线程启动前被意外中断

五、任务提交完整流程

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
  
    // 阶段1:尝试创建核心线程
    if (workerCountOf(c) < corePoolSize) {       // 条件1:当前线程数 < 核心数
        if (addWorker(command, true))            // 参数true表示核心线程
            return;                              // 创建成功直接返回
        c = ctl.get();                           // 创建失败(并发导致状态变化)重新获取ctl
    }

    // 阶段2:尝试将任务加入队列
    if (isRunning(c) && workQueue.offer(command)) { // 再次检查运行状态并入队
        int recheck = ctl.get();
        if (!isRunning(recheck) && remove(command))  // 二次检查:线程池已关闭?
            reject(command);                        // 触发拒绝策略(任务被移除)
        else if (workerCountOf(recheck) == 0)        // 无存活线程但队列有任务
            addWorker(null, false);                  // 创建救急线程处理队列
    }
  
    // 阶段3:队列已满,尝试创建非核心线程
    else if (!addWorker(command, false))         // 参数false表示非核心线程
        reject(command);                         // 创建失败触发拒绝策略
}

关键设计

  • 核心线程创建:即使队列未满,只要线程数不足立即创建
  • 队列双检锁workQueue.offer() 后必须二次检查线程池状态
  • 救急线程机制:当队列有任务但线程数为零时(例如被回收),创建新线程处理

六、任务执行核心(runWorker() 完整流程)

Worker 类中,执行 run 方法,底层调用了 runWorker方法。在 runWorker 方法中,提供了一些扩展点,比如说 beforeExecute(wt, task)afterExecute(task, thrown)。在自定义线程池中,可以扩展实现这两个方法并自定义逻辑,比如加上耗时时间日志之类的。

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // 状态置为0,允许中断
  
    boolean completedAbruptly = true;
    try {
        // 循环获取任务:首次执行firstTask,后续从队列获取
        while (task != null || (task = getTask()) != null) {
            w.lock(); // 加锁防止中断
          
            // 处理线程中断信号(STOP状态)
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&          // 清除中断标志
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();                    // 重新设置中断标志
          
            try {
                beforeExecute(wt, task);           // 扩展点:执行前回调
                Throwable thrown = null;
                try {
                    task.run();                    // 执行用户任务
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);    // 扩展点:执行后回调
                }
            } finally {
                task = null;                       // 清空当前任务
                w.completedTasks++;
                w.unlock();                        // 释放锁
            }
        }
        completedAbruptly = false;                // 正常退出循环
    } finally {
        processWorkerExit(w, completedAbruptly);   // 处理线程退出
    }
}

关键机制

  • 锁的释放顺序unlock() 必须在 completedTasks++ 之后,保证计数的可见性
  • 异常处理:捕获所有Throwable但仅记录Error和RuntimeException
  • 扩展点beforeExecute()afterExecute() 可用于监控任务执行

七、任务获取机制(getTask() 源码解析)

private Runnable getTask() {
    boolean timedOut = false; // 上次poll是否超时
  
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 状态检查:当线程池关闭且队列为空时不再获取任务
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount(); // 减少线程数
            return null;
        }
      
        int wc = workerCountOf(c);
      
        // 是否允许超时回收:当前线程数超过核心数 或 允许核心线程超时
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
      
        // 两个终止条件:
        // 1. (线程数超过最大值 或 超时发生) 且 (线程数>1 或 队列为空)
        // 2. 线程数超过容量限制
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
      
        try {
            // 根据timed决定使用poll或take
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true; // 标记超时
        } catch (InterruptedException retry) {
            timedOut = false; // 中断重试
        }
    }
}

核心逻辑

  • 超时控制:非核心线程使用 poll(keepAliveTime),核心线程默认使用 take()(除非开启allowCoreThreadTimeOut
  • 优雅终止:当线程池关闭且队列为空时,逐步减少工作线程数
  • 并发安全:通过CAS操作 compareAndDecrementWorkerCount 保证线程数准确

八、线程退出处理(processWorkerExit() 逻辑)

在 任务执行核心runWorker() 方法的最后,会调用 processWorkerExit() 执行线程退出逻辑。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 异常退出时需要手动减少线程数
    if (completedAbruptly) 
        decrementWorkerCount();

    // 统计完成任务数
    completedTaskCount += w.completedTasks;
    workers.remove(w); // 从集合中移除Worker
  
    // 尝试终止线程池(如果状态是SHUTDOWN且工作队列为空)
    tryTerminate();
  
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) { // 线程池仍在运行
        if (!completedAbruptly) {    // 正常退出
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && !workQueue.isEmpty())
                min = 1; // 至少保留一个线程处理队列任务
            if (workerCountOf(c) >= min)
                return; // 无需补充新线程
        }
        addWorker(null, false); // 创建新Worker处理剩余任务
    }
}

核心逻辑

  • 补偿机制:当非异常退出且当前线程数不足时,补充新线程
  • 保留核心线程:根据 allowCoreThreadTimeOut 决定是否保留核心线程
  • 尝试终止线程池tryTerminate() 会向其他线程发送终止信号

九、线程池终止流程(shutdown() vs shutdownNow())

1. shutdown()

public void shutdown() {
    checkShutdownAccess();
    advanceRunState(SHUTDOWN); // 状态转为SHUTDOWN
    interruptIdleWorkers();    // 仅中断空闲线程
    onShutdown();              // 空方法(扩展点)
}

private void interruptIdleWorkers() {
    for (Worker w : workers) {
        Thread t = w.thread;
        if (!t.isInterrupted() && w.tryLock()) { // 尝试获取Worker锁
            try {
                t.interrupt();
            } finally {
                w.unlock();
            }
        }
    }
}

2. shutdownNow()

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    checkShutdownAccess();
    advanceRunState(STOP);     // 直接进入STOP状态
    interruptWorkers();        // 强制中断所有线程
    tasks = drainQueue();      // 清空任务队列
    return tasks;
}

private void interruptWorkers() {
    for (Worker w : workers) {
        w.interruptIfStarted(); // 不需要获取锁,直接中断
    }
}

关键区别

  • shutdown() 允许执行完队列剩余任务,shutdownNow() 立即停止所有任务
  • 中断策略:shutdown() 只中断空闲线程,shutdownNow() 中断所有线程

十、线程池扩展点/动态修改/监控

凡是可以 set 的成员变量,都可以被动态更新

  • 线程池的核心线程数量 corePoolSize 可以被动态更新。
  • 线程池的最大线程数量 maximumPoolSize 可以被动态更新。
  • 线程池的拒绝策略处理器 RejectedExecutionHandler 可以被动态更新。
  • 线程池核心线程是否允许超时回收的标志 allowCoreThreadTimeOut 可以被动态更新。
  • 线程池线程的最大空闲时间 keepAliveTime 可以被动态更新(之所以没有 threadFactory,是因为一般来说不会动态更新线程工厂)。

凡是可以 get 的成员变量,它们的信息都可以在线程池运行过程中被收集:

  • 线程池的核心线程数 corePoolSize、最大线程数 maximumPoolSize 可以被收集。
  • 线程池线程的空闲时间 keepAliveTime、核心线程是否允许超时回收 allowCoreThreadTimeOut 可以被收集。
  • 线程池的拒绝策略 RejectedExecutionHandler 和任务队列 workQueue 可以被收集。
  • 线程池当前创建的线程数量 poolSize、曾经创建线程的最大数量 largestPoolSize、当前活跃线程数量 activeCount 可以被收集。
  • 线程池的执行的任务总数 taskCount、已经执行完毕的任务总数 completedTaskCount 可以被收集。

1. 动态参数调整(setCorePoolSize 示例)

public void setCorePoolSize(int corePoolSize) {
    if (corePoolSize < 0)
        throw new IllegalArgumentException();
    int delta = corePoolSize - this.corePoolSize;
    this.corePoolSize = corePoolSize;
  
    // 如果当前线程数超过新核心数,尝试中断空闲线程
    if (workerCountOf(ctl.get()) > corePoolSize)
        interruptIdleWorkers();
    // 如果新核心数更大,可能需要创建新线程
    else if (delta > 0) { 
        int k = Math.min(delta, workQueue.size()); // 需要创建的线程数
        while (k-- > 0) {
            if (addWorker(null, true)) // 创建核心线程处理队列任务
                continue;
            break;
        }
    }
}

设计亮点

  • 动态扩容:当核心数增加时,自动创建新线程处理积压任务
  • 智能缩容:通过 interruptIdleWorkers() 逐步回收多余线程

2. 拒绝策略实现原理(以CallerRunsPolicy为例)

public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {          // 线程池未关闭
            r.run();                    // 由调用者线程直接执行任务
        }
    }
}

特点

  • 同步执行可能降低整体吞吐量,但保证任务不会丢失
  • 适用于需要保证任务绝对执行的场景

3. 监控接口实现原理

可以通过继承 ThreadPoolExecutor 并重写钩子方法实现监控:

protected void beforeExecute(Thread t, Runnable r) {
    super.beforeExecute(t, r);
    // 记录任务开始时间
    ((MyTask) r).setStartTime(System.nanoTime());
}

protected void afterExecute(Runnable r, Throwable t) {
    super.afterExecute(r, t);
    // 统计任务耗时
    long cost = System.nanoTime() - ((MyTask) r).getStartTime();
    metrics.recordTaskTime(cost);
}

protected void terminated() {
    super.terminated();
    // 线程池完全终止时触发
    logger.info("ThreadPool terminated");
}

设计哲学总结:

  1. 状态驱动:所有行为由 ctl 的状态变化触发
  2. 锁粒度优化:Worker级别的锁而非全局锁,提高并发度
  3. 资源弹性:动态调整线程数,平衡系统负载
  4. 扩展友好:通过多个protected方法提供扩展点
  5. 失败隔离:单个任务异常不会导致整个线程池崩溃

建议通过调试模式观察以下场景的代码路径:

  1. 核心线程数、队列容量、最大线程数都满时的拒绝流程
  2. keepAliveTime到期时线程回收过程
  3. shutdown() 与队列中剩余任务的交互
  4. 核心线程超时参数开启后的行为变

十一、ThreadPoolExecutor 类源码注释

public class ThreadPoolExecutor extends AbstractExecutorService {
    /**
     * 用它来计算当前线程池的运行状态:原子变量,高3位保存线程池状态,低29位保存工作线程数
     */
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));


    /**
     * 29位
     */
    private static final int COUNT_BITS = Integer.SIZE - 3;
    
    /**
     * 最大线程数:2^29 - 1
     */
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    /**
     * 线程池状态
     * 1. RUNNING:接受新任务并处理阻塞队列中的任务。11100000 00000000 00000000 00000000
     * 2. SHUTDOWN:不接受新任务,但处理阻塞队列中的任务。00000000 00000000 00000000 00000000
     * 3. STOP:不接受新任务,不处理阻塞队列中的任务,中断正在处理的任务。00100000 00000000 00000000 00000000
     * 4. TIDYING:所有任务已终止,workerCount(有效线程数)为0,线程过渡到该状态时,会执行terminated()方法。00100000 00000000 00000000 00000000
     * 5. TERMINATED:terminated()方法完成后,线程池的状态就会变成TERMINATED。01100000 00000000 00000000 00000000
     */
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    /**
     * 计算线程池状态
     * 解释如下:
     * 1. ctl是一个AtomicInteger类型的变量,它同时存储了线程池的状态和当前工作线程数
     * 2. ctl的高3位存储线程池状态,低29位存储工作线程数
     * 3. CAPACITY是一个常量,值为(1 << COUNT_BITS) - 1,其中COUNT_BITS是29
     * 4. ~CAPACITY是对CAPACITY取反,得到的是一个高3位为1,低29位为0的掩码
     * 5. c & ~CAPACITY通过与操作,将ctl的低29位(工作线程数)置为0,只保留高3位的线程池状态
     */
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    
    /** 
     * 计算工作线程数
     * 解释如下:
     * 1. ctl是一个AtomicInteger类型的变量,它同时存储了线程池的状态和当前工作线程数
     * 2. ctl的高3位存储线程池状态,低29位存储工作线程数
     * 3. CAPACITY是一个常量,值为(1 << COUNT_BITS) - 1,其中COUNT_BITS是29
     * 4. c & CAPACITY通过与操作,将ctl的高3位(线程池状态)置为0,只保留低29位的工作线程数
     */
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    
    /**
     * 计算ctl的值
     * 解释如下:
     * 1. rs是线程池状态,wc是工作线程数
     * 2. rs << COUNT_BITS将线程池状态左移29位,得到一个高3位为线程池状态,低29位为0的数字
     * 3. wc & CAPACITY将工作线程数与CAPACITY进行与操作,得到一个低29位为工作线程数,高3位为0的数字
     * 4. rs << COUNT_BITS | wc & CAPACITY将两个数字进行或操作,得到一个高3位为线程池状态,低29位为工作线程数的数字
     */
    private static int ctlOf(int rs, int wc) { return rs | wc; }

    /*
     * Bit field accessors that don't require unpacking ctl.
     * These depend on the bit layout and on workerCount being never negative.
     */
    /**
     * 线程池状态小于s
     * 解释如下:
     * 1. c是ctl的值,它同时存储了线程池的状态和当前工作线程数
     * 2. s是线程池状态
     * 3. c < s通过比较操作,判断ctl的高3位是否小于s,如果小于,则返回true,否则返回false
     */
    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }
    
    /**
     * 线程池状态大于等于s
     * 解释如下:
     * 1. c是ctl的值,它同时存储了线程池的状态和当前工作线程数
     * 2. s是线程池状态
     * 3. c >= s通过比较操作,判断ctl的高3位是否大于等于s,如果大于等于,则返回true,否则返回false
     */
    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }

    /**
     * 线程池状态是RUNNING
     * 解释如下:
     * 1. c是ctl的值,它同时存储了线程池的状态和当前工作线程数
     * 2. c == RUNNING通过比较操作,判断ctl的高3位是否等于RUNNING,如果等于,则返回true,否则返回false
     */
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }

    /**
     * Attempts to CAS-increment the workerCount field of ctl.
     */
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }

    /**
     * Attempts to CAS-decrement the workerCount field of ctl.
     */
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }

    /**
     * 自旋减少工作线程数
     */
    private void decrementWorkerCount() {
        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    }

    /**
     * 任务队列
     */
    private final BlockingQueue<Runnable> workQueue;
    
    private final ReentrantLock mainLock = new ReentrantLock();

    /**
     * 线程池核心任务:相当于真正意义上的线程池。
     * 当线程池创建时,会创建一个Worker对象,Worker对象会创建一个线程,线程会执行Worker对象的run方法。
     * 
     * 因为Worker中定义了Thread成员变量,可以说一个Worker就对应着一个线程,线程池每创建
     * 一个Worker对象就意味着创建了一个新的线程,并且会把新创建的Worker对象添加到workerPool中
     * Set containing all worker threads in pool. Accessed only when
     * holding mainLock.
     */
    private final HashSet<Worker> workers = new HashSet<Worker>();

    /**
     * 
     * Wait condition to support awaitTermination
     */
    private final Condition termination = mainLock.newCondition();

    /**
     * 线程池执行完毕的任务的数量
     */
    private int largestPoolSize;

    /**
     * 线程池执行完毕的任务的数量
     */
    private long completedTaskCount;

    /**
     * 创建线程的工厂
     */
    private volatile ThreadFactory threadFactory;

    /**
     * 拒绝策略处理器
     */
    private volatile RejectedExecutionHandler handler;

    /**
    * 线程的存活时间,这个一般是针对非核心线程的,如果allowCoreThreadTimeOut设置为true了
     * 那么核心线程在经过keepAliveTime空闲时间之后,也会被回收
     */
    private volatile long keepAliveTime;

    /**
     * 这个成员变量就是用来判断是否允许核心线程超时后被回收的标志
     * 也就是说,当线程池中没有了任务,那么超过线程存活时间之后,线程池的核心线程也允许被回收
     */
    private volatile boolean allowCoreThreadTimeOut;

    /**
     * 线程池的核心线程数量
     */
    private volatile int corePoolSize;

    /**
     * 线程池最大线程数量
     * 
     */
    private volatile int maximumPoolSize;

    /**
     * The default rejected execution handler
     */
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();

    /**
     * Permission required for callers of shutdown and shutdownNow.
     * We additionally require (see checkShutdownAccess) that callers
     * have permission to actually interrupt threads in the worker set
     * (as governed by Thread.interrupt, which relies on
     * ThreadGroup.checkAccess, which in turn relies on
     * SecurityManager.checkAccess). Shutdowns are attempted only if
     * these checks pass.
     *
     * All actual invocations of Thread.interrupt (see
     * interruptIdleWorkers and interruptWorkers) ignore
     * SecurityExceptions, meaning that the attempted interrupts
     * silently fail. In the case of shutdown, they should not fail
     * unless the SecurityManager has inconsistent policies, sometimes
     * allowing access to a thread and sometimes not. In such cases,
     * failure to actually interrupt threads may disable or delay full
     * termination. Other uses of interruptIdleWorkers are advisory,
     * and failure to actually interrupt will merely delay response to
     * configuration changes so is not handled exceptionally.
     */
    private static final RuntimePermission shutdownPerm =
        new RuntimePermission("modifyThread");

    /* The context to be used when executing the finalizer, or null. */
    private final AccessControlContext acc;

    /**
     * Class Worker mainly maintains interrupt control state for
     * threads running tasks, along with other minor bookkeeping.
     * This class opportunistically extends AbstractQueuedSynchronizer
     * to simplify acquiring and releasing a lock surrounding each
     * task execution.  This protects against interrupts that are
     * intended to wake up a worker thread waiting for a task from
     * instead interrupting a task being run.  We implement a simple
     * non-reentrant mutual exclusion lock rather than use
     * ReentrantLock because we do not want worker tasks to be able to
     * reacquire the lock when they invoke pool control methods like
     * setCorePoolSize.  Additionally, to suppress interrupts until
     * the thread actually starts running tasks, we initialize lock
     * state to a negative value, and clear it upon start (in
     * runWorker).
     */
    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

    /*
     * Methods for setting control state
     */

    /**
     * Transitions runState to given target, or leaves it alone if
     * already at least the given target.
     *
     * @param targetState the desired state, either SHUTDOWN or STOP
     *        (but not TIDYING or TERMINATED -- use tryTerminate for that)
     */
    private void advanceRunState(int targetState) {
        for (;;) {
            int c = ctl.get();
            if (runStateAtLeast(c, targetState) ||
                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                break;
        }
    }

    /**
     * Transitions to TERMINATED state if either (SHUTDOWN and pool
     * and queue empty) or (STOP and pool empty).  If otherwise
     * eligible to terminate but workerCount is nonzero, interrupts an
     * idle worker to ensure that shutdown signals propagate. This
     * method must be called following any action that might make
     * termination possible -- reducing worker count or removing tasks
     * from the queue during shutdown. The method is non-private to
     * allow access from ScheduledThreadPoolExecutor.
     */
    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

    /*
     * Methods for controlling interrupts to worker threads.
     */

    /**
     * If there is a security manager, makes sure caller has
     * permission to shut down threads in general (see shutdownPerm).
     * If this passes, additionally makes sure the caller is allowed
     * to interrupt each worker thread. This might not be true even if
     * first check passed, if the SecurityManager treats some threads
     * specially.
     */
    private void checkShutdownAccess() {
        SecurityManager security = System.getSecurityManager();
        if (security != null) {
            security.checkPermission(shutdownPerm);
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (Worker w : workers)
                    security.checkAccess(w.thread);
            } finally {
                mainLock.unlock();
            }
        }
    }

    /**
     * Interrupts all threads, even if active. Ignores SecurityExceptions
     * (in which case some threads may remain uninterrupted).
     */
    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }

    /**
     * Interrupts threads that might be waiting for tasks (as
     * indicated by not being locked) so they can check for
     * termination or configuration changes. Ignores
     * SecurityExceptions (in which case some threads may remain
     * uninterrupted).
     *
     * @param onlyOne If true, interrupt at most one worker. This is
     * called only from tryTerminate when termination is otherwise
     * enabled but there are still other workers.  In this case, at
     * most one waiting worker is interrupted to propagate shutdown
     * signals in case all threads are currently waiting.
     * Interrupting any arbitrary thread ensures that newly arriving
     * workers since shutdown began will also eventually exit.
     * To guarantee eventual termination, it suffices to always
     * interrupt only one idle worker, but shutdown() interrupts all
     * idle workers so that redundant workers exit promptly, not
     * waiting for a straggler task to finish.
     */
    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

    /**
     * Common form of interruptIdleWorkers, to avoid having to
     * remember what the boolean argument means.
     */
    private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
    }

    private static final boolean ONLY_ONE = true;

    /*
     * Misc utilities, most of which are also exported to
     * ScheduledThreadPoolExecutor
     */

    /**
     * Invokes the rejected execution handler for the given command.
     * Package-protected for use by ScheduledThreadPoolExecutor.
     */
    final void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }

    /**
     * Performs any further cleanup following run state transition on
     * invocation of shutdown.  A no-op here, but used by
     * ScheduledThreadPoolExecutor to cancel delayed tasks.
     */
    void onShutdown() {
    }

    /**
     * State check needed by ScheduledThreadPoolExecutor to
     * enable running tasks during shutdown.
     *
     * @param shutdownOK true if should return true if SHUTDOWN
     */
    final boolean isRunningOrShutdown(boolean shutdownOK) {
        int rs = runStateOf(ctl.get());
        return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
    }

    /**
     * Drains the task queue into a new list, normally using
     * drainTo. But if the queue is a DelayQueue or any other kind of
     * queue for which poll or drainTo may fail to remove some
     * elements, it deletes them one by one.
     */
    private List<Runnable> drainQueue() {
        BlockingQueue<Runnable> q = workQueue;
        ArrayList<Runnable> taskList = new ArrayList<Runnable>();
        q.drainTo(taskList);
        if (!q.isEmpty()) {
            for (Runnable r : q.toArray(new Runnable[0])) {
                if (q.remove(r))
                    taskList.add(r);
            }
        }
        return taskList;
    }

    /*
     * Methods for creating, running and cleaning up after workers
     */

    /**
     * Checks if a new worker can be added with respect to current
     * pool state and the given bound (either core or maximum). If so,
     * the worker count is adjusted accordingly, and, if possible, a
     * new worker is created and started, running firstTask as its
     * first task. This method returns false if the pool is stopped or
     * eligible to shut down. It also returns false if the thread
     * factory fails to create a thread when asked.  If the thread
     * creation fails, either due to the thread factory returning
     * null, or due to an exception (typically OutOfMemoryError in
     * Thread.start()), we roll back cleanly.
     *
     * @param firstTask the task the new thread should run first (or
     * null if none). Workers are created with an initial first task
     * (in method execute()) to bypass queuing when there are fewer
     * than corePoolSize threads (in which case we always start one),
     * or when the queue is full (in which case we must bypass queue).
     * Initially idle threads are usually created via
     * prestartCoreThread or to replace other dying workers.
     *
     * @param core if true use corePoolSize as bound, else
     * maximumPoolSize. (A boolean indicator is used here rather than a
     * value to ensure reads of fresh values after checking other pool
     * state).
     * @return true if successful
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

    /**
     * Rolls back the worker thread creation.
     * - removes worker from workers, if present
     * - decrements worker count
     * - rechecks for termination, in case the existence of this
     *   worker was holding up termination
     */
    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w);
            decrementWorkerCount();
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }

    /**
     * Performs cleanup and bookkeeping for a dying worker. Called
     * only from worker threads. Unless completedAbruptly is set,
     * assumes that workerCount has already been adjusted to account
     * for exit.  This method removes thread from worker set, and
     * possibly terminates the pool or replaces the worker if either
     * it exited due to user task exception or if fewer than
     * corePoolSize workers are running or queue is non-empty but
     * there are no workers.
     *
     * @param w the worker
     * @param completedAbruptly if the worker died due to user exception
     */
    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

    /**
     * Performs blocking or timed wait for a task, depending on
     * current configuration settings, or returns null if this worker
     * must exit because of any of:
     * 1. There are more than maximumPoolSize workers (due to
     *    a call to setMaximumPoolSize).
     * 2. The pool is stopped.
     * 3. The pool is shutdown and the queue is empty.
     * 4. This worker timed out waiting for a task, and timed-out
     *    workers are subject to termination (that is,
     *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
     *    both before and after the timed wait, and if the queue is
     *    non-empty, this worker is not the last thread in the pool.
     *
     * @return task, or null if the worker must exit, in which case
     *         workerCount is decremented
     */
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

    /**
     * Main worker run loop.  Repeatedly gets tasks from queue and
     * executes them, while coping with a number of issues:
     *
     * 1. We may start out with an initial task, in which case we
     * don't need to get the first one. Otherwise, as long as pool is
     * running, we get tasks from getTask. If it returns null then the
     * worker exits due to changed pool state or configuration
     * parameters.  Other exits result from exception throws in
     * external code, in which case completedAbruptly holds, which
     * usually leads processWorkerExit to replace this thread.
     *
     * 2. Before running any task, the lock is acquired to prevent
     * other pool interrupts while the task is executing, and then we
     * ensure that unless pool is stopping, this thread does not have
     * its interrupt set.
     *
     * 3. Each task run is preceded by a call to beforeExecute, which
     * might throw an exception, in which case we cause thread to die
     * (breaking loop with completedAbruptly true) without processing
     * the task.
     *
     * 4. Assuming beforeExecute completes normally, we run the task,
     * gathering any of its thrown exceptions to send to afterExecute.
     * We separately handle RuntimeException, Error (both of which the
     * specs guarantee that we trap) and arbitrary Throwables.
     * Because we cannot rethrow Throwables within Runnable.run, we
     * wrap them within Errors on the way out (to the thread's
     * UncaughtExceptionHandler).  Any thrown exception also
     * conservatively causes thread to die.
     *
     * 5. After task.run completes, we call afterExecute, which may
     * also throw an exception, which will also cause thread to
     * die. According to JLS Sec 14.20, this exception is the one that
     * will be in effect even if task.run throws.
     *
     * The net effect of the exception mechanics is that afterExecute
     * and the thread's UncaughtExceptionHandler have as accurate
     * information as we can provide about any problems encountered by
     * user code.
     *
     * @param w the worker
     */
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

    // Public constructors and methods

    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters and default thread factory and rejected execution handler.
     * It may be more convenient to use one of the {@link Executors} factory
     * methods instead of this general purpose constructor.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters and default rejected execution handler.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }

    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters and default thread factory.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

    /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

    /**
     * Initiates an orderly shutdown in which previously submitted
     * tasks are executed, but no new tasks will be accepted.
     * Invocation has no additional effect if already shut down.
     *
     * <p>This method does not wait for previously submitted tasks to
     * complete execution.  Use {@link #awaitTermination awaitTermination}
     * to do that.
     *
     * @throws SecurityException {@inheritDoc}
     */
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

    /**
     * Attempts to stop all actively executing tasks, halts the
     * processing of waiting tasks, and returns a list of the tasks
     * that were awaiting execution. These tasks are drained (removed)
     * from the task queue upon return from this method.
     *
     * <p>This method does not wait for actively executing tasks to
     * terminate.  Use {@link #awaitTermination awaitTermination} to
     * do that.
     *
     * <p>There are no guarantees beyond best-effort attempts to stop
     * processing actively executing tasks.  This implementation
     * cancels tasks via {@link Thread#interrupt}, so any task that
     * fails to respond to interrupts may never terminate.
     *
     * @throws SecurityException {@inheritDoc}
     */
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

    public boolean isShutdown() {
        return ! isRunning(ctl.get());
    }

    /**
     * Returns true if this executor is in the process of terminating
     * after {@link #shutdown} or {@link #shutdownNow} but has not
     * completely terminated.  This method may be useful for
     * debugging. A return of {@code true} reported a sufficient
     * period after shutdown may indicate that submitted tasks have
     * ignored or suppressed interruption, causing this executor not
     * to properly terminate.
     *
     * @return {@code true} if terminating but not yet terminated
     */
    public boolean isTerminating() {
        int c = ctl.get();
        return ! isRunning(c) && runStateLessThan(c, TERMINATED);
    }

    public boolean isTerminated() {
        return runStateAtLeast(ctl.get(), TERMINATED);
    }

    public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (;;) {
                if (runStateAtLeast(ctl.get(), TERMINATED))
                    return true;
                if (nanos <= 0)
                    return false;
                nanos = termination.awaitNanos(nanos);
            }
        } finally {
            mainLock.unlock();
        }
    }

    /**
     * Invokes {@code shutdown} when this executor is no longer
     * referenced and it has no threads.
     */
    protected void finalize() {
        SecurityManager sm = System.getSecurityManager();
        if (sm == null || acc == null) {
            shutdown();
        } else {
            PrivilegedAction<Void> pa = () -> { shutdown(); return null; };
            AccessController.doPrivileged(pa, acc);
        }
    }

    /**
     * Sets the thread factory used to create new threads.
     *
     * @param threadFactory the new thread factory
     * @throws NullPointerException if threadFactory is null
     * @see #getThreadFactory
     */
    public void setThreadFactory(ThreadFactory threadFactory) {
        if (threadFactory == null)
            throw new NullPointerException();
        this.threadFactory = threadFactory;
    }

    /**
     * Returns the thread factory used to create new threads.
     *
     * @return the current thread factory
     * @see #setThreadFactory(ThreadFactory)
     */
    public ThreadFactory getThreadFactory() {
        return threadFactory;
    }

    /**
     * Sets a new handler for unexecutable tasks.
     *
     * @param handler the new handler
     * @throws NullPointerException if handler is null
     * @see #getRejectedExecutionHandler
     */
    public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
        if (handler == null)
            throw new NullPointerException();
        this.handler = handler;
    }

    /**
     * Returns the current handler for unexecutable tasks.
     *
     * @return the current handler
     * @see #setRejectedExecutionHandler(RejectedExecutionHandler)
     */
    public RejectedExecutionHandler getRejectedExecutionHandler() {
        return handler;
    }

    /**
     * Sets the core number of threads.  This overrides any value set
     * in the constructor.  If the new value is smaller than the
     * current value, excess existing threads will be terminated when
     * they next become idle.  If larger, new threads will, if needed,
     * be started to execute any queued tasks.
     *
     * @param corePoolSize the new core size
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     * @see #getCorePoolSize
     */
    public void setCorePoolSize(int corePoolSize) {
        if (corePoolSize < 0)
            throw new IllegalArgumentException();
        int delta = corePoolSize - this.corePoolSize;
        this.corePoolSize = corePoolSize;
        if (workerCountOf(ctl.get()) > corePoolSize)
            interruptIdleWorkers();
        else if (delta > 0) {
            // We don't really know how many new threads are "needed".
            // As a heuristic, prestart enough new workers (up to new
            // core size) to handle the current number of tasks in
            // queue, but stop if queue becomes empty while doing so.
            int k = Math.min(delta, workQueue.size());
            while (k-- > 0 && addWorker(null, true)) {
                if (workQueue.isEmpty())
                    break;
            }
        }
    }

   
    public int getCorePoolSize() {
        return corePoolSize;
    }
    
    public boolean prestartCoreThread() {
        return workerCountOf(ctl.get()) < corePoolSize &&
            addWorker(null, true);
    }

   
    void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        if (wc < corePoolSize)
            addWorker(null, true);
        else if (wc == 0)
            addWorker(null, false);
    }

    
    public int prestartAllCoreThreads() {
        int n = 0;
        while (addWorker(null, true))
            ++n;
        return n;
    }

    
    public boolean allowsCoreThreadTimeOut() {
        return allowCoreThreadTimeOut;
    }

   
    public void allowCoreThreadTimeOut(boolean value) {
        if (value && keepAliveTime <= 0)
            throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
        if (value != allowCoreThreadTimeOut) {
            allowCoreThreadTimeOut = value;
            if (value)
                interruptIdleWorkers();
        }
    }

    
    public void setMaximumPoolSize(int maximumPoolSize) {
        if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
            throw new IllegalArgumentException();
        this.maximumPoolSize = maximumPoolSize;
        if (workerCountOf(ctl.get()) > maximumPoolSize)
            interruptIdleWorkers();
    }

   
    public int getMaximumPoolSize() {
        return maximumPoolSize;
    }

    
    public void setKeepAliveTime(long time, TimeUnit unit) {
        if (time < 0)
            throw new IllegalArgumentException();
        if (time == 0 && allowsCoreThreadTimeOut())
            throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
        long keepAliveTime = unit.toNanos(time);
        long delta = keepAliveTime - this.keepAliveTime;
        this.keepAliveTime = keepAliveTime;
        if (delta < 0)
            interruptIdleWorkers();
    }

    
    public long getKeepAliveTime(TimeUnit unit) {
        return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
    }

    
    public BlockingQueue<Runnable> getQueue() {
        return workQueue;
    }

    
    public boolean remove(Runnable task) {
        boolean removed = workQueue.remove(task);
        tryTerminate(); // In case SHUTDOWN and now empty
        return removed;
    }

    
    public void purge() {
        final BlockingQueue<Runnable> q = workQueue;
        try {
            Iterator<Runnable> it = q.iterator();
            while (it.hasNext()) {
                Runnable r = it.next();
                if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
                    it.remove();
            }
        } catch (ConcurrentModificationException fallThrough) {
            // Take slow path if we encounter interference during traversal.
            // Make copy for traversal and call remove for cancelled entries.
            // The slow path is more likely to be O(N*N).
            for (Object r : q.toArray())
                if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
                    q.remove(r);
        }

        tryTerminate(); // In case SHUTDOWN and now empty
    }

    

    
    public int getPoolSize() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // Remove rare and surprising possibility of
            // isTerminated() && getPoolSize() > 0
            return runStateAtLeast(ctl.get(), TIDYING) ? 0
                : workers.size();
        } finally {
            mainLock.unlock();
        }
    }

    
    public int getActiveCount() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            int n = 0;
            for (Worker w : workers)
                if (w.isLocked())
                    ++n;
            return n;
        } finally {
            mainLock.unlock();
        }
    }

    
    public int getLargestPoolSize() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            return largestPoolSize;
        } finally {
            mainLock.unlock();
        }
    }

    
    public long getTaskCount() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            long n = completedTaskCount;
            for (Worker w : workers) {
                n += w.completedTasks;
                if (w.isLocked())
                    ++n;
            }
            return n + workQueue.size();
        } finally {
            mainLock.unlock();
        }
    }

    
    public long getCompletedTaskCount() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            long n = completedTaskCount;
            for (Worker w : workers)
                n += w.completedTasks;
            return n;
        } finally {
            mainLock.unlock();
        }
    }

   
    public String toString() {
        long ncompleted;
        int nworkers, nactive;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            ncompleted = completedTaskCount;
            nactive = 0;
            nworkers = workers.size();
            for (Worker w : workers) {
                ncompleted += w.completedTasks;
                if (w.isLocked())
                    ++nactive;
            }
        } finally {
            mainLock.unlock();
        }
        int c = ctl.get();
        String rs = (runStateLessThan(c, SHUTDOWN) ? "Running" :
                     (runStateAtLeast(c, TERMINATED) ? "Terminated" :
                      "Shutting down"));
        return super.toString() +
            "[" + rs +
            ", pool size = " + nworkers +
            ", active threads = " + nactive +
            ", queued tasks = " + workQueue.size() +
            ", completed tasks = " + ncompleted +
            "]";
    }

    
    protected void beforeExecute(Thread t, Runnable r) { }

   
    protected void afterExecute(Runnable r, Throwable t) { }

   
    protected void terminated() { }
    
    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        
        public CallerRunsPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

    
    public static class AbortPolicy implements RejectedExecutionHandler {
        
        public AbortPolicy() { }

       
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

    
    public static class DiscardPolicy implements RejectedExecutionHandler {
        
        public DiscardPolicy() { }
        
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

    
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        
        public DiscardOldestPolicy() { }
        
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }
}