Java并发编程的艺术-学习笔记-6

Java并发容器和框架

CurrentHashMap的实现原理和使用

ConcurrentHashMap是线程安全又高效的HashMap

线程不安全的HashMap
在多线程的环境下,使用HashMap进行put操作可能导致死循环。
由于哈希桶的存在,在多线程竞争的条件下,可能导致Entry链表形成环形的数据结构,一旦形成环形数据结构,Entry的next节点永远不为空,就会产生死循环获取Entry。

效率低下的HashTable
HashTable容器使用synchronized来保证线程安全,当一个线程访问HashTable的同步方法,其他线程也要访问同步方法的时候,会进入阻塞或轮询状态。

锁分段技术提高并发访问效率
HashTabe在高并发访问时的效率低下的原因是,所有访问HashTable的线程都必须竞争同一把锁。
锁分段技术:首先将数据分成一段段的存储,然后把每一段数据分配一把锁,当一个线程占用锁访问其中它管理的数据段,其他段的数据可以被其他线程访问。

ConcurrentHashMap的结构

一个Segment其实就是一个类Hash Table的数据结构,Segment内部维护了一个链表数组,也就是Segment里面有哈希桶,桶里的元素使用链表排序。

点击加载

ConcurrentHashMap定位一个元素的过程需要进行两次Hash操作,第一次Hash定位到Segment,第二次Hash定位到元素所在链表的头部。因此这种结构带来的副作用是Hash过程比普通的HashMap要长,但是带来的好处是写操作的时候只对元素所在的Segment进行加锁即可,不会影响到其他的Segment。这样在最理想的情况下,ConcurrentHashMap可以最高同时支持Segment数量大小的写操作(这些写操作刚好被平均分布在所有Segment上)。

ConcurrentHashMap的初始化

初始化segments数组

1
2
3
4
5
6
7
8
9
10
11
12
13
if(concurrencyLevel > MAX_SEGMENTS){
concurrencyLevel = MAX_SEGMENTS; //最大值为65535
}
int sshift = 0;
int ssize = 1;
while(ssize < concurrencyLevel){
++sshift;
ssize <<= 1; //最大值为65536
}
segmentShift = 32 - sshift; //段偏移量
segmentMask = ssize -1; //段掩码
this.segments = Segment.newArray(ssize);

初始化segmentShift和segmentMask

segmentShift和segmentMask在定位Segment的散列算法中用到。
shift等于ssize从1往左移位的次数,在默认情况下concurrencyLevel等于16,1往左移4次,所以sshift等于4,故segmentShift = 32 -4 = 28。

定位Segment

ConcurrentHashMap使用对元素的hashCode进行散列操作,定位到Segment,接着再散列,目的是减少散列冲突,使元素能够均匀地分布在不同的Segment上,提高并发效率。

初始化每个segment

输入参数initialCapacity是ConcurrentHashMap的初始化容量,loadFoctor是每个segment的负载因子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
if(initialCapacity > MAXIMUM_CAPACITY){
initialCapacity = MAXIMUM_CAPACITY;
}
int c = initialCapacity / ssize;
if(c * ssize < initialCapacity){
++c;
}
int cap = 1;
//cap就是segment里HashEntry数组的长度
//默认情况下 initialCapacity == 16, loadFactor == 0.75, cap == 1
while(cap < c){
cap <<= 1;
}
//初始化segments[] 数组
for(int i = 0; i < this.segments.length; ++i){
this.segments[i] = new Segment<K, V>(cap, loadFactor);
}

ConcurrentHashMap的操作

get操作

get操作不用加锁(除非读到的值是空才会加锁重读)是高效的原因之一,实现原理如下:
在ConcurrentHashMap的get()方法中,它将要使用到的共享变量都定义为volatile变量。它能保证变量在线程之间的可见性,能被多个线程同时读,但只能被单线程写(有一种情况可以被多线程写, 写入的值不依赖原值)。在get()方法中,只需要读而不需要写共享变量count和value,所以可以不用加锁。

1
2
transient volatile int count;
volatile V value;

put操作

由于put操作里需要对共享变量进行写操作,为了线程安全必须加锁。
put()方法首先定位到Segment,然后在Segment里进行插入操作。插入操作需要经历两个步骤,第一步判断是否需要对Segment里的HashEntry数组扩容,第二步定位添加元素的位置,然后将其放在HashEntry数组的链表中。

扩容
插入元素之前会先判断Segment里的HashEntry数组是否超过容量,再进行扩容;
而HashTable是在插入元素之后,再判断是否达到扩容标准,如果之后再也没有元素插入到该哈希桶,造成HashTable进行一次无效的扩容。

