Java并发编程的艺术-学习笔记-5

Java中的锁

Lock接口

在Java SE 5 之后,并发包新增了Lock接口(以及相关实现类)用来实现锁功能,它提供了与synchronized关键字类似的同步功能,只是在使用时需要显示地获取和释放锁。虽然它缺少了(通过synchronized快或方法所提供的)隐式获取和释放锁的便捷性,但是却拥有了锁获取和释放的可操作性、可中断的获取锁以及超时获取锁等多种synchronized关键字不具备的同步特性。

Lock使用的基本方式

1
2
3
4
5
6
7
8
9
Lock lock = new ReentrantLock();
lock.lock();
try{
}catch(Exception e){
}finally{
lock.unlock();
}

在finally块中释放锁,目的是保证在获取到锁之后,最终能够被释放。

不要将获取锁的过程中写在try块中,因为如果获取锁(自定义锁的实现)发生了异常,异常抛出的时候,也会导致锁无故释放。

Lock是一个接口,它定义了锁获取和释放的基本操作

Lock接口提供的synchronized关键字不具备的主要特性

特性 描述
尝试非阻塞获取锁 当前线程尝试获取锁
能够中断地获取锁 与synchronized不同,获取锁的线程能够响应中断,当获取到锁的线程被中断,中断异常会被抛出,同时锁会被释放
超时获取锁 在指定的截止时间之前获取锁,如果超时了还未获取锁,返回

Lock接口的API

方法名称 描述
void lock() 获取锁,调用该方法当前线程会获取锁,一旦获取,立刻返回
void lockInterruptibly() throws IE 可中断地获取锁,在锁的获取中可以中断当前线程
boolean tryLock() 尝试非阻塞获取锁,调用该方法后立刻返回
boolean tryLock(time, unit) throws IE 超时的获取锁,可能情况:1. 获取锁 2. 超时返回false 3. 时间内被中断
void unlock() 释放锁

注:throws IE == throws InterruptedExcption

队列同步器

队列同步器AbstractQueuedSynchronized(AQS),是用来构建锁或者其他同步组件的基础框架,它使用了一个int变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。

AQS的主要使用方法是继承,子类通过继承AQS并实现它的抽象方法来管理同步状态,在抽象方法的实现过程中免不了要对同步状态进行更改,需要AQS提供的三个方法 getState()、setState(int state)和compareAndSetState(int expect, int update),它们能够保证状态的改变是安全的。子类被推荐定义为自定义同步组件的静态内部类。

同步器是是实现锁的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义。可以这样理解二者的关系:锁是面向使用者的,它定义了使用者与锁交互的接口,隐藏了实现细节;同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。

队列同步器的接口和示例

重写同步器的方法,需要使用AQS提供的三个方法来访问或修改同步状态。

方法名称 描述
getState() 获取当前同步状态
setState(int newState) 设置当前同步状态
comparAndSetState(int expect, int update) 使用原子性的CAS保证安全地设置状态

可重写的五个同步器方法(在AQS是protected属性),所以锁的调用者不能直接调用AQS的以下方法

这个五个方法在AQS中的样子,不重写无法正常使用

1
2
3
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
//独占式获取同步状态, 实现该方法使用CAS方法
protected boolean tryAcquire(int arg)
//独占式释放同步状态
protected boolean tryRelease(int arg)
//共享式获取同步状态, 返回大于等于0的值, 表示获取成功, 反之失败
protected int tryAcquireShared(int arg)
//共享式释放同步状态
protected boolean tryReleaseShared(int arg)
//表示是否被当前线程所独占
protected boolean isHeldExclusively()

同步器提供的可用模板方法(在AQS中是public属性),可以供锁的使用者调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//独占式获取同步状态,
//若当前线程获取成功,返回;
//否则线程进入同步等待队列, 调用可重写的tryAcquire(int)
void acquire(int arg)
//与acquire(int arg)一样, 但是该方法响应中断
//线程未获取锁, 进入同步等待队列, 若被中断, 方法抛出InterruptedException并返回
void acquireInterruptibly(int arg)
//在acquireInterruptibly(int arg)基础上加上超时限制
//线程在时间内获取到同步状态, 返回true, 否则返回false
boolean tryAcquireNanos(int arg, long nanos)
//共享式的获取同步状态
//与独占式的获取主要区别就是同一个时刻可以有多个线程获取到同步状态
void acquireShared(int arg)
//与acquireShared(int)一样, 并且响应中断
void acquireSharedInterruptibly(int arg)
//独占式释放同步状态
//该方法在释放同步状态之后, 还会将同步队列中的第一个节点包含的线程唤醒
boolean release(int arg)
//共享式的释放同步状态
boolean releaseShared(int arg)
//获取等待在同步队列上的线程集合
Collection<Thread> getQueueThreads()

