JavaConcurrency
Java Concurrency Framework: Analysis of the Executors Framework Source Code

Java并发框架之执行器框架(Executors)源码解析

在Java并发编程中,执行器框架(Executors)是一个重要的组件,它提供了强大的线程池管理功能,帮助我们有效地处理并发任务。在本篇博客中,我们将系统地分析执行器框架的源码,以更深刻地理解其实现原理和设计思想。

一、执行器框架概述

执行器框架主要由以下几个接口和类组成:

  • Executor:执行器接口,定义了执行任务的execute(Runnable)方法。
  • ExecutorService:执行器服务接口,继承自Executor接口,提供了线程池管理功能,如任务提交、关闭线程池等。
  • AbstractExecutorService:抽象执行器服务类,实现了ExecutorService接口,提供了将Callable任务转换为Runnable任务的功能。
  • ThreadPoolExecutor:线程池执行器类,继承自AbstractExecutorService,实现了一个基于线程池的执行器。
  • ScheduledExecutorService:调度执行器服务接口,继承自ExecutorService,提供了定时任务和周期性任务的调度功能。
  • ScheduledThreadPoolExecutor:调度线程池执行器类,继承自ThreadPoolExecutor,实现了ScheduledExecutorService接口。
  • Executors:执行器工具类,提供了创建不同类型线程池的工厂方法。

其中最重要的是ThreadPoolExecutor

二、ThreadPoolExecutor源码分析

ThreadPoolExecutor是执行器框架的核心实现类,它提供了线程池的创建、线程管理、任务提交和执行等功能。我们将重点分析它的源码实现。

1. 继承结构

(Executor → ExecutorService) → AbstractExecutorService ←ThreadPoolExecutor

Executor ->
  └─ ExecutorService
    └─ java.util.concurrent.AbstractExecutorService
        └─ java.util.concurrent.ThreadPoolExecutor

2. 类的属性

ThreadPoolExecutor类的主要属性如下:

private final BlockingQueue<Runnable> workQueue; // 工作队列,用于存储待执行的任务
private final ReentrantLock mainLock = new ReentrantLock(); // 主锁,用于保护线程池状态的修改
private final HashSet<Worker> workers = new HashSet<Worker>(); // 工作线程集合
private final Condition termination = mainLock.newCondition(); // 用于等待线程池终止的条件变量
private int largestPoolSize; // 记录线程池的最大线程数
private long completedTaskCount; // 记录已完成任务的数量

2. 构造方法

ThreadPoolExecutor提供了多个构造方法,它们都会调用init()方法进行初始化操作。这里以一个常用的构造方法为例:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

这个构造方法接收如下参数:

  • corePoolSize:核心线程池大小,即线程池中始终保持活跃的线程数量。
  • maximumPoolSize:最大线程池大小,即线程池中允许的最大线程数量。
  • keepAliveTime:非核心线程的空闲存活时间,当线程池中的线程数量超过核心线程池大小时,空闲的非核心线程在等待新任务的最长时间。
  • unit:空闲存活时间的单位。
  • workQueue:用于存储待执行任务的阻塞队列。
  • threadFactory:用于创建新线程池的工厂
  • handler: 确定当线程满了之后

3. 任务提交与执行

当我们使用execute(Runnable)方法提交一个任务时,ThreadPoolExecutor会按照以下逻辑执行任务:

  1. 如果线程池中的线程数量小于核心线程池大小,则创建一个新的工作线程来执行任务。
  2. 如果线程池已满,尝试将任务添加到工作队列。
  3. 如果工作队列已满,尝试创建一个新的工作线程来执行任务。
  4. 如果线程数量已达到最大线程池大小,执行拒绝策略。

以下是execute(Runnable)方法的实现:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

注意到上面的描述,线程池创建时不会有任何新线程被创建,直到有任务提交进入才会有新任务创建 当然如果想要提前创建核心线程,可以提前调用这几个方法。 这几个方法都是快速创建核心线程,实现细节不一样而已

