深入 ThreadPoolExecutor 源码

类结构

mark

这里主要要说的是 ThreadPoolExecutor

线程池状态

打开源码映入眼帘的就是这几个字段和方法,对应的就是线程池的一些运行状态和相关方法

//控制线程池中数量和状态的字段,用 AtomicInteger 保存
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//count bit 顾名思义就是 workerCount 的位数,这里是 29
private static final int COUNT_BITS = Integer.SIZE - 3;
//1<<29 -1 == 1111....1111(29 个 1) 线程数 (workerCount) 上限 大约 5 亿
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
//高三位存放状态,相应的低 29 位就是 workerCount
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

// Packing and unpacking ctl
// 拆解 ctl 获取状态和数量
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
// 拼接状态和数量得到 ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }

状态转换过程

mark

💡 RUNNING :能接受新提交的任务,并且也能处理阻塞队列中的任务;

💡 SHUTDOWN:关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。在线程池处于 RUNNING 状态时,调用 shutdown()方法会使线程池进入到该状态。(finalize() 方法在执行过程中也会调用 shutdown() 方法进入该状态)

💡 STOP:不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNINGSHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态

💡 TIDYING:如果所有的任务都已终止了,workerCount (有效线程数) 为 0,线程池进入该状态后会调用 terminated() 方法进入 TERMINATED 状态

💡 TERMINATED:在 terminated() 方法执行完后进入该状态,默认 terminated() 方法中什么也没有做。

进入TERMINATED的条件如下:

  • 线程池不是 RUNNING 状态;
  • 线程池状态不是 TIDYING 状态或 TERMINATED 状态;
  • 如果线程池状态是 SHUTDOWN 并且 workerQueue 为空;
  • workerCount 为 0;
  • 设置 TIDYING 状态成功。

成员变量

再往下,就会看见一些很重要的成员变量

//任务缓存队列,存放待执行的任务
private final BlockingQueue<Runnable> workQueue;
//可重入锁,线程池主要的锁
private final ReentrantLock mainLock = new ReentrantLock();
//线程集合(线程池的 workers 集合)
private final HashSet<Worker> workers = new HashSet<Worker>();
//对应的条件变量
private final Condition termination = mainLock.newCondition();
//用来记录线程池中曾经出现过的最大线程数,和线程池容量没有关系
private int largestPoolSize;
//用来记录已经执行完毕的任务个数
private long completedTaskCount;
//工厂方法,用来创建线程
private volatile ThreadFactory threadFactory;
//拒绝策略
private volatile RejectedExecutionHandler handler;
//线程闲置时候的最大存活时间
private volatile long keepAliveTime;
//是否允许核心线程闲置的时候超时
private volatile boolean allowCoreThreadTimeOut;
//核心线程数
private volatile int corePoolSize;
//最大线程数
private volatile int maximumPoolSize;
//默认的拒绝策略:AbortPolicy 直接拒绝并抛异常
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

execute()

源码分析

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();

//获取线程池的状态和线程数量
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { //workerCount 小于核心线程数量
//将任务添加到 workers 中,第二个参数代表是否根据 corePoolSize 来添加线程,false 则根据 maxPoolSize
if (addWorker(command, true))
return;
//添加失败,重新获取 ctl
c = ctl.get();
}

//上面添加到 workers 中失败,有可能是核心线程不够用了或者线程池不是运行状态
//如果线程池是 Running 状态 尝试添加任务到阻塞队列中
if (isRunning(c) && workQueue.offer(command)) {
//添加到等待队列成功,重新获取 ctl
int recheck = ctl.get();
//如果线程池不是 Running 状态就从等待队列中 remove 这个任务
if (! isRunning(recheck) && remove(command))
reject(command); //采用拒绝策略拒绝该任务
else if (workerCountOf(recheck) == 0)
//线程池 Running 但是没有线程
//创建一个线程但是不传入 Runnable(已经在阻塞队列中了)
addWorker(null, false);
}
//两种情况
//1. 线程池不是 Running 状态并且 command 不为 null,addWorker 会直接 false 然后拒绝这个任务
//2. 添加到 workQueue(阻塞队列)失败,也就是 queue 满了,可能是需要扩容了
// 所以后面的参数是 false,添加失败也会直接拒绝
else if (!addWorker(command, false))
reject(command);
}

