Executor 框架
Java的线程是程序的执行机制,也是工作单位。
从JDK 5开始,把工作单元和执行机制分离开。执行机制由Executor提供,而工作单元包括Runnable和Callable。
Executor 框架简介
Executor 框架的两级调度模型
在HotSpot VM的线程模型中,Java线程被一对一映射为本地操作系统线程。Java线程启动时会创建一个本地操作系统线程;当该Java线程被终止时,这个操作系统线程也会被回收。操作系统会调度所有线程并将它们分配给可用的CPU。
Executor 框架的结构与成员
任务:包括被执行任务需要实现的接口:Runnable接口或Callable接口。
任务的执行:包括任务的执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口。框架主要有两个关键类实现了ExecutorService接口(ThreadPoolExecutor和ScheduledThreadPoolExecutor)。
异步计算的结果:包括接口Futrue和实现接口的FutureTask类。
接口和类的简介
名称 | 描述 |
---|---|
Executor | 接口,将任务的提交和任务的执行分离开 |
ThreadPoolExecutor | 线程池核心实现类,用来执行被提交的任务 |
ScheduledThreadPoolExecutor | 实现类,可以在给定的延迟后运行命令,或者定期执行命令 |
FutureTask | 实现Future接口的类,代表异步计算的结果 |
Runnable和Callable | 实现了这两个接口的类,都可以被线程池执行 |
ThreadPoolExecutor 详解
Executor框架最核心的类ThreadPoolExecutor,主要由4个组件构成
名称 | 描述 |
---|---|
corePool | 核心线程池的大小 |
maximumPool | 最大线程池的大小 |
BlockingQueue | 用来暂时保存任务的工作队列 |
rejectedExecutionHandler | 当Excutor框架已经关闭,或者饱和(达到最大线程池大小且工作队列已满),execute()方法将要调用的Handler |
通过Executor框架的工具类Executors可以创建3种不同类型的ThreadPoolExecutor。
FixedThreadPool
可重用固定线程数的线程池。
|
|
FixedThreadPool的corePoolSize和maximumPoolSize都被设置为创建nThread。
当线程池的线程数大于corePoolSize时,keepAliveTime为多余的空闲线程等待的最长时间,超过这个时间后多余的线程将被终止。
这里把keepAliveTime设置为0L,意味着多余的空余线程会被立刻终止。
FixedThreadPool使用无界队列LinkedBlockingQueue作为线程池的工作队列(队列最大容量为Integer.MAX_VALUE)。使用无界队列作为工作队列会对线程池带来如下影响。
- 当线程池线程数等于corePoolSize,新任务将会在无界队列中等待,因此线程中的线程不会超过corePoolSize。
- 由于1,使用无界队列maximumPoolSize将是一个无效的参数
- 由于1和2,使用无界队列keepAliveTime也是个无效的参数
- 不会由于饱和策略而处理handler
SingleThreadExecutor 详解
SingleThreadExecutor是使用单个worker线程的Executor。
|
|
工作流程:
- 如果当前线程池无运行程序,创建一个新线程执行任务
- 在线程池预热之后,将任务加入LinkedBlockingQueue
- 线程执行完1中的任务后,会在一个无限循环中反复从LinkedBlockingQueue获取任务来执行
CachedThreadPool 详解
一个会根据需要创建新线程的线程池。
|
|
无核心线程池,空闲线程超过keepAliveTime之后会被终止。
CachedThreadPool使用没有容量的SynchronousQueue作为线程池的工作队列,但是CachedThreadPool的maximumPool是无边界的(Integer.MAX_VALUE)。这意味着,如果主线程提交任务的速度高于maximumPoolSize中线程处理任务的速度,CachedThreadPool会不断创建新线程。可能会耗尽CPU。
工作流程:
- 首先执行offer操作,往队列中塞任务, 由于一个offer操作阻塞等待一个poll操作,如果maximumPool有空闲线程正在执行SynchronousQueue.poll(keepAliveTime, TimeUnit unit),那么配对成功,线程执行任务。否则执行下面的步骤2
- 当初始maximumPool为空,或者maximumPool中没有空闲的线程,将没有线程执行poll()操作(空闲线程会一直调用超时返回的pool),也就是步骤1失败的情况,此时CachedThreadPool会创建一个新线程执行任务,execute()方法执行成功。
- 在步骤2中新创建的线程在任务执行完毕之后,会执行poll(keepAliveTIme, TimeUnit)。这个poll操作会让空闲线程最多等待keepAliveTime,如果时间内主线程提交了一个新任务(offer()操作),那么这个空闲的线程会执行任务;否则,空闲线程被终止。如此一来,长时间保持空闲的CachedThreadPool不会使用任何资源。
ScheduledThreadPoolExecutor 详解
ScheduledThreadPoolExecutor继承自ThreadPoolExecutor。它主要用来在给定的延迟之后运行任务,或者定期执行任务。ScheduledThreadPoolExecutor可以在构造函数中指定多个对应的后台线程数。
运行机制
工作流程如图
DelayQueue是一个无界队列,所以ScheduledThreadPoolExecutor中的maximumPoolSize的大小没有什么效果。
任务的执行主要分成两大部分:
- 当调用scheduleAtFixedRate()方法或者scheduleWithFixedDelay()方法时,会向队列添加一个是实现了RunableScheduledFuture接口的ScheduledFutureTask。
- 线程池中的线程从DelayQueue中获取了ScheduledFutureTask,然后执行任务。
实现
ScheduledFutureTask主要包括三个long型成员变量
- time,表示这个任务要被执行的具体时间
- sequenceNumber,表示这个任务被添加到框架中的序号
- period,表示任务执行的间隔周期。
DelayQueue封装了PriorityQueue,这个优先队列会对队列中的任务进行排序。time时间早的任务先被执行,如果相同,再比较sequenceNumber。
take操作和poll操作会阻塞线程,在对立中使用take是为了有task可获取,就把它取出来,然后设置时间,再从新使用add入队,以time排序。
下图是线程1执行某个周期任务的4个步骤
工作流程:
- 线程1从DelayQueue中获取已经到期的ScheduledFutureTask(DelayQueue.take())。到期任务指的是ScheduledFutureTask的time大于等于当前时间
- 线程1执行这个ScheduledFutureTask。
- 线程1修改ScheduledFutureTask的time变量为下次将要被执行的时间。
- 线程1把这个修改time之后的ScheduledFutureTask放回到DelayQueue中(DelayQueue.add())。
DelayQueue.take()方法的源代码实现
|
|
获取任务的工作流程
1.获取Lock
2.获取周期任务
如果PriorityQueue为空,当前线程到Condition中等待;否则执行2.2
如果PriorityQueue的头元素的time时间比当前时间大,到Conditon中等待到time时间;否则执行下面的2.3
获取PriorityQueue的头元素(2.3.1);如果PriorityQueue不为空,则唤醒在Condition中等待的所有线程。
3.释放Lock
ScheduledThreadPoolExecutor在一个循环中执行步骤2,知道线程从PriorityQueue获取到最后一个元素之后,才会退出循环(结束步骤2)
下面是DelayQueue.add()的源码分析
|
|
添加任务的工作流程
1.获取Lock
2.添加任务
向PriorityQueue添加任务
如果在在上面2.1中添加的任务是PrioityQueue的头元素,唤醒在Condition中等待的所有线程。
3.释放所有Lock
FutureTask 详解
FutureTask除了实现Future接口外,还实现了Runnable接口。因此,FutureTask可以交给Executor执行,也可以由调用线程直接运行(FutureTask.run())。
根据FutureTask.run()方法被执行的时机,FutureTask可以处于3种状态:未启动、已启动和已完成。
当FutureTask处于未启动或已启动状态时,执行FutureTask.get()方法将导致调用线程阻塞;当FutureTask处于已完成状态时,执行FutureTask.get()方法将导致调用线程立即返回结果或抛出异常。
FutureTask的实现
FutureTask的实现基于AbstractQueuedSynchronizer。
每一个基于AQS实现的同步器都会包含两种类型的操作:
- 至少一个acquire操作。这个操作阻塞调用线程,除非/直到AQS的状态允许这个线程继续执行。FutureTask的acquire操作为get()/get(timeoout, TimeUnit)方法调用。
- 至少一个release操作。这个操作改变AQS的状态,改变后的状态可允许一个或多个阻塞线程被解除阻塞。FutureTask的release操作包括run()方法和cancel(…)方法。
基于”复合优先于继承”的原则,FutureTask声明了一个内部私有的继承于AQS的子类Sync,对FutureTask所有公有方法的调用都会委托给这个内部子类。
Sync实现了AQS的tryActuireShared(int)方法,和tryReleaseShared(int)方法,Sync通过这两个方法来检查和更新同步状态。
Sync是FutureTask的内部私有类,它继承自AQS。创建FutureTask时会创建内部私有的成员对象Sync,FutureTask所有的公有方法都直接委托了内部私有的Sync。
FutureTask.get()
FutureTask.get()方法会调用AQS.acquireSharedInterruptibly(int arg),这个方法的执行过程如下。
- 调用AQS.acquireSharedInterruptibly(int arg)方法,这个方法首先会回调在子类Sync中实现的tryAcquireShared()方法来判断acquire操作是否可以成功。acquire操作可以成功的的条件为:state为执行完成状态RAN或者已取消状态CANCELLED,且runneer不为null。
- 如果成功则get()方法立即返回。如果失败则到线程等待队列中去等待其他线程执行release操作。
- 当其他线程执行release操作(比如FutureTask.run()或者FutureTask.cancel(…))唤醒当前队列后,当前线程再次执行tryAcquireShared()将返回1,当前线程离开线程等待队列并唤醒它的后继线程(这里会产生级联唤醒效果)
- 最后返回计算的结果或抛出异常
FutureTask.run()
工作流程:
- 执行在构造函数中指定的任务(Callable.call())。
- 以原子方式来更新同步状态(调用AQS.compareAndSetState(int expect, int update),设置state为执行完成状态RAN)。如果这个原子操作成功,就设置代表计算结果的变量result的值为Callable.call的返回值,然后调用AQS.releaseShared(int arg)。
- AQS.releaseShared(int arg)首先会回调在子类Sync中实现的tryReleaseShared(arg)来执行release操作(设置运行任务的线程runner为null,然后返回true);AQS.releaseShared(int arg),然后唤醒线程等待队列中的第一个线程。
- 调用FutureTask.don()
当执行FutureTask.get()方法时,如果FutureTask不是处于执行完成状态RAN或已取消状态CANCELLED,当前执行线程将到AQS的线程等待队列中等待(如下图的线程A、B、C和D)。当某个线程执行FutureTask.run()方法或FutureTask.cancel(…)方法时,会唤醒线程等待队列的第一个线程(如下图线程E唤醒线程A)。
假设开始时FutureTask处于未启动状态或者已启动状态,等待队列中已经有三个线程等待(A、B和C),此刻,后续的线程D执行get()方法将导致线程D也到等待队列中等待。
当线程E执行run()方法时,会唤醒队列中的第一个线程A。线程A被唤醒后,首先把自己从队列中删除,然后唤醒它的后继线程B,最后线程A从get()方法返回。线程B、C和D重复线程A的处理流程。最终,在队列中等待的所有线程都被级联唤醒并且从get()方法返回。