万字长文-线程池源码深入分析
完整的线程池源码注释见章尾。
一、JDK 中的线程池类图
线程池核心类图
classDiagram
%% ================= 接口与抽象类 =================
class Executor {
<<interface>>
+execute(Runnable command) void
}
class ExecutorService {
<<interface>>
+shutdown() void
+shutdownNow() List~Runnable~
+submit(Callable~T~ task) Future~T~
+invokeAll(Collection~Callable~T~~ tasks) List~Future~T~~
...其他方法...
}
class AbstractExecutorService {
<<abstract>>
+submit(Runnable task) Future~?~
+invokeAny(Collection~Callable~T~~ tasks) T
...其他方法...
}
%% ================= 核心实现类 =================
class ThreadPoolExecutor {
-corePoolSize: int
-maximumPoolSize: int
-keepAliveTime: long
-workQueue: BlockingQueue~Runnable~
-threadFactory: ThreadFactory
-handler: RejectedExecutionHandler
+execute(Runnable command) void
+shutdown() void
+shutdownNow() List~Runnable~
+prestartAllCoreThreads() int
...其他方法...
}
class ScheduledThreadPoolExecutor {
+schedule(Runnable command, long delay, TimeUnit unit) ScheduledFuture~?~
+scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) ScheduledFuture~?~
...其他定时任务方法...
}
%% ================= 依赖组件 =================
class BlockingQueue~Runnable~ {
<<interface>>
+offer(Runnable e) boolean
+poll(long timeout, TimeUnit unit) Runnable
+take() Runnable
...其他方法...
}
class ThreadFactory {
<<interface>>
+newThread(Runnable r) Thread
}
class RejectedExecutionHandler {
<<interface>>
+rejectedExecution(Runnable r, ThreadPoolExecutor executor) void
}
%% ================= 继承与实现关系 =================
Executor <|-- ExecutorService
ExecutorService <|.. AbstractExecutorService
AbstractExecutorService <|-- ThreadPoolExecutor
ThreadPoolExecutor <|-- ScheduledThreadPoolExecutor
%% ================= 组合关系 =================
ThreadPoolExecutor *-- BlockingQueue~Runnable~ : 任务队列
ThreadPoolExecutor *-- ThreadFactory : 线程工厂
ThreadPoolExecutor *-- RejectedExecutionHandler : 拒绝策略
类图说明
1. 接口与抽象类
-
Executor
最基础的执行器接口,定义任务提交方法execute()
。 -
ExecutorService
扩展Executor
,添加线程池生命周期管理(如shutdown()
)和任务批量操作方法(如submit()
)。 -
AbstractExecutorService
提供ExecutorService
接口的默认实现(如submit()
的通用逻辑)。
2. 核心实现类
-
ThreadPoolExecutor
标准线程池实现,核心参数:corePoolSize
: 核心线程数maximumPoolSize
: 最大线程数keepAliveTime
: 非核心线程空闲存活时间workQueue
: 任务队列(如LinkedBlockingQueue
)threadFactory
: 线程创建工厂handler
: 拒绝策略(如AbortPolicy
)
-
ScheduledThreadPoolExecutor
支持定时任务的线程池(继承自ThreadPoolExecutor
)。
3. 依赖组件
-
BlockingQueue<Runnable>
任务队列接口,决定线程池的任务调度策略(如无界队列、有界队列、同步移交队列)。 -
ThreadFactory
线程创建工厂,允许自定义线程名称、优先级、守护状态等。 -
RejectedExecutionHandler
拒绝策略处理器,定义队列和线程池满时的行为(如抛出异常、直接运行、静默丢弃等)。
类关系说明
关系类型 | 示例 | 说明 |
---|---|---|
**继承(< | --)** | `Executor < |
**实现(< | ..)** | `ExecutorService < |
组合(*--) | ThreadPoolExecutor *-- BlockingQueue |
线程池持有任务队列实例 |
渲染方法
-
在线预览
将代码粘贴到 Mermaid Live Editor 中查看效果。 -
本地工具
使用支持 Mermaid 的 Markdown 工具(如 VSCode + Mermaid 插件、Typora)。
应用场景
- 面试复习:快速理解线程池的架构设计
- 代码评审:分析自定义线程池的参数配置
- 系统设计:规划线程池与其他组件的交互关系
通过此图可以清晰看到线程池如何通过 组合模式 将任务队列、线程工厂、拒绝策略等组件解耦,实现高度可定制化的并发处理框架。
二、线程池构造函数详解
ThreadPoolExecutor
是 Java 中功能最强大的线程池实现类,其构造函数包含多个核心参数,直接影响线程池的行为和性能。以下是其构造函数及各参数的详细解析:
public ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler
)
1. 核心参数解析
参数名 | 类型 | 作用 | 默认值/示例 |
---|---|---|---|
corePoolSize | int | 核心线程数:即使线程空闲也保留的线程数(除非设置allowCoreThreadTimeOut ) |
需显式设置(如CPU密集型任务建议设置为CPU核数) |
maximumPoolSize | int | 最大线程数:线程池允许创建的最大线程数 | 需显式设置(一般为核心数的2倍) |
keepAliveTime | long | 非核心线程的空闲存活时间:超出核心数的线程在空闲时的存活时间 | 60(秒) |
unit | TimeUnit | 时间单位:keepAliveTime 的时间单位(纳秒、毫秒、秒等) |
TimeUnit.SECONDS |
workQueue | BlockingQueue |
任务队列:用于保存等待执行的任务的阻塞队列 | LinkedBlockingQueue |
threadFactory | ThreadFactory | 线程工厂:用于创建新线程(可自定义线程名称、优先级等) | Executors.defaultThreadFactory() |
handler | RejectedExecutionHandler | 拒绝策略:当线程池和队列都满时,处理新提交任务的策略 | AbortPolicy(抛出异常) |
2. 线程池工作流程
当新任务提交时,线程池的处理逻辑如下:
graph TD
A[提交任务] --> B{核心线程是否已满?}
B -->|未满| C[创建新线程执行]
B -->|已满| D{队列是否已满?}
D -->|未满| E[任务入队等待]
D -->|已满| F{最大线程数是否已满?}
F -->|未满| G[创建非核心线程执行]
F -->|已满| H[触发拒绝策略]
具体步骤:
- 核心线程优先:任务提交后优先创建核心线程执行。
- 任务入队:核心线程满后,任务进入阻塞队列。
- 扩容线程:当队列已满且线程数未达最大值时,创建非核心线程。
- 拒绝策略:队列和线程池均满时,按策略处理新任务。
3. 关键参数详解
生产环境需要结合实际业务,压测并优化线程数。
(1)线程数配置
-
CPU密集型任务(如计算、压缩):
corePoolSize = CPU核数
(Runtime.getRuntime().availableProcessors()
)
maximumPoolSize = corePoolSize
(避免过多线程竞争) -
IO密集型任务(如网络请求、数据库操作):
corePoolSize = CPU核数 * 2
maximumPoolSize = corePoolSize + 预期并发数
(2)任务队列(workQueue)
队列类型 | 特点 | 适用场景 |
---|---|---|
LinkedBlockingQueue | 无界队列(默认容量Integer.MAX_VALUE ),导致maximumPoolSize 无效 |
任务量平稳且需要保证所有任务被处理 |
ArrayBlockingQueue | 有界队列,需指定固定容量 | 需要控制队列大小防止内存溢出 |
SynchronousQueue | 不存储任务,直接移交线程执行(需配合较大maximumPoolSize ) |
高并发且任务处理快速的场景 |
PriorityBlockingQueue | 带优先级的无界队列 | 需要任务按优先级执行 |
示例:
// 创建容量为100的有界队列
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(100);
(3)拒绝策略(handler)
策略类 | 行为 | 适用场景 |
---|---|---|
AbortPolicy | 直接抛出RejectedExecutionException |
需要明确感知任务被拒绝(生产环境常用) |
CallerRunsPolicy | 由提交任务的线程直接执行任务 | 需要保证任务不丢失(如日志记录) |
DiscardPolicy | 静默丢弃新任务 | 可容忍任务丢失(如监控数据上报) |
DiscardOldestPolicy | 丢弃队列中最旧的任务,然后重试提交 | 需要优先处理新任务(如实时消息处理) |
自定义拒绝策略示例:
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 记录日志并降级处理
logger.warn("Task rejected: {}", r);
r.run(); // 降级由当前线程执行
}
}
(4)线程工厂(threadFactory)
用于自定义线程属性,增强可观测性:
ThreadFactory factory = new ThreadFactory() {
private final AtomicInteger counter = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("Worker-" + counter.getAndIncrement());
t.setPriority(Thread.NORM_PRIORITY);
t.setDaemon(false); // 非守护线程
return t;
}
};
4. 完整配置示例
ThreadPoolExecutor executor = new ThreadPoolExecutor(
4, // corePoolSize
8, // maximumPoolSize
30, // keepAliveTime
TimeUnit.SECONDS, // unit
new ArrayBlockingQueue<>(100), // workQueue
new CustomThreadFactory(), // threadFactory
new ThreadPoolExecutor.CallerRunsPolicy() // handler
);
5. 注意事项
- 避免使用无界队列:可能导致内存溢出(如
LinkedBlockingQueue
默认容量极大)。 - 合理设置最大线程数:避免过高导致线程竞争,或过低导致任务堆积。
- 监控线程池状态:通过
getActiveCount()
、getQueue().size()
等API监控运行状态。 - 优雅关闭:调用
shutdown()
平滑关闭,或shutdownNow()
强制终止。
6. 常见问题
Q1:核心线程数设为0会怎样?
- 所有线程都会被视为非核心线程,空闲时会被回收。适合任务量波动大的场景。
Q2:如何预热核心线程?
- 调用
prestartAllCoreThreads()
提前创建所有核心线程。
Q3:动态调整参数
- 通过
setCorePoolSize()
和setMaximumPoolSize()
动态调整线程数。
通过合理配置这些参数,可以构建出适应不同业务场景的高效线程池,平衡系统资源利用率和任务处理能力。
三、状态与线程数的原子控制
/**
* 用它来计算当前线程池的运行状态:原子变量,高3位保存线程池状态,低29位保存工作线程数
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
在线程池中,线程状态和线程数维护在一个单原子变量中,这样做可以避免多变量同步问题。
/**
* 29位
*/
private static final int COUNT_BITS = Integer.SIZE - 3;
/**
* 最大线程数:2^29 - 1
*/
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
/**
* 线程池状态
* 1. RUNNING:接受新任务并处理阻塞队列中的任务。11100000 00000000 00000000 00000000
* 2. SHUTDOWN:不接受新任务,但处理阻塞队列中的任务。00000000 00000000 00000000 00000000
* 3. STOP:不接受新任务,不处理阻塞队列中的任务,中断正在处理的任务。00100000 00000000 00000000 00000000
* 4. TIDYING:所有任务已终止,workerCount(有效线程数)为0,线程过渡到该状态时,会执行terminated()方法。00100000 00000000 00000000 00000000
* 5. TERMINATED:terminated()方法完成后,线程池的状态就会变成TERMINATED。01100000 00000000 00000000 00000000
*/
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
/**
* 计算线程池状态
* 解释如下:
* 1. ctl是一个AtomicInteger类型的变量,它同时存储了线程池的状态和当前工作线程数
* 2. ctl的高3位存储线程池状态,低29位存储工作线程数
* 3. CAPACITY是一个常量,值为(1 << COUNT_BITS) - 1,其中COUNT_BITS是29
* 4. ~CAPACITY是对CAPACITY取反,得到的是一个高3位为1,低29位为0的掩码
* 5. c & ~CAPACITY通过与操作,将ctl的低29位(工作线程数)置为0,只保留高3位的线程池状态
*/
private static int runStateOf(int c) { return c & ~CAPACITY; }
/**
* 计算工作线程数
* 解释如下:
* 1. ctl是一个AtomicInteger类型的变量,它同时存储了线程池的状态和当前工作线程数
* 2. ctl的高3位存储线程池状态,低29位存储工作线程数
* 3. CAPACITY是一个常量,值为(1 << COUNT_BITS) - 1,其中COUNT_BITS是29
* 4. c & CAPACITY通过与操作,将ctl的高3位(线程池状态)置为0,只保留低29位的工作线程数
*/
private static int workerCountOf(int c) { return c & CAPACITY; }
/**
* 计算ctl的值
* 解释如下:
* 1. rs是线程池状态,wc是工作线程数
* 2. rs << COUNT_BITS将线程池状态左移29位,得到一个高3位为线程池状态,低29位为0的数字
* 3. wc & CAPACITY将工作线程数与CAPACITY进行与操作,得到一个低29位为工作线程数,高3位为0的数字
* 4. rs << COUNT_BITS | wc & CAPACITY将两个数字进行或操作,得到一个高3位为线程池状态,低29位为工作线程数的数字
*/
private static int ctlOf(int rs, int wc) { return rs | wc; }
/*
* Bit field accessors that don't require unpacking ctl.
* These depend on the bit layout and on workerCount being never negative.
*/
/**
* 线程池状态小于s
* 解释如下:
* 1. c是ctl的值,它同时存储了线程池的状态和当前工作线程数
* 2. s是线程池状态
* 3. c < s通过比较操作,判断ctl的高3位是否小于s,如果小于,则返回true,否则返回false
*/
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
/**
* 线程池状态大于等于s
* 解释如下:
* 1. c是ctl的值,它同时存储了线程池的状态和当前工作线程数
* 2. s是线程池状态
* 3. c >= s通过比较操作,判断ctl的高3位是否大于等于s,如果大于等于,则返回true,否则返回false
*/
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
/**
* 线程池状态是RUNNING
* 解释如下:
* 1. c是ctl的值,它同时存储了线程池的状态和当前工作线程数
* 2. c == RUNNING通过比较操作,判断ctl的高3位是否等于RUNNING,如果等于,则返回true,否则返回false
*/
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
四、Worker线程核心逻辑
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
final Thread thread; // 实际执行任务的线程对象
Runnable firstTask; // 初始任务(可能为null)
volatile long completedTasks; // 完成的任务计数器
Worker(Runnable firstTask) {
setState(-1); // 初始状态-1禁止中断(直到runWorker执行)
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); // 关键:将Worker自身作为Runnable传给线程
}
public void run() {
runWorker(this); // 实际执行入口
}
// 实现简单的不可重入锁
protected boolean isHeldExclusively() { return getState() != 0; }
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
}
关键设计:
- Worker本身作为Runnable:通过
thread.start()
触发runWorker()
执行 - AQS锁的作用:
- 防止任务执行期间被外部中断
- 实现
shutdownNow()
时批量中断空闲线程
- 状态-1初始化:避免线程启动前被意外中断
五、任务提交完整流程
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 阶段1:尝试创建核心线程
if (workerCountOf(c) < corePoolSize) { // 条件1:当前线程数 < 核心数
if (addWorker(command, true)) // 参数true表示核心线程
return; // 创建成功直接返回
c = ctl.get(); // 创建失败(并发导致状态变化)重新获取ctl
}
// 阶段2:尝试将任务加入队列
if (isRunning(c) && workQueue.offer(command)) { // 再次检查运行状态并入队
int recheck = ctl.get();
if (!isRunning(recheck) && remove(command)) // 二次检查:线程池已关闭?
reject(command); // 触发拒绝策略(任务被移除)
else if (workerCountOf(recheck) == 0) // 无存活线程但队列有任务
addWorker(null, false); // 创建救急线程处理队列
}
// 阶段3:队列已满,尝试创建非核心线程
else if (!addWorker(command, false)) // 参数false表示非核心线程
reject(command); // 创建失败触发拒绝策略
}
关键设计:
- 核心线程创建:即使队列未满,只要线程数不足立即创建
- 队列双检锁:
workQueue.offer()
后必须二次检查线程池状态 - 救急线程机制:当队列有任务但线程数为零时(例如被回收),创建新线程处理
六、任务执行核心(runWorker() 完整流程)
在Worker
类中,执行 run
方法,底层调用了 runWorker
方法。在 runWorker
方法中,提供了一些扩展点,比如说 beforeExecute(wt, task)
和 afterExecute(task, thrown)
。在自定义线程池中,可以扩展实现这两个方法并自定义逻辑,比如加上耗时时间日志之类的。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 状态置为0,允许中断
boolean completedAbruptly = true;
try {
// 循环获取任务:首次执行firstTask,后续从队列获取
while (task != null || (task = getTask()) != null) {
w.lock(); // 加锁防止中断
// 处理线程中断信号(STOP状态)
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() && // 清除中断标志
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt(); // 重新设置中断标志
try {
beforeExecute(wt, task); // 扩展点:执行前回调
Throwable thrown = null;
try {
task.run(); // 执行用户任务
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); // 扩展点:执行后回调
}
} finally {
task = null; // 清空当前任务
w.completedTasks++;
w.unlock(); // 释放锁
}
}
completedAbruptly = false; // 正常退出循环
} finally {
processWorkerExit(w, completedAbruptly); // 处理线程退出
}
}
关键机制:
- 锁的释放顺序:
unlock()
必须在completedTasks++
之后,保证计数的可见性 - 异常处理:捕获所有Throwable但仅记录Error和RuntimeException
- 扩展点:
beforeExecute()
和afterExecute()
可用于监控任务执行
七、任务获取机制(getTask() 源码解析)
private Runnable getTask() {
boolean timedOut = false; // 上次poll是否超时
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 状态检查:当线程池关闭且队列为空时不再获取任务
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount(); // 减少线程数
return null;
}
int wc = workerCountOf(c);
// 是否允许超时回收:当前线程数超过核心数 或 允许核心线程超时
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 两个终止条件:
// 1. (线程数超过最大值 或 超时发生) 且 (线程数>1 或 队列为空)
// 2. 线程数超过容量限制
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 根据timed决定使用poll或take
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true; // 标记超时
} catch (InterruptedException retry) {
timedOut = false; // 中断重试
}
}
}
核心逻辑:
- 超时控制:非核心线程使用
poll(keepAliveTime)
,核心线程默认使用take()
(除非开启allowCoreThreadTimeOut
) - 优雅终止:当线程池关闭且队列为空时,逐步减少工作线程数
- 并发安全:通过CAS操作
compareAndDecrementWorkerCount
保证线程数准确
八、线程退出处理(processWorkerExit() 逻辑)
在 任务执行核心runWorker()
方法的最后,会调用 processWorkerExit()
执行线程退出逻辑。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 异常退出时需要手动减少线程数
if (completedAbruptly)
decrementWorkerCount();
// 统计完成任务数
completedTaskCount += w.completedTasks;
workers.remove(w); // 从集合中移除Worker
// 尝试终止线程池(如果状态是SHUTDOWN且工作队列为空)
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) { // 线程池仍在运行
if (!completedAbruptly) { // 正常退出
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && !workQueue.isEmpty())
min = 1; // 至少保留一个线程处理队列任务
if (workerCountOf(c) >= min)
return; // 无需补充新线程
}
addWorker(null, false); // 创建新Worker处理剩余任务
}
}
核心逻辑:
- 补偿机制:当非异常退出且当前线程数不足时,补充新线程
- 保留核心线程:根据
allowCoreThreadTimeOut
决定是否保留核心线程 - 尝试终止线程池:
tryTerminate()
会向其他线程发送终止信号
九、线程池终止流程(shutdown() vs shutdownNow())
1. shutdown()
public void shutdown() {
checkShutdownAccess();
advanceRunState(SHUTDOWN); // 状态转为SHUTDOWN
interruptIdleWorkers(); // 仅中断空闲线程
onShutdown(); // 空方法(扩展点)
}
private void interruptIdleWorkers() {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) { // 尝试获取Worker锁
try {
t.interrupt();
} finally {
w.unlock();
}
}
}
}
2. shutdownNow()
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
checkShutdownAccess();
advanceRunState(STOP); // 直接进入STOP状态
interruptWorkers(); // 强制中断所有线程
tasks = drainQueue(); // 清空任务队列
return tasks;
}
private void interruptWorkers() {
for (Worker w : workers) {
w.interruptIfStarted(); // 不需要获取锁,直接中断
}
}
关键区别:
shutdown()
允许执行完队列剩余任务,shutdownNow()
立即停止所有任务- 中断策略:
shutdown()
只中断空闲线程,shutdownNow()
中断所有线程
十、线程池扩展点/动态修改/监控
凡是可以 set 的成员变量,都可以被动态更新
- 线程池的核心线程数量 corePoolSize 可以被动态更新。
- 线程池的最大线程数量 maximumPoolSize 可以被动态更新。
- 线程池的拒绝策略处理器 RejectedExecutionHandler 可以被动态更新。
- 线程池核心线程是否允许超时回收的标志 allowCoreThreadTimeOut 可以被动态更新。
- 线程池线程的最大空闲时间 keepAliveTime 可以被动态更新(之所以没有 threadFactory,是因为一般来说不会动态更新线程工厂)。
凡是可以 get 的成员变量,它们的信息都可以在线程池运行过程中被收集:
- 线程池的核心线程数 corePoolSize、最大线程数 maximumPoolSize 可以被收集。
- 线程池线程的空闲时间 keepAliveTime、核心线程是否允许超时回收 allowCoreThreadTimeOut 可以被收集。
- 线程池的拒绝策略 RejectedExecutionHandler 和任务队列 workQueue 可以被收集。
- 线程池当前创建的线程数量 poolSize、曾经创建线程的最大数量 largestPoolSize、当前活跃线程数量 activeCount 可以被收集。
- 线程池的执行的任务总数 taskCount、已经执行完毕的任务总数 completedTaskCount 可以被收集。
1. 动态参数调整(setCorePoolSize 示例)
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0)
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize;
this.corePoolSize = corePoolSize;
// 如果当前线程数超过新核心数,尝试中断空闲线程
if (workerCountOf(ctl.get()) > corePoolSize)
interruptIdleWorkers();
// 如果新核心数更大,可能需要创建新线程
else if (delta > 0) {
int k = Math.min(delta, workQueue.size()); // 需要创建的线程数
while (k-- > 0) {
if (addWorker(null, true)) // 创建核心线程处理队列任务
continue;
break;
}
}
}
设计亮点:
- 动态扩容:当核心数增加时,自动创建新线程处理积压任务
- 智能缩容:通过
interruptIdleWorkers()
逐步回收多余线程
2. 拒绝策略实现原理(以CallerRunsPolicy为例)
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) { // 线程池未关闭
r.run(); // 由调用者线程直接执行任务
}
}
}
特点:
- 同步执行可能降低整体吞吐量,但保证任务不会丢失
- 适用于需要保证任务绝对执行的场景
3. 监控接口实现原理
可以通过继承 ThreadPoolExecutor
并重写钩子方法实现监控:
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
// 记录任务开始时间
((MyTask) r).setStartTime(System.nanoTime());
}
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
// 统计任务耗时
long cost = System.nanoTime() - ((MyTask) r).getStartTime();
metrics.recordTaskTime(cost);
}
protected void terminated() {
super.terminated();
// 线程池完全终止时触发
logger.info("ThreadPool terminated");
}
设计哲学总结:
- 状态驱动:所有行为由
ctl
的状态变化触发 - 锁粒度优化:Worker级别的锁而非全局锁,提高并发度
- 资源弹性:动态调整线程数,平衡系统负载
- 扩展友好:通过多个protected方法提供扩展点
- 失败隔离:单个任务异常不会导致整个线程池崩溃
建议通过调试模式观察以下场景的代码路径:
- 核心线程数、队列容量、最大线程数都满时的拒绝流程
- keepAliveTime到期时线程回收过程
- shutdown() 与队列中剩余任务的交互
- 核心线程超时参数开启后的行为变
十一、ThreadPoolExecutor 类源码注释
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* 用它来计算当前线程池的运行状态:原子变量,高3位保存线程池状态,低29位保存工作线程数
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
/**
* 29位
*/
private static final int COUNT_BITS = Integer.SIZE - 3;
/**
* 最大线程数:2^29 - 1
*/
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
/**
* 线程池状态
* 1. RUNNING:接受新任务并处理阻塞队列中的任务。11100000 00000000 00000000 00000000
* 2. SHUTDOWN:不接受新任务,但处理阻塞队列中的任务。00000000 00000000 00000000 00000000
* 3. STOP:不接受新任务,不处理阻塞队列中的任务,中断正在处理的任务。00100000 00000000 00000000 00000000
* 4. TIDYING:所有任务已终止,workerCount(有效线程数)为0,线程过渡到该状态时,会执行terminated()方法。00100000 00000000 00000000 00000000
* 5. TERMINATED:terminated()方法完成后,线程池的状态就会变成TERMINATED。01100000 00000000 00000000 00000000
*/
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
/**
* 计算线程池状态
* 解释如下:
* 1. ctl是一个AtomicInteger类型的变量,它同时存储了线程池的状态和当前工作线程数
* 2. ctl的高3位存储线程池状态,低29位存储工作线程数
* 3. CAPACITY是一个常量,值为(1 << COUNT_BITS) - 1,其中COUNT_BITS是29
* 4. ~CAPACITY是对CAPACITY取反,得到的是一个高3位为1,低29位为0的掩码
* 5. c & ~CAPACITY通过与操作,将ctl的低29位(工作线程数)置为0,只保留高3位的线程池状态
*/
private static int runStateOf(int c) { return c & ~CAPACITY; }
/**
* 计算工作线程数
* 解释如下:
* 1. ctl是一个AtomicInteger类型的变量,它同时存储了线程池的状态和当前工作线程数
* 2. ctl的高3位存储线程池状态,低29位存储工作线程数
* 3. CAPACITY是一个常量,值为(1 << COUNT_BITS) - 1,其中COUNT_BITS是29
* 4. c & CAPACITY通过与操作,将ctl的高3位(线程池状态)置为0,只保留低29位的工作线程数
*/
private static int workerCountOf(int c) { return c & CAPACITY; }
/**
* 计算ctl的值
* 解释如下:
* 1. rs是线程池状态,wc是工作线程数
* 2. rs << COUNT_BITS将线程池状态左移29位,得到一个高3位为线程池状态,低29位为0的数字
* 3. wc & CAPACITY将工作线程数与CAPACITY进行与操作,得到一个低29位为工作线程数,高3位为0的数字
* 4. rs << COUNT_BITS | wc & CAPACITY将两个数字进行或操作,得到一个高3位为线程池状态,低29位为工作线程数的数字
*/
private static int ctlOf(int rs, int wc) { return rs | wc; }
/*
* Bit field accessors that don't require unpacking ctl.
* These depend on the bit layout and on workerCount being never negative.
*/
/**
* 线程池状态小于s
* 解释如下:
* 1. c是ctl的值,它同时存储了线程池的状态和当前工作线程数
* 2. s是线程池状态
* 3. c < s通过比较操作,判断ctl的高3位是否小于s,如果小于,则返回true,否则返回false
*/
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
/**
* 线程池状态大于等于s
* 解释如下:
* 1. c是ctl的值,它同时存储了线程池的状态和当前工作线程数
* 2. s是线程池状态
* 3. c >= s通过比较操作,判断ctl的高3位是否大于等于s,如果大于等于,则返回true,否则返回false
*/
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
/**
* 线程池状态是RUNNING
* 解释如下:
* 1. c是ctl的值,它同时存储了线程池的状态和当前工作线程数
* 2. c == RUNNING通过比较操作,判断ctl的高3位是否等于RUNNING,如果等于,则返回true,否则返回false
*/
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/**
* Attempts to CAS-increment the workerCount field of ctl.
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
/**
* Attempts to CAS-decrement the workerCount field of ctl.
*/
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
/**
* 自旋减少工作线程数
*/
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
/**
* 任务队列
*/
private final BlockingQueue<Runnable> workQueue;
private final ReentrantLock mainLock = new ReentrantLock();
/**
* 线程池核心任务:相当于真正意义上的线程池。
* 当线程池创建时,会创建一个Worker对象,Worker对象会创建一个线程,线程会执行Worker对象的run方法。
*
* 因为Worker中定义了Thread成员变量,可以说一个Worker就对应着一个线程,线程池每创建
* 一个Worker对象就意味着创建了一个新的线程,并且会把新创建的Worker对象添加到workerPool中
* Set containing all worker threads in pool. Accessed only when
* holding mainLock.
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
/**
*
* Wait condition to support awaitTermination
*/
private final Condition termination = mainLock.newCondition();
/**
* 线程池执行完毕的任务的数量
*/
private int largestPoolSize;
/**
* 线程池执行完毕的任务的数量
*/
private long completedTaskCount;
/**
* 创建线程的工厂
*/
private volatile ThreadFactory threadFactory;
/**
* 拒绝策略处理器
*/
private volatile RejectedExecutionHandler handler;
/**
* 线程的存活时间,这个一般是针对非核心线程的,如果allowCoreThreadTimeOut设置为true了
* 那么核心线程在经过keepAliveTime空闲时间之后,也会被回收
*/
private volatile long keepAliveTime;
/**
* 这个成员变量就是用来判断是否允许核心线程超时后被回收的标志
* 也就是说,当线程池中没有了任务,那么超过线程存活时间之后,线程池的核心线程也允许被回收
*/
private volatile boolean allowCoreThreadTimeOut;
/**
* 线程池的核心线程数量
*/
private volatile int corePoolSize;
/**
* 线程池最大线程数量
*
*/
private volatile int maximumPoolSize;
/**
* The default rejected execution handler
*/
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
/**
* Permission required for callers of shutdown and shutdownNow.
* We additionally require (see checkShutdownAccess) that callers
* have permission to actually interrupt threads in the worker set
* (as governed by Thread.interrupt, which relies on
* ThreadGroup.checkAccess, which in turn relies on
* SecurityManager.checkAccess). Shutdowns are attempted only if
* these checks pass.
*
* All actual invocations of Thread.interrupt (see
* interruptIdleWorkers and interruptWorkers) ignore
* SecurityExceptions, meaning that the attempted interrupts
* silently fail. In the case of shutdown, they should not fail
* unless the SecurityManager has inconsistent policies, sometimes
* allowing access to a thread and sometimes not. In such cases,
* failure to actually interrupt threads may disable or delay full
* termination. Other uses of interruptIdleWorkers are advisory,
* and failure to actually interrupt will merely delay response to
* configuration changes so is not handled exceptionally.
*/
private static final RuntimePermission shutdownPerm =
new RuntimePermission("modifyThread");
/* The context to be used when executing the finalizer, or null. */
private final AccessControlContext acc;
/**
* Class Worker mainly maintains interrupt control state for
* threads running tasks, along with other minor bookkeeping.
* This class opportunistically extends AbstractQueuedSynchronizer
* to simplify acquiring and releasing a lock surrounding each
* task execution. This protects against interrupts that are
* intended to wake up a worker thread waiting for a task from
* instead interrupting a task being run. We implement a simple
* non-reentrant mutual exclusion lock rather than use
* ReentrantLock because we do not want worker tasks to be able to
* reacquire the lock when they invoke pool control methods like
* setCorePoolSize. Additionally, to suppress interrupts until
* the thread actually starts running tasks, we initialize lock
* state to a negative value, and clear it upon start (in
* runWorker).
*/
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
/*
* Methods for setting control state
*/
/**
* Transitions runState to given target, or leaves it alone if
* already at least the given target.
*
* @param targetState the desired state, either SHUTDOWN or STOP
* (but not TIDYING or TERMINATED -- use tryTerminate for that)
*/
private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
/**
* Transitions to TERMINATED state if either (SHUTDOWN and pool
* and queue empty) or (STOP and pool empty). If otherwise
* eligible to terminate but workerCount is nonzero, interrupts an
* idle worker to ensure that shutdown signals propagate. This
* method must be called following any action that might make
* termination possible -- reducing worker count or removing tasks
* from the queue during shutdown. The method is non-private to
* allow access from ScheduledThreadPoolExecutor.
*/
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
/*
* Methods for controlling interrupts to worker threads.
*/
/**
* If there is a security manager, makes sure caller has
* permission to shut down threads in general (see shutdownPerm).
* If this passes, additionally makes sure the caller is allowed
* to interrupt each worker thread. This might not be true even if
* first check passed, if the SecurityManager treats some threads
* specially.
*/
private void checkShutdownAccess() {
SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkPermission(shutdownPerm);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
security.checkAccess(w.thread);
} finally {
mainLock.unlock();
}
}
}
/**
* Interrupts all threads, even if active. Ignores SecurityExceptions
* (in which case some threads may remain uninterrupted).
*/
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
/**
* Interrupts threads that might be waiting for tasks (as
* indicated by not being locked) so they can check for
* termination or configuration changes. Ignores
* SecurityExceptions (in which case some threads may remain
* uninterrupted).
*
* @param onlyOne If true, interrupt at most one worker. This is
* called only from tryTerminate when termination is otherwise
* enabled but there are still other workers. In this case, at
* most one waiting worker is interrupted to propagate shutdown
* signals in case all threads are currently waiting.
* Interrupting any arbitrary thread ensures that newly arriving
* workers since shutdown began will also eventually exit.
* To guarantee eventual termination, it suffices to always
* interrupt only one idle worker, but shutdown() interrupts all
* idle workers so that redundant workers exit promptly, not
* waiting for a straggler task to finish.
*/
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
/**
* Common form of interruptIdleWorkers, to avoid having to
* remember what the boolean argument means.
*/
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private static final boolean ONLY_ONE = true;
/*
* Misc utilities, most of which are also exported to
* ScheduledThreadPoolExecutor
*/
/**
* Invokes the rejected execution handler for the given command.
* Package-protected for use by ScheduledThreadPoolExecutor.
*/
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
/**
* Performs any further cleanup following run state transition on
* invocation of shutdown. A no-op here, but used by
* ScheduledThreadPoolExecutor to cancel delayed tasks.
*/
void onShutdown() {
}
/**
* State check needed by ScheduledThreadPoolExecutor to
* enable running tasks during shutdown.
*
* @param shutdownOK true if should return true if SHUTDOWN
*/
final boolean isRunningOrShutdown(boolean shutdownOK) {
int rs = runStateOf(ctl.get());
return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
}
/**
* Drains the task queue into a new list, normally using
* drainTo. But if the queue is a DelayQueue or any other kind of
* queue for which poll or drainTo may fail to remove some
* elements, it deletes them one by one.
*/
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
/*
* Methods for creating, running and cleaning up after workers
*/
/**
* Checks if a new worker can be added with respect to current
* pool state and the given bound (either core or maximum). If so,
* the worker count is adjusted accordingly, and, if possible, a
* new worker is created and started, running firstTask as its
* first task. This method returns false if the pool is stopped or
* eligible to shut down. It also returns false if the thread
* factory fails to create a thread when asked. If the thread
* creation fails, either due to the thread factory returning
* null, or due to an exception (typically OutOfMemoryError in
* Thread.start()), we roll back cleanly.
*
* @param firstTask the task the new thread should run first (or
* null if none). Workers are created with an initial first task
* (in method execute()) to bypass queuing when there are fewer
* than corePoolSize threads (in which case we always start one),
* or when the queue is full (in which case we must bypass queue).
* Initially idle threads are usually created via
* prestartCoreThread or to replace other dying workers.
*
* @param core if true use corePoolSize as bound, else
* maximumPoolSize. (A boolean indicator is used here rather than a
* value to ensure reads of fresh values after checking other pool
* state).
* @return true if successful
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
/**
* Rolls back the worker thread creation.
* - removes worker from workers, if present
* - decrements worker count
* - rechecks for termination, in case the existence of this
* worker was holding up termination
*/
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
/**
* Performs cleanup and bookkeeping for a dying worker. Called
* only from worker threads. Unless completedAbruptly is set,
* assumes that workerCount has already been adjusted to account
* for exit. This method removes thread from worker set, and
* possibly terminates the pool or replaces the worker if either
* it exited due to user task exception or if fewer than
* corePoolSize workers are running or queue is non-empty but
* there are no workers.
*
* @param w the worker
* @param completedAbruptly if the worker died due to user exception
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
/**
* Performs blocking or timed wait for a task, depending on
* current configuration settings, or returns null if this worker
* must exit because of any of:
* 1. There are more than maximumPoolSize workers (due to
* a call to setMaximumPoolSize).
* 2. The pool is stopped.
* 3. The pool is shutdown and the queue is empty.
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait, and if the queue is
* non-empty, this worker is not the last thread in the pool.
*
* @return task, or null if the worker must exit, in which case
* workerCount is decremented
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
/**
* Main worker run loop. Repeatedly gets tasks from queue and
* executes them, while coping with a number of issues:
*
* 1. We may start out with an initial task, in which case we
* don't need to get the first one. Otherwise, as long as pool is
* running, we get tasks from getTask. If it returns null then the
* worker exits due to changed pool state or configuration
* parameters. Other exits result from exception throws in
* external code, in which case completedAbruptly holds, which
* usually leads processWorkerExit to replace this thread.
*
* 2. Before running any task, the lock is acquired to prevent
* other pool interrupts while the task is executing, and then we
* ensure that unless pool is stopping, this thread does not have
* its interrupt set.
*
* 3. Each task run is preceded by a call to beforeExecute, which
* might throw an exception, in which case we cause thread to die
* (breaking loop with completedAbruptly true) without processing
* the task.
*
* 4. Assuming beforeExecute completes normally, we run the task,
* gathering any of its thrown exceptions to send to afterExecute.
* We separately handle RuntimeException, Error (both of which the
* specs guarantee that we trap) and arbitrary Throwables.
* Because we cannot rethrow Throwables within Runnable.run, we
* wrap them within Errors on the way out (to the thread's
* UncaughtExceptionHandler). Any thrown exception also
* conservatively causes thread to die.
*
* 5. After task.run completes, we call afterExecute, which may
* also throw an exception, which will also cause thread to
* die. According to JLS Sec 14.20, this exception is the one that
* will be in effect even if task.run throws.
*
* The net effect of the exception mechanics is that afterExecute
* and the thread's UncaughtExceptionHandler have as accurate
* information as we can provide about any problems encountered by
* user code.
*
* @param w the worker
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
// Public constructors and methods
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters and default thread factory and rejected execution handler.
* It may be more convenient to use one of the {@link Executors} factory
* methods instead of this general purpose constructor.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters and default rejected execution handler.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters and default thread factory.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.
*
* <p>This method does not wait for previously submitted tasks to
* complete execution. Use {@link #awaitTermination awaitTermination}
* to do that.
*
* @throws SecurityException {@inheritDoc}
*/
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
/**
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks
* that were awaiting execution. These tasks are drained (removed)
* from the task queue upon return from this method.
*
* <p>This method does not wait for actively executing tasks to
* terminate. Use {@link #awaitTermination awaitTermination} to
* do that.
*
* <p>There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. This implementation
* cancels tasks via {@link Thread#interrupt}, so any task that
* fails to respond to interrupts may never terminate.
*
* @throws SecurityException {@inheritDoc}
*/
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
public boolean isShutdown() {
return ! isRunning(ctl.get());
}
/**
* Returns true if this executor is in the process of terminating
* after {@link #shutdown} or {@link #shutdownNow} but has not
* completely terminated. This method may be useful for
* debugging. A return of {@code true} reported a sufficient
* period after shutdown may indicate that submitted tasks have
* ignored or suppressed interruption, causing this executor not
* to properly terminate.
*
* @return {@code true} if terminating but not yet terminated
*/
public boolean isTerminating() {
int c = ctl.get();
return ! isRunning(c) && runStateLessThan(c, TERMINATED);
}
public boolean isTerminated() {
return runStateAtLeast(ctl.get(), TERMINATED);
}
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
/**
* Invokes {@code shutdown} when this executor is no longer
* referenced and it has no threads.
*/
protected void finalize() {
SecurityManager sm = System.getSecurityManager();
if (sm == null || acc == null) {
shutdown();
} else {
PrivilegedAction<Void> pa = () -> { shutdown(); return null; };
AccessController.doPrivileged(pa, acc);
}
}
/**
* Sets the thread factory used to create new threads.
*
* @param threadFactory the new thread factory
* @throws NullPointerException if threadFactory is null
* @see #getThreadFactory
*/
public void setThreadFactory(ThreadFactory threadFactory) {
if (threadFactory == null)
throw new NullPointerException();
this.threadFactory = threadFactory;
}
/**
* Returns the thread factory used to create new threads.
*
* @return the current thread factory
* @see #setThreadFactory(ThreadFactory)
*/
public ThreadFactory getThreadFactory() {
return threadFactory;
}
/**
* Sets a new handler for unexecutable tasks.
*
* @param handler the new handler
* @throws NullPointerException if handler is null
* @see #getRejectedExecutionHandler
*/
public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
if (handler == null)
throw new NullPointerException();
this.handler = handler;
}
/**
* Returns the current handler for unexecutable tasks.
*
* @return the current handler
* @see #setRejectedExecutionHandler(RejectedExecutionHandler)
*/
public RejectedExecutionHandler getRejectedExecutionHandler() {
return handler;
}
/**
* Sets the core number of threads. This overrides any value set
* in the constructor. If the new value is smaller than the
* current value, excess existing threads will be terminated when
* they next become idle. If larger, new threads will, if needed,
* be started to execute any queued tasks.
*
* @param corePoolSize the new core size
* @throws IllegalArgumentException if {@code corePoolSize < 0}
* @see #getCorePoolSize
*/
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0)
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize;
this.corePoolSize = corePoolSize;
if (workerCountOf(ctl.get()) > corePoolSize)
interruptIdleWorkers();
else if (delta > 0) {
// We don't really know how many new threads are "needed".
// As a heuristic, prestart enough new workers (up to new
// core size) to handle the current number of tasks in
// queue, but stop if queue becomes empty while doing so.
int k = Math.min(delta, workQueue.size());
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty())
break;
}
}
}
public int getCorePoolSize() {
return corePoolSize;
}
public boolean prestartCoreThread() {
return workerCountOf(ctl.get()) < corePoolSize &&
addWorker(null, true);
}
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true))
++n;
return n;
}
public boolean allowsCoreThreadTimeOut() {
return allowCoreThreadTimeOut;
}
public void allowCoreThreadTimeOut(boolean value) {
if (value && keepAliveTime <= 0)
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
if (value != allowCoreThreadTimeOut) {
allowCoreThreadTimeOut = value;
if (value)
interruptIdleWorkers();
}
}
public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
this.maximumPoolSize = maximumPoolSize;
if (workerCountOf(ctl.get()) > maximumPoolSize)
interruptIdleWorkers();
}
public int getMaximumPoolSize() {
return maximumPoolSize;
}
public void setKeepAliveTime(long time, TimeUnit unit) {
if (time < 0)
throw new IllegalArgumentException();
if (time == 0 && allowsCoreThreadTimeOut())
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
long keepAliveTime = unit.toNanos(time);
long delta = keepAliveTime - this.keepAliveTime;
this.keepAliveTime = keepAliveTime;
if (delta < 0)
interruptIdleWorkers();
}
public long getKeepAliveTime(TimeUnit unit) {
return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
}
public BlockingQueue<Runnable> getQueue() {
return workQueue;
}
public boolean remove(Runnable task) {
boolean removed = workQueue.remove(task);
tryTerminate(); // In case SHUTDOWN and now empty
return removed;
}
public void purge() {
final BlockingQueue<Runnable> q = workQueue;
try {
Iterator<Runnable> it = q.iterator();
while (it.hasNext()) {
Runnable r = it.next();
if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
it.remove();
}
} catch (ConcurrentModificationException fallThrough) {
// Take slow path if we encounter interference during traversal.
// Make copy for traversal and call remove for cancelled entries.
// The slow path is more likely to be O(N*N).
for (Object r : q.toArray())
if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
q.remove(r);
}
tryTerminate(); // In case SHUTDOWN and now empty
}
public int getPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Remove rare and surprising possibility of
// isTerminated() && getPoolSize() > 0
return runStateAtLeast(ctl.get(), TIDYING) ? 0
: workers.size();
} finally {
mainLock.unlock();
}
}
public int getActiveCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int n = 0;
for (Worker w : workers)
if (w.isLocked())
++n;
return n;
} finally {
mainLock.unlock();
}
}
public int getLargestPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
return largestPoolSize;
} finally {
mainLock.unlock();
}
}
public long getTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
for (Worker w : workers) {
n += w.completedTasks;
if (w.isLocked())
++n;
}
return n + workQueue.size();
} finally {
mainLock.unlock();
}
}
public long getCompletedTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
for (Worker w : workers)
n += w.completedTasks;
return n;
} finally {
mainLock.unlock();
}
}
public String toString() {
long ncompleted;
int nworkers, nactive;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
ncompleted = completedTaskCount;
nactive = 0;
nworkers = workers.size();
for (Worker w : workers) {
ncompleted += w.completedTasks;
if (w.isLocked())
++nactive;
}
} finally {
mainLock.unlock();
}
int c = ctl.get();
String rs = (runStateLessThan(c, SHUTDOWN) ? "Running" :
(runStateAtLeast(c, TERMINATED) ? "Terminated" :
"Shutting down"));
return super.toString() +
"[" + rs +
", pool size = " + nworkers +
", active threads = " + nactive +
", queued tasks = " + workQueue.size() +
", completed tasks = " + ncompleted +
"]";
}
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
}