这里要理解 addWorker 的第二个参数,true 则代表当前线程池的上界仍然是 corePoolSize,后面的 addWorker 会根据上界来判断是否增加线程,false 则代表上界是 maximumPoolSize,这一点在后面的分析中会看到。

Executor 大致执行流程

mark

addWorker()

addWorker() 的作用就是创建线程 (Worker) 并且添加到 Workers 集合中,然后启动线程

源码分析

private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// rs>=SHUTDOWN 说明不是 RUNNING 状态
if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null &&!workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
//如果大于可允许的最大线程数,或者大于当前的线程池的上界,直接 false
//上面传入的第二个参数的作用体现出来了,为 true 上界则是 corePoolSize
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//利用 CAS 自增 增大 workCount 线程数,成功后就跳出循环
if (compareAndIncrementWorkerCount(c))
break retry;
//CAS 失败,重新获取 ctl
c = ctl.get(); // Re-read ctl
//判断还是不是 Running 状态(能到这里说明 rs==Running 状态),不是的话就跳出去重新来过
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
// CAS 失败并且还是 Running 状态,继续自旋尝试自增
}
}
//线程是否启动,以及线程是否添加
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//创建一个 Worker(对线程的封装)
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
//是 RUNNING 状态 或者 是 SHUTDOWN 状态且没有提交任务 (SHUTDOWN 状态还可以执行阻塞队列的任务)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//添加到工作集中
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s; //记录最大值
workerAdded = true; //添加成功
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//启动线程,执行 Worker 的 run 方法
t.start();
workerStarted = true; //启动成功
}
}
} finally {
if (! workerStarted)
//启动失败,回滚 workers 并尝试关闭线程池
addWorkerFailed(w);
}
return workerStarted;
}

🔸 首先判断当前线程池的状态是否适合继续 addWorker,分析这里的 if 条件,RUNNING 不会 false ,STOP,TIDYING,TERMINATED 直接 false,SHUTDOWN 状态如果 firstTask 为空 阻塞队列中还有任务的时候不会 false,其他情况都 false。

🔸 获取当前的 workerCount 判断是否超过了当前的上界,这里就用到了第二个参数

🔸 然后利用 CAS 自旋增加 workerCount

🔸 创建 Worker 对象,获取 mainLock 并加锁,因为 workers 是 HashSet 并不是线程安全的

🔸 再次获取线程池转台并判断是否合法,合法就添加到 workers 中,然后在 finally 块中释放锁

🔸 根据前面的 workerAdded 判断是否启动线程

🔸 在最终的 finally 块中根据是否启动成功来决定是否回滚

addWorkerFailed()

启动失败,回滚之前的添加操作

private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
//获取锁
mainLock.lock();
try {
if (w != null)
//从 workers 中移除
workers.remove(w);
//减少 workerCount
decrementWorkerCount();
//尝试关闭线程池
tryTerminate();
} finally {
mainLock.unlock();
}
}

Worker 类

封装了线程对象,线程池维护的就是这些 worker

private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;

/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask; //传入的任务
/** Per-thread task counter */
volatile long completedTasks;

/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // 设置状态为 -1
this.firstTask = firstTask;
//根据工厂方法创建线程
//将 Worker 传递进去,作为 Thread 的参数
//new Thread(Worker worker);
this.thread = getThreadFactory().newThread(this);
}

/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}

// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
// 是否获取到了锁
protected boolean isHeldExclusively() {
return getState() != 0;
}

protected boolean tryAcquire(int unused) {
//利用 CAS 设置 state
//很明显这里是个不可重入的独占锁,具体可以对比 ReentrantLock 的实现方法
if (compareAndSetState(0, 1)) {
//继承自 AbstractOwnableSynchronizer
//保存当前的持有锁的线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

//释放锁设置 state=0
protected boolean tryRelease(int unused) {
//设置独占锁线程为空
setExclusiveOwnerThread(null);
setState(0);
return true;
}

public void lock() { acquire(1); } //最终会调用 tryAcquire(1);
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); } //最终会调用 tryRelease(1);
public boolean isLocked() { return isHeldExclusively(); }

