Java并发框架之执行器框架(Executors)源码解析
在Java并发编程中,执行器框架(Executors)是一个重要的组件,它提供了强大的线程池管理功能,帮助我们有效地处理并发任务。在本篇博客中,我们将系统地分析执行器框架的源码,以更深刻地理解其实现原理和设计思想。
一、执行器框架概述
执行器框架主要由以下几个接口和类组成:
Executor
:执行器接口,定义了执行任务的execute(Runnable)
方法。ExecutorService
:执行器服务接口,继承自Executor
接口,提供了线程池管理功能,如任务提交、关闭线程池等。AbstractExecutorService
:抽象执行器服务类,实现了ExecutorService
接口,提供了将Callable
任务转换为Runnable
任务的功能。ThreadPoolExecutor
:线程池执行器类,继承自AbstractExecutorService
,实现了一个基于线程池的执行器。ScheduledExecutorService
:调度执行器服务接口,继承自ExecutorService
,提供了定时任务和周期性任务的调度功能。ScheduledThreadPoolExecutor
:调度线程池执行器类,继承自ThreadPoolExecutor
,实现了ScheduledExecutorService
接口。Executors
:执行器工具类,提供了创建不同类型线程池的工厂方法。
其中最重要的是ThreadPoolExecutor
二、ThreadPoolExecutor源码分析
ThreadPoolExecutor
是执行器框架的核心实现类,它提供了线程池的创建、线程管理、任务提交和执行等功能。我们将重点分析它的源码实现。
1. 继承结构
(Executor → ExecutorService) → AbstractExecutorService ←ThreadPoolExecutor
Executor ->
└─ ExecutorService
└─ java.util.concurrent.AbstractExecutorService
└─ java.util.concurrent.ThreadPoolExecutor
2. 类的属性
ThreadPoolExecutor
类的主要属性如下:
private final BlockingQueue<Runnable> workQueue; // 工作队列,用于存储待执行的任务
private final ReentrantLock mainLock = new ReentrantLock(); // 主锁,用于保护线程池状态的修改
private final HashSet<Worker> workers = new HashSet<Worker>(); // 工作线程集合
private final Condition termination = mainLock.newCondition(); // 用于等待线程池终止的条件变量
private int largestPoolSize; // 记录线程池的最大线程数
private long completedTaskCount; // 记录已完成任务的数量
2. 构造方法
ThreadPoolExecutor
提供了多个构造方法,它们都会调用init()
方法进行初始化操作。这里以一个常用的构造方法为例:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
这个构造方法接收如下参数:
corePoolSize
:核心线程池大小,即线程池中始终保持活跃的线程数量。maximumPoolSize
:最大线程池大小,即线程池中允许的最大线程数量。keepAliveTime
:非核心线程的空闲存活时间,当线程池中的线程数量超过核心线程池大小时,空闲的非核心线程在等待新任务的最长时间。unit
:空闲存活时间的单位。workQueue
:用于存储待执行任务的阻塞队列。threadFactory
:用于创建新线程池的工厂handler
: 确定当线程满了之后
3. 任务提交与执行
当我们使用execute(Runnable)
方法提交一个任务时,ThreadPoolExecutor
会按照以下逻辑执行任务:
- 如果线程池中的线程数量小于核心线程池大小,则创建一个新的工作线程来执行任务。
- 如果线程池已满,尝试将任务添加到工作队列。
- 如果工作队列已满,尝试创建一个新的工作线程来执行任务。
- 如果线程数量已达到最大线程池大小,执行拒绝策略。
以下是execute(Runnable)
方法的实现:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
注意到上面的描述,线程池创建时不会有任何新线程被创建,直到有任务提交进入才会有新任务创建 当然如果想要提前创建核心线程,可以提前调用这几个方法。 这几个方法都是快速创建核心线程,实现细节不一样而已
/**
public boolean prestartCoreThread() {
return workerCountOf(ctl.get()) < corePoolSize &&
addWorker(null, true);
}
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true))
++n;
return n;
}
4. 工作线程和任务处理
ThreadPoolExecutor
使用内部类Worker
表示工作线程。Worker
实现了Runnable
接口,并持有一个线程和一个任务。当工作线程被执行时,它会首先执行自己持有的任务,然后循环地从工作队列中获取并执行任务。
以下是Worker
的部分实现:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
}
重点方法 runWorker(Worker)
runWorker(Worker)
方法是ThreadPoolExecutor
中的核心方法,用于处理工作线程的任务执行逻辑。它会循环地从工作队列中获取任务并执行,直到线程被中断或满足关闭条件。
5、线程池状态
线程池的状态主要反映了线程池的运行状态,在ThreadPoolExecutor
中,线程池的状态通过一个整型变量ctl
来表示。ctl
包含两个部分信息:线程池状态(高3位)和线程数量(低29位)。这里,我们将关注线程池的状态以及如何进行状态转换。
5.1 线程池的五种状态
如下所示:
- RUNNING:线程池正在运行,可以接收和执行新任务。
- SHUTDOWN:线程池正在关闭,不再接收新任务,但仍然执行已提交的任务。
- STOP:线程池已停止,不再接收新任务,同时取消尚未开始执行的任务。
- TIDYING:线程池中的所有任务都已完成,工作线程被终止,线程池正在执行
terminated()
方法。 - TERMINATED:线程池已经完全终止,
terminated()
方法执行完毕。
这些状态在ThreadPoolExecutor
中以静态常量的形式定义:
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
5.2 状态转换
线程池的状态转换发生在以下几种情况:
- 从RUNNING到SHUTDOWN:当调用
shutdown()
方法时,线程池将不再接收新任务,但会继续处理已提交的任务。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
- 从RUNNING或SHUTDOWN到STOP:当调用
shutdownNow()
方法时,线程池将取消尚未开始执行的任务并尝试中断正在执行的任务。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
-
从STOP到TIDYING:当线程池中的所有任务都已完成,且工作线程数量为0时,线程池进入TIDYING状态。
-
从TIDYING到TERMINATED:当
terminated()
方法执行完毕后,线程池进入TERMINATED状态。
状态转换的核心方法是advanceRunState(int targetState)
,它会将线程池的状态更新为目标
状态或更高的状态。这是为了确保线程池状态只会按照预定的顺序进行转换。以下是advanceRunState()
方法的实现:
private void advanceRunState(int targetState) {
// assert targetState == SHUTDOWN || targetState == STOP;
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
状态转换过程中,tryTerminate()
方法会被调用,以确保线程池在满足条件时尝试终止。以下是tryTerminate()
方法的实现:
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
tryTerminate()
方法会检查线程池的状态和任务队列,如果满足终止条件(如无正在执行的任务,且状态为SHUTDOWN或STOP),则尝试将线程池状态转换为TIDYING,并执行terminated()
方法。完成后,线程池状态将转换为TERMINATED。
总结一下,线程池的状态和状态转换反映了线程池的生命周期,有助于我们理解线程池的运行机制和如何正确地关闭线程池。在使用线程池时,我们需要根据实际需求选择合适的关闭方式,以确保资源得到有效释放,避免潜在的问题。
6 模板方法
三、总结
通过对ThreadPoolExecutor
的源码分析,我们可以深入了解线程池的创建、线程管理、任务提交和执行等过程。这有助于我们更好地理解执行器框架的设计思想和实现细节,并有效地利用它来解决实际问题。
同时,我们可以从源码中学习到很多高效编程和设计模式的应用,例如如何使用阻塞队列来存储任务,如何利用ReentrantLock和条件变量实现线程同步,以及如何使用内部类来表示工作线程等。
在实际应用中,我们可以根据业务需求选择合适的线程池配置,例如核心线程数、最大线程数、空闲存活时间、阻塞队列类型等。此外,我们还可以通过实现自定义的拒绝策略来处理任务提交失败的情况,以提高系统的稳定性和可靠性。
当然,ThreadPoolExecutor只是执行器框架的一个组成部分。要全面了解Java并发框架,还需要学习其他组件的源码,如ScheduledThreadPoolExecutor、FutureTask、ConcurrentHashMap等。希望本篇博客能为大家在深入学习Java并发框架的道路上提供一些帮助。