JUC之Semaphore

JUC的Semaphore俗称信号量,可用来控制同时访问特定资源的线程数量。通过它的构造函数我们可以指定信号量(称为许可证permits可能更为明确)的数量,线程可以调用Semaphore对象的acquire方法获取一个许可证,调用release来归还一个许可证。

下面举个Semaphore的基本使用示例。

Semaphore示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class SemaphoreTest {

public static void main(String[] args) throws InterruptedException {
// 定义许可证数量
final Semaphore semaphore = new Semaphore(2);

IntStream.range(0, 4).forEach(i -> {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "开始");
try {
semaphore.acquire(); // 一次拿一个许可证
System.out.println(Thread.currentThread().getName() + "获取许可证");
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName() + "释放许可证");
semaphore.release();
}
System.out.println(Thread.currentThread().getName() + "结束");
}, "thread" + (i + 1)).start();
});
}
}

上面的例子中,我们定义许可证的数量为2个,然后4个线程通过acquire方法去获取许可证,结束行通过release方法释放许可证。acquire方法默认一次只拿一个许可证,所以上面的例子中,同一时刻最多只有两个线程同时执行。

程序输出如下所示: asfasdfa.gif

acquire的重载方法acquire(int permits)允许线程一次性获取N个许可证;同样的release的重载方法release(int permits)允许线程一次性释放N个许可证。

Semaphore还有一个tryAcquire,它允许线程尝试去获取1个许可证,如果许可证不足没有获取到的话,线程也会继续执行,而非阻塞等待。tryAcquire方法的重载方法tryAcquire(long timeout, TimeUnit unit)可以指定尝试获取许可证的超时时间。

acquireUninterruptibly

从上面的例子我们会发现acquire方法会抛出InterruptedException异常,说明这个方法是可以被打断的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class SemaphoreTest {

public static void main(String[] args) throws InterruptedException {
final Semaphore semaphore = new Semaphore(1);
Thread thread1 = new Thread(() -> {
try {
semaphore.acquire(2);
} catch (InterruptedException e) {
System.err.println("semaphore InterruptedException");
e.printStackTrace();
}
});

thread1.start();
TimeUnit.MICROSECONDS.sleep(500);
thread1.interrupt();
}
}

上面例子thread1线程获取2个许可证,但许可证总数只有1个,所以会阻塞等待。main线程通过调用thread1的interrupt方法去打断thread1线程,结果如下:

QQ截图20190515164441.png

而通过acquireUninterruptibly方法去获取许可证是不可被打断的:

1
2
3
4
5
6
7
8
9
10
11
12
public class SemaphoreTest {

public static void main(String[] args) throws InterruptedException {
final Semaphore semaphore = new Semaphore(1);
Thread thread1 = new Thread(() -> {
semaphore.acquireUninterruptibly(2);
});
thread1.start();
TimeUnit.MICROSECONDS.sleep(500);
thread1.interrupt();
}
}

上面程序并不会抛出InterruptedException,thread1会一直处于阻塞状态。

drainPermits

drainPermits方法一次性获取所有许可证(drain抽干榨干😮):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class SemaphoreTest {
public static void main(String[] args) throws InterruptedException {
final Semaphore semaphore = new Semaphore(5);

new Thread(() -> {
System.out.println("availablePermits: " + semaphore.availablePermits());
semaphore.drainPermits(); // 获取所有许可证,抽干
System.out.println("availablePermits: " + semaphore.availablePermits());
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
semaphore.release(5);
System.out.println(Thread.currentThread().getName() + "结束");
}, "thread1").start();

}
}

availablePermits方法用于获取当前可用许可证数量的预估值。程序输出如下:

QQ截图20190515164951.png

别的API

hasQueuedThreads方法用于判断是否有处于等待获取许可证状态的线程;getQueueLength用于获取处于等待获取许可证状态的线程的数量;getQueuedThreads用于获取处于等待获取许可证状态的线程集合。

getQueuedThreadsprotected的,所以要使用它,我们得自定义一个Semaphore的子类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class SemaphoreTest {

public static void main(String[] args) {
// 定义许可证数量
final MySemaphore semaphore = new MySemaphore(1);

IntStream.range(0, 4).forEach(i -> {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "开始");
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "获取许可证");
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName() + "释放许可证");
semaphore.release();
}
System.out.println(Thread.currentThread().getName() + "结束");
}, "thread" + (i + 1)).start();
});

while (true) {
if (semaphore.hasQueuedThreads()) {
System.out.println("等待线程数量:" + semaphore.getQueueLength());
Collection<Thread> queuedThreads = semaphore.getQueuedThreads();
System.out.println("等待线程:" + queuedThreads.stream().map(Thread::getName).collect(Collectors.joining(",")));
}
}
}

static class MySemaphore extends Semaphore {

private static final long serialVersionUID = -2595494765642942297L;

public MySemaphore(int permits) {
super(permits);
}

public MySemaphore(int permits, boolean fair) {
super(permits, fair);
}

public Collection<Thread> getQueuedThreads() {
return super.getQueuedThreads();
}
}
}

程序输出如下所示(截取一部分): QQ截图20190515165623.png

Api总结

总结下Semaphore常用的方法:

方法描述
acquire()获取一个许可证,可以被打断,没有足够的许可证时阻塞等待
acquire(int permits)获取指定数量的许可证,可以被打断,没有足够的许可证时阻塞等待
acquireUninterruptibly()获取一个许可证,不可被打断,没有足够的许可证时阻塞等待
acquireUninterruptibly(int permits)获取指定数量的许可证,不可被打断,没有足够的许可证时阻塞等待
tryAcquire()尝试获取一个许可证,没有足够的许可证时程序继续执行,不会被阻塞
tryAcquire(int permits)尝试获取指定数量的许可证,没有足够的许可证时程序继续执行,不会被阻塞
tryAcquire(long timeout, TimeUnit unit)在指定的时间范围内尝试获取1个许可证,没有足够的许可证时程序继续执行,
不会被阻塞,在该时间方位内可以被打断
tryAcquire(int permits, long timeout, TimeUnit unit)在指定的时间范围内尝试获取指定数量的许可证,没有足够的许可证时程序
继续执行,不会被阻塞,在该时间方位内可以被打断
release()释放一个许可证
drainPermits()一次性获取所有可用的许可证
availablePermits()获取当前可用许可证数量的预估值
hasQueuedThreads()判断是否有处于等待获取许可证状态的线程
getQueueLength()获取处于等待获取许可证状态的线程的数量的预估值
getQueuedThreads()获取处于等待获取许可证状态的线程集合
请作者喝瓶肥宅水🥤

0