10000小时


  • 首页

  • 关于

  • 标签

  • 分类

  • 归档

Java并发编程的艺术-学习笔记-10

发表于 2017-12-22 | 分类于 并发

Executor 框架

Java的线程是程序的执行机制,也是工作单位。
从JDK 5开始,把工作单元和执行机制分离开。执行机制由Executor提供,而工作单元包括Runnable和Callable。

Executor 框架简介

Executor 框架的两级调度模型

在HotSpot VM的线程模型中,Java线程被一对一映射为本地操作系统线程。Java线程启动时会创建一个本地操作系统线程;当该Java线程被终止时,这个操作系统线程也会被回收。操作系统会调度所有线程并将它们分配给可用的CPU。

点击加载

Executor 框架的结构与成员

任务:包括被执行任务需要实现的接口:Runnable接口或Callable接口。
任务的执行:包括任务的执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口。框架主要有两个关键类实现了ExecutorService接口(ThreadPoolExecutor和ScheduledThreadPoolExecutor)。
异步计算的结果:包括接口Futrue和实现接口的FutureTask类。

接口和类的简介

名称 描述
Executor 接口,将任务的提交和任务的执行分离开
ThreadPoolExecutor 线程池核心实现类,用来执行被提交的任务
ScheduledThreadPoolExecutor 实现类,可以在给定的延迟后运行命令,或者定期执行命令
FutureTask 实现Future接口的类,代表异步计算的结果
Runnable和Callable 实现了这两个接口的类,都可以被线程池执行

ThreadPoolExecutor 详解

Executor框架最核心的类ThreadPoolExecutor,主要由4个组件构成

名称 描述
corePool 核心线程池的大小
maximumPool 最大线程池的大小
BlockingQueue 用来暂时保存任务的工作队列
rejectedExecutionHandler 当Excutor框架已经关闭,或者饱和(达到最大线程池大小且工作队列已满),execute()方法将要调用的Handler

通过Executor框架的工具类Executors可以创建3种不同类型的ThreadPoolExecutor。

FixedThreadPool

可重用固定线程数的线程池。

1
2
3
4
5
6
7
8
9
public static ExecutorService newFixedThreadPool(int nThreads){
return new ThreadPoolExecutor(int corePoolSize,
int nThreads, //maximumPoolSize
0L, //keepAliveTime
TimeUnit unit,
BlockingQueue<Runnable> workQueue);
}

FixedThreadPool的corePoolSize和maximumPoolSize都被设置为创建nThread。
当线程池的线程数大于corePoolSize时,keepAliveTime为多余的空闲线程等待的最长时间,超过这个时间后多余的线程将被终止。
这里把keepAliveTime设置为0L,意味着多余的空余线程会被立刻终止。

FixedThreadPool使用无界队列LinkedBlockingQueue作为线程池的工作队列(队列最大容量为Integer.MAX_VALUE)。使用无界队列作为工作队列会对线程池带来如下影响。

  1. 当线程池线程数等于corePoolSize,新任务将会在无界队列中等待,因此线程中的线程不会超过corePoolSize。
  2. 由于1,使用无界队列maximumPoolSize将是一个无效的参数
  3. 由于1和2,使用无界队列keepAliveTime也是个无效的参数
  4. 不会由于饱和策略而处理handler

SingleThreadExecutor 详解

SingleThreadExecutor是使用单个worker线程的Executor。

1
2
3
4
5
6
7
8
public static ExecutorService newSingleThreadExecutor(){
return new ThreadPoolExecutor(1,
1,
0L,
TimeUnit unit,
new LinkedBlockingQueue<Runnable>());
}

点击加载

工作流程:

  1. 如果当前线程池无运行程序,创建一个新线程执行任务
  2. 在线程池预热之后,将任务加入LinkedBlockingQueue
  3. 线程执行完1中的任务后,会在一个无限循环中反复从LinkedBlockingQueue获取任务来执行

CachedThreadPool 详解

一个会根据需要创建新线程的线程池。

