Java并发编程的艺术-学习笔记-8

Java中的并发工具类

等待多线程完成的CountDownLatch

CountDownLatch允许一个或多个线程等待其他线程完成操作。

假设有如下需要:在一个Excel表有多个sheet数据,每一个线程处理一个sheet的数据。待所有数据处理完毕,主线程发出结束消息。

使用join方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class EasyJoinTest {
static volatile int Count = 0;
static class WorkThread implements Runnable{
int i = ++Count;
@Override
public void run() {
try {
System.out.println("Thread: " + i + " start");
TimeUnit.SECONDS.sleep(1);
}catch (Exception e){
} finally {
System.out.println("Thread: " + i +" finished");
}
}
}
public static void main(String[] args) throws InterruptedException{
Thread t1 = new Thread(new WorkThread());
Thread t2 = new Thread(new WorkThread());
t1.start();
t2.start();
//主线程等待t1, t2线程退出才能退出
t1.join();
t2.join();
System.out.println("all done");
}
}

使用更加强大的CountDownLatch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class CountDownLatchTest {
static volatile int Count = 0;
static CountDownLatch countDownLatch = new CountDownLatch(2);
static class WorkThread implements Runnable{
int i = ++Count;
@Override
public void run() {
try {
System.out.println("Thread: " + i + " start");
TimeUnit.SECONDS.sleep(1);
}catch (Exception e){
} finally {
System.out.println("Thread: " + i +" finished");
countDownLatch.countDown();
}
}
}
public static void main(String[] args) throws InterruptedException{
Thread t1 = new Thread(new WorkThread());
Thread t2 = new Thread(new WorkThread());
Thread t3 = new Thread(new WorkThread());
t1.start();
t2.start();
t3.start();
//由于countDownLatch在初始化是等待2个countDown()方法的调用
//所以可能并不会等待线程3完成 而是先输出 all done, 再输出Thread: 3 finished
countDownLatch.await();
System.out.println("all done");
}

await可以设置超时等待,在主线程超时放弃等待countDown的计数到0

同步屏障CyclicBarrier

让一组线程到达一个屏障(也可以叫做同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被拦截的线程才能继续运行。

运行的线程会被阻塞在 barrier.await() 处。

构造函数:

1
2
3
4
5
CyclicBarrier(int parties); //等待parties个线程到达
//在有线程到达屏障的时候执行barrierAction线程
//barrier线程在其他线程到达屏障的时候执行
CyclicBarrier(int parties, Runnable barrierAction);

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class CyclicBarrierTest {
static CyclicBarrier barrier = new CyclicBarrier(2, new PriorityThread());
static class PriorityThread implements Runnable{
@Override
public void run() {
System.out.println("this is priority thread");
}
}
static class NormalThread implements Runnable{
static volatile int count = 0;
@Override
public void run(){
int c = ++count;
System.out.println("thread: " + c + " is started and sleep 1s");
try{
TimeUnit.SECONDS.sleep(1);
barrier.await(); //线程会被阻塞在这里
}catch (Exception e){
}finally {
System.out.println("thread " + c + " is over");
}
}
}
public static void main(String[] args){
Thread t1 = new Thread(new NormalThread());
Thread t2 = new Thread(new NormalThread());
t1.start();
t2.start();
}
}

CyclicBarrier和CountDownLatch的区别

CountDwonLatch的计数器只能使用一次,而CyclicBarrier的计数器可是使用reset()方法重置。所以CyclicBarrier能处理更为复杂的业务场景。例如,计算发生错误,可以重置计数器,并让线程重新执行一次。

CyclicBarrier还提供以下方法

方法 描述
void reset() 重置计数器,不是初始值,而是剩余的计数值
int getNumberWaiting() 返回CyclicBarrier阻塞的线程的线程数量
boolean isBroken() 了解阻塞的线程是否被中断

控制并发线程数的Semaphore

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。

应用场景
Semaphore可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接。加入有一个需求,需要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程并发地读取,但是如果读到内存后,还需要存储都数据库中,而数据库的连接数只有10个,这时我们必须控制只有10个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,就可以使用Semaphore来做流量控制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class SemaphoreTest {
static Semaphore semaphore = new Semaphore(2);
static ThreadPoolExecutor pool = new ThreadPoolExecutor(20, 20, 20, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(20));
static volatile int cnt = 0;
static class MyThread implements Runnable{
private int c = ++cnt;
@Override
public void run(){
try{
semaphore.acquire();
System.out.println("task " + c + " is running, the number of waiting queue: " + semaphore.getQueueLength());
TimeUnit.SECONDS.sleep(3);
semaphore.release();
}catch (Exception e){
}
}
}
public static void main(String[] args) throws InterruptedException{
for(int i = 0; i < 20; ++i){
Thread t = new Thread(new MyThread());
pool.execute(t);
}
pool.shutdown();
}
}
方法 描述
public Semaphore(int permits) 最大并发数为permits
void acquire() 获取一个许可证, 没有获取到就阻塞
void release() 使用完之后记得归回许可证
int availablePermits() 返回当前可以的许可证
int getQueueLength() 返回正在等待获取许可证的线程数
boolean hasQueuedThreads() 是否有线程正在等待获取许可证

线程间交换数据的Exchanger

Exchanger是一个用于线程间协作的工具类,可以用来进行线程间的数据交换。它提供一个同步点,在这个同步点,两个可以交换彼此的数据。如果第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产的数据传递给对方。

运用场景

Exchanger可以用于校对工作。

下面代码展示两个线程一直工作直至取到相同的随机数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class ExchangerTest {
static Exchanger exchanger = new Exchanger();
static Random random = new Random(47);
static class MyThread implements Runnable{
@Override
public void run(){
try {
for( ; ;) {
int exchangerNumber = random.nextInt(10);
int i = (Integer) exchanger.exchange(exchangerNumber);
System.out.println("my number: " + exchangerNumber + ", your number: " + i);
if( i == exchangerNumber){
break;
}
}
}catch (Exception e){
}
}
}
public static void main(String[] args){
int i = 0;
Thread t1 = new Thread(new MyThread());
Thread t2 = new Thread(new MyThread());
t1.start();
t2.start();
}
}

如果两个线程一直有一个没有执行exchange()方法,则会一直等待,如果担心有特殊情况发生,避免一直等待,可以使用exchange(Object o, long timeout, TimeUnit unit)设置最大等待时长。