背景
大多数并发应用程序都是围绕“任务执行(Task Execution
)”来构造的:任务通常是些抽象且离散的工作单元。任务执行时应该选择清晰的任务边界以及明确的任务执行策略。执行策略有如下几种:
- 串行执行
在多任务中执行性能太差,只能一个个顺序处理,无法提高吞吐率或快速响应性 - 显示创建线程
new Thread(task).start()
:显示创建。但是大量创建会有如下问题:线程生命周期开销非常高;资源消耗:尤其是内存,当可运行线程数量多于处理器数量时,会出现大量闲置进程,会占用很多内存,线程竞争CPU
资源时也会产生性能消耗;稳定性; - 使用并发框架
如:Executor
框架
Executor
框架简介
该框架基于生产者-消费者模式,支持多种不同类型的任务执行策略,提供了标准方法将任务的提交和执行解耦,并用 Runnable
来表示任务。提供了对生命周期的支持,以及统计信息收集,程序管理和性能监视等。
接口定义
1 | public interface Executor { |
执行策略
在执行策略中定义了任务执行的“What, Where, When, How
”等方面:
- 在什么(
What
)线程中执行任务? - 任务按照什么(
What
)顺序执行(FIFO, LIFO
,优先级)? - 有多少个(
How Many
)任务能并发执行? - 在队列中有多少个(
How Many
)任务在等待执行? - 如果系统由于过载而需要拒绝一个任务,那么应该选择哪一个(
Which
)任务?另外,如何(How
)通知应用程序有任务被拒绝?
执行策略是资源管理工具,最佳策略取决于可用的计算资源以及服务质量的需求。通过限制并发任务的数量,可用确保应用程序不会由于资源耗尽而失败,或者由于资源竞争影响性能。
Executor
框架类图结构
Executor
接口定义了一个接收Runnable
对象的方法execute
ExecutorService
比Executor
使用更广泛的子类接口,其提供了生命周期管理的方法,以及可跟踪一个或多个异步任务执行状况返回Future
的方法AbstractExecutorService
抽象类,ExecutorService
执行方法的默认实现ScheduledExecutorService
接口,可定时调度任务ThreadPoolExecutor
线程池的核心实现类,用来执行被提交的任务ScheduledThreadPoolExecutor
继承ThreadPoolExecutor
并实现ScheduledExecutorService
,一个可定时调度任务的线程池Executors
提供了一系列静态工厂方法和共用方法,创建Executor
框架的相关类:ExecutorService, ScheduledExecuorSevice, ThreadFactor, Callable...
初识线程池
线程池是指管理一组同构工作线程的资源池,其中工作队列(Work Queue
)保存了所有等待执行的任务,工作者线程(Worker Thread
)的任务很简单:从工作队列中获取一个任务,执行任务,然后返回线程池并等待下一个任务。通过重用线程池中现有线程而不是创建新线程,可以在处理多个请求时分摊线程创建和销毁的巨大开销;另外任务请求到达时,工作线程已经存在,提高了响应性;调整线程池大小,可以防止过多线程相互竞争资源导致耗尽内存
基本接口
Runnable
接口定义
1 | public interface Runnable { |
可以看到 Runnable
是没有返回值的
Callable
接口定义
1 | public interface Callable<V> { |
在多线程并发中,我们需要获取任务的执行结果,显然 Runnable
无法实现。所以重新定义了 Callable
返回任务结果 V
或者抛出异常。
Future
接口定义
1 | public interface Future<V> { |
Future
表示异步执行的结果。可以对具体的 Runnable
或者 Callable
任务进行取消、查询是否取消/完成、获取结果。通过 get
方法获取执行结果,该方法会阻塞直到任务返回结果。
RunnableFuture
接口定义
1 | public interface RunnableFuture<V> extends Runnable, Future<V> { |
RunnableFuture
可被执行并能异步获取结果。
FutureTask
类
RunnableAdapter
类
1 | static final class RunnableAdapter<T> implements Callable<T> { |
作用:将 Runnable
和 result
封装成 Callable
。
这个适配器很简单,就是简单的实现了 Callable
接口,在 call()
实现中调用Runnable.run()
方法,然后把传入的 result
作为任务的结果返回。
FutureTask
类定义
1 | public class FutureTask<V> implements RunnableFuture<V> { |
- 可以通过构造函数注入
Runnable
或Callable
。如果传入的是Runnable
和result
, 将通过RunnableAdapter
封装成Callable
。 FutureTask
是RunnableFuture
的实现类,既能当做一个Runnable
直接被Thread
执行,也能作为Future
用来得到执行结果(构造函数传入的Callable
的结果)或者抛出异常。通常作为任务提交后的异步返回的结果,也可以用来取消任务等。
任务运行的状态及转换
FutureTask
在任务执行过程中,记录了如下 7 种状态:
NEW
:新建COMPLETING
任务已经执行完成,但是任务的结果还在保存中NORMAL
任务执行完后的结果正常EXCEPTIONAL
任务执行完后的结果异常CANCELLED
:取消
任务还没开始执行或者已经开始执行但是还没有执行完成的时候被取消INTERRUPTING
:中断中INTERRUPTED
:已经被中断
可能出现的状态转换:
1 | NEW -> COMPLETING -> NORMAL |
任务在构造函数中初始化的状态都是 NEW
,所有流程都是以此为起点。
任务执行
任务执行是执行者调用的,不用主动去执行
1 | public void run() { |
可以从源码中得到这几个信息:
- 任务运行前,状态一定是
NEW
- 调用传入的
Callable.call
执行任务 - 正确执行,保存执行结果
Callable
的result
- 抛出异常时,保存异常
Throwable
ExecutorService
生命周期
ExecutorService
的生命周期有三种状态:运行,关闭和终止。
生命周期管理
ExecutorService
提供了管理 Executor
生命周期的方法:
1 | void shutdown(); |
ExecutorService
在初始创建时处于运行状态shutdown
方法将会执行平缓的关闭过程:不再接受新的任务,同时等待提交的任务执行完成(包含那些还未开始执行的任务)shutdownNow
方法将执行粗暴的关闭过程:它将尝试取消所有运行中的任务,而且不再启动队列中尚未开始执行的任务。
提交任务
1 | <T> Future<T> submit(Callable<T> task); |
不管是 Runnable
还是 Callable
都可以作为任务提交,在执行 submit
时都会将任务封装成 Future
的实现类实例,调用 Executor.execute
来执行,并同时作为结果返回。
线程池
类定义及构造方法
1 | public class ThreadPoolExecutor extends AbstractExecutorService { |
参数说明:
corePoolSize
核心池线程数大小maximumPoolSize
线程池允许创建线程数最大值keepAliveTime
空闲线程存活时间:如果工作线程数多于corePoolSize
,则这些多的线程的空闲时间超过keepAliveTime
时将被终止unit
keepAliveTime
参数的时间单位workQueue
缓存任务的阻塞队列threadFactory
使用ThreadFactory
创建新线程,默认使用defaultThreadFactory
创建线程handle
定义处理被拒绝任务的策略,默认使用ThreadPoolExecutor.AbortPolicy
,任务被拒绝时将抛出RejectExecutorException
几个重要成员变量
1 | // 核心线程池大小 |
线程池的优势
合理利用线程池能够带来三个好处:
- 降低资源消耗
通过重复利用已创建的线程降低线程创建和销毁造成的消耗。 - 提高响应速度
当任务到达时,任务可以不需要等到线程创建就能立即执行。 - 提高线程的可管理性
线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
线程池的大小
一般需要根据任务的类型来配置线程池大小:
CPU
密集型任务
就需要尽量压榨CPU
,参考值可以设为NCPU + 1
IO
密集型任务
参考值可以设置为2*NCPU
可以先将线程池大小设置为参考值,再观察任务运行情况和系统负载、资源利用率来进行适当调整。
线程池计数及状态
定义
1 | private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); |
计数及状态解析
使用 AtomicInteger ctl
一个值,来同时表示当前线程数和线程池的状态。其中:
runState
: 高 3 位,表示线程池状态workerCount
:剩下的 29 位,表示当前线程池的工作线程数COUNT_BITS
:表示线程数计数的位数为 29CAPACITY
:表示最大的工作线程数计数容量ctlOf
:表示将状态和计数组合成一个值runStateOf
:从ctl
中解析状态workerCountOf
:从ctl
中解析线程计数
计数增减
使用 CAS: Compare And Set
算法(无锁算法)实现共享变量 ctl
的自增自减(复合)操作,尽量确保多线程安全:
1 | private boolean compareAndIncrementWorkerCount(int expect) { |
五种状态
RUNNING
接受新任务并处理队列中的任务SHUTDOWN
不接受新任务但仍然处理队列中的任务STOP
不接受新任务,也不处理队列中的任务,并且设置正在进行中任务的中断标志位TIDYING
所有任务都结束了,wokrCount
设置为 0,并调用钩子方法terminated()
TERMINATED
钩子方法terminated()
执行完毕
状态转移及其含义
RUNNING -> SHUTDOWN
调用shutdown()
,可能出现在finalize
中调用(RUNNING or SHUTDOWN) -> STOP
调用shutdownNow()
STOP -> TIDYING
当线程池为空时TIDYING -> TERMINATED
当钩子方法terminated()
执行完毕时
线程池中线程创建及任务执行
Worker
工作线程类
类定义
1 | private final class Worker |
AQS: AbstractQueuedSynchronizer
: 用来做锁控制的,分为:独享功能和共享功能Worker
实现了AQS
独占锁功能,主要维护任务执行线程的中断控制状态Worker
实现了Runnable
并封装了任务(Runnable
)和线程- 线程池的所有线程都是在
Worker
的构造方法中通过线程工厂新建的 Worker.run
直接调用了线程池的runWorker(this)
,可以参考后面的分析
任务提交及线程创建策略
1 | public void execute(Runnable command) { |
任务执行 execute
源码很短,流程也很清晰:异步执行一个指定的任务,可以新开一个线程或者线程池中已有的线程来执行。如果线程池已经处于关闭状态(不是RUNNING
状态,只有该状态才会接受新任务)或者已经达到最大容量(maximumPoolSize
),将会拒绝该任务。源码分析:
- 如果任务为空,直接抛出空指针异常
workerCount/wc = workerCountOf(ctl.get())
:线程池中的当前线程数- 如果
workerCount < corePoolSize
,每次都会新建一个Worker
线程去执行这个任务 - 如果
workerCount >= corePoolSize
,则每来一个任务会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行 - 如果队列添加失败(一般来说是任务缓存队列已满),直接尝试新建线程(前提条件:
workerCount < maximumPoolSize
)执行该任务,如果新建失败拒绝服务
线程创建
1 | private boolean addWorker(Runnable firstTask, boolean core) { |
源码中可以看到,在新建的 Worker
工作线程开始执行前,会反复确认线程池状态(RUNNING
),并且加锁二次判断后,添加到 HashSet
中保存所有的执行线程,添加成功则开始执行任务,同时记录线程池曾经出现的最大线程数 largestPoolSize
。
任务运行
1 | final void runWorker(Worker w) { |
源码可以看出:
Worker
线程先执行当前任务,如果执行完毕会复用该线程,从阻塞队列中获取排队的任务继续执行- 任务执行前会将当前
Worker
加锁,避免在任务执行过程中被线程池中断 - 任务执行前后有钩子方法可以调用
beforeExecute/afterExecute
从缓冲队列中获取任务
1 | private Runnable getTask() { |
源码分析:
- 取任务前先对线程池状态和当前线程数做判断,是否符合要求
- 如果允许空闲线程存活
allowCoreThreadTimeOut || wc > corePoolSize
,则缓存队列最多等待keepAliveTime
时间任务入队;否则队列中阻塞等待取出任务
阻塞队列
在线程池中的作用
队列大小和最大池大小可能需要相互折衷:
- 大型队列 + 小型池
可以最大限度地降低CPU
使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是I/O
边界),则系统可能为超过您许可的更多线程安排时间。 - 小型队列 + 大型池
CPU
使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。
常见队列及特性
在前面我们多次提到了任务缓存队列,即 workQueue
,它用来存放等待执行的任务。workQueue
的类型为 BlockingQueue<Runnable>
,通常可以取下面三种类型:
ArrayBlockingQueue
:基于数组的先进先出队列,此队列创建时必须指定大小LinkedBlockingQueue
:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE
,是一个无界队列synchronousQueue
:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务
拒绝策略
在线程池中的作用
四个策略都在线程池中都是内部类。
常见策略及特性
1 | private static final RejectedExecutionHandler defaultHandler = |
当线程池的任务缓存队列已满并且线程池中的线程数目达到 maximumPoolSize
,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:
ThreadPoolExecutor.AbortPolicy
丢弃任务并抛出RejectedExecutionException
异常。ThreadPoolExecutor.DiscardPolicy
不能执行的任务被丢弃,但是不抛出异常。ThreadPoolExecutor.DiscardOldestPolicy
丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)。ThreadPoolExecutor.CallerRunsPolicy
调用线程处理该任务,线程调用运行该任务的execute
本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
使用方式
原理
通信机制
Executors
创建常见线程池
- 固定长度线程池
1
2
3
4
5
6
7
8
9
10
11public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
创建一个固定长度的线程池,每提交一个任务就创建一个线程,直到达到线程池的最大数量,这时线程池的规模将不再变化(除非某个线程异常结束后会补充一个新线程)。特点:可以重用固定数量线程的线程池,处理一个共享的无边界队列 。任何时间点,最多有 nThreads
个线程会处于活动状态执行任务。如果当所有线程都是活动时,有多的任务被提交过来,那么它会一致在队列中等待直到有线程可用。如果任何线程在执行过程中因为错误而中止,新的线程会替代它的位置来执行后续的任务。所有线程都会一致存于线程池中,直到显式的执行 ExecutorService.shutdown()
关闭。LinkedBlockingQueue
作为线程池的工作队列,是一个无界队列,当线程池的线程数达到 corePoolSize
后,新任务将在无界队列中等待,因此线程池的线程数量不会超过 corePoolSize
,同时 maxiumPoolSize
也就变成了一个无效的参数,并且运行中的线程池并不会拒绝任务。
- 可缓存无限容量线程池
1
2
3
4
5
6
7
8
9
10
11public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
corePoolSize = 0; maxiumPoolSize = Integer.MAX_VALUE
,即线程池的规模不做限制是无界的。在线程可用时,重用之前构造好的池中线程。这个线程池在执行大量短生命周期的异步任务时(many short-lived asynchronous task
),可以显著提高程序性能。调用 execute
时,可以重用之前已构造的可用线程,如果不存在可用线程,那么会重新创建一个新的线程并将其加入到线程池中。如果线程超过 60
秒还未被使用,就会被中止并从缓存中移除。因此线程池在长时间空闲后不会消耗任何资源。
- 单线程池
1
2
3
4
5
6
7
8
9
10
11
12
13public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
特殊的固定长度线程池,只有一个线程来执行任务,可以确保任务按照队列中的顺序串行执行。使用单个工作线程来执行一个无边界的队列。(如果单个线程在执行过程中因为某些错误中止,新的线程会替代它执行后续线程)。它可以保证认为是按顺序执行的,任何时候都不会有多于一个的任务处于活动状态。和 newFixedThreadPool(1)
的区别在于,如果线程遇到错误中止,它是无法使用替代线程的。
- 延时线程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
延迟线程池的构造函数中:maxiumPoolSize = Integer.MAX_VALUE
,表明线程池最大容量是无界的。以延迟或定时的方式来执行任务。
示例
先简单总结下 Runnable, Callable, Future, FutureTask
和 ExecutorService.submit
的关系和概念:
Runnable
:作为submit
的参数,不需要返回值Callable
:作为submit
的参数,并定义返回值类型Future
表示异步执行的返回值。作为submit
的返回值,实际是对Callable
返回值的一个再次封装。FutureTask
作为submit
的参数,但是因为实现了Future
所以同时包含了submit
的返回值。查看源码发现上面的Runnable, Callable
都会先被封装成FutureTask
再去执行submit
。
Runnable
并不关心返回结果。
1 | public class TestRunnable { |
Callable + Future
Callable
作为参数传入,Future
作为异步结果返回。通过 Future.get()
来同步等待获取线程执行的结果。
1 | public class TestCallableFuture { |
Callable
转换为 FutureTask
Callable
先转换为 FutureTask
,作为参数传入,等待执行完毕后能够通过 FutureTask.get()
同步等待获取结果。
1 | public class TestCallableFutureTask { |