//打断已经启动的线程
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}

这里可以看到 Worker 继承了 AQS 并且实现了 Runnable 接口,然后借助 AQS 实现了一个独占的不可重入的锁,其实这也是很巧妙的一点(这里我和博客上的理解有点出入)。

到这里大家肯定会有疑问,为什么这里要实现 AQS 然后实现一个锁?既然要又为什么要实现一个不可重入的,而不直接使用ReentrantLock 那不是更加方便么??除此之外还有一个小细节就是构造器里面为什么setState(-1) 这样不就获取不到锁了么??

其实这是为了后面shutdown的时候interruptIdleWorkers能判断出线程是否在工作,从而打断那些空闲的线程。如果使用可重入锁的话就无法通过tryLock() 来判断线程是否在工作。而setState(-1) 则是为了防止在任务没有开始前被打断

runWorker()

在上面AddWorker()最后添加成功后会启动 Worker 线程,而在 worker 线程中 run 方法又会调用一个runWorker()方法,这里就是具体执行任务的地方

final void runWorker(Worker w) {
//当前执行线程
Thread wt = Thread.currentThread();
//拿到任务
Runnable task = w.firstTask;
w.firstTask = null;
//前面构造 worker 的时候设置了 state=-1
//设置 state 为 0,tryRelease(1)
//其实这样是为了在这之前不会被打断(还没有开始执行任务)
w.unlock(); // allow interrupts
boolean completedAbruptly = true; //是否因为异常而退出?
try {
//task 为空就从阻塞队列中拿任务
while (task != null || (task = getTask()) != null) {
//加锁,但是这个锁不会和其他的 Worker 互斥,这个锁只是用来判断 worker 是否在工作
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
//打断当前执行线程 wt
wt.interrupt();
try {
//空方法交给子类去实现
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run(); //执行任务
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//空方法交给子类去实现
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++; //完成任务数++
w.unlock();
}
}
completedAbruptly = false; //下面的收尾工作会根据这个判断是否调整线程数
} finally {
processWorkerExit(w, completedAbruptly); //收尾工作
}
}

🔸 首先获取了参数传递进来的 worker 携带的任务

🔸 然后执行了 w.unlock(),实际上这里就对应了上面 Worker 类构造器中的 setState(-1),正因为前面设置了 state 为 -1 所以在 unlock() 之前,获得锁的 CAS 操作肯定都会失败,当然这也是为了在任务启动前不会被打断,所以在这里 unlock() 就又将 state 设置为了 0 也就表示可以通过 CAS 获得锁了,也可以被 shutdown 打断了。

🔸 然后这个线程就会执行 worker 中的任务,如果 worker 中任务为空就会从阻塞队列中获取任务

🔸 获取到任务后进入循环先进行 lock() 操作,这就代表已经开始执行任务了,这个时候 shutdown 就无法发送中断信号中断这个线程执行 (注意这个 lock 并不会和其他的 worker 互斥,因为每个 Worker 都是新 new 出来的,完全不相关的,他们的 state 状态都是独立的)

🔸 runStateAtLeast(ctl.get(), STOP) 返回ctl.get() >= STOP ,判断线程池是否正在关闭如果是就打断该线程,如果不是需要确保线程不是 Interrupt 状态

If pool is stopping, ensure thread is interrupted; if not, ensure thread is not interrupted.

🔸beforeExecute()afterExecute() 都是空方法交给子类去实现的。

🔸 到 finally 块里面就代表这个工作线程已经快要结束了,processWorkerExit() 就是处理一些”后事”的

getTask()

这个方法就是用来从阻塞队列中获取任务的,如果 getTask() 返回 null,在runWorker() 里面接收到null 后就会跳出循环进而执行finally 块里面的processWorkerExit() 。而这也就意味着这个线程执行结束了,不会在执行任务了,剩下的就是等待 JVM 回收这个线程。

