Java并发编程的艺术-学习笔记-9

Java中的线程池

线程池的实现原理

线程池的工作流程

  1. 线程池是否都在执行任务?如果不是,则创建一个新的工作线程来执行任务。否则,进入下个流程。
  2. 线程池判断工作队列是否已经满?没有满,则将新提交的任务储存在这个工作队列里。
  3. 再次判断线程池是否都处理工作状态,如果没有,创建一个新的工作线程来执行任务。否则,交给饱和策略来执行处理这个任务。

点击加载

ThreadPoolExecutor执行的execute()方法

  1. 如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(执行这个步骤需要获取全局锁)。
  2. 如果运行的线程等于或多余corePoolSize,则将任务加入BlockingQueue。
  3. 如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来完成任务(执行这个步骤需要获取全局锁)。
  4. 如果创建的线程会使得数量大于maximumPoolSize,任务将会被拒绝。

点击加载

ThreadPoolExecutor采取上述的步骤的总体设计思路,是为了在执行execute()方法,尽量避免获取全局锁(严重的可伸缩瓶颈)。在ThreadPoolExecutor完成预热之后(当前运行的线程数大于等于corePoolSize),几乎所有的execute()方法调用都是执行步骤2(加入阻塞队列),步骤2不需要获取全局锁。

部分源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void execute(Runnable command){
if(command == null){
throw new NullPointerException();
}
//如果addIfUnderCorePoolSize(command)成功,
//表示 poolSize < corePoolSize, 在corePool线程池未满有线程可供使用
if(poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)){
//如果线程大于等于基本线程数,或者线程创建失败, 则将当前任务刚到工作队列中.
if(runState == RUNNING && workQueue.offer(command)){
ensureQueueTaskHandled(command);
}
}
//如果线程池不处于运行状态或任务无法加入队列, 并且当前线程数量小于最大允许的线程数量
//则创建一个线程执行任务
else if(!addIfUnderMaximumPoolSize(command)){
reject(command);
}
}

工作线程:线程池创建线程时,会将线程封装成工作线程Worker,在其执行完任务后,还会循环获取工作队列里的任务来执行。
Work类中的run()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void run(){
try{
Runnable task = firstTask;
firstTask = null;
//task 如果不是是从线程池中取出的队列
// 那么就从工作队列(阻塞队列)中取出任务
while(task != null || (task = getTask() != null)){
runTask(task);
task = null;
}
}finally {
workerDone(this);
}
}

线程池的使用

线程池的创建

1
2
3
4
5
6
7
new ThreadPoolExecutor(corePoolSize,
runnableTaskQueue,
maximumPoolSize,
keepAliveTime,
TimeUnit,
ThreadFactory,
handler)

corePoolSize:线程池的基本大小。当提交一个任务到线程池,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小就不再创建。如果调用了prestartAllCoreThreads()方法,线程池会提前创建并启动所有基本线程。

runnableTaskQueuer:任务(工作)队列。用于保存等待执行的任务的阻塞队列。可以选择以下几个队列。

1.ArrayBlockingQueue
2.LinkedBlockingQueue,吞吐量高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列
3 SynchronousQueue,吞吐量一般高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列
4 PriorityBlockingQueue

maximumPoolSize:线程池最大数量。如果队列满了,并且已经创建的线程数小于最大线程数,则线程池再创建新的线程执行任务。
如果使用了无界阻塞队列,这个参数其实没啥意义了。

keepAliveTime:线程活动保持时间。线程池的工作线程空闲后,保存存活的时间。所以,如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率。

TimeUnit:线程活动保持时间的单位。天(DAYS)、小时(HOURS)、分钟(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS)和纳秒(NANOSECONDS)。

ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更具有意义的名字。