推荐《Java并发编程实战》
[TOC]
什么是线程池
线程池,从字面含义来看,是指管理一组同构工作线程的资源池,是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如MySQL
线程池可以看做是线程的集合。在没有任务时线程处于空闲状态,当请求到来:线程池给这个请求分配一个空闲的线程,任务完成后回到线程池中等待下次任务**(而不是销毁)。这样就实现了线程的重用
线程过多会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,同时也降低了计算机的整体性能。线程池维护多个线程,等待监督管理者分配可并发执行的任务。这种做法,一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用
使用线程池的好处:
- 降低资源消耗: 通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗
- 提高响应速度: 任务到达时,无需等待线程创建即可立即执行
- 提高线程的可管理性: 线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控
- 提供更多更强大的功能: 线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行
线程池解决的问题
线程池解决的核心问题就是资源管理问题。在并发环境下,系统不能够确定在任意时刻中,有多少任务需要执行,有多少资源需要投入。这种不确定性将带来以下若干问题:
- 频繁申请/销毁资源和调度资源,将带来额外的消耗,可能会非常巨大
- 对资源无限申请缺少抑制手段,易引发系统资源耗尽的风险
- 系统无法合理管理内部的资源分布,会降低系统的稳定性
为解决资源分配这个问题,线程池采用了“池化”(Pooling)思想。池化是为了最大化收益并最小化风险,而将资源统一在一起管理的一种思想
在计算机领域中的表现为:统一管理IT资源,包括服务器、存储、和网络资源等等。通过共享资源,使用户在低投入中获益。除去线程池,还有其他比较典型的几种使用策略包括:
- 内存池(Memory Pooling):预先申请内存,提升申请内存速度,减少内存碎片
- 连接池(Connection Pooling):预先申请数据库连接,提升申请连接的速度,降低系统的开销
- 实例池(Object Pooling):循环使用对象,减少资源在初始化和释放时的昂贵损耗
线程池核心设计与实现
Java中的线程池核心实现类是ThreadPoolExecutor,本章基于JDK 1.8的源码来分析Java线程池的核心设计与实现。首先来看一下ThreadPoolExecutor的UML类图,了解下ThreadPoolExecutor的继承关系
java.util.concurrent.Executor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface Executor {
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}
虽然Executor是个简单的接口,但它却为灵活且强大的异步任务执行框架提供了基础,该框架能支持多种不同类型的任务执行策略。它提供了一种标准的方法将任务的提交过程与执行过程解耦开来,并用Runable来表示任务。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)中,由Executor框架完成线程的调配和任务的执行部分
Executor的实现还提供了对生命周期的支持,以及统计信息收集、应用程序管理机制和性能监视等机制
Executor基于生产者-消费者模式,提交任务的操作相当于生产者(生成待完成的工作单元),执行任务的线程则相当于消费者(执行完这些工作单元)
ExecutorService接口增加了一些能力:(1)扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法;(2)提供了管控线程池的方法,比如停止线程池的运行
AbstractExecutorService则是上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。最下层的实现类ThreadPoolExecutor实现最复杂的运行部分,ThreadPoolExecutor将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务
ThreadPoolExecutor运行机制:
通过上图可以看出线程池的生产者-消费者模式,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程
线程池的运行主要分成两部分:任务管理、线程管理
任务管理部分 充当生产者的角色,当任务提交后,线程池会判断该任务后续的流转:
(1)直接申请线程执行该任务 (2)缓冲到队列中等待线程执行 (3)拒绝该任务
线程管理部分 是消费者,它们被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收
生命周期管理
线程池运行的状态,并不是用户显式设置的,而是伴随着线程池的运行,由内部来维护。线程池内部使用一个变量维护两个值:运行状态(runState)和线程数量 (workerCount)。在具体实现中,线程池将运行状态(runState)、线程数量 (workerCount)两个关键参数的维护放在了一起,如下代码所示
java.util.concurrent.ThreadPoolExecutor
1
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ctl这个AtomicInteger类型,是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段,它同时包含两部分的信息:
ps:int 4个字节,32位
- 线程池的运行状态 (runState) 高3位保存runState
- 线程池内有效线程的数量 (workerCount) 低29位保存workerCount
两个变量之间互不干扰。用一个变量去存储两个值,可避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源。通过阅读线程池源代码也可以发现,经常出现要同时判断线程池运行状态和线程数量的情况。线程池也提供了若干方法去供用户获得线程池当前的运行状态、线程个数。这里都使用的是位运算的方式,相比于基本运算,速度也会快很多
关于内部封装的获取生命周期状态、获取线程池线程数量的计算方法如以下代码所示:
java.util.concurrent.ThreadPoolExecutor
1
2
3
4
5
6
7
8
9
10
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// Packing and unpacking ctl
// 计算当前运行状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 计算当前线程数量
private static int workerCountOf(int c) { return c & CAPACITY; }
// 通过状态和线程数生成ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }
ThreadPoolExecutor的运行状态有5种,分别为:
1
2
3
4
5
6
7
8
9
private static final int COUNT_BITS = Integer.SIZE - 3;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
|运行状态 | 状态描述 | |:——-|:———–| |RUNNING |高3位为111,能接收新提交的任务,并且也能处理阻塞队列中的任务| |SHUTDOWN |高3位为000,关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务| |STOP |高3位为001,不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程| |TIDYING |高3位为010,所有的任务都已终止了,workerCount(有效线程数)为0| |TERMINATED|高3位为011,在terminated()方法执行完后进入该状态|
生命周期转换:
任务执行机制
任务调度
任务调度是线程池的主要入口,当用户提交了一个任务,接下来这个任务将如何执行都是由这个阶段决定的。了解这部分就相当于了解了线程池的核心运行机制
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Core pool size is the minimum number of workers to keep alive
* (and not allow to time out etc) unless allowCoreThreadTimeOut
* is set, in which case the minimum is zero.
*/
// 核心池大小,若allowCoreThreadTimeOut被设置,核心线程全部空闲超时被回收的情况下会为0
private volatile int corePoolSize;
/**
* Maximum pool size. Note that the actual maximum is internally
* bounded by CAPACITY.
*/
// 最大池大小,不得超过CAPACITY
private volatile int maximumPoolSize;
//线程池的最大容量,其值的二进制为:00011111111111111111111111111111(29个1)
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 通过与的方式,获取ctl的低29位,也就是线程池中工作线程的数量
private static int workerCountOf(int c) { return c & CAPACITY; }
首先,所有任务的调度都是由execute方法完成的,这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。其执行过程如下:
- 首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务
- 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务
- 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中
- 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务
- 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常
提交任务(任务调度补充)
线程池框架提供了两种方式提交任务,submit()和execute(),通过submit()方法提交的任务可以返回任务执行的结果,通过execute()方法提交的任务不能获取任务执行的结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
submit()方法是在ThreadPoolExecutor的父类AbstractExecutorService类实现的,最终还是调用的ThreadPoolExecutor类的execute()方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 获取线程池控制状态ctl的值
int c = ctl.get();
//worker数量小于corePoolSize
if (workerCountOf(c) < corePoolSize) {
// 创建worker,addWorker方法boolean参数用来判断是否创建核心线程
// true比较的是 corePoolSize
// false比较的是 maximumPoolSize
if (addWorker(command, true))
// 成功则返回
return;
// 失败则再次获取线程池控制状态
c = ctl.get();
}
// 线程池处于RUNNING状态且线程数大于核心线程数
// 将任务加入workQueue任务缓存队列
if (isRunning(c) && workQueue.offer(command)) {
// 再次检查,获取线程池控制状态
// 防止在任务入队的过程中线程池关闭了或者线程池中没有线程了
int recheck = ctl.get();
//线程池不处于RUNNING状态,且将任务从workQueue移除成功
if (! isRunning(recheck) && remove(command))
//采取任务拒绝策略
reject(command);
//worker数量等于0
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
执行流程文字总结请见任务调度
任务缓冲(阻塞队列)
任务缓冲模块是线程池能够管理任务的核心部分
线程池的本质是对任务和线程的管理,而做到这一点最关键的思想就是将任务和线程两者解耦,不让两者直接关联,才可以做后续的分配工作
线程池中是以生产者消费者模式,通过一个阻塞队列来实现的
阻塞队列缓存任务,工作线程从阻塞队列中获取任务
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:
- 在队列为空时,获取元素的线程会等待队列变为非空
- 当队列满时,存储元素的线程会等待队列可用
阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素
1
private final BlockingQueue<Runnable> workQueue;
java.util.concurrent.BlockingQueue的实现类
使用不同的队列可以实现不一样的任务存取策略,阻塞队列的成员:
名称 | 描述 |
---|---|
ArrayBlockingQueue | 一个用数组实现的有界阻塞队列,此列表按照先进先出(FIFO)的顺序对元素进行排序,支持公平和非公平锁 |
LinkedBlockingQueue | 一个由链表结构组成的有界队列,此队列按照先进先出(FIFO)的原则对元素进行排序,此队列的默认长度为Integer_MAX_VALUE,所以默认创建的该队列有容量危险 |
LinkedBlockingDeque | 一个由链表结构组成的双向阻塞队列。队列头部和尾部都可以添加和移除元素,多线程并发时,可以将锁的竞争最多降到一半 |
PriorityBlockingQueue | 一个支持线程优先级排序的无界队列,默认自然序进行排序,也可以自定义实现compareTo()方法来指定元素排序规则,不能保证优先级元素的顺序 |
DelayQueue | 一个实现比PriorityBlockingQueue多实现了延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素,只有延时期满后才能从队列中获取元素 |
SynchronousQueue | 一个不存储元素的阻塞队列,每一个put操作必须等待take操作,否则不能添加元素。支持公平和非公平锁。SynchronousQueue的一个使用场景是在线程池里,Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收 |
LinkedTransferQueue | 一个由链表结构组成的无界阻塞队列,相当于其他队列,LinkedTransferQueue队列多出了transfer和tryTransfer方法 |
任务申请
由上文的任务分配部分可知,任务的执行有两种可能:
- 一种是任务直接由新创建的线程执行
- 另一种是线程从任务队列中获取任务然后执行,执行完任务的空闲线程会再次去从队列中申请任务再去执行
第一种情况仅出现在线程初始创建的时候,第二种是线程获取任务绝大多数的情况
线程需要从任务缓存模块中不断地取任务执行,帮助线程从阻塞队列中获取任务,实现线程管理模块和任务管理模块之间的通信。这部分策略由getTask方法实现,其执行流程如下图所示:
java.util.concurrent.ThreadPoolExecutor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
private Runnable getTask() {
// 标识当前线程是否超时未能获取到task对象
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
// 获取线程池的控制状态
int c = ctl.get();
// 获取线程池的运行状态
int rs = runStateOf(c);
// 如果线程池状态大于等于STOP,或者处于SHUTDOWN状态,并且阻塞队列为空
// 线程池工作线程数量递减,方法返回null,回收线程
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
// 获取worker数量
int wc = workerCountOf(c);
// 标识当前线程在空闲时,是否应该超时回收
// 如果allowCoreThreadTimeOut为ture,或当前线程数大于核心池大小,则需要超时回收
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果worker数量大于maximumPoolSize(有可能调用了 setMaximumPoolSize()
// 导致worker数量大于maximumPoolSize)
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 如果允许超时回收,则调用阻塞队列的poll()
// 只在keepAliveTime时间内等待获取任务,一旦超过则返回null
// 否则调用take(),如果队列为空,线程进入阻塞状态
// 无限时等待任务,直到队列中有可取任务或者响应中断信号退出
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// 若task不为null,则返回成功获取的task对象
if (r != null)
return r;
// 若返回task为null,表示线程空闲时间超时,则设置timeOut为true
timedOut = true;
} catch (InterruptedException retry) {
// 如果此worker发生了中断,采取的方案是重试,没有超时
// 在哪些情况下会发生中断?调用setMaximumPoolSize(),shutDown(),shutDownNow()
timedOut = false;
}
}
}
getTask这部分进行了多次判断,为的是控制线程的数量,使其符合线程池的状态。如果线程池现在不应该持有那么多线程,则会返回null值。工作线程Worker会不断接收新任务去执行,而当工作线程Worker接收不到任务的时候,就会开始被回收
任务拒绝(策略)
任务拒绝模块是线程池的保护部分,线程池有一个最大的容量,当线程池的任务缓存队列已满,并且线程池中的线程数目达到maximumPoolSize时,就需要拒绝掉该任务,采取任务拒绝策略,保护线程池
java.util.concurrent.RejectedExecutionHandler
1
2
3
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
拒绝策略是一个接口,用户可以通过实现这个接口去定制拒绝策略,也可以选择JDK提供的四种已有拒绝策略(ThreadPoolExecutor类中),其特点如下:
名称 | 描述 |
---|---|
AbortPolicy | 丢弃任务并抛出RejectedExecutionException异常。这是线程池默认的拒绝策略,在任务不能再提交的时候,抛出异常,及时反馈程序运行状态。如果是比较关键的业务,推荐使用此拒绝策略,这样子在系统不能承载更大的并发量的时候,能够及时的通过异常发现 |
DiscardPolicy | 丢弃任务,但是不抛出异常。使用此策略,可能会使我们无法发现系统的异常状态。建议是一些无关紧要的业务采用此策略 |
DiscardOldestPolicy | 丢弃队列最前面的任务,然后重新提交被拒绝的任务。是否要采用此种拒绝策略,还得根据实际业务是否允许丢弃老任务来认真衡量 |
CallerRunsPolicy | 由调用线程(提交任务的线程)处理该任务。这种情况是需要让所有任务都执行完毕,那么就适合大量计算的任务类型去执行,多线程仅仅是增大吞吐量的手段,最终必须要让每个任务都执行完毕 |
Worker线程管理
Workder线程
线程池为了掌握线程的状态并维护线程的生命周期,设计了线程池内的工作线程Worker
java.util.concurrent.ThreadPoolExecutor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable{
/** Thread this worker is running in. Null if factory fails. */
// Worker持有的线程
final Thread thread;
/** Initial task to run. Possibly null. */
// 初始化的任务,可以为null
Runnable firstTask;
/** Per-thread task counter */
// 线程任务计数器(当前线程完成的任务数)
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
//设置AQS的state为-1,在执行runWorker()方法之前阻止线程中断
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//利用指定的线程工厂创建一个线程,注意,参数是Worker实例本身this
//也就是当执行start方法启动线程thread时,真正执行的是Worker类的run方法
this.thread = getThreadFactory().newThread(this);
}
}
Worker这个工作线程,实现了Runnable接口,并持有一个线程thread,一个初始化的任务firstTask
thread是在调用构造方法时通过ThreadFactory来创建的线程,可以用来执行任务
firstTask用它来保存传入的第一个任务,这个任务可以有也可以为null。如果这个值是非空的,那么线程就会在启动初期立即执行这个任务,也就对应核心线程创建时的情况
如果这个值是null,那么就需要创建一个线程去执行任务列表(workQueue)中的任务,也就是非核心线程的创建
线程池需要管理线程的生命周期,需要在线程长时间不运行的时候进行回收。线程池使用一张Hash表去持有线程的引用,这样可以通过添加引用、移除引用这样的操作来控制线程的生命周期。这个时候重要的就是如何判断线程是否在运行
java.util.concurrent.ThreadPoolExecutor
1
2
3
4
5
/**
* Set containing all worker threads in pool. Accessed only when
* holding mainLock.
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
Worker是通过继承AQS,使用AQS来实现独占锁这个功能。没有使用可重入锁ReentrantLock,而是使用AQS,为的就是实现不可重入的特性去反应线程现在的执行状态
ThreadPoolExecutor类的内部类Worker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 是否持有独占锁,重写AQS方法
@Override
protected boolean isHeldExclusively() {
return getState() != 0;
}
// 尝试获取锁,重写AQS方法
@Override
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 尝试释放锁,重写AQS方法
@Override
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// 获取锁
public void lock() { acquire(1); }
// 尝试获取锁
public boolean tryLock() { return tryAcquire(1); }
// 释放锁
public void unlock() { release(1); }
// 是否持有锁
public boolean isLocked() { return isHeldExclusively(); }
- lock方法一旦获取了独占锁,表示当前线程正在执行任务中
- 如果正在执行任务,则不应该中断线程
- 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断
- 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;如果线程是空闲状态则可以安全回收
在线程回收过程中就使用到了这种特性,回收过程如下图所示:
Worker线程增加
增加线程是通过线程池中的addWorker方法,该方法的功能就是增加一个线程,该方法不考虑线程池是在哪个阶段增加的该线程,这个分配线程的策略是在上个步骤完成的,该步骤仅仅完成增加线程,并使它运行,最后返回是否成功这个结果
addWorker方法有两个参数:firstTask、core。firstTask参数用于指定新增的线程执行的第一个任务,该参数可以为空;core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize,其执行流程如下图所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Worker线程回收
线程池中线程的销毁依赖JVM自动的回收
线程池做的工作是根据当前线程池的状态维护一定数量的线程引用,防止这部分线程被JVM回收
当线程池决定哪些线程需要回收时,只需要将其引用消除即可
Worker被创建出来后,就会不断地进行轮询,然后获取任务去执行,核心线程可以无限等待获取任务,非核心线程要限时获取任务
当Worker无法获取到任务,也就是获取的任务为空时,循环会结束,Worker会主动消除自身在线程池内的引用
java.util.concurrent.ThreadPoolExecutor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
final void runWorker(Worker w) {
// 获取当前线程
Thread wt = Thread.currentThread();
//获取w的firstTask
Runnable task = w.firstTask;
//设置w的firstTask为null
w.firstTask = null;
// 释放锁,设置AQS的state为0,允许中断
w.unlock(); // allow interrupts
//用于标识线程是否异常终止,finally中processWorkerExit()方法会有不同逻辑
boolean completedAbruptly = true;
try {
// 自旋,循环调用getTask()获取任务,不断从任务缓存队列获取任务并执行
while (task != null || (task = getTask()) != null) {
//进入循环内部,代表已经获取到可执行的任务
// 则对worker对象加锁,保证线程在执行任务过程中不会被中断
w.lock();
// If pool is stopping, ensure thread is interrupted;
// 若线程池状态大于等于STOP,那么意味着该线程要中断
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() && // 线程被中断
runStateAtLeast(ctl.get(), STOP))) && //且是因为线程池内部状态变化而被中断
!wt.isInterrupted()) //确保该线程未被中断
// 发出中断请求
wt.interrupt();
try {
// 开始执行任务前的Hook方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 到这里正式开始执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 执行任务后的Hook方法
afterExecute(task, thrown);
}
} finally {
// 置空task,准备通过getTask()获取下一个任务
task = null;
// 计数+1
w.completedTasks++;
// 释放掉worker持有的独占锁
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 获取不到任务时,主动回收自己
// 线程执行结束可能有两种情况:
// 1.getTask()返回null,也就是说,这个worker的使命结束了,线程执行结束
// 2.任务执行过程中发生了异常
// 第一种情况,getTask()返回null,那么getTask()中会将workerCount递减
// 第二种情况,workerCount没有进行处理,这个递减操作会在processWorkerExit()中处理
processWorkerExit(w, completedAbruptly);
}
}
Worker类的run()方法的实现,最终调用了ThreadPoolExecutor类的runWorker()方法
runWorker() 方法是线程池的核心,实现了线程池中的线程复用机制
runWorker()方法工作:
- 运行第一个任务firstTask之后,循环调用getTask()方法获取任务,不断从任务缓存队列获取任务并执行
- 获取到任务之后就对worker对象加锁,保证线程在执行任务的过程中不会被中断,任务执行完会释放锁
- 在执行任务的前后,可以根据业务场景重写beforeExecute()和afterExecute()等Hook方法(钩子方法)
- 执行通过getTask()方法获取到的任务
- 线程执行结束后,调用processWorkerExit()方法执行结束线程的一些清理工作
线程回收的工作是在processWorkerExit方法完成的
java.util.concurrent.ThreadPoolExecutor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果用户任务执行过程中发生了异常,则需要递减workerCount
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
//获取全局锁
mainLock.lock();
try {
// 将worker完成任务的数量累加到总的完成任务数中
completedTaskCount += w.completedTasks;
// 从workers集合中移除该worker
workers.remove(w);
} finally {
mainLock.unlock();
}
// 尝试终止线程池
tryTerminate();
// 获取线程池控制状态
int c = ctl.get();
// 线程池运行状态小于STOP
if (runStateLessThan(c, STOP)) {
// 如果用户任务执行过程中发生了异常,则直接调用addWorker()方法创建线程
if (!completedAbruptly) {
// 是否允许核心线程超时
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 允许核心超时并且workQueue阻塞队列不为空,那线程池中至少有一个工作线程
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 如果工作线程数量workerCount大于等于核心池大小corePoolSize,
// 或者允许核心超时并且workQueue阻塞队列不为空时
// 线程池中至少有一个工作线程,直接返回
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 不满足上述条件,则调用addWorker()方法创建线程
// 创建新的线程取代当前线程
addWorker(null, false);
}
}
在这个方法中,将线程引用移出线程池(workers.remove(w))就已经结束了线程销毁的部分。但由于引起线程销毁的可能性有很多,线程池还要判断是什么引发了这次销毁,是否要改变线程池的现阶段状态,是否要根据新状态,重新分配线程
processWorkerExit()方法中主要调用了tryTerminate()方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) || // 线程池的运行状态为RUNNING
runStateAtLeast(c, TIDYING) || // 线程池的运行状态大于等于TIDYING
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) //线程池的运行状态为SHUTDOWN且阻塞队列不为空
return;
// 只有当线程池的运行状态为STOP
// 或线程池运行状态为SHUTDOWN且阻塞队列为空时,可以执行到这里
// 如果线程池工作线程的数量不为0
if (workerCountOf(c) != 0) { // Eligible to terminate
// 仅仅中断一个空闲的worker
interruptIdleWorkers(ONLY_ONE);
return;
}
// 只有当线程池工作线程的数量为0时可以执行到这里
final ReentrantLock mainLock = this.mainLock;
// 获取全局锁
mainLock.lock();
try {
// CAS操作设置线程池运行状态为TIDYING,工作线程数量为0
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 执行terminated()钩子方法
terminated();
} finally {
// 设置线程池运行状态为TERMINATED,工作线程数量为0
ctl.set(ctlOf(TERMINATED, 0));
// 唤醒在termination条件上等待的所有线程
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
// 若CAS操作失败则重试
}
}
tryTerminate()方法的作用是尝试终止线程池,它会在所有可能终止线程池的地方被调用,满足终止线程池的条件有两个:首先,线程池状态为STOP,或者为SHUTDOWN且任务缓存队列为空;其次,工作线程数量为0
满足了上述两个条件之后,tryTerminate()方法获取全局锁,设置线程池运行状态为TIDYING,之后执行terminated()钩子方法,最后设置线程池状态为TERMINATED
至此,线程池运行状态变为TERMINATED,工作线程数量为0,workers已清空,且workQueue也已清空,所有线程都执行结束,线程池的生命周期到此结束
Worker线程执行任务
在Worker类中的run方法调用了runWorker方法来执行任务,runWorker方法的执行过程如下:
源码见上面Worker线程回收中的runWorker
- while循环不断地通过getTask()方法获取任务
- getTask()方法从阻塞队列中取任务
- 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态
- 执行任务
- 如果getTask结果为null则跳出循环,执行processWorkerExit()方法,销毁线程
关闭线程池
关闭线程池有两个方法,shutdown()和shutdownNow()
shutdown()方法 将线程池运行状态设置为SHUTDOWN,此时线程池不会接受新的任务,但会处理阻塞队列中的任务
java.util.concurrent.ThreadPoolExecutor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 检查shutdown权限
checkShutdownAccess();
// 设置线程池运行状态为SHUTDOWN
advanceRunState(SHUTDOWN);
// 中断所有空闲worker
interruptIdleWorkers();
// 用onShutdown()钩子方法
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
shutdown()方法首先会检查是否具有shutdown的权限,然后设置线程池的运行状态为SHUTDOWN,之后中断所有空闲的worker,再调用onShutdown()钩子方法,最后尝试终止线程池
shutdown()方法调用了interruptIdleWorkers()方法中断所有空闲的worker,其实现如下:
java.util.concurrent.ThreadPoolExecutor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
// onlyOne标识是否只中断一个线程
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 遍历workers工作线程集合
for (Worker w : workers) {
Thread t = w.thread;
// 线程未被中断且成功获得锁
if (!t.isInterrupted() && w.tryLock()) {
try {
// 发出中断请求
t.interrupt();
} catch (SecurityException ignore) {
} finally {
// 释放锁
w.unlock();
}
}
// 若只中断一个线程,则跳出循环
if (onlyOne)
break;
}
} finally {
// 释放锁
mainLock.unlock();
}
}
shutdownNow()方法 将线程池运行状态设置为STOP,此时线程池不会接受新任务,也不会处理阻塞队列中的任务,并且中断正在运行的任务
java.util.concurrent.ThreadPoolExecutor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 检查shutdown权限
checkShutdownAccess();
// 设置线程池运行状态为STOP
advanceRunState(STOP);
// 中断所有worker
interruptWorkers();
// 将任务缓存队列中等待执行的任务取出并放到list中
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 尝试终止线程池
tryTerminate();
// 返回任务缓存队列中等待执行的任务列表
return tasks;
}
shutdownNow()方法与shutdown()方法相似,不同之处在于
shutdownNow()设置线程池的运行状态为STOP,之后中断所有的worker(并非只是空闲的worker),尝试终止线程池之后,返回任务缓存队列中等待执行的任务列表
shutdown()设置线程池的运行状态为SHUTDOWN
shutdownNow()方法调用了interruptWorkers()方法(shutdown()调用的interruptIdleWorkers)中断所有的worker(并非只是空闲的worker)
1
2
3
4
5
6
7
8
9
10
11
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
// 调用Worker类的interruptIfStarted()方法中断线程
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
ThreadPoolExecutor构造方法(核心参数)
ThreadPoolExecutor一共四个构造方法,前三个基于第四个实现,共有7个参数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
/**
* The default rejected execution handler
* 默认的拒绝策略 AbortPolicy 丢弃任务并抛出RejectedExecutionException异常
*/
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
// 5个参数,两个默认参数
// Executors.defaultThreadFactory()默认的线程工厂
// defaultHandler 默认的拒绝策略 AbortPolicy
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
// 6个参数,一个默认参数
// defaultHandler 默认的拒绝策略 AbortPolicy
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
// 6个参数,一个默认参数
// Executors.defaultThreadFactory() 默认的线程工厂
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
// 7个参数,前三个构造方法最终都调用该构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// 参数合法性校验
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
// 参数合法性校验
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
//初始化对应的属性
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
1. corePoolSize: 线程池中的核心线程数。当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行
2. maximumPoolSize: 线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize
3. keepAliveTime: 线程空闲时的存活时间。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,keepAliveTime参数也会起作用,直到线程池中的线程数为0
4. unit: keepAliveTime参数的时间单位
5. workQueue: 任务缓存队列,用来存放等待执行的任务。如果当前线程数为corePoolSize,继续提交的任务就会被保存到任务缓存队列中,等待被执行
一般有三个BlockingQueue选择:
-
SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态。因此,如果线程池中始终没有空闲线程(任务提交的平均速度快于被处理的速度),可能出现无限制的线程增长。
-
LinkedBlockingQueue:基于链表结构的阻塞队列,如果不设置初始化容量,其容量为Integer.MAX_VALUE,即为无界队列。因此,如果线程池中线程数达到了corePoolSize,且始终没有空闲线程(任务提交的平均速度快于被处理的速度),任务缓存队列可能出现无限制的增长。
-
ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务
6. threadFactory: 线程工厂,创建新线程时使用的线程工厂
7. handler: 任务拒绝策略,当阻塞队列满了,且线程池中的线程数达到maximumPoolSize,如果继续提交任务,就会采取任务拒绝策略处理该任务,线程池提供了4种任务拒绝策略(详细内容参考任务拒绝(策略)):
- AbortPolicy
- CallerRunsPolicy
- DiscardPolicy
- DiscardOldestPolicy