并发编程-CyclicBarrier的使用

前言

上一篇里CountDownLatch的使用,这一篇来学习一个和CountDownLatch功能差不多的线程同步工具类CyclicBarrier。

CyclicBarrier的概念

同样的CyclicBarrierjava.util.concurrent包下用来实现多线程之间同步的工具类,比起CountDownLatch,CyclicBarrier通过一个默认状态(所有的线程都被挂起)来作为是否继续执行的标记,当所有的线程都到达这个状态时,线程才可以继续往下执行。

CountDownLatch的用法

CountDownLatch提供了两个构造函数

1
2
3
4
5
6
//指定等待到默认状态的线程数
public CyclicBarrier(int parties) {
}
//指定所有线程达到默认状态后,会执行的操作barrierAction,该操作将由最后一个达到默认状态的线程执行
public CyclicBarrier(int parties, Runnable barrierAction) {
}

核心方法

1
2
3
4
//调用此方法的线程,实际上就是被挂起,达到默认状态
public int await() throws InterruptedException, BrokenBarrierException { };
//指定超时时间,超时后,达到默认状态的线程也会继续执行,同时会抛出异常 生产建议使用超时时间,避免死等造成内存溢出
public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException { };

来看个例子,通过CyclicBarrier实现两个线程的同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static void main(String[] args) {
final int workerNum = 2;
final CyclicBarrier barrier = new CyclicBarrier(workerNum);
for (int i =0;i<workerNum;i++){
new Worker(barrier).start();
}
}
static class Worker extends Thread{
private CyclicBarrier cyclicBarrier;
public Worker(CyclicBarrier cyclicBarrier){
this.cyclicBarrier = cyclicBarrier;
}
public void run() {
try {
System.out.println("子线程"+Thread.currentThread().getName()+"开始执行");
Thread.sleep(3000);
System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕");
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("所有线程执行完毕,继续执行其他东西...");
}
}

执行结果

1
2
3
4
5
6
子线程Thread-0开始执行
子线程Thread-1开始执行
子线程Thread-0执行完毕
子线程Thread-1执行完毕
所有线程执行完毕,继续执行其他东西...
所有线程执行完毕,继续执行其他东西...

如果不使用 cyclicBarrier.await(),执行结果:

1
2
3
4
5
6
子线程Thread-1开始执行
子线程Thread-0开始执行
子线程Thread-1执行完毕
所有线程执行完毕,继续执行其他东西...
子线程Thread-0执行完毕
所有线程执行完毕,继续执行其他东西...

从结果可以看出,通过cyclicBarrier.await(),每个线程执行完毕后,都在等待其他线程执行完毕,然后执行后续其他东西的执行。
同时我们可以通过实现barrierAction进行额外的一些操作来实现更多复杂的业务场景:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public static void main(String[] args) {
final int workerNum = 2;
final CyclicBarrier barrier = new CyclicBarrier(workerNum, new Runnable() {
public void run() {
System.out.println("当前线程:"+Thread.currentThread().getName()+"开始执行其他一些操作了");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
for (int i =0;i<workerNum;i++){
new Worker(barrier).start();
}
}
static class Worker extends Thread{
private CyclicBarrier cyclicBarrier;
public Worker(CyclicBarrier cyclicBarrier){
this.cyclicBarrier = cyclicBarrier;
}
public void run() {
try {
System.out.println("子线程"+Thread.currentThread().getName()+"开始执行");
Thread.sleep(3000);
System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕");
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("所有线程执行完毕,继续执行其他东西...");
}
}

执行结果:

1
2
3
4
5
6
7
子线程Thread-0开始执行
子线程Thread-1开始执行
子线程Thread-0执行完毕
子线程Thread-1执行完毕
当前线程:Thread-1开始执行其他一些操作了
所有线程执行完毕,继续执行其他东西...
所有线程执行完毕,继续执行其他东西...

从结果可以看出只有Thread-1执行了barrierAction,且是它完成操作后,各个线程才执行后续的操作。
从await方法注释里我们知道,是最后一个到达默认状态的线程执行了barrierAction

1
2
3
4
* Creates a new {@code CyclicBarrier} that will trip when the
* given number of parties (threads) are waiting upon it, and which
* will execute the given barrier action when the barrier is tripped,
* performed by the last thread entering the barrier.

与CountDownLatch区别

CountDownLatch主要用于一个线程与其他多个线程的同步上,barrierAction主要是用于多个线程之间的同步,同步相对简单,且能支持复杂的业务场景。

坚持原创技术分享,您的支持将鼓励我继续创作!