private Runnable getTask() {
//是否超时?和 keepAliveTime 相关
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
// 如果线程池状态为 SHUTDOWN 并且任务队列没任务 或者 线程状态>=STOP
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// Are workers subject to culling?
// 是否允许线程超时 允许核心线程超时(主动超时)? 或者 wc 大于了核心线程数(被动超时)
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//此时需要回收多余的线程
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c)) //cas 减少线程数
return null; //返回 null
continue; //cas 失败 继续循环
}

try {
//根据 timed 判断是限时获取还是直接获取
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take(); //阻塞的获取
if (r != null)
return r;
//没获取到,超时了
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

🔸 获取线程池状态,如果线程池状态为 SHUTDOWN 并且任务队列没任务 或者 线程状态>=STOP 就通过 CAS 自旋减少线程数,然后返回 null

🔸 判断是否允许当前线程获取任务超时,如果允许核心线程超时就代表所有线程都会超时限制,又或者是当前线程数超过了核心线程数,也就是经过了扩容,所以核心线程之外的线程都是有超时限制的。

🔸 如果 ① wc 超过最大线程数 ②没超过最大线程数,但是超时了并且此时 wc>1(留一个处理任务)③没超过最大线程数,但是超时了并且阻塞队列为空,此时需要回收多余的线程

🔸 根据 timed 选取从阻塞队列中获取任务的方式,要么是限时获取的 poll 或者一直阻塞的 take,获取到了之后返回

processWorkerExit()

看名字就知道是一个收尾的方法,执行线程结束后的一些必要收尾工作。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
//如果是因为异常而退出则 getTask() 没有机会去调整线程数所以需要在这里调整
decrementWorkerCount(); // wc--

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//完成任务总数+=该 worker 完成的任务数
completedTaskCount += w.completedTasks;
//移除出线程集 workers
workers.remove(w);
} finally {
mainLock.unlock();
}

//尝试结束线程池
tryTerminate();

//获取 ctl
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
//非异常结束
if (!completedAbruptly) {
//最小值根据是否允许核心线程超时来判断
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1; //如果最小为 0 并且任务队列不为空则保证线程池中至少有一个线程执行这些任务
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//1. 异常结束,新增加一个线程到线程池,相当于替换了异常的线程
//2. 非异常结束,工作线程小于核心线程,增加线程,确保在不允许核心线程超时的情况下线程数不小于 corePoolSize
addWorker(null, false);
}
}

runStateLessThan c < STOP 返回 true 就说明是SHUTDOWN 或者RUNNING ,到这里该工作线程(Worker)的生命周期就结束了。

工作线程的生命周期

mark

tryTerminate()

在上面的处理收尾工作的processWorkerExit() 的时候中间调用了一个 tryTerminate() 方法

final void tryTerminate() {
for (;;) {
int c = ctl.get();
//如果是在 RUNNING 状态 直接 return
//如果已经是 TIDYING 或者 TERMINATED 状态 直接 return
//如果是 SHUTDOWN 状态并且阻塞队列中还有任务 直接 return
//return 就说明线程池还不到关闭的时候
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//到这里就说明要么是 STOP 状态,要么是 SHUTDOWN 状态阻塞队列也没任务了
if (workerCountOf(c) != 0) { // Eligible to terminate
//中断一个空闲的 worker 线程
interruptIdleWorkers(ONLY_ONE);
return;
}

//wc==0 没有线程存活了
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//CAS 自旋 修改线程池状态为 TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated(); //空方法,交给子类去实现的
} finally {
//设置状态为 TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
//唤醒 termination 上 awiat 的线程
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}

这里最后的termination.signalAll() 实际上是唤醒的awaitTermination() 方法阻塞的线程

public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
//限时等待
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}

shutdown()

public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//CAS 自旋设置线程池状态为 SHUTDOWN
advanceRunState(SHUTDOWN);
//打断 idle(空闲)的 worker
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}

这里关键的就是这个打断空闲线程的操作。

interruptIdleWorkers()