ConcurrentHashMap不会对整个容器进行扩容,而只对某个segment进行扩容。

ConcurrentLinkedQueue

线程安全的队列
阻塞算法:用一个锁(入队和出队共一把锁)或者两个锁(出入队锁不同)
非阻塞算法:循环CAS方式

ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,采用FIFO的规则对节点进行排序。

ConcurreentLinkedQueue的结构

队列有一个头节点和一个尾节点,头节点不存线程相关信息,只有下一个节点next的指针。默认情况下,head节点存储的元素为空,tail节点等于head节点。在队列非空的情况下,队列的尾节点就是tail。

1
private transient volatile Node<E> tail = head;

入队

入队的过程

点击加载

入队主要做两件事情,第一是将入队节点设置为当前队列尾节点的一个节点;第二是更新tail节点,如果tail节点的next不为空,则将入队节点设置为tail节点,如果tail节点的next节点为空,则将入队节点设置为tail的next节点,所以tail节点不总是尾节点

入队方法永远返回true,所以不要通过返回值判断入队是否成功。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private static final int HOPS = 1;
public boolean offer(E e){
if(e == null){
throw new NullPointerException();
}
//创建准备入队的节点
Node<E> node = new Node<E>(e);
retry:
//死循环, 入队不成功反复入队
for( ; ; ){
Node<E> t = tail; //创建一个指向tail节点的引用
Node<E> p = t; //p用来表示队列的尾节点, 默认情况下等于tail
for(int hops = 0; ; hops++){
Node<E> next = succ(p);
// next != null说明p不是尾节点, 需要更新p后, 在将它指向next节点
if(next != null){
//默认情况下 HOPS == 1
//循环了两次及其以上, 并且当前节点还是不等于尾节点
if(hops > HOPS && t != null){
continue retry;
}
p = next;
}
//如果p是尾节点, 则设置p的next节点为入队节点
//
else if(p.casNext(null, n)){
if(hops >= HOPS){
casTail(t, n);
}
return true;
}
else{
//p有next节点, 表示p的next节点有可能是尾节点
p = succ(p);
}
}
}
}

定位尾节点

1
2
3
4
5
6
//找p节点下一节点
//当 p == next 成立, 说明队列为空, 返回头节点即可
final Node<E> succ(Node<E> p){
Node<E> next = p.getNext();
return (p == next) ? head : next;
}

由于tail节点并不总是队列尾节点,所以每次入队都必须先通过tail节点来找到尾节点。

设置入队节点为尾节点

1
p.casNext(null, n)

用于将入队节点设置为当前队列尾节点的next节点,如果p是null,表示p是当前队列的尾节点,如果不是null,表示有其他线程更新了尾节点,则需要重新获取当前队列的尾节点。

HOPS的设计意图

下面是入队操作的简单理解版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public boolean easyUnderstandOffer(E e){
if(e == null){
throw new NullPointerException();
}
Node<E> node = new Node<E>(e);
for( ; ; ){
Node<E> t = tail;
//每次都假设tail作为队列尾节点
//判断失败就是有其他线程竞争, 该线程重新入队即可
if(t.casNext(null, node) && casTail(t, node)){
return true;
}
}
}

这种做法代码量少,而且逻辑非常清晰和易懂。但是它的缺点是每次都要使用循环更新CAS更新tail节点。如果能够减少CAS更新tail节点的次数,就能提高入队效率。
使用hops变量来控制并减少tail节点的更新频率,并不是每次节点入队后都将tail节点更新为尾节点,而是当tail和尾节点的距离大于等于HOPS(默认等于1)时才更新tail节点,tail和尾节点距离越长,使用CAS更新tail节点的次数就会越少,但是距离越长带来的负面效果就是每次入队时定位尾节点的时间就越长,因为循环体需要多循环一次来定位出尾节点。
但是这样依然可以提高入队效率,本质上来看它通过增加对volatile的读操作来减少对volatile变量的写操作,而写操作的开销要远大于读操作。

出队

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public E poll(){
Node<E> h = head;
// p表示头节点, 需要出队的节点
Node<E> p = h;
for(int hops = 0; ; ++ hops){
E item = p.getItem();
//如果p节点的元素不为空, 使用CAS设置p节点引用的元素为null
//如果成功返回p节点的元素
if(item != null && p.casItem(item, null)){
if(hops >= HOPS){
//将p节点一个节点设置为head节点
Node<E> q = p.getNext();
updateHead(h, (q != null) ? q : p);
}
return item;
}
//如果头节点元素为空或者头节点发生了变化, 说明头节点已经被另外一个线程修改了
//那么获取p节点的一个节点
Node<E> next = succ(p);
//如果p的下一个节点也为空, 说为这个队列已经空了
if(next == null){
//更新头节点
updateHead(h, p);
break;
}
//如果下一个元素不为空, 则将头节点的一个节点设置为头节点
p = next;
}
return null;
}

