JUC之CyclicBarrier

CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

CyclicBarrier示例

使用“人满发车”的例子来演示CyclicBarrier:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class CyclicBarrierTest {
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(2);
System.out.println("快上车来不及解释了");

new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread() + "已上车");
barrier.await();
System.out.println("所有人已上车,发车");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}, "Jane").start();

new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread() + "已上车");
barrier.await();
System.out.println("所有人已上车,发车");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}, "Mike").start();
}
}

上面例子中我们定义了一个等待2个线程完成的CyclicBarrier,在两个线程内部调用了await方法,让其阻塞等待,并告知CyclicBarrier我已经到达屏障了。只有当两个线程都执行到barrier.await()这一行时,屏障开启,线程才会继续往下执行。程序输出如下所示:

1
2
3
4
5
快上车来不及解释了
Thread[Mike,5,main]已上车
Thread[Jane,5,main]已上车
所有人已上车,发车
所有人已上车,发车

CyclicBarrier的构造函数支持传入一个回调方法:

1
2
3
CyclicBarrier barrier = new CyclicBarrier(n, () -> {
System.out.println("当所有线程到达屏障时,执行该回调");
});

改造上面的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class CyclicBarrierTest {
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(2, new Thread(() -> {
System.out.println("发车,嘟嘟嘟");
}));

System.out.println("快上车来不及解释了");

new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread() + "已上车");
barrier.await();
System.out.println("所有人已上车");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}, "Jane").start();

new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread() + "已上车");
barrier.await();
System.out.println("所有人已上车");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}, "Mike").start();
}
}

输出如下所示:

1
2
3
4
5
6
快上车来不及解释了
Thread[Mike,5,main]已上车
Thread[Jane,5,main]已上车
发车,嘟嘟嘟
所有人已上车,发车
所有人已上车,发车

设置超时时间

await的重载方法:await(long timeout, TimeUnit unit)可以设置最大等待时长,超出这个时间屏障还没有开启的话则抛出TimeoutException

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class CyclicBarrierTest2 {
public static void main(String[] args) throws InterruptedException {
CyclicBarrier barrier = new CyclicBarrier(2);

new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(3);
barrier.await();
System.out.println(Thread.currentThread() + "继续执行");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}, "thread1").start();

new Thread(() -> {
try {
barrier.await(1, TimeUnit.SECONDS);
System.out.println(Thread.currentThread() + "继续执行");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}, "thread2").start();
}
}

QQ截图20190513104938.png

QQ截图20190513105045.png

BrokenBarrierException

抛出BrokenBarrierException异常时表示屏障破损,此时标志位broken=true。抛出BrokenBarrierException异常的情况主要有:

  1. 其他等待的线程被中断,则当前线程抛出BrokenBarrierException异常;

  2. 其他等待的线程超时,则当前线程抛出BrokenBarrierException异常;

  3. 当前线程在等待时,其他线程调用CyclicBarrier.reset()方法,则当前线程抛出BrokenBarrierException异常。

模拟第1种情况,其他等待的线程被中断,则当前线程抛出BrokenBarrierException异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class CyclicBarrierTest2 {
public static void main(String[] args) throws InterruptedException {
CyclicBarrier barrier = new CyclicBarrier(2);

new Thread(() -> {
try {
System.out.println(Thread.currentThread() + "开始执行");
TimeUnit.SECONDS.sleep(3);
barrier.await();
System.out.println(Thread.currentThread() + "继续执行");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}, "thread1").start();

Thread thread2 = new Thread(() -> {
try {
System.out.println(Thread.currentThread() + "开始执行");
TimeUnit.SECONDS.sleep(1);
barrier.await();
System.out.println(Thread.currentThread() + "继续执行");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}, "thread2");

thread2.start();
TimeUnit.SECONDS.sleep(2);
thread2.interrupt();
}
}

输出:

QQ截图20190513112642.png

上面例子中thread2线程睡眠1秒后先到达屏障点,然后进入等待状态。2秒后main线程执行thread2.interrupt()中断等待中的thread2线程,所以程序抛出BrokenBarrierException异常。3秒后thread1线程到达屏障点,此时屏障已经被破坏了,所以也抛出BrokenBarrierException异常。

模拟第2种情况:其他等待的线程超时,则当前线程抛出BrokenBarrierException异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class CyclicBarrierTest2 {
public static void main(String[] args) throws InterruptedException {
CyclicBarrier barrier = new CyclicBarrier(2);

new Thread(() -> {
try {
System.out.println(Thread.currentThread() + "开始执行");
TimeUnit.SECONDS.sleep(3);
barrier.await();
System.out.println(Thread.currentThread() + "继续执行");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}, "thread1").start();

new Thread(() -> {
try {
System.out.println(Thread.currentThread() + "开始执行");
TimeUnit.SECONDS.sleep(1);
barrier.await(1, TimeUnit.SECONDS);
System.out.println(Thread.currentThread() + "继续执行");
} catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
e.printStackTrace();
}
}, "thread2").start();
}
}

输出:

QQ截图20190513113514.png

上面例子中thread2睡眠1秒后到达屏障点,然后进入等待状态(最多等待1秒),然而因为thread1要3秒后才能到达屏障点,所以thread2将抛出TimeoutException。3秒后,thread1到达屏障点,但这时候由于thread2的await方法抛出的异常破坏了屏障,所以thread1将抛出BrokenBarrierException异常。

模拟第3中情况:当前线程在等待时,其他线程调用CyclicBarrier.reset()方法,则当前线程抛出BrokenBarrierException异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class CyclicBarrierTest2 {
public static void main(String[] args) throws InterruptedException {
CyclicBarrier barrier = new CyclicBarrier(2);

new Thread(() -> {
try {
System.out.println(Thread.currentThread() + "开始执行");
TimeUnit.SECONDS.sleep(3);
barrier.await();
System.out.println(Thread.currentThread() + "继续执行");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}, "thread1").start();

new Thread(() -> {
try {

System.out.println(Thread.currentThread() + "开始执行");
TimeUnit.SECONDS.sleep(1);
barrier.await();
System.out.println(Thread.currentThread() + "继续执行");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}, "thread2").start();

TimeUnit.SECONDS.sleep(2);
System.out.println(barrier.getNumberWaiting());
barrier.reset();
}
}

输出:

QQ截图20190513114412.png

上面例子中,thread2睡眠1秒后到达屏障点,然后进入等待状态。2秒后main线程调用reset方法重置了屏障,所以在等待状态中的thread2抛出BrokenBarrierException异常。3秒后,thread1到达屏障点,由于reset方法重置了屏障,所以thread1并不会抛出BrokenBarrierException异常,而是一直在屏障点进行等待别的线程到达屏障点。

从上面的三个例子中可以看到,无论是哪种情况导致屏障破坏,屏障点后面的代码都没有被执行,main方法也没有退出。

和CountDownLatch区别

  1. CountDownLatch:一个线程(或者多个),等待另外N个线程完成某个事情之后才能执行;CyclicBarrier:N个线程相互等待,任何一个线程完成之前,所有的线程都必须等待。

  2. CountDownLatch:一次性的;CyclicBarrier:可以重复使用。

请作者喝瓶肥宅水🥤

0