/**
public boolean prestartCoreThread() {
    return workerCountOf(ctl.get()) < corePoolSize &&
        addWorker(null, true);
}


void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)
        addWorker(null, true);
    else if (wc == 0)
        addWorker(null, false);
}

public int prestartAllCoreThreads() {
    int n = 0;
    while (addWorker(null, true))
        ++n;
    return n;
}

4. 工作线程和任务处理

ThreadPoolExecutor使用内部类Worker表示工作线程。Worker实现了Runnable接口,并持有一个线程和一个任务。当工作线程被执行时,它会首先执行自己持有的任务,然后循环地从工作队列中获取并执行任务。

以下是Worker的部分实现:

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable {
    final Thread thread;
    Runnable firstTask;
 
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
 
    public void run() {
        runWorker(this);
    }
}

重点方法 runWorker(Worker)

runWorker(Worker)方法是ThreadPoolExecutor中的核心方法,用于处理工作线程的任务执行逻辑。它会循环地从工作队列中获取任务并执行,直到线程被中断或满足关闭条件。

5、线程池状态

线程池的状态主要反映了线程池的运行状态,在ThreadPoolExecutor中,线程池的状态通过一个整型变量ctl来表示。ctl包含两个部分信息:线程池状态(高3位)和线程数量(低29位)。这里,我们将关注线程池的状态以及如何进行状态转换。

5.1 线程池的五种状态

如下所示:

  1. RUNNING:线程池正在运行,可以接收和执行新任务。
  2. SHUTDOWN:线程池正在关闭,不再接收新任务,但仍然执行已提交的任务。
  3. STOP:线程池已停止,不再接收新任务,同时取消尚未开始执行的任务。
  4. TIDYING:线程池中的所有任务都已完成,工作线程被终止,线程池正在执行terminated()方法。
  5. TERMINATED:线程池已经完全终止,terminated()方法执行完毕。

这些状态在ThreadPoolExecutor中以静态常量的形式定义:

private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

5.2 状态转换

线程池的状态转换发生在以下几种情况:

  1. 从RUNNING到SHUTDOWN:当调用shutdown()方法时,线程池将不再接收新任务,但会继续处理已提交的任务。
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}
  1. 从RUNNING或SHUTDOWN到STOP:当调用shutdownNow()方法时,线程池将取消尚未开始执行的任务并尝试中断正在执行的任务。
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        interruptWorkers();
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}
  1. 从STOP到TIDYING:当线程池中的所有任务都已完成,且工作线程数量为0时,线程池进入TIDYING状态。

  2. 从TIDYING到TERMINATED:当terminated()方法执行完毕后,线程池进入TERMINATED状态。

状态转换的核心方法是advanceRunState(int targetState),它会将线程池的状态更新为目标 状态或更高的状态。这是为了确保线程池状态只会按照预定的顺序进行转换。以下是advanceRunState()方法的实现:

private void advanceRunState(int targetState) {
    // assert targetState == SHUTDOWN || targetState == STOP;
    for (;;) {
        int c = ctl.get();
        if (runStateAtLeast(c, targetState) ||
            ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
            break;
    }
}

状态转换过程中,tryTerminate()方法会被调用,以确保线程池在满足条件时尝试终止。以下是tryTerminate()方法的实现:

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
            return;
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }
 
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

tryTerminate()方法会检查线程池的状态和任务队列,如果满足终止条件(如无正在执行的任务,且状态为SHUTDOWN或STOP),则尝试将线程池状态转换为TIDYING,并执行terminated()方法。完成后,线程池状态将转换为TERMINATED。

总结一下,线程池的状态和状态转换反映了线程池的生命周期,有助于我们理解线程池的运行机制和如何正确地关闭线程池。在使用线程池时,我们需要根据实际需求选择合适的关闭方式,以确保资源得到有效释放,避免潜在的问题。

6 模板方法

三、总结

通过对ThreadPoolExecutor的源码分析,我们可以深入了解线程池的创建、线程管理、任务提交和执行等过程。这有助于我们更好地理解执行器框架的设计思想和实现细节,并有效地利用它来解决实际问题。

同时,我们可以从源码中学习到很多高效编程和设计模式的应用,例如如何使用阻塞队列来存储任务,如何利用ReentrantLock和条件变量实现线程同步,以及如何使用内部类来表示工作线程等。

在实际应用中,我们可以根据业务需求选择合适的线程池配置,例如核心线程数、最大线程数、空闲存活时间、阻塞队列类型等。此外,我们还可以通过实现自定义的拒绝策略来处理任务提交失败的情况,以提高系统的稳定性和可靠性。

当然,ThreadPoolExecutor只是执行器框架的一个组成部分。要全面了解Java并发框架,还需要学习其他组件的源码,如ScheduledThreadPoolExecutor、FutureTask、ConcurrentHashMap等。希望本篇博客能为大家在深入学习Java并发框架的道路上提供一些帮助。