Java中的阻塞队列

概念介绍

阻塞队列是一个支持两个附加操作的队列

  1. 支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
  2. 支持阻塞的移除方法:意思是当队列空时,获取元素的线程会等待队列变为非空。
方法/处理方式 抛出异常 返回特殊值 一直阻塞 超时等待
插入方法 add(e) offer(e) put(e) offer(e, time, unit)
移除方法 remove(e) poll(e) take() poll(time. unit)
检查方法 element() peek() 不可用 不可用

抛出异常:IllegalStateException(“Queue Full”)/NoSuchElementException
返回特殊值:插入成功返回true/取出失败返回null
一直阻塞:可响应中断退出

在无界阻塞队列中,队列永远不会满,所以所有使用put和offer方法永远不会被阻塞,而且使用offer方法永远返回true。

7种阻塞队列

1. ArrayBlockingQueue

一个用数组实现的有界阻塞队列。按照FIFO的原则对元素进行排序。
默认情况下不保证线程公平的访问队列,所谓公平访问队列是指阻塞的线程,可以按照阻塞的先后顺序访问队列,即先阻塞线程先访问队列。非公平性是对先等待的线程是非公平的,当队列可用时,阻塞的线程都可以争夺访问队列的资格。
为了保证公平性,通常会降低吞吐量。

当传递fair参数为true创建一个公平的阻塞队列

1
2
3
4
5
6
7
8
9
10
//访问的公平性是使用可重入锁实现的
public ArrayBlockingQueue(int capacity, boolean fair){
if(capacity <= 0){
throw new IllegalArgumentException();
}
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newConditon();
}

2. LinkedBlockingQueue

链表实现的有界阻塞队列。默认和最大的长度为Integer.MAX_VALUE。队列元素使用FIFO进行排序。

3. PriorityBlockingQueue

支持优先级别的无界阻塞队列。默认情况元素采用自然顺序生序。也可以自定类实现compareTo()方法指定排序规则,或者初始化PriorityBlockQueue时,指定构造参数Comparator来对元素进行排序。不能保证同优先级别元素的顺序。

4. DelayQueue

支持延时获取元素的无界阻塞队列。队列使用PriorityQueue实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。

运用场景

1.缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DealyQueue,一旦能从DealyQueue中获取元素,表示缓存有效期到了。

2.定时任务调度:使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就可以执行。比如TimerQueue就是使用DelayQueue实现的。

DelayQueue的运用实例

以下内容参考博客孙振超
设计一个具有过期时间的缓存

要求:
1.当向缓存中添加key-value对时,如果这个key在缓存中,还没有过期,需要用这key对应的更新时间。
2.为了能够让DelayQueue将保存的过期键删除,需要重写实现Delayed接口添加到DelayQueue的实例类的hashCode函数和equals函数
3.当缓存关闭,监控时间线程也应该关闭,因而监控线程应当作为守护线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
class DelayedItem<T> implements Delayed {
private T t;
private long liveTime; //存活的时间
private long removeTime; //设置过期时间
public DelayedItem(T t, long liveTime){
this.t = t;
this.liveTime = liveTime;
this.removeTime = TimeUnit.NANOSECONDS.convert(liveTime, TimeUnit.NANOSECONDS) + System.nanoTime();
}
//返回值大于0表示还在时间以内
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(removeTime - System.nanoTime(), unit);
}
@Override
@SuppressWarnings("uncheck")
public int compareTo(Delayed o) {
if(o == null){
return 1;
}
if(o == this){
return 0;
}
if(o instanceof DelayedItem){
DelayedItem<T> delayedItem = (DelayedItem<T>)o;
if(liveTime > delayedItem.liveTime){
return 1;
}
else if (liveTime < delayedItem.liveTime){
return -1;
}
else{
return 0;
}
}
long diff = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
return diff > 0 ? 1 : ((diff < 0) ? -1 : 0);
}
@Override
public int hashCode(){
return t.hashCode();
}
@Override
public boolean equals(Object o){
if(o instanceof DelayedItem){
return o.hashCode() == this.hashCode();
}
return false;
}
public T getT(){
return this.t;
}
}
class Cache<K, V>{
public ConcurrentHashMap<K, V> map = new ConcurrentHashMap<>();
public DelayQueue<DelayedItem<K>> queue = new DelayQueue<>();
public Cache(K k, V v, long liveTime){
Thread t = new Thread(){
@Override
public void run(){
}
};
t.setDaemon(true);
t.start();
}
public void dameonCheckOverdueKey(){
for( ; ; ){
DelayedItem<K> delayedItem = queue.poll();
if(delayedItem != null){
map.remove(delayedItem.getT());
System.out.println(System.nanoTime() + " remove " + delayedItem.getT() + " from cache");
}
try{
TimeUnit.SECONDS.sleep(1);
}catch (Exception e){
e.getStackTrace();
}
}
}
public void put(K k, V v, long liveTime){
//put()返回旧值
V v2 = map.put(k, v);
DelayedItem<K> tmpItem = new DelayedItem<>(k, liveTime);
if(v2 != null){
queue.remove(v2);
}
queue.put(tmpItem);
}
}