1
2
3
4
5
6
7
8
public static ExecutorService newCachedThreadPool(){
return new ThreadPoolExecutor(0,
Integer.MAX_VALUE,
keepAliveTime
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

无核心线程池,空闲线程超过keepAliveTime之后会被终止。
CachedThreadPool使用没有容量的SynchronousQueue作为线程池的工作队列,但是CachedThreadPool的maximumPool是无边界的(Integer.MAX_VALUE)。这意味着,如果主线程提交任务的速度高于maximumPoolSize中线程处理任务的速度,CachedThreadPool会不断创建新线程。可能会耗尽CPU。

点击加载

工作流程:

  1. 首先执行offer操作,往队列中塞任务, 由于一个offer操作阻塞等待一个poll操作,如果maximumPool有空闲线程正在执行SynchronousQueue.poll(keepAliveTime, TimeUnit unit),那么配对成功,线程执行任务。否则执行下面的步骤2
  2. 当初始maximumPool为空,或者maximumPool中没有空闲的线程,将没有线程执行poll()操作(空闲线程会一直调用超时返回的pool),也就是步骤1失败的情况,此时CachedThreadPool会创建一个新线程执行任务,execute()方法执行成功。
  3. 在步骤2中新创建的线程在任务执行完毕之后,会执行poll(keepAliveTIme, TimeUnit)。这个poll操作会让空闲线程最多等待keepAliveTime,如果时间内主线程提交了一个新任务(offer()操作),那么这个空闲的线程会执行任务;否则,空闲线程被终止。如此一来,长时间保持空闲的CachedThreadPool不会使用任何资源。

ScheduledThreadPoolExecutor 详解

ScheduledThreadPoolExecutor继承自ThreadPoolExecutor。它主要用来在给定的延迟之后运行任务,或者定期执行任务。ScheduledThreadPoolExecutor可以在构造函数中指定多个对应的后台线程数。

运行机制

工作流程如图

点击加载

DelayQueue是一个无界队列,所以ScheduledThreadPoolExecutor中的maximumPoolSize的大小没有什么效果。
任务的执行主要分成两大部分:

  1. 当调用scheduleAtFixedRate()方法或者scheduleWithFixedDelay()方法时,会向队列添加一个是实现了RunableScheduledFuture接口的ScheduledFutureTask。
  2. 线程池中的线程从DelayQueue中获取了ScheduledFutureTask,然后执行任务。

实现

ScheduledFutureTask主要包括三个long型成员变量

  1. time,表示这个任务要被执行的具体时间
  2. sequenceNumber,表示这个任务被添加到框架中的序号
  3. period,表示任务执行的间隔周期。

DelayQueue封装了PriorityQueue,这个优先队列会对队列中的任务进行排序。time时间早的任务先被执行,如果相同,再比较sequenceNumber。

take操作和poll操作会阻塞线程,在对立中使用take是为了有task可获取,就把它取出来,然后设置时间,再从新使用add入队,以time排序。

下图是线程1执行某个周期任务的4个步骤

点击加载

工作流程:

  1. 线程1从DelayQueue中获取已经到期的ScheduledFutureTask(DelayQueue.take())。到期任务指的是ScheduledFutureTask的time大于等于当前时间
  2. 线程1执行这个ScheduledFutureTask。
  3. 线程1修改ScheduledFutureTask的time变量为下次将要被执行的时间。
  4. 线程1把这个修改time之后的ScheduledFutureTask放回到DelayQueue中(DelayQueue.add())。

DelayQueue.take()方法的源代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public E take() throws InterruptedException{
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 1
try{
for( ; ; ){
E first = q.peek();
if(first == null){
available.await(); // 2.1
}
else{
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if(delay > 0){
long t1 = available.awaitNanos(delay); // 2.2
}
else{
E x = q.poll(); // 2.3.1
assert(x != null);
if(q.size() != 0){
available.signalAll(); // 2.3.2
}
return x;
}
}
}
}finally {
lock.unlock(); // 3
}
}

获取任务的工作流程
1.获取Lock
2.获取周期任务
如果PriorityQueue为空,当前线程到Condition中等待;否则执行2.2
如果PriorityQueue的头元素的time时间比当前时间大,到Conditon中等待到time时间;否则执行下面的2.3
获取PriorityQueue的头元素(2.3.1);如果PriorityQueue不为空,则唤醒在Condition中等待的所有线程。
3.释放Lock

ScheduledThreadPoolExecutor在一个循环中执行步骤2,知道线程从PriorityQueue获取到最后一个元素之后,才会退出循环(结束步骤2)

下面是DelayQueue.add()的源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public boolean offer(E e){
final ReentrantLock lock = this.lock;
lock.lock(); // 1
try{
E first = q.peek();
q.offer(e); // 2.1
if(first == null || e.compareTo(first) < 0){
avaiable.singalAll(); // 2.2
}
return true;
}finally {
lock.unlock(); // 3
}
}

添加任务的工作流程
1.获取Lock
2.添加任务
向PriorityQueue添加任务
如果在在上面2.1中添加的任务是PrioityQueue的头元素,唤醒在Condition中等待的所有线程。
3.释放所有Lock

FutureTask 详解

FutureTask除了实现Future接口外,还实现了Runnable接口。因此,FutureTask可以交给Executor执行,也可以由调用线程直接运行(FutureTask.run())。
根据FutureTask.run()方法被执行的时机,FutureTask可以处于3种状态:未启动、已启动和已完成。

点击加载

当FutureTask处于未启动或已启动状态时,执行FutureTask.get()方法将导致调用线程阻塞;当FutureTask处于已完成状态时,执行FutureTask.get()方法将导致调用线程立即返回结果或抛出异常。

点击加载

FutureTask的实现

FutureTask的实现基于AbstractQueuedSynchronizer。

每一个基于AQS实现的同步器都会包含两种类型的操作:

  1. 至少一个acquire操作。这个操作阻塞调用线程,除非/直到AQS的状态允许这个线程继续执行。FutureTask的acquire操作为get()/get(timeoout, TimeUnit)方法调用。
  2. 至少一个release操作。这个操作改变AQS的状态,改变后的状态可允许一个或多个阻塞线程被解除阻塞。FutureTask的release操作包括run()方法和cancel(…)方法。

基于”复合优先于继承”的原则,FutureTask声明了一个内部私有的继承于AQS的子类Sync,对FutureTask所有公有方法的调用都会委托给这个内部子类。
Sync实现了AQS的tryActuireShared(int)方法,和tryReleaseShared(int)方法,Sync通过这两个方法来检查和更新同步状态。

Sync是FutureTask的内部私有类,它继承自AQS。创建FutureTask时会创建内部私有的成员对象Sync,FutureTask所有的公有方法都直接委托了内部私有的Sync。

FutureTask.get()

FutureTask.get()方法会调用AQS.acquireSharedInterruptibly(int arg),这个方法的执行过程如下。

  1. 调用AQS.acquireSharedInterruptibly(int arg)方法,这个方法首先会回调在子类Sync中实现的tryAcquireShared()方法来判断acquire操作是否可以成功。acquire操作可以成功的的条件为:state为执行完成状态RAN或者已取消状态CANCELLED,且runneer不为null。
  2. 如果成功则get()方法立即返回。如果失败则到线程等待队列中去等待其他线程执行release操作。
  3. 当其他线程执行release操作(比如FutureTask.run()或者FutureTask.cancel(…))唤醒当前队列后,当前线程再次执行tryAcquireShared()将返回1,当前线程离开线程等待队列并唤醒它的后继线程(这里会产生级联唤醒效果)
  4. 最后返回计算的结果或抛出异常

FutureTask.run()

工作流程:

  1. 执行在构造函数中指定的任务(Callable.call())。
  2. 以原子方式来更新同步状态(调用AQS.compareAndSetState(int expect, int update),设置state为执行完成状态RAN)。如果这个原子操作成功,就设置代表计算结果的变量result的值为Callable.call的返回值,然后调用AQS.releaseShared(int arg)。
  3. AQS.releaseShared(int arg)首先会回调在子类Sync中实现的tryReleaseShared(arg)来执行release操作(设置运行任务的线程runner为null,然后返回true);AQS.releaseShared(int arg),然后唤醒线程等待队列中的第一个线程。
  4. 调用FutureTask.don()

当执行FutureTask.get()方法时,如果FutureTask不是处于执行完成状态RAN或已取消状态CANCELLED,当前执行线程将到AQS的线程等待队列中等待(如下图的线程A、B、C和D)。当某个线程执行FutureTask.run()方法或FutureTask.cancel(…)方法时,会唤醒线程等待队列的第一个线程(如下图线程E唤醒线程A)。

点击加载

假设开始时FutureTask处于未启动状态或者已启动状态,等待队列中已经有三个线程等待(A、B和C),此刻,后续的线程D执行get()方法将导致线程D也到等待队列中等待。
当线程E执行run()方法时,会唤醒队列中的第一个线程A。线程A被唤醒后,首先把自己从队列中删除,然后唤醒它的后继线程B,最后线程A从get()方法返回。线程B、C和D重复线程A的处理流程。最终,在队列中等待的所有线程都被级联唤醒并且从get()方法返回。

Java并发编程的艺术-学习笔记-9

发表于 2017-12-22 | 分类于 并发

Java中的线程池

线程池的实现原理

线程池的工作流程

  1. 线程池是否都在执行任务?如果不是,则创建一个新的工作线程来执行任务。否则,进入下个流程。
  2. 线程池判断工作队列是否已经满?没有满,则将新提交的任务储存在这个工作队列里。
  3. 再次判断线程池是否都处理工作状态,如果没有,创建一个新的工作线程来执行任务。否则,交给饱和策略来执行处理这个任务。

点击加载

ThreadPoolExecutor执行的execute()方法

  1. 如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(执行这个步骤需要获取全局锁)。
  2. 如果运行的线程等于或多余corePoolSize,则将任务加入BlockingQueue。
  3. 如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来完成任务(执行这个步骤需要获取全局锁)。
  4. 如果创建的线程会使得数量大于maximumPoolSize,任务将会被拒绝。

点击加载

ThreadPoolExecutor采取上述的步骤的总体设计思路,是为了在执行execute()方法,尽量避免获取全局锁(严重的可伸缩瓶颈)。在ThreadPoolExecutor完成预热之后(当前运行的线程数大于等于corePoolSize),几乎所有的execute()方法调用都是执行步骤2(加入阻塞队列),步骤2不需要获取全局锁。

部分源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void execute(Runnable command){
if(command == null){
throw new NullPointerException();
}
//如果addIfUnderCorePoolSize(command)成功,
//表示 poolSize < corePoolSize, 在corePool线程池未满有线程可供使用
if(poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)){
//如果线程大于等于基本线程数,或者线程创建失败, 则将当前任务刚到工作队列中.
if(runState == RUNNING && workQueue.offer(command)){
ensureQueueTaskHandled(command);
}
}
//如果线程池不处于运行状态或任务无法加入队列, 并且当前线程数量小于最大允许的线程数量
//则创建一个线程执行任务
else if(!addIfUnderMaximumPoolSize(command)){
reject(command);
}
}

工作线程:线程池创建线程时,会将线程封装成工作线程Worker,在其执行完任务后,还会循环获取工作队列里的任务来执行。
Work类中的run()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void run(){
try{
Runnable task = firstTask;
firstTask = null;
//task 如果不是是从线程池中取出的队列
// 那么就从工作队列(阻塞队列)中取出任务
while(task != null || (task = getTask() != null)){
runTask(task);
task = null;
}
}finally {
workerDone(this);
}
}

线程池的使用

线程池的创建

1
2
3
4
5
6
7
new ThreadPoolExecutor(corePoolSize,
runnableTaskQueue,
maximumPoolSize,
keepAliveTime,
TimeUnit,
ThreadFactory,
handler)

corePoolSize:线程池的基本大小。当提交一个任务到线程池,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小就不再创建。如果调用了prestartAllCoreThreads()方法,线程池会提前创建并启动所有基本线程。

runnableTaskQueuer:任务(工作)队列。用于保存等待执行的任务的阻塞队列。可以选择以下几个队列。

1.ArrayBlockingQueue
2.LinkedBlockingQueue,吞吐量高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列
3 SynchronousQueue,吞吐量一般高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列
4 PriorityBlockingQueue

maximumPoolSize:线程池最大数量。如果队列满了,并且已经创建的线程数小于最大线程数,则线程池再创建新的线程执行任务。
如果使用了无界阻塞队列,这个参数其实没啥意义了。

keepAliveTime:线程活动保持时间。线程池的工作线程空闲后,保存存活的时间。所以,如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率。

TimeUnit:线程活动保持时间的单位。天(DAYS)、小时(HOURS)、分钟(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS)和纳秒(NANOSECONDS)。

ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更具有意义的名字。

Java并发编程的艺术-学习笔记-8

发表于 2017-12-19 | 分类于 并发

Java中的并发工具类

等待多线程完成的CountDownLatch

CountDownLatch允许一个或多个线程等待其他线程完成操作。

假设有如下需要:在一个Excel表有多个sheet数据,每一个线程处理一个sheet的数据。待所有数据处理完毕,主线程发出结束消息。

使用join方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class EasyJoinTest {
static volatile int Count = 0;
static class WorkThread implements Runnable{
int i = ++Count;
@Override
public void run() {
try {
System.out.println("Thread: " + i + " start");
TimeUnit.SECONDS.sleep(1);
}catch (Exception e){
} finally {
System.out.println("Thread: " + i +" finished");
}
}
}
public static void main(String[] args) throws InterruptedException{
Thread t1 = new Thread(new WorkThread());
Thread t2 = new Thread(new WorkThread());
t1.start();
t2.start();
//主线程等待t1, t2线程退出才能退出
t1.join();
t2.join();
System.out.println("all done");
}
}

使用更加强大的CountDownLatch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class CountDownLatchTest {
static volatile int Count = 0;
static CountDownLatch countDownLatch = new CountDownLatch(2);
static class WorkThread implements Runnable{
int i = ++Count;
@Override
public void run() {
try {
System.out.println("Thread: " + i + " start");
TimeUnit.SECONDS.sleep(1);
}catch (Exception e){
} finally {
System.out.println("Thread: " + i +" finished");
countDownLatch.countDown();
}
}
}
public static void main(String[] args) throws InterruptedException{
Thread t1 = new Thread(new WorkThread());
Thread t2 = new Thread(new WorkThread());
Thread t3 = new Thread(new WorkThread());
t1.start();
t2.start();
t3.start();
//由于countDownLatch在初始化是等待2个countDown()方法的调用
//所以可能并不会等待线程3完成 而是先输出 all done, 再输出Thread: 3 finished
countDownLatch.await();
System.out.println("all done");
}

await可以设置超时等待,在主线程超时放弃等待countDown的计数到0

同步屏障CyclicBarrier

让一组线程到达一个屏障(也可以叫做同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被拦截的线程才能继续运行。

运行的线程会被阻塞在 barrier.await() 处。

构造函数:

1
2
3
4
5
CyclicBarrier(int parties); //等待parties个线程到达
//在有线程到达屏障的时候执行barrierAction线程
//barrier线程在其他线程到达屏障的时候执行
CyclicBarrier(int parties, Runnable barrierAction);

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class CyclicBarrierTest {
static CyclicBarrier barrier = new CyclicBarrier(2, new PriorityThread());
static class PriorityThread implements Runnable{
@Override
public void run() {
System.out.println("this is priority thread");
}
}
static class NormalThread implements Runnable{
static volatile int count = 0;
@Override
public void run(){
int c = ++count;
System.out.println("thread: " + c + " is started and sleep 1s");
try{
TimeUnit.SECONDS.sleep(1);
barrier.await(); //线程会被阻塞在这里
}catch (Exception e){
}finally {
System.out.println("thread " + c + " is over");
}
}
}
public static void main(String[] args){
Thread t1 = new Thread(new NormalThread());
Thread t2 = new Thread(new NormalThread());
t1.start();
t2.start();
}
}

CyclicBarrier和CountDownLatch的区别

CountDwonLatch的计数器只能使用一次,而CyclicBarrier的计数器可是使用reset()方法重置。所以CyclicBarrier能处理更为复杂的业务场景。例如,计算发生错误,可以重置计数器,并让线程重新执行一次。

CyclicBarrier还提供以下方法

方法 描述
void reset() 重置计数器,不是初始值,而是剩余的计数值
int getNumberWaiting() 返回CyclicBarrier阻塞的线程的线程数量
boolean isBroken() 了解阻塞的线程是否被中断

控制并发线程数的Semaphore

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。

应用场景
Semaphore可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接。加入有一个需求,需要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程并发地读取,但是如果读到内存后,还需要存储都数据库中,而数据库的连接数只有10个,这时我们必须控制只有10个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,就可以使用Semaphore来做流量控制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class SemaphoreTest {
static Semaphore semaphore = new Semaphore(2);
static ThreadPoolExecutor pool = new ThreadPoolExecutor(20, 20, 20, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(20));
static volatile int cnt = 0;
static class MyThread implements Runnable{
private int c = ++cnt;
@Override
public void run(){
try{
semaphore.acquire();
System.out.println("task " + c + " is running, the number of waiting queue: " + semaphore.getQueueLength());
TimeUnit.SECONDS.sleep(3);
semaphore.release();
}catch (Exception e){
}
}
}
public static void main(String[] args) throws InterruptedException{
for(int i = 0; i < 20; ++i){
Thread t = new Thread(new MyThread());
pool.execute(t);
}
pool.shutdown();
}
}
方法 描述
public Semaphore(int permits) 最大并发数为permits
void acquire() 获取一个许可证, 没有获取到就阻塞
void release() 使用完之后记得归回许可证
int availablePermits() 返回当前可以的许可证
int getQueueLength() 返回正在等待获取许可证的线程数
boolean hasQueuedThreads() 是否有线程正在等待获取许可证

线程间交换数据的Exchanger

Exchanger是一个用于线程间协作的工具类,可以用来进行线程间的数据交换。它提供一个同步点,在这个同步点,两个可以交换彼此的数据。如果第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产的数据传递给对方。

运用场景

Exchanger可以用于校对工作。

下面代码展示两个线程一直工作直至取到相同的随机数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class ExchangerTest {
static Exchanger exchanger = new Exchanger();
static Random random = new Random(47);
static class MyThread implements Runnable{
@Override
public void run(){
try {
for( ; ;) {
int exchangerNumber = random.nextInt(10);
int i = (Integer) exchanger.exchange(exchangerNumber);
System.out.println("my number: " + exchangerNumber + ", your number: " + i);
if( i == exchangerNumber){
break;
}
}
}catch (Exception e){
}
}
}
public static void main(String[] args){
int i = 0;
Thread t1 = new Thread(new MyThread());
Thread t2 = new Thread(new MyThread());
t1.start();
t2.start();
}
}

如果两个线程一直有一个没有执行exchange()方法,则会一直等待,如果担心有特殊情况发生,避免一直等待,可以使用exchange(Object o, long timeout, TimeUnit unit)设置最大等待时长。

Java并发编程的艺术-学习笔记-7

发表于 2017-12-19 | 分类于 并发

Java中的原子操作类

原子更新基本类型

包括以下3个类:
AtomicBoolean、AtomicInteger和AtomicLong

以上三个类提供的方法几乎一模一样,以AtomicInteger为例展示。

方法 描述
int addAndGet(int delta) 以原子的方式将输入的数值与AtomicInteger里的value相加,并返回结果
boolean compareAndSet(int expect, int update) 如果输入数值等于预期,则以原子方式更新数值,并返回true;否则不更新并返回false
int getAndIncrement() 以原子方式将当前值增加1,返回自增前的旧值
int getAndSet(int newValue) 以原子方式设置为newValue的值,并返回旧值
void LazySet(int newValue) 最终会被设置为newValue,但是其他线程可能在一段时间内还是访问到旧值

getAndIncrement()的实现源码其实就是CAS

1
2
3
4
5
6
7
8
9
10
11
12
13
public final int getAndIncrement(){
for( ; ; ){
int current = get();
int next = current + 1;
if(compareAndSet(current, next)){
return current;
}
}
}
public final boolean compareAndSet(int expect, int update){
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

Unsafe.java的CAS方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public final native boolean compareAndSwapObject(Object o,
long offset,
Object expected,
Object update);
public final native boolean compareAndSwapInt(Object o,
long offset,
int expected,
int update);
public final native boolean compareAndSwapLong(Object o,
long offset,
long expected,
long update);

在Unsafe中只提供了三种CAS方法,对于char、float和double变量我们可以先将其转换成整形或者长整形,再进行CAS。
其实AtomicBoolean也是先将Boolean转成整形,再使用compareAndSwapInt进行CAS。

原子更新数组

包括以下3个类:
AtomicIntegerArray:原子更新整形数组里的元素
AtomicLongArray:原子更新长整形数组里的元素
AtomicReferencey:原子更新引用类型数组里的元素

方法 描述
int addAndGet(int i, int delta) 以原子方式将输入值与数组索引i的元素相加,并返回旧值
boolean compareAndSet(int i, int expect, int update) CAS设置数组索引i的元素

使用示例:

1
2
3
4
5
6
7
8
9
10
static int []value = new int[]{1, 2,};
//AtomicIntegerArray内部的数组是value数组的拷贝
static AtomicIntegerArray array = new AtomicIntegerArray(value);
public static void main(String[] args){
System.out.println(array.getAndSet(0, 11)); //输出1
System.out.println(array.get(0)); //输入11
System.out.println(value[0]); //输出1
}

原子更新引用类型

包括以下3个类:
AtomicReference:原子更新引用类型
AtomicReferenceFieldUpdater:原子更新引用类型的字段
AtomicMarkableReference:原子更新带有标记位的引用类型。构造方法是AtomicMrkableReference(V initialRef, booleanMark)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static AtomicReference<User> reference1 = new AtomicReference<>();
static class User{
boolean flag;
int i;
String s;
public User(boolean flag, int i, String s){
this.flag = flag;
this.i = i;
this.s = s;
}
public boolean getFlag(){ return flag; }
public int getInt(){ return i; }
public String getSring(){ return s; }
public String toString(){return "flag: " + flag + ", i: " + i + ", s: " + s;}
}
public static void main(String[] args){
User user = new User(true, 11, "Irving");
User newUser = new User(false, 2, "Kawhi");
reference1.set(user);
reference1.compareAndSet(user, newUser);
System.out.println(reference1.toString()); //输出: flag: false, i: 2, s: Kawhi

原子更新字段

包括以下3个类:
AtomicIntegerFieldUpdater:原子更新整型的字段的更新器
AtomicLongFieldUpdater:原子更新长整型字段的更新器
AtomicStampedReference:原子更新带有版本好的引用类型,该类型将整数值与引用关联起来,可用于原子更新数据及其版本号,可以解决CAS进行原子更新出现的ABA问题。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static class Player{
public volatile int number; //变量number必须是 public属性 和 volatile
public Player(int number){ this.number = number; }
public int getNumber() { return number; }
}
static AtomicIntegerFieldUpdater<Player> updater = AtomicIntegerFieldUpdater.newUpdater(Player.class, "number");
public static void main(String[] args){
Player kawhi = new Player(1);
System.out.println(updater.getAndIncrement(kawhi)); //返回旧值1
System.out.println(updater.get(kawhi)); //返回新值2
}

Java并发编程的艺术-学习笔记-6

发表于 2017-12-18 | 分类于 并发

Java并发容器和框架

CurrentHashMap的实现原理和使用

ConcurrentHashMap是线程安全又高效的HashMap

线程不安全的HashMap:
在多线程的环境下,使用HashMap进行put操作可能导致死循环。
由于哈希桶的存在,在多线程竞争的条件下,可能导致Entry链表形成环形的数据结构,一旦形成环形数据结构,Entry的next节点永远不为空,就会产生死循环获取Entry。

效率低下的HashTable:
HashTable容器使用synchronized来保证线程安全,当一个线程访问HashTable的同步方法,其他线程也要访问同步方法的时候,会进入阻塞或轮询状态。

锁分段技术提高并发访问效率:
HashTabe在高并发访问时的效率低下的原因是,所有访问HashTable的线程都必须竞争同一把锁。
锁分段技术:首先将数据分成一段段的存储,然后把每一段数据分配一把锁,当一个线程占用锁访问其中它管理的数据段,其他段的数据可以被其他线程访问。

ConcurrentHashMap的结构

一个Segment其实就是一个类Hash Table的数据结构,Segment内部维护了一个链表数组,也就是Segment里面有哈希桶,桶里的元素使用链表排序。

点击加载

ConcurrentHashMap定位一个元素的过程需要进行两次Hash操作,第一次Hash定位到Segment,第二次Hash定位到元素所在链表的头部。因此这种结构带来的副作用是Hash过程比普通的HashMap要长,但是带来的好处是写操作的时候只对元素所在的Segment进行加锁即可,不会影响到其他的Segment。这样在最理想的情况下,ConcurrentHashMap可以最高同时支持Segment数量大小的写操作(这些写操作刚好被平均分布在所有Segment上)。

ConcurrentHashMap的初始化

初始化segments数组

1
2
3
4
5
6
7
8
9
10
11
12
13
if(concurrencyLevel > MAX_SEGMENTS){
concurrencyLevel = MAX_SEGMENTS; //最大值为65535
}
int sshift = 0;
int ssize = 1;
while(ssize < concurrencyLevel){
++sshift;
ssize <<= 1; //最大值为65536
}
segmentShift = 32 - sshift; //段偏移量
segmentMask = ssize -1; //段掩码
this.segments = Segment.newArray(ssize);

初始化segmentShift和segmentMask

segmentShift和segmentMask在定位Segment的散列算法中用到。
shift等于ssize从1往左移位的次数,在默认情况下concurrencyLevel等于16,1往左移4次,所以sshift等于4,故segmentShift = 32 -4 = 28。

定位Segment

ConcurrentHashMap使用对元素的hashCode进行散列操作,定位到Segment,接着再散列,目的是减少散列冲突,使元素能够均匀地分布在不同的Segment上,提高并发效率。

初始化每个segment

输入参数initialCapacity是ConcurrentHashMap的初始化容量,loadFoctor是每个segment的负载因子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
if(initialCapacity > MAXIMUM_CAPACITY){
initialCapacity = MAXIMUM_CAPACITY;
}
int c = initialCapacity / ssize;
if(c * ssize < initialCapacity){
++c;
}
int cap = 1;
//cap就是segment里HashEntry数组的长度
//默认情况下 initialCapacity == 16, loadFactor == 0.75, cap == 1
while(cap < c){
cap <<= 1;
}
//初始化segments[] 数组
for(int i = 0; i < this.segments.length; ++i){
this.segments[i] = new Segment<K, V>(cap, loadFactor);
}

ConcurrentHashMap的操作

get操作

get操作不用加锁(除非读到的值是空才会加锁重读)是高效的原因之一,实现原理如下:
在ConcurrentHashMap的get()方法中,它将要使用到的共享变量都定义为volatile变量。它能保证变量在线程之间的可见性,能被多个线程同时读,但只能被单线程写(有一种情况可以被多线程写, 写入的值不依赖原值)。在get()方法中,只需要读而不需要写共享变量count和value,所以可以不用加锁。

1
2
transient volatile int count;
volatile V value;

put操作

由于put操作里需要对共享变量进行写操作,为了线程安全必须加锁。
put()方法首先定位到Segment,然后在Segment里进行插入操作。插入操作需要经历两个步骤,第一步判断是否需要对Segment里的HashEntry数组扩容,第二步定位添加元素的位置,然后将其放在HashEntry数组的链表中。

扩容:
插入元素之前会先判断Segment里的HashEntry数组是否超过容量,再进行扩容;
而HashTable是在插入元素之后,再判断是否达到扩容标准,如果之后再也没有元素插入到该哈希桶,造成HashTable进行一次无效的扩容。

ConcurrentHashMap不会对整个容器进行扩容,而只对某个segment进行扩容。

ConcurrentLinkedQueue

线程安全的队列
阻塞算法:用一个锁(入队和出队共一把锁)或者两个锁(出入队锁不同)
非阻塞算法:循环CAS方式

ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,采用FIFO的规则对节点进行排序。

ConcurreentLinkedQueue的结构

队列有一个头节点和一个尾节点,头节点不存线程相关信息,只有下一个节点next的指针。默认情况下,head节点存储的元素为空,tail节点等于head节点。在队列非空的情况下,队列的尾节点就是tail。

1
private transient volatile Node<E> tail = head;

入队

入队的过程:

点击加载

入队主要做两件事情,第一是将入队节点设置为当前队列尾节点的一个节点;第二是更新tail节点,如果tail节点的next不为空,则将入队节点设置为tail节点,如果tail节点的next节点为空,则将入队节点设置为tail的next节点,所以tail节点不总是尾节点。

入队方法永远返回true,所以不要通过返回值判断入队是否成功。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private static final int HOPS = 1;
public boolean offer(E e){
if(e == null){
throw new NullPointerException();
}
//创建准备入队的节点
Node<E> node = new Node<E>(e);
retry:
//死循环, 入队不成功反复入队
for( ; ; ){
Node<E> t = tail; //创建一个指向tail节点的引用
Node<E> p = t; //p用来表示队列的尾节点, 默认情况下等于tail
for(int hops = 0; ; hops++){
Node<E> next = succ(p);
// next != null说明p不是尾节点, 需要更新p后, 在将它指向next节点
if(next != null){
//默认情况下 HOPS == 1
//循环了两次及其以上, 并且当前节点还是不等于尾节点
if(hops > HOPS && t != null){
continue retry;
}
p = next;
}
//如果p是尾节点, 则设置p的next节点为入队节点
//
else if(p.casNext(null, n)){
if(hops >= HOPS){
casTail(t, n);
}
return true;
}
else{
//p有next节点, 表示p的next节点有可能是尾节点
p = succ(p);
}
}
}
}

定位尾节点:

1
2
3
4
5
6
//找p节点下一节点
//当 p == next 成立, 说明队列为空, 返回头节点即可
final Node<E> succ(Node<E> p){
Node<E> next = p.getNext();
return (p == next) ? head : next;
}

由于tail节点并不总是队列尾节点,所以每次入队都必须先通过tail节点来找到尾节点。

设置入队节点为尾节点:

1
p.casNext(null, n)

用于将入队节点设置为当前队列尾节点的next节点,如果p是null,表示p是当前队列的尾节点,如果不是null,表示有其他线程更新了尾节点,则需要重新获取当前队列的尾节点。

HOPS的设计意图

下面是入队操作的简单理解版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public boolean easyUnderstandOffer(E e){
if(e == null){
throw new NullPointerException();
}
Node<E> node = new Node<E>(e);
for( ; ; ){
Node<E> t = tail;
//每次都假设tail作为队列尾节点
//判断失败就是有其他线程竞争, 该线程重新入队即可
if(t.casNext(null, node) && casTail(t, node)){
return true;
}
}
}

这种做法代码量少,而且逻辑非常清晰和易懂。但是它的缺点是每次都要使用循环更新CAS更新tail节点。如果能够减少CAS更新tail节点的次数,就能提高入队效率。
使用hops变量来控制并减少tail节点的更新频率,并不是每次节点入队后都将tail节点更新为尾节点,而是当tail和尾节点的距离大于等于HOPS(默认等于1)时才更新tail节点,tail和尾节点距离越长,使用CAS更新tail节点的次数就会越少,但是距离越长带来的负面效果就是每次入队时定位尾节点的时间就越长,因为循环体需要多循环一次来定位出尾节点。
但是这样依然可以提高入队效率,本质上来看它通过增加对volatile的读操作来减少对volatile变量的写操作,而写操作的开销要远大于读操作。

出队

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public E poll(){
Node<E> h = head;
// p表示头节点, 需要出队的节点
Node<E> p = h;
for(int hops = 0; ; ++ hops){
E item = p.getItem();
//如果p节点的元素不为空, 使用CAS设置p节点引用的元素为null
//如果成功返回p节点的元素
if(item != null && p.casItem(item, null)){
if(hops >= HOPS){
//将p节点一个节点设置为head节点
Node<E> q = p.getNext();
updateHead(h, (q != null) ? q : p);
}
return item;
}
//如果头节点元素为空或者头节点发生了变化, 说明头节点已经被另外一个线程修改了
//那么获取p节点的一个节点
Node<E> next = succ(p);
//如果p的下一个节点也为空, 说为这个队列已经空了
if(next == null){
//更新头节点
updateHead(h, p);
break;
}
//如果下一个元素不为空, 则将头节点的一个节点设置为头节点
p = next;
}
return null;
}

Java中的阻塞队列

概念介绍

阻塞队列是一个支持两个附加操作的队列

  1. 支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
  2. 支持阻塞的移除方法:意思是当队列空时,获取元素的线程会等待队列变为非空。
方法/处理方式 抛出异常 返回特殊值 一直阻塞 超时等待
插入方法 add(e) offer(e) put(e) offer(e, time, unit)
移除方法 remove(e) poll(e) take() poll(time. unit)
检查方法 element() peek() 不可用 不可用

抛出异常:IllegalStateException(“Queue Full”)/NoSuchElementException
返回特殊值:插入成功返回true/取出失败返回null
一直阻塞:可响应中断退出

在无界阻塞队列中,队列永远不会满,所以所有使用put和offer方法永远不会被阻塞,而且使用offer方法永远返回true。

7种阻塞队列

1. ArrayBlockingQueue

一个用数组实现的有界阻塞队列。按照FIFO的原则对元素进行排序。
默认情况下不保证线程公平的访问队列,所谓公平访问队列是指阻塞的线程,可以按照阻塞的先后顺序访问队列,即先阻塞线程先访问队列。非公平性是对先等待的线程是非公平的,当队列可用时,阻塞的线程都可以争夺访问队列的资格。
为了保证公平性,通常会降低吞吐量。

当传递fair参数为true创建一个公平的阻塞队列

1
2
3
4
5
6
7
8
9
10
//访问的公平性是使用可重入锁实现的
public ArrayBlockingQueue(int capacity, boolean fair){
if(capacity <= 0){
throw new IllegalArgumentException();
}
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newConditon();
}

2. LinkedBlockingQueue

链表实现的有界阻塞队列。默认和最大的长度为Integer.MAX_VALUE。队列元素使用FIFO进行排序。

3. PriorityBlockingQueue

支持优先级别的无界阻塞队列。默认情况元素采用自然顺序生序。也可以自定类实现compareTo()方法指定排序规则,或者初始化PriorityBlockQueue时,指定构造参数Comparator来对元素进行排序。不能保证同优先级别元素的顺序。

4. DelayQueue

支持延时获取元素的无界阻塞队列。队列使用PriorityQueue实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。

运用场景

1.缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DealyQueue,一旦能从DealyQueue中获取元素,表示缓存有效期到了。

2.定时任务调度:使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就可以执行。比如TimerQueue就是使用DelayQueue实现的。

DelayQueue的运用实例

以下内容参考博客孙振超
设计一个具有过期时间的缓存

要求:
1.当向缓存中添加key-value对时,如果这个key在缓存中,还没有过期,需要用这key对应的更新时间。
2.为了能够让DelayQueue将保存的过期键删除,需要重写实现Delayed接口添加到DelayQueue的实例类的hashCode函数和equals函数
3.当缓存关闭,监控时间线程也应该关闭,因而监控线程应当作为守护线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
class DelayedItem<T> implements Delayed {
private T t;
private long liveTime; //存活的时间
private long removeTime; //设置过期时间
public DelayedItem(T t, long liveTime){
this.t = t;
this.liveTime = liveTime;
this.removeTime = TimeUnit.NANOSECONDS.convert(liveTime, TimeUnit.NANOSECONDS) + System.nanoTime();
}
//返回值大于0表示还在时间以内
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(removeTime - System.nanoTime(), unit);
}
@Override
@SuppressWarnings("uncheck")
public int compareTo(Delayed o) {
if(o == null){
return 1;
}
if(o == this){
return 0;
}
if(o instanceof DelayedItem){
DelayedItem<T> delayedItem = (DelayedItem<T>)o;
if(liveTime > delayedItem.liveTime){
return 1;
}
else if (liveTime < delayedItem.liveTime){
return -1;
}
else{
return 0;
}
}
long diff = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
return diff > 0 ? 1 : ((diff < 0) ? -1 : 0);
}
@Override
public int hashCode(){
return t.hashCode();
}
@Override
public boolean equals(Object o){
if(o instanceof DelayedItem){
return o.hashCode() == this.hashCode();
}
return false;
}
public T getT(){
return this.t;
}
}
class Cache<K, V>{
public ConcurrentHashMap<K, V> map = new ConcurrentHashMap<>();
public DelayQueue<DelayedItem<K>> queue = new DelayQueue<>();
public Cache(K k, V v, long liveTime){
Thread t = new Thread(){
@Override
public void run(){
}
};
t.setDaemon(true);
t.start();
}
public void dameonCheckOverdueKey(){
for( ; ; ){
DelayedItem<K> delayedItem = queue.poll();
if(delayedItem != null){
map.remove(delayedItem.getT());
System.out.println(System.nanoTime() + " remove " + delayedItem.getT() + " from cache");
}
try{
TimeUnit.SECONDS.sleep(1);
}catch (Exception e){
e.getStackTrace();
}
}
}
public void put(K k, V v, long liveTime){
//put()返回旧值
V v2 = map.put(k, v);
DelayedItem<K> tmpItem = new DelayedItem<>(k, liveTime);
if(v2 != null){
queue.remove(v2);
}
queue.put(tmpItem);
}
}

5. SynchronousQueue

一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。
SynchronousQueue可以看成是一个传球手,负责把生产者线程处理数据直接传递给消费者线程。队列本身不存储任何元素,非常适合传递性场景。

6. LinkedTransferQueue

一个链表结构组成的无界阻塞队列。
transfer()方法
如果当前有消费者正在等待接受元素(消费者使用take()方法或者带时间限制的poll()方法时),transfer方法可以把生产者传入的元素立刻传输给消费者。如果没有消费者等待接受元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回。

tryTransfer()方法

试探生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,立刻返回false。
tryTransfer还有接受时间限制的参数。

7. LinkedBlockingDeque

链表构成的双向阻塞队列。
在初始化队列的同时可以设置容量防止其过度膨胀。双向阻塞队列可以运用在”工作窃取”模式中。

阻塞队列的实现原理

使用通知模式实现。所谓通知模式,就是当生产者往满的队列里添加元素时会阻塞住生产者,当消费者消费了一个队列中的元素后,会通知生产者可以添加元素。

下面是ArrayBlockingQeueu的部分源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private final Condition notFull;
private final Condition notEmpty;
public ArrayBlockingQueue(int capacity, boolean fair){
//省略其他代码
notFull = lock.newCondition();
notEmpty = lock.newCondition();
}
public void put(E e) throws InterruptedException{
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try{
while(count == items.length){
notFull.await();
}
insert(e); //插入值, 并且通知notEmpty
}finally {
lock.unlock();
}
}
public E take() throws InterruptedException{
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try{
while(count == 0){
notEmpty.await();
}
return dequeue(); //返回值, 并且通知notFull
}finally {
lock.unlock();
}
}

Fork/Join框架

Fork/Join的运行流程图

点击加载

工作窃取算法

工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。
那么为什么需要使用工作窃取算法呢?假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。
而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

优点:充分利用线程进行并行计算,减少了线程之间的竞争
缺点:在某些情况下还是存在竞争,比如双端队列中只有一个任务时,并且该算法会消耗了更多的系统资源,比如创建多个线程和多个双端队列。

Fork/Join框架的设计

步骤1. 分割任务。首先我们需要有一个fork类来把大任务分割成子任务,直到足够小。
步骤2. 执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列中获取任务执行。子任务执行完成的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。

Fork/Join使用两个类来完成以上两个事情。
1.ForkJoinTask:首先要创建一个ForkJoin任务,它提供在任务中执行fork()和join()操作的机制。通常情况下继承ForkJoinTask的子类。
RecursiveAction:用于没有返回结果的任务
RecursiveTask:用于有返回结果的任务

2.ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行。

任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时候,它会随机从其他工作线程的队列尾部获取一个任务。

使用Fork/Join框架的一个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package com.myConcurrent.Chapter6;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
public class CountTask extends RecursiveTask<Integer>{
private static final int THRESHOLD = 2; //阈值
private int start;
private int end;
public CountTask(int start, int end){
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
//如果任务足够小就计算任务
boolean canCompute = (end - start) <= THRESHOLD;
if(canCompute){
for(int i = start; i <= end; ++i){
sum += i;
}
}
else{
//任务大于设定的阈值, 分割任务
int middle = (start + end) / 2;
CountTask leftTask = new CountTask(start, middle);
CountTask rightTask = new CountTask(middle + 1, end);
//执行子任务
leftTask.fork();
rightTask.fork();
//等待子任务执行完毕, 得到其结果
int leftResult = leftTask.join();
int rightResult = rightTask.join();
sum = leftResult + rightResult;
}
return sum;
}
public static void main(String[] args){
ForkJoinPool forkJoinPool = new ForkJoinPool();
CountTask countTask = new CountTask(1, 4);
Future<Integer> result = forkJoinPool.submit(countTask);
try{
System.out.println(result.get());
}catch (InterruptedException ie){
}catch (ExecutionException ee){
}
}
}

设计模式-模板模式

发表于 2017-12-14 | 分类于 设计模式

定义

模板方法模式是类的行为模式。准备一个抽象类,将部分逻辑以具体方法以及具体构造函数的形式实现,然后声明一些抽象方法来迫使子类实现剩余的逻辑。不同的子类可以以不同的方式实现这些抽象方法,从而剩余的逻辑有不同的实现。

模式中的角色

抽象类(AbstractClass):实现模板方法,定义了算法的骨架
具体类(ConcreteClass):实现抽象类中的抽象方法,已完成完整的算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
abstract class CarFactory{
public void showCar(){
this.name();
this.location();
}
protected abstract void name();
protected abstract void location();
}
class Benzi extends CarFactory{
@Override
protected void name(){
System.out.println("benzi");
}
@Override
protected void location(){
System.out.println("tatou");
}
}
class Bmw extends CarFactory{
@Override
protected void name(){
System.out.println("BMW");
}
@Override
protected void location(){
System.out.println("touta");
}
}
public class Demo {
public static void main(String[] args){
CarFactory carFactory = new Benzi();
carFactory.showCar();
Bmw bmw = new Bmw();
bmw.showCar();
}
}

输出如下:

1
2
3
4
benzi
tatou
BMW
touta

评价

优点:模板方法模式通过把不变的行为搬移到超类,去除了子类中重复的代码。子类实现算法的某些细节,有助于算法的扩展。通过一个父类调用子类实现的操作,通过子类扩展增加新的行为,符合“开放-封闭原则”。
缺点:每个不同的实现都需要定义一个子类,这会导致类的个数增加,设计更加抽象。

Java并发编程的艺术-学习笔记-5

发表于 2017-12-14 | 分类于 并发

Java中的锁

Lock接口

在Java SE 5 之后,并发包新增了Lock接口(以及相关实现类)用来实现锁功能,它提供了与synchronized关键字类似的同步功能,只是在使用时需要显示地获取和释放锁。虽然它缺少了(通过synchronized快或方法所提供的)隐式获取和释放锁的便捷性,但是却拥有了锁获取和释放的可操作性、可中断的获取锁以及超时获取锁等多种synchronized关键字不具备的同步特性。

Lock使用的基本方式

1
2
3
4
5
6
7
8
9
Lock lock = new ReentrantLock();
lock.lock();
try{
}catch(Exception e){
}finally{
lock.unlock();
}

在finally块中释放锁,目的是保证在获取到锁之后,最终能够被释放。

不要将获取锁的过程中写在try块中,因为如果获取锁(自定义锁的实现)发生了异常,异常抛出的时候,也会导致锁无故释放。

Lock是一个接口,它定义了锁获取和释放的基本操作

Lock接口提供的synchronized关键字不具备的主要特性

特性 描述
尝试非阻塞获取锁 当前线程尝试获取锁
能够中断地获取锁 与synchronized不同,获取锁的线程能够响应中断,当获取到锁的线程被中断,中断异常会被抛出,同时锁会被释放
超时获取锁 在指定的截止时间之前获取锁,如果超时了还未获取锁,返回

Lock接口的API

方法名称 描述
void lock() 获取锁,调用该方法当前线程会获取锁,一旦获取,立刻返回
void lockInterruptibly() throws IE 可中断地获取锁,在锁的获取中可以中断当前线程
boolean tryLock() 尝试非阻塞获取锁,调用该方法后立刻返回
boolean tryLock(time, unit) throws IE 超时的获取锁,可能情况:1. 获取锁 2. 超时返回false 3. 时间内被中断
void unlock() 释放锁

注:throws IE == throws InterruptedExcption

队列同步器

队列同步器AbstractQueuedSynchronized(AQS),是用来构建锁或者其他同步组件的基础框架,它使用了一个int变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。

AQS的主要使用方法是继承,子类通过继承AQS并实现它的抽象方法来管理同步状态,在抽象方法的实现过程中免不了要对同步状态进行更改,需要AQS提供的三个方法 getState()、setState(int state)和compareAndSetState(int expect, int update),它们能够保证状态的改变是安全的。子类被推荐定义为自定义同步组件的静态内部类。

同步器是是实现锁的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义。可以这样理解二者的关系:锁是面向使用者的,它定义了使用者与锁交互的接口,隐藏了实现细节;同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。

队列同步器的接口和示例

重写同步器的方法,需要使用AQS提供的三个方法来访问或修改同步状态。

方法名称 描述
getState() 获取当前同步状态
setState(int newState) 设置当前同步状态
comparAndSetState(int expect, int update) 使用原子性的CAS保证安全地设置状态

可重写的五个同步器方法(在AQS是protected属性),所以锁的调用者不能直接调用AQS的以下方法

这个五个方法在AQS中的样子,不重写无法正常使用

1
2
3
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
//独占式获取同步状态, 实现该方法使用CAS方法
protected boolean tryAcquire(int arg)
//独占式释放同步状态
protected boolean tryRelease(int arg)
//共享式获取同步状态, 返回大于等于0的值, 表示获取成功, 反之失败
protected int tryAcquireShared(int arg)
//共享式释放同步状态
protected boolean tryReleaseShared(int arg)
//表示是否被当前线程所独占
protected boolean isHeldExclusively()

同步器提供的可用模板方法(在AQS中是public属性),可以供锁的使用者调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//独占式获取同步状态,
//若当前线程获取成功,返回;
//否则线程进入同步等待队列, 调用可重写的tryAcquire(int)
void acquire(int arg)
//与acquire(int arg)一样, 但是该方法响应中断
//线程未获取锁, 进入同步等待队列, 若被中断, 方法抛出InterruptedException并返回
void acquireInterruptibly(int arg)
//在acquireInterruptibly(int arg)基础上加上超时限制
//线程在时间内获取到同步状态, 返回true, 否则返回false
boolean tryAcquireNanos(int arg, long nanos)
//共享式的获取同步状态
//与独占式的获取主要区别就是同一个时刻可以有多个线程获取到同步状态
void acquireShared(int arg)
//与acquireShared(int)一样, 并且响应中断
void acquireSharedInterruptibly(int arg)
//独占式释放同步状态
//该方法在释放同步状态之后, 还会将同步队列中的第一个节点包含的线程唤醒
boolean release(int arg)
//共享式的释放同步状态
boolean releaseShared(int arg)
//获取等待在同步队列上的线程集合
Collection<Thread> getQueueThreads()

同步器提供的模板方法基本上分为3类,独占式获取和释放同步状态、共享式获取和释放同步状态和查询同步队列中的等待线程。

独占锁就是在同一个时刻只能有一个线程可以获取到锁,其他获取不到的线程只能处于同步等待队列之中,只有获取了锁的线程释放了锁,后继的线程才能有机会获取到锁。

自定义实现的锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class MyMutex implements Lock {
//静态内部类继承AQS
private static class Sync extends AbstractQueuedSynchronizer{
@Override
protected boolean isHeldExclusively(){
return getState() == 1;
}
@Override
public boolean tryAcquire(int acquire){
if(compareAndSetState(0, 1)){
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int release){
if(getState() == 0){
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
Condition newCondition(){
return new ConditionObject();
}
}//结束静态内部类
//仅需将操作代理到Sync上既可以
private final Sync sync = new Sync();
public void lock() { sync.acquire(1); }
public boolean tryLock() { return sync.tryAcquire(1); }
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public void unlock() { sync.release(1); }
public Condition newCondition() { return sync.newCondition(); }
public boolean isLocked() { return sync.isHeldExclusively(); }
public boolean hasQueuedThreads(){ return sync.hasQueuedThreads(); }
}

如上所式,独占锁MyMutex是自定义的一个同步组件,它在同一个时刻只允许一个线程占有锁。Mutex中定义了一个静态内部类,该内部类继承了同步器并且实现了独占式获取和释放同步状态。在tryAcquire(int acquires)方法,如果经过CAS设置成功(同步状态为1),则表示获取了同步状态,而在tryRelease(int release)方法中只是将同步状态重置为0。
用户使用MyMutex并不会直接和内部同步器实现打交道,而是调用MyMutex提供的方法。这样便可以大大降低一个可靠自定实现的同步组件的门槛。

队列同步器的实现分析

包括4个部分

  1. 同步队列
  2. 独占式同步状态的获取与释放
  3. 共享式同步状态获取与释放
  4. 超时获取同步状态

同步队列

同步器依赖内部实现的同步队列(一个FIFO双向队列)来完成同步状态管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待信息构造成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。

同步队列中的节点(Node)用来保存获取同步状态失败的线程引用、等待状态以及前驱和后继节点

int WaitStatus等待状态 描述
CANCELEED 值为1,由于在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待,节点进入该状态将不会变化
SIGNAL 值为-1,后继节点的线程等于等待状态,而当前节点的线程如果释放了同步状态或者取消,将会通知后继节点运行
CONDITION 值为-2,节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()方法后,该节点将会从等待队列中转移到同步队列
PROPAGATE 值为-3,表示下一次共享式同步状态获取将会无条件传播下去???
INITIAL 值为0,初始状态

Node nextWaiter: 等待队列中的后继节点。如果当前节点是共享的,那么这个字段将是一个SHARRED常量,即节点类型(独占和共享)和等待队列中的后继节点共用同一个字段

同步队列的CAS:

当一个线程成功地获取了同步状态(或者锁),其他线程将无法获取到同步状态,转而被构造成节点并加入同步队列中,而这个加入队列的过程必须要保证是线程安全的。
同步器提供一个基于CAS的设置尾节点的方法:compareAndSetTail(Node expect, Node update),它需要传递当前线程“认为”的尾节点和当前节点。
设置首节点并不需要CAS来保证,由于只有一个线程能够成功获取到同步状态。

Node的定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
//head和tail是不存线程信息的节点(只用于标识)
private transient volatile Node head;
private transient volatile Node tail;
static final class Node {
static final Node SHARED = new Node(); //共享式同步状态, 使用同一个节点
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
//获取当前节点的前驱节点
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

独占式同步状态获取和释放

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
public final void acquire(int arg){
if(!tryAcquire(arg) && acquireQueued(Node.EXCLUSIVE, arg)){
selfInterrupt();
}
}
private Node addWaiter(Node mode){
Node node = new Node(Thread.currentThread(), mode);
//快速尝试在尾部添加节点
//满足两个条件: 1.尾节点已经存在 2.没有线程竞争出现导致的问题
Node pred = tail;
if(pred != null){
node.prev = pred;
if(compareAndSetTail(pred, tail)){
pred.next = node;
return node;
}
}
enq(node);
return node;
}
//返回的并非新建尾节点, 而是旧的尾节点
//通过"死循环", 完成线程安全的正确设置尾节点
private Node enq(final Node node){
for( ; ;){
Node t = tail;
if(t == null){
if(compareAndSetTail(new Node())){
tail = node;
}
}
else{
node.prev = t;
if(compareAndSetTail(t, node)){
t.next = node;
return t;
}
}
}
}
//每个节点(线程)在同步队列中不断地自省观察
//条件满足, 获取同步状态; 否则, 自旋并阻塞节点(线程)
final boolean acquireQueued(final Node node, int arg){
boolean failed = true;
try{
boolean interrupted = false;
for( ; ; ){
//获取当前node的前一节点p
//如果前驱节点是头节点, 并且满足自定义的tryAcquire条件
//那么获取同步状态成功, 否则继续自旋, 自旋的同时判断是否有阻塞和中断信
final Node p = node.predecessor();
if(p == head && tryAcquire(arg)){
setHead(node);
p.next = null;
failed = false;
return interrupted;
}
//shouldParkAfterFailedAcquire()方法
//Checks and updates status for a node that failed to acquire.
//Returns true if thread should block. This is the main signal
//control in all acquire loops.
if(shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()){
interrupted = true;
}
}
}finally {
//不能从自旋中获取同步状态
if(failed){
//Cancels an ongoing attempt to acquire.
//取消尝试获取同步锁
cancelAcquire(node);
}
}
}
public final boolean release(int arg){
//从自定义的释放锁中返回
if(tryRealease(arg)){
Node h = head;
//waitStatus == 0 标识初始状态.
if(h != null && h.waitStatus != 0){
unparkSuccessor(h); //Wakes up node's successor, if one exists.
}
return true;
}
return false;
}

共享式同步状态获取和释放

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
//共享式同步状态的获取和释放
public final void acqurireShared(int arg){
//要求自定义的tryAcquireShared
//第一次获取不到同步状态, 进入死循环获取同步状态
if(tryAcqureShared(arg) < 0){
doAcquireShared(arg);
}
}
/*
* tryAcquireShared在返回 >= 0时, 表示能够获取到同步状态
* 自旋过程中, 如果当前节点的前驱节点为头节点, 尝试获取同步状态, 返回值 >= 0, 成功
*/
private void doAcquireShared(int arg){
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try{
boolean interrupted = false;
for( ; ; ){
final Node p = node.predecessor();
if(p == head){
int r = tryAcquireShared(arg);
if(r >= 0){
setHeadAndPropagate(node, r);
p.next = null;
if(interrupted){
selfInterrupt();
}
failed = false;
return;
}
}
if(shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()){
interrupted = true;
}
}
}finally {
if(failed){
cancelAcquire(node);
}
}
}
/*
* 该方法在释放同步状态之后, 将会唤醒后续处于等待状态的节点.
* tryReleaseShared方法必须保证同步状态线程安全地释放, 一般通过循环和CAS保证
* 因为释放共享同步状态的操作会来自多个线程
*/
public final boolean releaseShared(int arg){
if(tryReleaseShared(arg)){
doReleaseShared();
return true;
}
return false;
}

独占式超时获取同步状态

响应中断的同步状态获取过程:

在Java5之前,当一个线程获取不到锁被阻塞在synchronized之外时,对该线程进行中断操作,此时该线程的中断标志位会被修改,但线程依旧会阻塞在synchronized,等待着获取锁。在Java5中,同步器提供了acquireInterruptibly(int arg)方法,这个方法在等待获取同步状态时,如果当前线程被中断,会立刻返回,并抛出InterruptedException。
超时获取同步状态过程可以被视为响应中断获取同步状态过程的”增强版”,doAcquireNanos(int arg, long nanosTimeout)方法在支持响应中断基础上,增加了超时获取的特性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException{
long lastTime = System.nanoTime();
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try{
for( ; ; ){
final Node p = node.predecessor();
if(p == head && tryAcquire(arg)){
setHead(node);
p.next = null;
failed = false;
return true;
}
//超时返回false
if(nanosTimeout <= 0){
return false;
}
//超时等待时间 > threshold(1000ns), 进入超时等待状态
//否则, 按照进入快速自旋过程
if(shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold){
LockSupport.parkNanos(this, nanosTimeout);
}
//计算nanosTimeout剩余的睡眠时间
long now = System.nanoTime();
nanosTimeout -= now - lastTime;
lastTime = now;
if(Thread.interrupted()){
throw new InterruptedException();
}
}
}finally {
if(failed){
cancelAcquire(node);
}
}
}

可重入锁

ReentrantLock,支持重进入的锁,它表示该锁能够支持一个线程对资源的重复加锁。除此之外,该锁还支持获取锁时的公平和非公平选择。
Synchronized关键字隐式的支持重进入,比如一个synchronized修饰的递归方法,在执行时,执行线程在获取了锁之后仍然连续多次地获取该锁。
ReentrantLock在调用lock()方法时,已经获取到锁的线程,能够再次调用lock()方法获取锁而不被阻塞。

锁的公平性问题:公平的获取锁,也就是等待时间最长的线程最优先获取锁,也可以说是锁获取是顺序的。ReentrantLock提供一个构造函数,能够控制锁是否公平。

实现重进入

重进入是指任意线程在获取到锁之后,该线程能够再次获取该锁而不会被锁给阻塞。需要解决以下两个特性:

  1. 线程再次获取锁。锁需要去识别获取锁的线程是否为当前占据锁的线程,如果是,则再次成功获取。
  2. 锁的最终释放。线程重复了n次获取锁,随后在第n次释放该锁后,其他线程能够获取到该锁。锁的最终释放要求锁对于获取进行计数自增,计数表示当前锁被重复获取的次数,计数为0表示锁已经成功释放。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
//ReentrantLock获取锁的非公平方法
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState(); //获取当前锁的状态, [计数值]
//该锁第一次被获取, 只要CAS成功即可获取锁
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//判断是否, 当前线程已经获取锁, 并再次想获取该锁
//如果锁已被其他线程锁占有, 当前线程并不能获取锁资源, 返回false
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) {
throw new Error("Maximum lock count exceeded");
}
setState(nextc);
return true;
}
return false;
}
//公平方法, 线程对于同步状态的获取是基于FIFO的
protected final boolean tryAcquire(int acquires){
final Thread current = Thread.currentThread();
int c = getState();
if(c == 0){
//hasQueuedPredecessors()方法, 同步队列中当前节点是否有前驱节点的判断
//返回true, 表示有线程比该线程很早请求锁, 没有遵守FIFO规则
//因此需要等待前面的线程获取并释放锁, 该线程才能获取
if(!hasQueuedPredecessors() && compareAndSetState(0, acquires)){
setExclusiveOwnerThread(current);
return true;
}
}
else if(current == Thread.currentThread()){
int nextc = c + acquires;
if(nextc < 0){
throw new Error("maximum lock count exceeded");
}
setState(nextc);
return true;
}
return false;
}
//ReentrantLock的tryRelease方法
protected final boolean tryRealease(int releases){
int c = getState() - releases;
//当前线程并没有占有ReentrantLock, 但是尝试释放该锁, 会抛出异常
if(Thread.currentThread() != getExclusiveOwnerThread()){
throw new IllegalMonitorStateException();
}
boolean free = false;
//当锁计数为0的, 没有线程占有该锁了
if(c == 0){
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

在ReentrantLock中,默认设置是非公平地获取同步状态。
非公平锁可能会造成线程”饥饿”,但是极少的线程切换,开销更小,保证了更大的吞吐量。

读写锁

读写锁的特点:在同一个时刻可以允许多个线程访问,但是在写线程访问时,所有的读线程和其他线程均被阻塞。读写锁通过维护一个读锁和一个写锁,使得并发相比一般的排他锁有了很大的提升。特别是在读多于写的情况下。

读写锁的设计实现

读写状态的设计

读写锁同样依赖自定义同步器来实现同步功能,而读写状态就是同步器的同步状态。
同步状态表示锁被一个线程重复获取的次数,而读写锁的自定义同步器需要在同步状态(一个整型变量)上维护多个读线程和一个写线程的状态。在一个整型变量上维护多种状态,使用”按位切割使用”这个变量,如下图所示。

点击加载

当前图的同步状态:表示一个线程已经获取了写锁,并且重进入了2次;并且连续两次获取了读锁。

写锁的获取和释放

写锁是一个支持重进入的排他锁。如果当前线程已经获取了写锁,则增加写状态;如果当前线程在获取写锁时,读锁已经被获取或者该线程不是已经获取写锁的线程,则当前线程进入等待状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/*
* 当 读写锁中有读锁, 那么写锁不能被获取
* 原因: 读写锁要确保写锁的操作对读锁可见,
* 只有当所有的读线程释放了锁, 写锁才可以被当前线程获取
* 一旦写锁被获取, 其他读写线程的后续访问都被阻塞
*/
protected final boolean tryAcqure(int acquires){
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c); //返回值为写锁的状态
if(c != 0){
//c != 0 && w == 0 写锁不存在, 但存在读锁; 或者出现了线程竞争
if(w == 0 || current != Thread.currentThread()){
return false;
}
if(w + exclusiveCount(acquires) > MAX_COUNT){
throw new Error("Maximum lock count exceeded");
}
setState(c + acquires);
return true;
}
//出现了需要阻塞写线程的情况
if(writerShouldBlock() || !compareAndSetState(c, c+acquires)){
return false;
}
//c == 0, 读写锁未被占有
setExclusiveOwnerThread(current);
return true;
}
protected final boolean tryRelease(int releases){
int c = getState() - releases;
if(Thread.currentThread() != getExclusiveOwnerThread()){
throw new IllegalMonitorStateException();
}
boolean free = false;
// c == 0 返回 true
// 表示没有写锁(并且已知没有读锁), 所以目前读写锁没有被获取
if(c == 0){
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

读锁的获取和释放

读锁是一个支持重进入的共享锁,它能够被多个线程同时获取,在没有其他写线程访问(或者写状态为0)时,读锁总会被成功地获取。
如果当前线程已经获取了读锁,增加读状态;如果当前线程在获取读锁的时,写锁已经被其他线程获取,则进入等待状态。

Java5到6变得复杂许多,由于是新增了一些功能。比如getReadHoldCount(),返回当前线程获取读锁的次数。读状态是所有线程获取读锁次数(可重入)的总和,而每个线程各自获取读锁的次数只能选择保存在ThreadLock中,使得获取读锁状态变得复杂。

下面的代码是删减版的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
protected final int tryAcqureShared(int unused){
for( ; ; ){
int c = getState();
int nextc = c + (1 << 16);
if(nextc < c){
throw new Error("Maximum lock count exceeded");
}
//写锁存在或者出现线程竞争
if(exclusiveCount(c) != 0 || owner != Thread.currentThread()){
return -1;
}
if(compareAndSetState(c, nextc)){
return 1;
}
}
}

锁降级

锁降级指的是写锁降级为读锁。如果当前线程拥有写锁,然后将其释放,最后再获取读锁,这种分段完成的过程不能称为锁降级。
锁降级指的是,把持住(当前拥有的)写锁,再获取到读锁,随后释放(先前拥有的)写锁的过程。

下面看一个锁降级的示例。因为数据不常变化,所以要多个线程可以并发的进行数据读取,当数据变更后,如果当前线程感知到数据变化,则进行数据的准备工作,同时其他线程被阻塞,直到当前线程完成数据的准备。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void processData(){
readLock.lock(); //这里的readLock和writeLock应该要是同一把锁,分开是易于标识
// update 是一个volatile修饰的boolean变量
// 此时所有访问processData()方法的线程都能够感知到变化,
// 但是只有一个线程能够获取到写锁, 其他线程的读写状态获取均被阻塞
if(!update){
//必须先释放读锁, 锁降级从写锁的获取开始
readLock.unlock(); // 1
writeLock.lock(); // 2 . 1和2操作可以设置为原子操作
try{
if(!update){
//准备数据的流程(略)
//现在数据被更新了
update = true;
}
// 获取读锁, 但肯定会被阻塞
// 目的在于在释放写锁的那一刹那, 不让其他线程获取到写锁
readLock.lock();
}finally {
writeLock.unlock();
}
}
try{
//使用数据的流程(略)
}finally {
readLock.unlock();
}
}

读锁第二次上锁的必要性:
主要是为了保证数据的可见性,如果当前线程不获取读锁,而是直接释放写锁,假设此刻另一个线程(记住线程T)获取了写锁并修改了数据,那么当前的线程是无法感知到线程T的数据更新。

LockSupport工具

当需要阻塞或者唤醒一个线程的时候,都会使用LockSupport工具类来完成相应工作。

方法名称 描述
void park() 阻塞当前线程,如果调用unpark(Thread t)方法或者被当前线程被中断,才能从park()返回
void parkNanos(long nanos) 超时返回的阻塞线程方法
void parkUntil(long deadline) 阻塞当前线程,直到deadline(从1970到deadline的时间的毫秒数)
void unpark(Thread t) 唤醒处于阻塞状态的线程t

Condition接口

任一Java对象,都拥有一组监视器方法,主要包括wait()、notify()等方法,与synchronized关键字组合,可以实现等待/通知模式。
Condition接口提供了类似Object的方法,与Lock配合可以实现等待/通知模式。

Object监视器方法和Condition接口的比较

对比项 Object Monitor Methods Condition
前置条件 获取对象的锁 调用Lock.lock获取锁,调用Lock.newCondition()获取Condition对象
调用方式 直接调用如:object.wait() 直接调用如:condition.await()
等待队列个数 一个 多个
当前线程释放锁并进入等待状态 支持 支持
当前线程释放锁并进入等待状态,在等待状态是否必须响应中断 必须响应 可以不响应中断
当前线程释放锁并进入超时等待 支持 支持
当前线程释放锁,并且超时等待直到deadline 不支持 支持
唤醒等待队列中的一个或全部线程 支持 支持

Condition接口方法

Condition的获取是依赖锁的。

Condition的实现分析

Object监视器模型上,一个对象拥有一个同步队列和等待队列。而并发包中的Lock同步器中拥有一个同步队列和多个等待队列。

点击加载

等待

调用了等待方法的线程是成功获取锁的线程,也就是同步队列中的首节点,该方法会将当前线程的节点,构造成一个新的等待节点加入等待队列,然后释放同步状态,唤醒同步队列中的后继节点,然后当前线程会进入等待状态。

从等待方法返回的线程,一定是获取了Condition相关锁的线程。

点击加载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public final void await() throws InterruptedException{
if(Thread.interrupted()){
throw new InterruptedException();
}
//当前线程加入等待队列
Node node = addConditionWaiter();
//当前线程从同步队列中释放, 即当前线程释放了锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// isOnSyncQueue 当线程在不在同步队列中, 返回false,循环
// 一旦返回true(线程从等待队列移到同步队列), 或者被打断
// 线程从等待队列中移除
while(!isOnSyncQueue(node)){
LockSupport.park(this);
if((interruptMode = checkInterruptWhileWaiting(node)) != 0){
break;
}
}
//线程被唤醒后, 进入同步队列, 加入到获取同步状态的竞争
if(acquireQueued(node, savedState) && interruptMode != THROW_IE){
interruptMode = REINTERRUPT;
}
if(node.nextWaiter != null){
unlinkCancelledWaiters(); // Unlinks cancelled waiter nodes from condition queue
}
if(interruptMode != 0){
reportInterruptAfterWait(interruptMode);
}
}

通知

点击加载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//被唤醒的线程可以从await()返回, 此时该线程已经成功地获取了锁
//调用signal方法会唤醒等待队列中等待时间最长的节点(首节点)
//在唤醒节点之前, 会将节点移到同步队列的队尾
public final void signal(){
//节点从等待队列移到同步队列的前置条件是
//当前线程必须获取到了锁
if(!isHeldExclusively){
throw new IllegalMonitorStateException();
}
Node first = firstWaiter;
if(first != null){
doSignal(first);
}
}

Java并发编程的艺术-学习笔记-4

发表于 2017-12-13 | 分类于 并发

Java并发编程的基础

现代操作系调度的最小单元是线程,也叫轻量级进程,在一个进程里可以创建多个线程,这些线程都拥有各自的计数器、栈和局部变量,并且能够访问共享变量。
处理器在这些线程上高速切换。
执行main()方法是一个名称为main的线程。

线程简介

线程优先级

现代操作系统基本采用时分的形式调度运行的线程,操作系统会分出一个个时间片,线程会分配到若干时间片,当线程的时间片用完了就会发生线程调度,并等待者下次分配。线程分配到的时间片多少决定了线程使用处理器资源的多少。

Java线程中,通过控制整形变量priority来控制优先级。
优先级高的线程分配的时间片数量要多于优先级低的线程。
频繁阻塞(睡眠或者I/O操作)的线程设蛰较高的优先级,而偏重计算(需要较多的CPU时间或者偏运算)的线程设置较低的优先级,确保处理器不会被独占。

线程优先级不能作为程序正确性的依赖,因为操作系统可以完全不用理会Java线程对于优先级的设定。

线程的状态

状态名称 说明
NEW 线程初始状态,刚被创建没有调用start()方法
RUNNABLE 运行状态,包括操作系统中的”就绪”和”运行”两种状态
BLOCKED 阻塞状态,表示线程阻塞与锁
WAITING 等待状态,进入该状态的线程需要等待其他线程做一些特定动作(通知或者中断)
TIME_WAITING 超时等待状态,在等待时间结束返回
TERMINATED 终止状态,表示当前线程执行完毕

线程在自身的生命周期中,并不固定于某个状态,而是随着代码的执行在不同的状态之间进行切换。

点击加载

从图中可以看出,线程创建之后,调用start()方法开始运行。当线程执行wait()方法之后,线程进入等待状态。进入等待状态的线程需要依赖其他线程的通知才能够返回到运行状态,而超时等待状态相当于在等待状态的基础上增加了超时限制,也就是超时时间一过,就会返回到运行状态。当线程调用同步方法时候,在没有获取到锁的状态下,线程将会进入到阻塞状态。线程在执行Runable的run()方法之后,将会进入到终止状态。

Daemon线程

守护线程是一种支持型线程,因为它主要被用作在后台调度以及支持性工作。这意味着,当一个Java虚拟机中不存在非Deamon线程的时候,Java虚拟机将会退出。可以调用Thread.setDaemon(true)将线程设置为Daemon线程。

Daemon属性需要在启动线程之前设置,不能在启动线程之后设置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import java.util.concurrent.TimeUnit;
public class Daemon {
static class DaemonRunner implements Runnable{
@Override
public void run(){
try{
TimeUnit.SECONDS.sleep(3);
}catch (Exception e){
}finally {
System.out.println("finally in run()");
}
}
}
public static void main(String[] args) throws Exception{
Thread t = new Thread(new DaemonRunner(), "...");
//将main线程作为自己的守护线程
t.setDaemon(true);
t.start();
//main线程退出, java虚拟中没有非Daemon线程, 虚拟机退出
//DaemonRunner中的finally没有被执行.
//下面t.join()可让finnally块执行
//在main线程调用了t的join, 那么main线程就要等待t结束, 才能执行后续代码
// t.join();
// System.out.println("shit");
}
}

运行上述程序,可以看到在终端或者命令提示符上没有任何输出。main线程(非Daemon线程)在启动了线程DaemonRunner之后随着main方法执行完毕而终止,而此时Java虚拟机已经没有非Daemon线程,虚拟机退出。所有线程终止,所以DaemonRunner中的finnally块没有执行。
在构建Daemon线程时,不能依靠finnally块来保证资源的释放

启动和终止线程

构造线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private void init(ThreadGroup g, Runnable target, String name, long stackSize, AccessControlContext acc){
if(name == null){
throw new NullPointerException("name connot be null");
}
//当前线程就是新建线程的父线程
Thread parent = Thread.currentThread();
this.group = g;
this.target = target;
tid = nextThreadID(); //分配一个线程ID
//将daemon, priority属性设置为父线程的对应属性
this.daemon = parent.isDaemon();
this.priority = parent.getPriority();
}

启动线程

上述的构造线程完成后,一个能够运行的线程对象就初始化了,在堆内存中等待着运行。

此时,调用start()方法就可以启动这个线程。线程starrt()方法含义是:当前线程(即parent线程)同步告知虚拟机,只要线程规划器空闲,应该立即调用start()方法的线程。

理解中断

中断可以理解为线程的一个标识位属性,它表示一个运行中的线程是否被其他线程进行了中断操作。中断好比其他线程对该线程打了个招呼,其他线程通过调用线程的interrupt()方法对其进行中断操作。

线程通过检查自身是否被中断来进行响应,线程通过方法isInterrupted()来进行判断是否被中断,也可以调用静态方法Thread.interrupted()对当前线程的中断标识位进行复位。如果该线程已经处于终结状态,即使该线程被中断过,在调用该对象的isInterrupted()方法时,依旧会返回false。

过期的suspend()、resume()和stop()

不建议使用三个过期的函数的原因:
以suspend()方法为例,在调用后,线程不会释放已经占有的资源(比如锁),而是占着有资源进入睡眠状态,这样容易引发死锁问题。同样滴,stop()方法在终结一个线程时不会保证资源的正常释放,通常是没有给予线程完成资源释放工作的机会,因此会导致程序可能工作在不确定的状态。

安全地终止线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import java.util.concurrent.TimeUnit;
public class ShutDown {
private static class Runner implements Runnable{
private long i;
private volatile boolean on = true;
public void cancel(){
on = false;
}
@Override
public void run(){
while( on && !Thread.currentThread().isInterrupted()){
++i;
}
System.out.println("Count i: " + i);
}
}
public static void main(String[] args) throws Exception{
Runner r1 = new Runner();
Runner r2 = new Runner();
Thread thread1 = new Thread(r1, "countThread1");
Thread thread2 = new Thread(r2, "countThread2");
//main线程睡眠1秒, 然后对thread1进行中断, 使得thread1可以感知中断而结束
thread1.start();
TimeUnit.SECONDS.sleep(1);
thread1.interrupt();
//main线程睡眠1秒, 然后thread2调用cancel(), 使得thread2可以感知 on == false而结束
thread2.start();
TimeUnit.SECONDS.sleep(1);
r2.cancel();
}
}

示例在执行过程中,main线程通过中断操作和cancel()方法都可以让Thread线程终止。这种通过标识位或者中断操作方式让线程在终止的时候可以有机会去清理资源,而不是直接武断地终止线程。

线程间的通信

volatile和synchronized关键字

volitale

volatile可以用来修饰字段(成员变量),就是告知程序任何对该变量的访问均需要从共享内存中获取,而对它的改变必须同步刷新回共享内存,它能保证所有线程对变量的访问的可见性。

synchronized

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class Pikaqiu{
public synchronized void n(){
}
}
public class SynchronizedLock {
private static Pikaqiu pikaqiu = new Pikaqiu();
public static void main(String []args){
//普通同步方法, 锁的对象是实例对象
pikaqiu.n();
//静态同步方法, 锁的对象是 SynchronizedLock Class类
m();
//同步方法块, 锁的对象是括号里的 Pikaqiu Class类
synchronized (Pikaqiu.class){
}
//同步方法块, 锁的对象是括号里的实例对象
synchronized (pikaqiu){
}
}
public static synchronized void m(){
}
}

任意一个对象都拥有自己的监视器,当这个对象由同步块或者这个对象的同步方法调用时,执行方法的线程必须先获取到该对象的监视器才能够进入同步块或同步方法,而没有获取到监视器的线程将会被阻塞在同步块和同步方法的入口处,进入阻塞状态。

任意线程对Object(Object由synchronized保护)的访问,首先要获取Object的监视器,如果获取失败,线程进入同步队列,线程状态变为阻塞。
当访问前驱(获取了锁的线程)释放了锁,该释放操作唤醒阻塞在同步队列中的线程,使其尝试获取监视器。

等待/通知机制

等待/通知机制,是指一个线程A调用了对象O的wait()方法进入等待状态,而另一个线程B调用了对象的O的notify()或者notifyAll()方法,线程A收到通知后从对象O的wait()方法返回,进行后续的操作。
上述的两个线程通过对象O来完成交互,而对象上的wait()和notify()的关系就如开关信号一样,用来完成等待方和通知方之间的交互工作。

调用上述方法的细节:

  1. 使用wait()、notify()和notifyAll()需要先对调用对象加锁
  2. 调用wait()方法后,将当前线程放置到对象的等待队列。
  3. notify()/notifyAll()调用后,等待线程依旧不会从wait()返回,需要调用notfify()/notifyAll()的线程释放锁之后,等待线程才有机会从wait()返回。
  4. notify()将等待队列中的一个等待线程从等待队列移到同步队列,而notifyAll()方法将等待队列中的所有线程全部移到同步队列。
  5. 从wait()方法中返回的前提是获得了对象的锁。

等待/通知机制依托于同步机制,其目的就是确保等待线程从wait()返回时能够感知到通知线程对变量做出的改变。

如上图,WaitThread首先获取了对象的锁,然后调用了对象的wait()方法,从而放弃了锁并进入了等待队列WaitQueue中,进入等待状态。由于WaitThread释放了锁,NotifyThread随后获取了对象的锁,在业务逻辑处理后,调用了对象的notify()方法,将WaitThread从WaitQueue移到SynchronizedQueue中,此时WaitThread状态变为阻塞状态。NotifyThread释放了锁之后,从SynchronizedQueue取出一个线程来获取锁,这个WaitThread再次获取锁,并从wait()方法返回继续执行。

等待/通知的经典范式

等待方(消费者)、通知方(生产者)

等待方遵循的规则

  1. 获取对象的锁
  2. 如果条件不满足,调用对象的wait()方法,被通知后再次检查条件
  3. 条件满足执行后续操作
1
2
3
4
5
6
synchronized(对象){
while(条件不满足){
对象.wait();
}
后续的逻辑处理;
}

通知方的规则

  1. 获取对象的锁
  2. 改变条件
  3. 通知所有等待在对象上的线程
1
2
3
4
synchronized(对象){
改变条件;
对象.notifyAll();
}

管道输入/输出流

管道输入/输出流与普通文件输入/输出流或者网络输入/输出流不同之处在于,它主要用于线程之间的数据传输,而传输的媒介为内存。

PipedReader和PipedWriterr面向字符。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import java.io.IOException;
import java.io.PipedReader;
import java.io.PipedWriter;
public class Piped {
public static void main(String []args) throws Exception{
PipedWriter out = new PipedWriter();
PipedReader in = new PipedReader();
//将输出和输入的流进行连接, 否则在使用时会抛出IOException
out.connect(in);
Thread thread = new Thread(new Print(in), "PrintThread");
thread.start();
int receive = 0;
try{
while((receive = System.in.read()) != -1){
out.write(receive);
}
}catch (IOException ioe){
}finally {
out.close();
}
}
static class Print implements Runnable{
private PipedReader in;
public Print(PipedReader in){
this.in = in;
}
@Override
public void run(){
int receive = 0;
try{
while((receive = in.read()) != -1){
System.out.print((char)receive);
}
}catch (IOException ioe){
}
}
}
}

ThreadLocal的使用

ThreadLock:线程本地变量,是一个以ThreadLock对象为键,任意对象为值的存储结构。这个结果被附带在线程上,也就是说一个线程可以根据一个ThreadLock对象查询到绑定到一个线程上的值。
可以通过get()和set()方法来获取和设置值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import java.util.concurrent.TimeUnit;
public class Profiler {
private static ThreadLocal<Long> threadLocal = new ThreadLocal<>();
public static void begin(){
threadLocal.set(System.currentTimeMillis());
}
public static Long end(){
return System.currentTimeMillis() - threadLocal.get();
}
public static void main(String[] args) throws Exception{
Profiler.begin();
TimeUnit.SECONDS.sleep(1);
System.out.println("cost: " + Profiler.end() + "mills");
}
}

若使用上述的Profiler的好处有,两个方法的调用不用在同一个方法或者类中。
例如在AOP中,可以在方法的调用前的切入点执行begin()方法,而在方法的调用之后执行切入点end()方法。

Thread.join()的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import java.util.concurrent.TimeUnit;
public class Join {
static class Catalina implements Runnable{
private Thread thread;
public Catalina(Thread thread){
this.thread = thread;
}
@Override
public void run(){
try{
thread.join();
}catch (InterruptedException e){
e.getMessage();
}
System.out.println(Thread.currentThread().getName() + "terminate.");
}
}
public static void main(String[] args) throws Exception{
System.out.println("Start main");
Thread previous = Thread.currentThread();
for(int i = 0; i < 10; ++i){
//每个线程拥有上一个线程的一个引用, 需要等待上一个线程的终止, 才能从等待中返回
Thread thread = new Thread(new Catalina(previous), "Thread(" + i + ")");
thread.start();
previous = thread;
}
TimeUnit.SECONDS.sleep(2);
System.out.println("Thread: " + Thread.currentThread().getName());
}
}

每个线程的终止的前提是,前驱线程的终止,每个线程等待前驱线程终止后,才从join()返回,这里涉及了等待/通知机制(等待前驱线程终止,接受前驱线程结束通知)。
当线程终止时,会调用线程自身的notifyAll()方法,会通知所有等待在该线程对象上的线程。

比如有两个线程,mainThread和ThreadA,在线程ThreadA中获取mainThread线程,并且调用mainThread.join(),那么线程A的终止前提条件是,mainThread终止。

Thread.yield()方法的使用

参考博客:点击跳转

使当前线程从执行状态(运行状态)变为可执行态(就绪状态)。cpu会从众多的可执行态里选择,也就是说,当前也就是刚刚的那个线程还是有可能会被再次执行到的,并不是说一定会执行其他线程而该线程在下一次中不会执行到了。

Java线程中有一个Thread.yield( )方法,很多人翻译成线程让步。顾名思义,就是说当一个线程使用了这个方法之后,它就会把自己CPU执行的时间让掉,让自己或者其它的线程运行。

打个比方:现在有很多人在排队上厕所,好不容易轮到这个人上厕所了,突然这个人说:“我要和大家来个竞赛,看谁先抢到厕所!”,然后所有的人在同一起跑线冲向厕所,有可能是别人抢到了,也有可能他自己有抢到了。我们还知道线程有个优先级的问题,那么手里有优先权的这些人就一定能抢到厕所的位置吗? 不一定的,他们只是概率上大些,也有可能没特权的抢到了。

Java并发编程的艺术-学习笔记-3

发表于 2017-12-10 | 分类于 并发

Java内存模型

在命令式编程中,线程之间的通信机制有两种:共享内存和消息传递。

在共享内存的并发模型里,线程之间共享程序的公共状态,通过读一写内存中的公共状态进行隐式传递。
在消息传递的并发模型里,线程之间没有公共状态,线程之间必须通过发送消息来显式进行通信。

Java的并发采用的共享内存模型。

Java内存模型的基础

内存模型的抽象结构

JMM通过控制主内存与每个线程的本地内存之间进行交互,提供内存可见性的保证。

抽象地看,JMM定义了线程和主内存之间的抽象关系:线程之间的共享变量存储在主内存中,每个线程都有一个私有的本地内存,本地内存中存储了该线程以读/写共享变量的副本。
本地内存是JMM的一个抽象概念,并不真实存在,它涵盖了缓存、写缓存区、寄存器与其他的硬件和编译器优化。

在Java中,所有实例域、静态域和数组元素都储存在堆内存中,堆内存在线程之间共享。

源代码到指令序列的重排序

为了提高性能,编译器和处理器会对指令进行一共三种重排序。

a. 编译器优化的重排序(不改变单线程程序语义的前提下)

b. 指令级并行的重排序(不存在数据依赖的前提下)

c. 内存系统的重排序。由于处理器使用缓存和读/写缓冲区,使得加载和存储操作看起来像是在乱序执行。

重排序会导致多线程出现内存不可见性问题。
对于编译器,JMM的编译器重排序规则会禁止特定类型的重排序
对于处理器,JMM的处理器要求编译器生成指令序列时,插入特定类型的内存屏障。

volatile的内存语义

编译器不会对volatile读与volatile读后面的任意内存操作重排序;(volatile读先)
编译器不会对volatile写与volatile写前面的任意内存操作重排序。(volatile写后)
组合两个条件,意味着为了同时实现volatile读和volatile写的内存语义,编译器不能对CAS与CAS前面和后面的任意内存操作重排序。

volatile变量的特性:

  1. 可见性。对一个volatile变量的读,总是能看到(任意线程)对这个volatile变量最后的写入。
  2. 原子性。任意单个volatile变量的读/写操作具有原子性,但类似于volatile++这个复合操作不具有原子性。

Java锁的内存语义

锁是Java并发编程中最重要的同步机制。锁除了让临界区互斥执行之外,还可以让释放锁的线程获取同一个锁的线程发送消息。

锁释放与volatile的读写有着相同的内存语义

  1. 线程A释放一个锁,实质上是线程A向接下来将要获取这个锁的某个线程发出了(线程A对共享变量所做修改的)消息。
  2. 线程B获取一个锁,实质上线程B接受了某个线程(在释放这个锁之前对共享变量所做修改的)消息。
  3. 线程A释放锁,随后线程B获取这个锁,这个过程实质上是线程A通过主内存向线程B发送消息。

底层实现

concurrent的实现

由于Java的CAS同时具有volatile读和volatile写的内存语义,因此Java线程之间的通信有了下面4种方式。

  1. 线程A写volatile变量,随后线程B读这个volatile变量
  2. 线程A写volatile变量,随后线程B用CAS更新这个volatile变量
  3. 线程A用CAS更新一个volatile变量,随后线程B用CAS更新这个volatile变量
  4. 线程A用CAS更新一个volatile变量,随后线程B这个volatile变量

Java的CAS会使用现代处理器上提供的高效机器级别的原子指令,这些原子指令以原子方式对内存执行读-写-改操作,是顺序计算图灵机的异步等价机器。volatile变量的读/写和CAS可以实现线程之间的通信。这些特性形成了concurreent包得以实现的基石。

点击加载

final域的内存语义

final域的重排序规则

a. 在构造函数内对一个final域的写入,与随后把这个构造对象的引用赋值给一个引用变量,这两个操作之间不能重排序
b. 初次读一个final域的对象的引用,与随后初次读这个final域,这两个操作之间不能重排序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class FinalTest {
int i;
final int j;
static FinalTest obj;
public FinalTest(){ //构造函数
i = 1; //写普通域
j = 2; //写final域
}
public static void writer(){ //写线程A的执行
obj = new FinalTest();
}
public static void reader(){ //读线程B的执行
FinalTest object = obj; //读对象的引用
int a = object.i; //读普通域
int b = object.j; //读final域
}
}

这里假设一个线程A执行writer()方法,随后另一个线程B执行reader()方法。下面我们通过这两个线程的交互来说明重排序规则。

写final域的重排序规则

写final域的重排序规则禁止把final域的写重排序到构造函数之外。实现包括

  1. JMM禁止编译器把final域的写重排序到构造函数之外。
  2. 编译器会在final域的写之后,构造函数return之前,插入一个StoreStore屏障。这个屏障禁止处理器把final域的写重排序到构造函数之外。

在示例代码中,由于线程A,B竞争,由于线程B”看到”对象引用obj时,很可能obj对象还没有构造完成(对普通域i的写操作被重排序到构造函数外,此时初始值1还没有写入普通域)。
而写final域的重排序规则可以确保:在对象引用为任意线程可见之前,对象的final域已经被正确初始化过了,而普通域没有这个保障

读final域的重排序规则

在一个线程中,初次读对象引用与初次读该对象包含的final域,JMM禁止处理器重排序这两个操作。

读final域的重排序规则可以确保:在读一个对象的final域之前,一定会先读包含这个final域的对象的引用。

final域为引用类型

对于final域为引用类型,写final域的重排序规则对编译器和处理器增加了约束:在构造函数内对一个final引用的对象的成员域的写入,与随后在构造函数外把这个被构造函数对象的引用赋值给一个引用变量,这两个操作不能重排序。

比如,引用的final域是int数组,该规则保证了,数组元素再被使用之前,已经被初始化了。

final引用不能从构造函数中”溢出”

写final域引用的重排序规则可以确保:在引用变量为任意线程可见之前,该引用变量指向的对象的final域已经在构造函数中被正确初始化了。其实要得到这个效果,还需要一个保证:在构造函数内部,不能让这个被构造对象的引用为其他线程所见,也就是对象引用不能在构造函数中”溢出”。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class FinalTest {
final int i;
static FinalTest obj;
public FinalTest(){ //构造函数
i = 1; //1 写final域, 此时初始化未完成
obj = this; //2 this引用在此"溢出"
}
public static void writer(){
new FinalTest();
}
public static void reader(){
if(obj != null){ //3 由于"溢出"(obj != null)
int t = obj.i; //4 读取未被初始化i
}
}
}

有了重排序规则可以确保:在构造函数返回之前,被构造对象的引用不能为其他线程所见,因为此时的final域可能还没有被初始化。在构造函数返回之后,任意线程都将保证能看到final域正确初始化之后的值。

双重检查锁定与延迟初始化

单例模式下的懒汉模式,在多线程的的条件下可能会出现问题。

基于voltatile的解决方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class Instance{
}
public class SafeDoubleCheckedLocking { //1
private volatile static Instance instance; //2
public static Instance getInstance(){ //3
if(instance == null){ //4 第一次检查
synchronized (SafeDoubleCheckedLocking.class){ //5 加锁
if(instance == null){ //6 第二次检查
instance = new Instance(); //7 可能会出现问题的地方
}
}
}
return instance;
}
}

问题根源:如上,在线程执行到第4行,当instance!=null的时候,实际上instance引用的对象初始化可能还没有完成。

解决思路是,使用volatile禁止上面2,3行代码的重排序,来保证线程安全的延迟初始化。

1
2
3
4
5
6
instance = new Instance();
//新建一个对象Instance可分解为如下三行伪代码
memory = allocate(); //1 分配对象的内存空间
ctorInstance(memory); //2 初始化对象
instance = memory; //3 设置Instance指向分配的空间

实际上,上面的伪代码操作2,3可能会重排序,这是不改变单线程程序执行结果的重排序。
但是在多线程,instance可能会先指向一个未被初始化的空间,调用者使用这个未初始化的内存空间,bug就出现了。

剑指Offer_66

发表于 2017-12-07 | 分类于 剑指Offer

题目

地上有一个m行和n列的方格。一个机器人从坐标0,0的格子开始移动,每一次只能向左,右,上,下四个方向移动一格,但是不能进入行坐标和列坐标的数位之和大于k的格子。 例如,当k为18时,机器人能够进入方格(35,37),因为3+5+3+7 = 18。但是,它不能进入方格(35,38),因为3+5+3+8 = 19。请问该机器人能够达到多少个格子?

解题思路

回溯法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class Solution {
private int movingCountHelper(int threshold, int rows, int cols, boolean []isVisited, int i, int j){
int pos = i * cols + j;
if(i < 0 || i >= rows || j < 0 || j >= cols || isVisited[pos] || !isSafe(threshold, i, j)){
return 0;
}
isVisited[pos] = true;
return 1 + movingCountHelper(threshold, rows, cols, isVisited, i, j-1)
+ movingCountHelper(threshold, rows, cols, isVisited, i, j+1)
+ movingCountHelper(threshold, rows, cols, isVisited, i+1, j)
+ movingCountHelper(threshold, rows, cols, isVisited, i-1, j);
}
private boolean isSafe(int threshold, int x, int y){
int sum = 0;
while(x != 0){
sum += (x % 10);
x /= 10;
}
while(y != 0){
sum += (y % 10);
y /= 10;
}
return sum > threshold ? false : true;
}
public int movingCount(int threshold, int rows, int cols)
{
boolean []isVisited = new boolean[rows * cols];
return movingCountHelper(threshold, rows, cols, isVisited, 0, 0);
}
}
1234…12
塔头刘德华

塔头刘德华

113 日志
11 分类
8 标签
© 2018 塔头刘德华
由 Hexo 强力驱动
|
主题 — NexT.Pisces v5.1.2