同步器提供的模板方法基本上分为3类,独占式获取和释放同步状态、共享式获取和释放同步状态和查询同步队列中的等待线程。

独占锁就是在同一个时刻只能有一个线程可以获取到锁,其他获取不到的线程只能处于同步等待队列之中,只有获取了锁的线程释放了锁,后继的线程才能有机会获取到锁。

自定义实现的锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class MyMutex implements Lock {
//静态内部类继承AQS
private static class Sync extends AbstractQueuedSynchronizer{
@Override
protected boolean isHeldExclusively(){
return getState() == 1;
}
@Override
public boolean tryAcquire(int acquire){
if(compareAndSetState(0, 1)){
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int release){
if(getState() == 0){
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
Condition newCondition(){
return new ConditionObject();
}
}//结束静态内部类
//仅需将操作代理到Sync上既可以
private final Sync sync = new Sync();
public void lock() { sync.acquire(1); }
public boolean tryLock() { return sync.tryAcquire(1); }
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public void unlock() { sync.release(1); }
public Condition newCondition() { return sync.newCondition(); }
public boolean isLocked() { return sync.isHeldExclusively(); }
public boolean hasQueuedThreads(){ return sync.hasQueuedThreads(); }
}

如上所式,独占锁MyMutex是自定义的一个同步组件,它在同一个时刻只允许一个线程占有锁。Mutex中定义了一个静态内部类,该内部类继承了同步器并且实现了独占式获取和释放同步状态。在tryAcquire(int acquires)方法,如果经过CAS设置成功(同步状态为1),则表示获取了同步状态,而在tryRelease(int release)方法中只是将同步状态重置为0。
用户使用MyMutex并不会直接和内部同步器实现打交道,而是调用MyMutex提供的方法。这样便可以大大降低一个可靠自定实现的同步组件的门槛。

队列同步器的实现分析

包括4个部分

  1. 同步队列
  2. 独占式同步状态的获取与释放
  3. 共享式同步状态获取与释放
  4. 超时获取同步状态

同步队列

同步器依赖内部实现的同步队列(一个FIFO双向队列)来完成同步状态管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待信息构造成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。

同步队列中的节点(Node)用来保存获取同步状态失败的线程引用、等待状态以及前驱和后继节点

int WaitStatus等待状态 描述
CANCELEED 值为1,由于在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待,节点进入该状态将不会变化
SIGNAL 值为-1,后继节点的线程等于等待状态,而当前节点的线程如果释放了同步状态或者取消,将会通知后继节点运行
CONDITION 值为-2,节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()方法后,该节点将会从等待队列中转移到同步队列
PROPAGATE 值为-3,表示下一次共享式同步状态获取将会无条件传播下去???
INITIAL 值为0,初始状态

Node nextWaiter: 等待队列中的后继节点。如果当前节点是共享的,那么这个字段将是一个SHARRED常量,即节点类型(独占和共享)和等待队列中的后继节点共用同一个字段

同步队列的CAS

当一个线程成功地获取了同步状态(或者锁),其他线程将无法获取到同步状态,转而被构造成节点并加入同步队列中,而这个加入队列的过程必须要保证是线程安全的。
同步器提供一个基于CAS的设置尾节点的方法:compareAndSetTail(Node expect, Node update),它需要传递当前线程“认为”的尾节点和当前节点。
设置首节点并不需要CAS来保证,由于只有一个线程能够成功获取到同步状态。

Node的定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
//head和tail是不存线程信息的节点(只用于标识)
private transient volatile Node head;
private transient volatile Node tail;
static final class Node {
static final Node SHARED = new Node(); //共享式同步状态, 使用同一个节点
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
//获取当前节点的前驱节点
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

独占式同步状态获取和释放

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
public final void acquire(int arg){
if(!tryAcquire(arg) && acquireQueued(Node.EXCLUSIVE, arg)){
selfInterrupt();
}
}
private Node addWaiter(Node mode){
Node node = new Node(Thread.currentThread(), mode);
//快速尝试在尾部添加节点
//满足两个条件: 1.尾节点已经存在 2.没有线程竞争出现导致的问题
Node pred = tail;
if(pred != null){
node.prev = pred;
if(compareAndSetTail(pred, tail)){
pred.next = node;
return node;
}
}
enq(node);
return node;
}
//返回的并非新建尾节点, 而是旧的尾节点
//通过"死循环", 完成线程安全的正确设置尾节点
private Node enq(final Node node){
for( ; ;){
Node t = tail;
if(t == null){
if(compareAndSetTail(new Node())){
tail = node;
}
}
else{
node.prev = t;
if(compareAndSetTail(t, node)){
t.next = node;
return t;
}
}
}
}
//每个节点(线程)在同步队列中不断地自省观察
//条件满足, 获取同步状态; 否则, 自旋并阻塞节点(线程)
final boolean acquireQueued(final Node node, int arg){
boolean failed = true;
try{
boolean interrupted = false;
for( ; ; ){
//获取当前node的前一节点p
//如果前驱节点是头节点, 并且满足自定义的tryAcquire条件
//那么获取同步状态成功, 否则继续自旋, 自旋的同时判断是否有阻塞和中断信
final Node p = node.predecessor();
if(p == head && tryAcquire(arg)){
setHead(node);
p.next = null;
failed = false;
return interrupted;
}
//shouldParkAfterFailedAcquire()方法
//Checks and updates status for a node that failed to acquire.
//Returns true if thread should block. This is the main signal
//control in all acquire loops.
if(shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()){
interrupted = true;
}
}
}finally {
//不能从自旋中获取同步状态
if(failed){
//Cancels an ongoing attempt to acquire.
//取消尝试获取同步锁
cancelAcquire(node);
}
}
}
public final boolean release(int arg){
//从自定义的释放锁中返回
if(tryRealease(arg)){
Node h = head;
//waitStatus == 0 标识初始状态.
if(h != null && h.waitStatus != 0){
unparkSuccessor(h); //Wakes up node's successor, if one exists.
}
return true;
}
return false;
}

共享式同步状态获取和释放

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
//共享式同步状态的获取和释放
public final void acqurireShared(int arg){
//要求自定义的tryAcquireShared
//第一次获取不到同步状态, 进入死循环获取同步状态
if(tryAcqureShared(arg) < 0){
doAcquireShared(arg);
}
}
/*
* tryAcquireShared在返回 >= 0时, 表示能够获取到同步状态
* 自旋过程中, 如果当前节点的前驱节点为头节点, 尝试获取同步状态, 返回值 >= 0, 成功
*/
private void doAcquireShared(int arg){
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try{
boolean interrupted = false;
for( ; ; ){
final Node p = node.predecessor();
if(p == head){
int r = tryAcquireShared(arg);
if(r >= 0){
setHeadAndPropagate(node, r);
p.next = null;
if(interrupted){
selfInterrupt();
}
failed = false;
return;
}
}
if(shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()){
interrupted = true;
}
}
}finally {
if(failed){
cancelAcquire(node);
}
}
}
/*
* 该方法在释放同步状态之后, 将会唤醒后续处于等待状态的节点.
* tryReleaseShared方法必须保证同步状态线程安全地释放, 一般通过循环和CAS保证
* 因为释放共享同步状态的操作会来自多个线程
*/
public final boolean releaseShared(int arg){
if(tryReleaseShared(arg)){
doReleaseShared();
return true;
}
return false;
}

独占式超时获取同步状态

响应中断的同步状态获取过程:

在Java5之前,当一个线程获取不到锁被阻塞在synchronized之外时,对该线程进行中断操作,此时该线程的中断标志位会被修改,但线程依旧会阻塞在synchronized,等待着获取锁。在Java5中,同步器提供了acquireInterruptibly(int arg)方法,这个方法在等待获取同步状态时,如果当前线程被中断,会立刻返回,并抛出InterruptedException。
超时获取同步状态过程可以被视为响应中断获取同步状态过程的”增强版”,doAcquireNanos(int arg, long nanosTimeout)方法在支持响应中断基础上,增加了超时获取的特性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException{
long lastTime = System.nanoTime();
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try{
for( ; ; ){
final Node p = node.predecessor();
if(p == head && tryAcquire(arg)){
setHead(node);
p.next = null;
failed = false;
return true;
}
//超时返回false
if(nanosTimeout <= 0){
return false;
}
//超时等待时间 > threshold(1000ns), 进入超时等待状态
//否则, 按照进入快速自旋过程
if(shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold){
LockSupport.parkNanos(this, nanosTimeout);
}
//计算nanosTimeout剩余的睡眠时间
long now = System.nanoTime();
nanosTimeout -= now - lastTime;
lastTime = now;
if(Thread.interrupted()){
throw new InterruptedException();
}
}
}finally {
if(failed){
cancelAcquire(node);
}
}
}

可重入锁

ReentrantLock,支持重进入的锁,它表示该锁能够支持一个线程对资源的重复加锁。除此之外,该锁还支持获取锁时的公平和非公平选择。
Synchronized关键字隐式的支持重进入,比如一个synchronized修饰的递归方法,在执行时,执行线程在获取了锁之后仍然连续多次地获取该锁。
ReentrantLock在调用lock()方法时,已经获取到锁的线程,能够再次调用lock()方法获取锁而不被阻塞。

锁的公平性问题:公平的获取锁,也就是等待时间最长的线程最优先获取锁,也可以说是锁获取是顺序的。ReentrantLock提供一个构造函数,能够控制锁是否公平。

实现重进入

重进入是指任意线程在获取到锁之后,该线程能够再次获取该锁而不会被锁给阻塞。需要解决以下两个特性:

  1. 线程再次获取锁。锁需要去识别获取锁的线程是否为当前占据锁的线程,如果是,则再次成功获取。
  2. 锁的最终释放。线程重复了n次获取锁,随后在第n次释放该锁后,其他线程能够获取到该锁。锁的最终释放要求锁对于获取进行计数自增,计数表示当前锁被重复获取的次数,计数为0表示锁已经成功释放。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
//ReentrantLock获取锁的非公平方法
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState(); //获取当前锁的状态, [计数值]
//该锁第一次被获取, 只要CAS成功即可获取锁
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//判断是否, 当前线程已经获取锁, 并再次想获取该锁
//如果锁已被其他线程锁占有, 当前线程并不能获取锁资源, 返回false
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) {
throw new Error("Maximum lock count exceeded");
}
setState(nextc);
return true;
}
return false;
}
//公平方法, 线程对于同步状态的获取是基于FIFO的
protected final boolean tryAcquire(int acquires){
final Thread current = Thread.currentThread();
int c = getState();
if(c == 0){
//hasQueuedPredecessors()方法, 同步队列中当前节点是否有前驱节点的判断
//返回true, 表示有线程比该线程很早请求锁, 没有遵守FIFO规则
//因此需要等待前面的线程获取并释放锁, 该线程才能获取
if(!hasQueuedPredecessors() && compareAndSetState(0, acquires)){
setExclusiveOwnerThread(current);
return true;
}
}
else if(current == Thread.currentThread()){
int nextc = c + acquires;
if(nextc < 0){
throw new Error("maximum lock count exceeded");
}
setState(nextc);
return true;
}
return false;
}
//ReentrantLock的tryRelease方法
protected final boolean tryRealease(int releases){
int c = getState() - releases;
//当前线程并没有占有ReentrantLock, 但是尝试释放该锁, 会抛出异常
if(Thread.currentThread() != getExclusiveOwnerThread()){
throw new IllegalMonitorStateException();
}
boolean free = false;
//当锁计数为0的, 没有线程占有该锁了
if(c == 0){
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

在ReentrantLock中,默认设置是非公平地获取同步状态。
非公平锁可能会造成线程”饥饿”,但是极少的线程切换,开销更小,保证了更大的吞吐量。

读写锁

读写锁的特点:在同一个时刻可以允许多个线程访问,但是在写线程访问时,所有的读线程和其他线程均被阻塞。读写锁通过维护一个读锁和一个写锁,使得并发相比一般的排他锁有了很大的提升。特别是在读多于写的情况下。

读写锁的设计实现

读写状态的设计

读写锁同样依赖自定义同步器来实现同步功能,而读写状态就是同步器的同步状态。
同步状态表示锁被一个线程重复获取的次数,而读写锁的自定义同步器需要在同步状态(一个整型变量)上维护多个读线程和一个写线程的状态。在一个整型变量上维护多种状态,使用”按位切割使用”这个变量,如下图所示。

点击加载

当前图的同步状态:表示一个线程已经获取了写锁,并且重进入了2次;并且连续两次获取了读锁。

写锁的获取和释放

写锁是一个支持重进入的排他锁。如果当前线程已经获取了写锁,则增加写状态;如果当前线程在获取写锁时,读锁已经被获取或者该线程不是已经获取写锁的线程,则当前线程进入等待状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/*
* 当 读写锁中有读锁, 那么写锁不能被获取
* 原因: 读写锁要确保写锁的操作对读锁可见,
* 只有当所有的读线程释放了锁, 写锁才可以被当前线程获取
* 一旦写锁被获取, 其他读写线程的后续访问都被阻塞
*/
protected final boolean tryAcqure(int acquires){
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c); //返回值为写锁的状态
if(c != 0){
//c != 0 && w == 0 写锁不存在, 但存在读锁; 或者出现了线程竞争
if(w == 0 || current != Thread.currentThread()){
return false;
}
if(w + exclusiveCount(acquires) > MAX_COUNT){
throw new Error("Maximum lock count exceeded");
}
setState(c + acquires);
return true;
}
//出现了需要阻塞写线程的情况
if(writerShouldBlock() || !compareAndSetState(c, c+acquires)){
return false;
}
//c == 0, 读写锁未被占有
setExclusiveOwnerThread(current);
return true;
}
protected final boolean tryRelease(int releases){
int c = getState() - releases;
if(Thread.currentThread() != getExclusiveOwnerThread()){
throw new IllegalMonitorStateException();
}
boolean free = false;
// c == 0 返回 true
// 表示没有写锁(并且已知没有读锁), 所以目前读写锁没有被获取
if(c == 0){
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

读锁的获取和释放

读锁是一个支持重进入的共享锁,它能够被多个线程同时获取,在没有其他写线程访问(或者写状态为0)时,读锁总会被成功地获取。
如果当前线程已经获取了读锁,增加读状态;如果当前线程在获取读锁的时,写锁已经被其他线程获取,则进入等待状态。

Java5到6变得复杂许多,由于是新增了一些功能。比如getReadHoldCount(),返回当前线程获取读锁的次数。读状态是所有线程获取读锁次数(可重入)的总和,而每个线程各自获取读锁的次数只能选择保存在ThreadLock中,使得获取读锁状态变得复杂。

下面的代码是删减版的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
protected final int tryAcqureShared(int unused){
for( ; ; ){
int c = getState();
int nextc = c + (1 << 16);
if(nextc < c){
throw new Error("Maximum lock count exceeded");
}
//写锁存在或者出现线程竞争
if(exclusiveCount(c) != 0 || owner != Thread.currentThread()){
return -1;
}
if(compareAndSetState(c, nextc)){
return 1;
}
}
}

锁降级

锁降级指的是写锁降级为读锁。如果当前线程拥有写锁,然后将其释放,最后再获取读锁,这种分段完成的过程不能称为锁降级。
锁降级指的是,把持住(当前拥有的)写锁,再获取到读锁,随后释放(先前拥有的)写锁的过程。

下面看一个锁降级的示例。因为数据不常变化,所以要多个线程可以并发的进行数据读取,当数据变更后,如果当前线程感知到数据变化,则进行数据的准备工作,同时其他线程被阻塞,直到当前线程完成数据的准备。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void processData(){
readLock.lock(); //这里的readLock和writeLock应该要是同一把锁,分开是易于标识
// update 是一个volatile修饰的boolean变量
// 此时所有访问processData()方法的线程都能够感知到变化,
// 但是只有一个线程能够获取到写锁, 其他线程的读写状态获取均被阻塞
if(!update){
//必须先释放读锁, 锁降级从写锁的获取开始
readLock.unlock(); // 1
writeLock.lock(); // 2 . 1和2操作可以设置为原子操作
try{
if(!update){
//准备数据的流程(略)
//现在数据被更新了
update = true;
}
// 获取读锁, 但肯定会被阻塞
// 目的在于在释放写锁的那一刹那, 不让其他线程获取到写锁
readLock.lock();
}finally {
writeLock.unlock();
}
}
try{
//使用数据的流程(略)
}finally {
readLock.unlock();
}
}

读锁第二次上锁的必要性:
主要是为了保证数据的可见性,如果当前线程不获取读锁,而是直接释放写锁,假设此刻另一个线程(记住线程T)获取了写锁并修改了数据,那么当前的线程是无法感知到线程T的数据更新。

LockSupport工具

当需要阻塞或者唤醒一个线程的时候,都会使用LockSupport工具类来完成相应工作。

方法名称 描述
void park() 阻塞当前线程,如果调用unpark(Thread t)方法或者被当前线程被中断,才能从park()返回
void parkNanos(long nanos) 超时返回的阻塞线程方法
void parkUntil(long deadline) 阻塞当前线程,直到deadline(从1970到deadline的时间的毫秒数)
void unpark(Thread t) 唤醒处于阻塞状态的线程t

Condition接口

任一Java对象,都拥有一组监视器方法,主要包括wait()、notify()等方法,与synchronized关键字组合,可以实现等待/通知模式。
Condition接口提供了类似Object的方法,与Lock配合可以实现等待/通知模式。

Object监视器方法和Condition接口的比较

对比项 Object Monitor Methods Condition
前置条件 获取对象的锁 调用Lock.lock获取锁,调用Lock.newCondition()获取Condition对象
调用方式 直接调用如:object.wait() 直接调用如:condition.await()
等待队列个数 一个 多个
当前线程释放锁并进入等待状态 支持 支持
当前线程释放锁并进入等待状态,在等待状态是否必须响应中断 必须响应 可以不响应中断
当前线程释放锁并进入超时等待 支持 支持
当前线程释放锁,并且超时等待直到deadline 不支持 支持
唤醒等待队列中的一个或全部线程 支持 支持

Condition接口方法

Condition的获取是依赖锁的。

Condition的实现分析

Object监视器模型上,一个对象拥有一个同步队列和等待队列。而并发包中的Lock同步器中拥有一个同步队列和多个等待队列。

点击加载

等待

调用了等待方法的线程是成功获取锁的线程,也就是同步队列中的首节点,该方法会将当前线程的节点,构造成一个新的等待节点加入等待队列,然后释放同步状态,唤醒同步队列中的后继节点,然后当前线程会进入等待状态。

从等待方法返回的线程,一定是获取了Condition相关锁的线程。

点击加载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public final void await() throws InterruptedException{
if(Thread.interrupted()){
throw new InterruptedException();
}
//当前线程加入等待队列
Node node = addConditionWaiter();
//当前线程从同步队列中释放, 即当前线程释放了锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// isOnSyncQueue 当线程在不在同步队列中, 返回false,循环
// 一旦返回true(线程从等待队列移到同步队列), 或者被打断
// 线程从等待队列中移除
while(!isOnSyncQueue(node)){
LockSupport.park(this);
if((interruptMode = checkInterruptWhileWaiting(node)) != 0){
break;
}
}
//线程被唤醒后, 进入同步队列, 加入到获取同步状态的竞争
if(acquireQueued(node, savedState) && interruptMode != THROW_IE){
interruptMode = REINTERRUPT;
}
if(node.nextWaiter != null){
unlinkCancelledWaiters(); // Unlinks cancelled waiter nodes from condition queue
}
if(interruptMode != 0){
reportInterruptAfterWait(interruptMode);
}
}

通知

点击加载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//被唤醒的线程可以从await()返回, 此时该线程已经成功地获取了锁
//调用signal方法会唤醒等待队列中等待时间最长的节点(首节点)
//在唤醒节点之前, 会将节点移到同步队列的队尾
public final void signal(){
//节点从等待队列移到同步队列的前置条件是
//当前线程必须获取到了锁
if(!isHeldExclusively){
throw new IllegalMonitorStateException();
}
Node first = firstWaiter;
if(first != null){
doSignal(first);
}
}