5. SynchronousQueue

一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。
SynchronousQueue可以看成是一个传球手,负责把生产者线程处理数据直接传递给消费者线程。队列本身不存储任何元素,非常适合传递性场景。

6. LinkedTransferQueue

一个链表结构组成的无界阻塞队列。
transfer()方法
如果当前有消费者正在等待接受元素(消费者使用take()方法或者带时间限制的poll()方法时),transfer方法可以把生产者传入的元素立刻传输给消费者。如果没有消费者等待接受元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回。

tryTransfer()方法

试探生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,立刻返回false。
tryTransfer还有接受时间限制的参数。

7. LinkedBlockingDeque

链表构成的双向阻塞队列。
在初始化队列的同时可以设置容量防止其过度膨胀。双向阻塞队列可以运用在”工作窃取”模式中。

阻塞队列的实现原理

使用通知模式实现。所谓通知模式,就是当生产者往满的队列里添加元素时会阻塞住生产者,当消费者消费了一个队列中的元素后,会通知生产者可以添加元素。

下面是ArrayBlockingQeueu的部分源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private final Condition notFull;
private final Condition notEmpty;
public ArrayBlockingQueue(int capacity, boolean fair){
//省略其他代码
notFull = lock.newCondition();
notEmpty = lock.newCondition();
}
public void put(E e) throws InterruptedException{
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try{
while(count == items.length){
notFull.await();
}
insert(e); //插入值, 并且通知notEmpty
}finally {
lock.unlock();
}
}
public E take() throws InterruptedException{
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try{
while(count == 0){
notEmpty.await();
}
return dequeue(); //返回值, 并且通知notFull
}finally {
lock.unlock();
}
}

Fork/Join框架

Fork/Join的运行流程图

点击加载

工作窃取算法

工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。
那么为什么需要使用工作窃取算法呢?假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。
而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

优点:充分利用线程进行并行计算,减少了线程之间的竞争
缺点:在某些情况下还是存在竞争,比如双端队列中只有一个任务时,并且该算法会消耗了更多的系统资源,比如创建多个线程和多个双端队列。

Fork/Join框架的设计

步骤1. 分割任务。首先我们需要有一个fork类来把大任务分割成子任务,直到足够小。
步骤2. 执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列中获取任务执行。子任务执行完成的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。

Fork/Join使用两个类来完成以上两个事情。
1.ForkJoinTask:首先要创建一个ForkJoin任务,它提供在任务中执行fork()和join()操作的机制。通常情况下继承ForkJoinTask的子类。
RecursiveAction:用于没有返回结果的任务
RecursiveTask:用于有返回结果的任务

2.ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行。

任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时候,它会随机从其他工作线程的队列尾部获取一个任务。

使用Fork/Join框架的一个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package com.myConcurrent.Chapter6;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
public class CountTask extends RecursiveTask<Integer>{
private static final int THRESHOLD = 2; //阈值
private int start;
private int end;
public CountTask(int start, int end){
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
//如果任务足够小就计算任务
boolean canCompute = (end - start) <= THRESHOLD;
if(canCompute){
for(int i = start; i <= end; ++i){
sum += i;
}
}
else{
//任务大于设定的阈值, 分割任务
int middle = (start + end) / 2;
CountTask leftTask = new CountTask(start, middle);
CountTask rightTask = new CountTask(middle + 1, end);
//执行子任务
leftTask.fork();
rightTask.fork();
//等待子任务执行完毕, 得到其结果
int leftResult = leftTask.join();
int rightResult = rightTask.join();
sum = leftResult + rightResult;
}
return sum;
}
public static void main(String[] args){
ForkJoinPool forkJoinPool = new ForkJoinPool();
CountTask countTask = new CountTask(1, 4);
Future<Integer> result = forkJoinPool.submit(countTask);
try{
System.out.println(result.get());
}catch (InterruptedException ie){
}catch (ExecutionException ee){
}
}
}