打断空闲的 worker,这里的空闲线程其实指的就是在阻塞队列上获取不到任务而阻塞的线程

经过上面的分析我们知道 Worker 会不断的从阻塞队列中去拿任务也就是 getTask() 方法,如果阻塞队列为空就会阻塞住 直到有任务提交到阻塞队列中,或者执行线程被中断。

这里我们是要 SHUTDOWN 那阻塞队列中肯定是不会再有任务提交,所以take() 会阻塞住,所以我们就只能通过打断执行线程的方式来打断take() 操作,否则会一直阻塞,线程池无法关闭。

private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); //加锁,因为 Wokers 是 HashSet 是线程不安全的
try {
for (Worker w : workers) {
Thread t = w.thread;
//没有被打断并且没有在工作(空闲)
if (!t.isInterrupted() && w.tryLock()) {
try {
//打断 Worker 里面的线程
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
//只打断一个就直接 break
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}

🔸首先获取了 mainLock 保证了 workers 的线程安全避免产生并发修改异常。

🔸然后遍历 workers,打断那些没有被打断并且没有工作的线程,那这里怎么知道线程是不是在工作呢?

别忘了前面提到的 Worker 类借助 AQS 实现了一个不可重入lock 方法,而在 worker 执行任务的时候会执行 lock 加锁,所以在这里tryLock() 如果返回true则说明 并没有在工作可以打断,反之如果正在工作tryLock() 不可重入,无法获取到自己持有的锁返回 false,所以线程肯定是在工作状态所以不应该打断,这些线程会在执行完任务后自行了断,因为线程池状态已经设置为SHUTDOWN 当然前提是这些任务是可终止的

🔸打断后释放tryLock()获取到的锁

🔸如果只打断一个就直接 break,否则就继续下一轮循环

🔸释放 mianLock

瞎猜:这里可以通过 getState 判断线程状态但是有可能在执行任务的过中阻塞

shutdownNow()

和上面的 shutdown() 一样都是用来关闭线程池的但是看方法名就知道这个比较粗暴,shutdownNow 立刻马上关闭,一点情面都不给😂,虽然说是打断所有的线程但是毕竟使用的是Interrupt ,也许别人正在执行的线程根本就不会理你😂,所以在提交任务的时候要对任务进行正确的 interrupt 响应,或者确保线程不会一直阻塞否则线程池就无法正常关闭

public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//CAS 自旋设置状态为 STOP
advanceRunState(STOP);
//打断所有的线程包括正在工作的线程
interruptWorkers();
//排干阻塞队列,因为已经 STOP 了不会再执行队列里面的任务了
tasks = drainQueue();
} finally {
mainLock.unlock();
}
//尝试关闭线程池
tryTerminate();
return tasks;
}

interruptWorkers()

private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
//打断所有启动的线程
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}

打断了所有的启动的线程,即使他们可能不会响应这个 Interrupted 信号,但是由于线程池状态已经变为了 STOP,所以他们也活不长了(当然前提是执行的任务是可以结束的),在下一次获取任务的时候就会直接 return 。

细节是魔鬼

在群里面看见的问题,为什么线程池要用 线程不安全的 HashSet 然后设置了一个 mainLock 控制并发,而不是直接使用线程安全的并发集合?

这一点其实在源码的注释中已经说的很清楚了,之前一直没有注意到

/**
* Lock held on access to workers set and related bookkeeping.
* While we could use a concurrent set of some sort, it turns out
* to be generally preferable to use a lock. Among the reasons is
* that this serializes interruptIdleWorkers, which avoids
* unnecessary interrupt storms, especially during shutdown.
* Otherwise exiting threads would concurrently interrupt those
* that have not yet interrupted. It also simplifies some of the
* associated statistics bookkeeping of largestPoolSize etc. We
* also hold mainLock on shutdown and shutdownNow, for the sake of
* ensuring workers set is stable while separately checking
* permission to interrupt and actually interrupting.
*/
private final ReentrantLock mainLock = new ReentrantLock();

一开始还挺抗拒的,感觉有的地方有的看不懂(英语渣渣留下眼泪)然后去 stackoverflower 上看了一下找到了一个答案 Using ReentrantLock in ThreadPoolExecutor to ensure thread-safe workers

