JUC之CountDownLatch

CountDownLatch允许一个或多个线程等待其他线程完成操作。定义CountDownLatch的时候,需要传入一个正数来初始化计数器(虽然传入0也可以,但这样的话CountDownLatch没什么实际意义)。其countDown方法用于递减计数器,await方法会使当前线程阻塞,直到计数器递减为0。所以CountDownLatch常用于多个线程之间的协调工作。

CountDownLatch示例

假设我们现在有这样一个需求:

  1. 从数据库获取数据

  2. 对这批数据进行处理

  3. 保存这批数据

为了让程序执行效率更高,第2步中我们可以使用多线程来并行处理这批数据,大致过程如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class CountDownLatchTest {

public static void main(String[] args) throws InterruptedException {
// 1. 模拟从数据库获取数据
int[] data = query();
System.out.println("获取数据完毕");

// 2. 数据处理
IntStream.range(0, data.length).forEach(i -> {
ExecutorService.execute(() -> {
System.out.println(Thread.currentThread() + "处理第" + (i + 1) + "条数据");
int value = data[i];
if (value % 2 == 0) {
data[i] = value * 2;
} else {
data[i] = value * 10;
}
});
});

System.out.println("所有数据都处理完了");
// 关闭线程池
ExecutorService.shutdown();
// 3. 保存数据
save(data);
}
private static int[] query() {
return new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
}
private static void save(int[] data) {
System.out.println("保存数据 - " + Arrays.toString(data));
}
}

由于线程获取CPU时间片的不确定性,所以有可能数据还没有处理完毕,第3步就执行完了:

1
2
3
4
5
6
7
8
9
10
11
12
13
获取数据完毕
所有数据都处理完了
保存数据 - [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Thread[pool-1-thread-2,5,main]处理第2条数据
Thread[pool-1-thread-1,5,main]处理第1条数据
Thread[pool-1-thread-2,5,main]处理第3条数据
Thread[pool-1-thread-1,5,main]处理第4条数据
Thread[pool-1-thread-1,5,main]处理第6条数据
Thread[pool-1-thread-2,5,main]处理第5条数据
Thread[pool-1-thread-1,5,main]处理第7条数据
Thread[pool-1-thread-1,5,main]处理第9条数据
Thread[pool-1-thread-2,5,main]处理第8条数据
Thread[pool-1-thread-1,5,main]处理第10条数据

我们可以借助CountDownLatch解决这个问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class CountDownLatchTest {
private static ExecutorService ExecutorService = Executors.newFixedThreadPool(2);
private static CountDownLatch latch = new CountDownLatch(10);

public static void main(String[] args) throws InterruptedException {
// 1. 模拟从数据库获取数据
int[] data = query();
System.out.println("获取数据完毕");

// 2. 数据处理
IntStream.range(0, data.length).forEach(i -> {
ExecutorService.execute(() -> {
System.out.println(Thread.currentThread() + "处理第" + (i + 1) + "条数据");
int value = data[i];
if (value % 2 == 0) {
data[i] = value * 2;
} else {
data[i] = value * 10;
}
latch.countDown();
});
});

latch.await();
System.out.println("所有数据都处理完了");
// 关闭线程池
ExecutorService.shutdown();
// 3. 保存数据
save(data);
}
private static int[] query() {
return new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
}
private static void save(int[] data) {
System.out.println("保存数据 - " + Arrays.toString(data));
}
}

我们定义了一个CountDownLatch,计数器值为10,和数据量一致。然后在第2步中,当每个线程执行完毕的时候调用countDown方法,让计数器减1。在第3步前调用await方法让main线程阻塞等待,直到计数器被减为0。所以这就保证了只有当所有数据加工完毕才执行保存数据操作。

执行方法,程序输出如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
获取数据完毕
Thread[pool-1-thread-1,5,main]处理第1条数据
Thread[pool-1-thread-1,5,main]处理第3条数据
Thread[pool-1-thread-1,5,main]处理第4条数据
Thread[pool-1-thread-1,5,main]处理第5条数据
Thread[pool-1-thread-1,5,main]处理第6条数据
Thread[pool-1-thread-1,5,main]处理第7条数据
Thread[pool-1-thread-1,5,main]处理第8条数据
Thread[pool-1-thread-1,5,main]处理第9条数据
Thread[pool-1-thread-1,5,main]处理第10条数据
Thread[pool-1-thread-2,5,main]处理第2条数据
所有数据都处理完了
保存数据 - [10, 4, 30, 8, 50, 12, 70, 16, 90, 20]

await有重载方法:await(long timeout, TimeUnit unit),设置最大等待时间,超过这个时间程序将继续执行不再被阻塞:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(4);
System.out.println(Thread.currentThread() + "线程执行完毕");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown();
}
}, "thread1").start();

latch.await(3, TimeUnit.SECONDS); // 最多等待 3秒
System.out.println("main线程执行完毕");
}
}

输出如下:

1
2
main线程执行完毕
Thread[thread1,5,main]线程执行完毕

请作者喝瓶肥宅水~

TOP