Java并发编程的艺术-学习笔记-10

Executor 框架

Java的线程是程序的执行机制,也是工作单位。
从JDK 5开始,把工作单元和执行机制分离开。执行机制由Executor提供,而工作单元包括Runnable和Callable。

Executor 框架简介

Executor 框架的两级调度模型

在HotSpot VM的线程模型中,Java线程被一对一映射为本地操作系统线程。Java线程启动时会创建一个本地操作系统线程;当该Java线程被终止时,这个操作系统线程也会被回收。操作系统会调度所有线程并将它们分配给可用的CPU。

点击加载

Executor 框架的结构与成员

任务:包括被执行任务需要实现的接口:Runnable接口或Callable接口。
任务的执行:包括任务的执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口。框架主要有两个关键类实现了ExecutorService接口(ThreadPoolExecutor和ScheduledThreadPoolExecutor)。
异步计算的结果:包括接口Futrue和实现接口的FutureTask类。

接口和类的简介

名称 描述
Executor 接口,将任务的提交和任务的执行分离开
ThreadPoolExecutor 线程池核心实现类,用来执行被提交的任务
ScheduledThreadPoolExecutor 实现类,可以在给定的延迟后运行命令,或者定期执行命令
FutureTask 实现Future接口的类,代表异步计算的结果
Runnable和Callable 实现了这两个接口的类,都可以被线程池执行

ThreadPoolExecutor 详解

Executor框架最核心的类ThreadPoolExecutor,主要由4个组件构成

名称 描述
corePool 核心线程池的大小
maximumPool 最大线程池的大小
BlockingQueue 用来暂时保存任务的工作队列
rejectedExecutionHandler 当Excutor框架已经关闭,或者饱和(达到最大线程池大小且工作队列已满),execute()方法将要调用的Handler

通过Executor框架的工具类Executors可以创建3种不同类型的ThreadPoolExecutor。

FixedThreadPool

可重用固定线程数的线程池。

1
2
3
4
5
6
7
8
9
public static ExecutorService newFixedThreadPool(int nThreads){
return new ThreadPoolExecutor(int corePoolSize,
int nThreads, //maximumPoolSize
0L, //keepAliveTime
TimeUnit unit,
BlockingQueue<Runnable> workQueue);
}

FixedThreadPool的corePoolSize和maximumPoolSize都被设置为创建nThread。
当线程池的线程数大于corePoolSize时,keepAliveTime为多余的空闲线程等待的最长时间,超过这个时间后多余的线程将被终止。
这里把keepAliveTime设置为0L,意味着多余的空余线程会被立刻终止。

FixedThreadPool使用无界队列LinkedBlockingQueue作为线程池的工作队列(队列最大容量为Integer.MAX_VALUE)。使用无界队列作为工作队列会对线程池带来如下影响。

  1. 当线程池线程数等于corePoolSize,新任务将会在无界队列中等待,因此线程中的线程不会超过corePoolSize。
  2. 由于1,使用无界队列maximumPoolSize将是一个无效的参数
  3. 由于1和2,使用无界队列keepAliveTime也是个无效的参数
  4. 不会由于饱和策略而处理handler

SingleThreadExecutor 详解

SingleThreadExecutor是使用单个worker线程的Executor。

1
2
3
4
5
6
7
8
public static ExecutorService newSingleThreadExecutor(){
return new ThreadPoolExecutor(1,
1,
0L,
TimeUnit unit,
new LinkedBlockingQueue<Runnable>());
}

点击加载

工作流程

  1. 如果当前线程池无运行程序,创建一个新线程执行任务
  2. 在线程池预热之后,将任务加入LinkedBlockingQueue
  3. 线程执行完1中的任务后,会在一个无限循环中反复从LinkedBlockingQueue获取任务来执行

CachedThreadPool 详解

一个会根据需要创建新线程的线程池。