大概总结一下就是两点

  1. 避免 “中断风暴” ,如果是用的显式锁那么如果有 10 个线程同时去执行 shutdown 方法,那么 10 个线程会排队去执行interruptIdleWorkers ,如果使用并发安全的队列的话,在 10 个线程同时去执行 shutdown 方法,那么肯定会同时去 interruptIdleWorkers 也就是所谓的中断风暴

    中断风暴(维基百科) 中文几乎搜不到相关资料,其实就是字面意思,而太多的中断请求会的影响系统的整体性能,不得不说大师就是大师,细节之处则更能体现水平,我等也只能膜拜了

  2. 另一点就是为了方便统计线程池的一些信息比如 largestPoolSize 等,这点也好理解,使用并发的 Set 只能保这个 set 的并发安全,而对于其他的一些线程池的相关的信息统计起来就比较麻烦,可能又需要另外的加锁,所以索性就直接搞一个全局的锁,一举两得

ThreadPoolExecutor 使用

构造方法

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

其实从中也可以看出对各个参数的一些限制。

corePoolSize:核心池的大小,这个参数与后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这 2 个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建 corePoolSize 个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为 0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到 corePoolSize 后,就会把到达的任务放到缓存队列当中

maximumPoolSize:线程池最大线程数,它表示在线程池中最多允许创建多少个线程

keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于 corePoolSize 时,keepAliveTime 才会起作用,直到线程池中的线程数不大于 corePoolSize:即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到 keepAliveTime,则会终止,直到线程池中的线程数不超过 corePoolSize,但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于 corePoolSize 时,keepAliveTime 参数也会起作用,直到线程池中的线程数为 0,这些内容其实在上面的深入源码中都有过分析。

unit:参数 keepAliveTime 的时间单位

workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择会对线程池的运行过程产生重大影响

threadFactory:线程工厂,主要用来创建线程(根据传进来的 Runnable/Callable)

handler:表示当拒绝处理任务时的策略,有以下四种取值

  • ThreadPoolExecutor.AbortPolicy 丢弃任务并抛出 RejectedExecutionException 异常。
  • ThreadPoolExecutor.DiscardPolicy 也是丢弃任务,但是不抛出异常。
  • ThreadPoolExecutor.DiscardOldestPolicy 丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
  • ThreadPoolExecutor.CallerRunsPolicy 由调用线程处理该任务

线程池的关闭

shutdown() shutdownNow() ,上面已经分析过了,就不再过多介绍了。

工厂方法

上面的构造器中一共有 7 个参数,可见要构造一个线程池并非那么容易,所以 jdk 在Executors 类中为我们提供了一些工厂方法,可以直接构造一些特定的线程池。

newCachedThreadPool()

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

可以看到core为 0,最大值为Integer.MAX_VALUE,任务队列使用的SynchronousQueue ,这个队列是一个很奇葩的阻塞队列,实际上它不是一个真正的队列,每个删除操作都要等待插入操作,反之每个插入操作也都要等待删除动作,只有两个都准备好的时候才不会阻塞,所以它内部不会为队列元素维护空间,也就是说并不会缓存任务,一旦提交了 (put) 任务,要么就由空闲线程去执行 (take),要么创建一条新线程去执行 (take)。

newFixedThreadPool()

public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

可以看到,最大值和 core 都是 nThread ,也就是最多nThread个线程,阻塞队列采用 LinkedBlockQueue

newSingleThreadExecutor()

public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

等价于newFixedThreadPool(1) ,但是这里的返回值经过了一层包装 返回的不再是ThreadPoolExecutor 也就是不会再有那些扩展的 monitor 方法

A wrapper class that exposes only the ExecutorService methods of an ExecutorService implementation.

类似的方法其实还有一些,像newWorkStealingPool 等,感兴趣可以自己去查一查。

其实这里阿里巴巴 Java 开发规范并不建议使用工厂方法创建线程

mark

所以建议还是通过构造器的方式去创建线程,这样也更加灵活更加可控。