1
2
3
4
5
6
7
8
public static ExecutorService newCachedThreadPool(){
return new ThreadPoolExecutor(0,
Integer.MAX_VALUE,
keepAliveTime
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

无核心线程池,空闲线程超过keepAliveTime之后会被终止。
CachedThreadPool使用没有容量的SynchronousQueue作为线程池的工作队列,但是CachedThreadPool的maximumPool是无边界的(Integer.MAX_VALUE)。这意味着,如果主线程提交任务的速度高于maximumPoolSize中线程处理任务的速度,CachedThreadPool会不断创建新线程。可能会耗尽CPU。

点击加载

工作流程

  1. 首先执行offer操作,往队列中塞任务, 由于一个offer操作阻塞等待一个poll操作,如果maximumPool有空闲线程正在执行SynchronousQueue.poll(keepAliveTime, TimeUnit unit),那么配对成功,线程执行任务。否则执行下面的步骤2
  2. 当初始maximumPool为空,或者maximumPool中没有空闲的线程,将没有线程执行poll()操作(空闲线程会一直调用超时返回的pool),也就是步骤1失败的情况,此时CachedThreadPool会创建一个新线程执行任务,execute()方法执行成功。
  3. 在步骤2中新创建的线程在任务执行完毕之后,会执行poll(keepAliveTIme, TimeUnit)。这个poll操作会让空闲线程最多等待keepAliveTime,如果时间内主线程提交了一个新任务(offer()操作),那么这个空闲的线程会执行任务;否则,空闲线程被终止。如此一来,长时间保持空闲的CachedThreadPool不会使用任何资源。

ScheduledThreadPoolExecutor 详解

ScheduledThreadPoolExecutor继承自ThreadPoolExecutor。它主要用来在给定的延迟之后运行任务,或者定期执行任务。ScheduledThreadPoolExecutor可以在构造函数中指定多个对应的后台线程数。

运行机制

工作流程如图

点击加载

DelayQueue是一个无界队列,所以ScheduledThreadPoolExecutor中的maximumPoolSize的大小没有什么效果。
任务的执行主要分成两大部分:

  1. 当调用scheduleAtFixedRate()方法或者scheduleWithFixedDelay()方法时,会向队列添加一个是实现了RunableScheduledFuture接口的ScheduledFutureTask。
  2. 线程池中的线程从DelayQueue中获取了ScheduledFutureTask,然后执行任务。

实现

ScheduledFutureTask主要包括三个long型成员变量

  1. time,表示这个任务要被执行的具体时间
  2. sequenceNumber,表示这个任务被添加到框架中的序号
  3. period,表示任务执行的间隔周期。

DelayQueue封装了PriorityQueue,这个优先队列会对队列中的任务进行排序。time时间早的任务先被执行,如果相同,再比较sequenceNumber。

take操作和poll操作会阻塞线程,在对立中使用take是为了有task可获取,就把它取出来,然后设置时间,再从新使用add入队,以time排序。

下图是线程1执行某个周期任务的4个步骤

点击加载

工作流程:

  1. 线程1从DelayQueue中获取已经到期的ScheduledFutureTask(DelayQueue.take())。到期任务指的是ScheduledFutureTask的time大于等于当前时间
  2. 线程1执行这个ScheduledFutureTask。
  3. 线程1修改ScheduledFutureTask的time变量为下次将要被执行的时间。
  4. 线程1把这个修改time之后的ScheduledFutureTask放回到DelayQueue中(DelayQueue.add())。

DelayQueue.take()方法的源代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public E take() throws InterruptedException{
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 1
try{
for( ; ; ){
E first = q.peek();
if(first == null){
available.await(); // 2.1
}
else{
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if(delay > 0){
long t1 = available.awaitNanos(delay); // 2.2
}
else{
E x = q.poll(); // 2.3.1
assert(x != null);
if(q.size() != 0){
available.signalAll(); // 2.3.2
}
return x;
}
}
}
}finally {
lock.unlock(); // 3
}
}

获取任务的工作流程
1.获取Lock
2.获取周期任务
如果PriorityQueue为空,当前线程到Condition中等待;否则执行2.2
如果PriorityQueue的头元素的time时间比当前时间大,到Conditon中等待到time时间;否则执行下面的2.3
获取PriorityQueue的头元素(2.3.1);如果PriorityQueue不为空,则唤醒在Condition中等待的所有线程。
3.释放Lock

ScheduledThreadPoolExecutor在一个循环中执行步骤2,知道线程从PriorityQueue获取到最后一个元素之后,才会退出循环(结束步骤2)

下面是DelayQueue.add()的源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public boolean offer(E e){
final ReentrantLock lock = this.lock;
lock.lock(); // 1
try{
E first = q.peek();
q.offer(e); // 2.1
if(first == null || e.compareTo(first) < 0){
avaiable.singalAll(); // 2.2
}
return true;
}finally {
lock.unlock(); // 3
}
}

添加任务的工作流程
1.获取Lock
2.添加任务
向PriorityQueue添加任务
如果在在上面2.1中添加的任务是PrioityQueue的头元素,唤醒在Condition中等待的所有线程。
3.释放所有Lock

FutureTask 详解

FutureTask除了实现Future接口外,还实现了Runnable接口。因此,FutureTask可以交给Executor执行,也可以由调用线程直接运行(FutureTask.run())。
根据FutureTask.run()方法被执行的时机,FutureTask可以处于3种状态:未启动、已启动和已完成。

点击加载

当FutureTask处于未启动或已启动状态时,执行FutureTask.get()方法将导致调用线程阻塞;当FutureTask处于已完成状态时,执行FutureTask.get()方法将导致调用线程立即返回结果或抛出异常。

点击加载

FutureTask的实现

FutureTask的实现基于AbstractQueuedSynchronizer。

每一个基于AQS实现的同步器都会包含两种类型的操作:

  1. 至少一个acquire操作。这个操作阻塞调用线程,除非/直到AQS的状态允许这个线程继续执行。FutureTask的acquire操作为get()/get(timeoout, TimeUnit)方法调用。
  2. 至少一个release操作。这个操作改变AQS的状态,改变后的状态可允许一个或多个阻塞线程被解除阻塞。FutureTask的release操作包括run()方法和cancel(…)方法。

基于”复合优先于继承”的原则,FutureTask声明了一个内部私有的继承于AQS的子类Sync,对FutureTask所有公有方法的调用都会委托给这个内部子类。
Sync实现了AQS的tryActuireShared(int)方法,和tryReleaseShared(int)方法,Sync通过这两个方法来检查和更新同步状态。

Sync是FutureTask的内部私有类,它继承自AQS。创建FutureTask时会创建内部私有的成员对象Sync,FutureTask所有的公有方法都直接委托了内部私有的Sync。

FutureTask.get()

FutureTask.get()方法会调用AQS.acquireSharedInterruptibly(int arg),这个方法的执行过程如下。

  1. 调用AQS.acquireSharedInterruptibly(int arg)方法,这个方法首先会回调在子类Sync中实现的tryAcquireShared()方法来判断acquire操作是否可以成功。acquire操作可以成功的的条件为:state为执行完成状态RAN或者已取消状态CANCELLED,且runneer不为null。
  2. 如果成功则get()方法立即返回。如果失败则到线程等待队列中去等待其他线程执行release操作。
  3. 当其他线程执行release操作(比如FutureTask.run()或者FutureTask.cancel(…))唤醒当前队列后,当前线程再次执行tryAcquireShared()将返回1,当前线程离开线程等待队列并唤醒它的后继线程(这里会产生级联唤醒效果)
  4. 最后返回计算的结果或抛出异常

FutureTask.run()

工作流程:

  1. 执行在构造函数中指定的任务(Callable.call())。
  2. 以原子方式来更新同步状态(调用AQS.compareAndSetState(int expect, int update),设置state为执行完成状态RAN)。如果这个原子操作成功,就设置代表计算结果的变量result的值为Callable.call的返回值,然后调用AQS.releaseShared(int arg)。
  3. AQS.releaseShared(int arg)首先会回调在子类Sync中实现的tryReleaseShared(arg)来执行release操作(设置运行任务的线程runner为null,然后返回true);AQS.releaseShared(int arg),然后唤醒线程等待队列中的第一个线程。
  4. 调用FutureTask.don()

当执行FutureTask.get()方法时,如果FutureTask不是处于执行完成状态RAN或已取消状态CANCELLED,当前执行线程将到AQS的线程等待队列中等待(如下图的线程A、B、C和D)。当某个线程执行FutureTask.run()方法或FutureTask.cancel(…)方法时,会唤醒线程等待队列的第一个线程(如下图线程E唤醒线程A)。

点击加载

假设开始时FutureTask处于未启动状态或者已启动状态,等待队列中已经有三个线程等待(A、B和C),此刻,后续的线程D执行get()方法将导致线程D也到等待队列中等待。
当线程E执行run()方法时,会唤醒队列中的第一个线程A。线程A被唤醒后,首先把自己从队列中删除,然后唤醒它的后继线程B,最后线程A从get()方法返回。线程B、C和D重复线程A的处理流程。最终,在队列中等待的所有线程都被级联唤醒并且从get()方法返回。