JUC之Exchanger

JUC中的Exchanger允许成对的线程在指定的同步点上通过exchange方法来交换数据。如果第一个线程先执行exchange方法,它会一直等待第二个线程也 执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将当前线程生产 出来的数据传递给对方。

Exchanger示例

两个线程通过Exchanger交换数据的简单示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class ExchangerTest {
public static void main(String[] args) {
final Exchanger<String> exchanger = new Exchanger<>();

new Thread(() -> {
System.out.println("thread1开始");
try {
String exchange = exchanger.exchange("来自thread1的数据");
System.out.println("接收thread2发送的数据:" + exchange);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("thread1结束");
}, "thread1").start();

new Thread(() -> {
System.out.println("thread2开始");
try {
String exchange = exchanger.exchange("来自thread2的数据");
System.out.println("接收thread1发送的数据:" + exchange);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("thread2结束");
}, "thread2").start();
}
}

在定义Exchanger的时候需要指定交换的数据类型,这里为String类型。exchange方法用于向另一个线程发送数据,方法的返回值为另一个线程发送过来的数据。上面例子输出如下:

1
2
3
4
5
6
thread1开始
thread2开始
接收thread2发送的数据:来自thread2的数据
thread1结束
接收thread1发送的数据:来自thread1的数据
thread2结束

上面说过,只有当成对的线程都到达同步点的时候,才会执行数据交换操作。现在我们让thread2休眠一会儿,看看thread1是否会进入等待:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class ExchangerTest {
public static void main(String[] args) {
final Exchanger<String> exchanger = new Exchanger<>();

new Thread(() -> {
System.out.println("thread1开始");
try {
String exchange = exchanger.exchange("来自thread1的数据");
System.out.println("接收thread2发送的数据:" + exchange);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("thread1结束");
}, "thread1").start();

new Thread(() -> {
System.out.println("thread2开始");
try {
TimeUnit.SECONDS.sleep(3); // thread1也会进入等待,直到双方都准备好交换数据。
String exchange = exchanger.exchange("来自thread2的数据");
System.out.println("接收thread1发送的数据:" + exchange);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("thread2结束");
}, "thread2").start();
}
}

程序输出如下所示:

exchanger1.gif

那么如果线程不成对会出现什么情况呢?我们添加thread3线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class ExchangerTest {
public static void main(String[] args) {
final Exchanger<String> exchanger = new Exchanger<>();

new Thread(() -> {
System.out.println("thread1开始");
try {
String exchange = exchanger.exchange("发送数据-thread1");
System.out.println("接收数据:" + exchange);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("thread1结束");
}, "thread1").start();

new Thread(() -> {
System.out.println("thread2开始");
try {
String exchange = exchanger.exchange("发送数据-thread2");
System.out.println("接收数据:" + exchange);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("thread2结束");
}, "thread2").start();

new Thread(() -> {
System.out.println("thread3开始");
try {
String exchange = exchanger.exchange("发送数据-thread3");
System.out.println("接收数据:" + exchange);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("thread3结束");
}, "thread3").start();
}
}

程序输出如下所示:

1
2
3
4
5
6
7
thread1开始
thread3开始
接收数据:发送数据-thread1
thread3结束
thread2开始
接收数据:发送数据-thread3
thread1结束

可看到thread1和thread3交换了数据然后正常停止了,而thread2由于没有线程和它交换数据而苦苦等待,线程永远不会停止。查看线程快照可以证明这点:

QQ截图20190514092846.png

线程匹配是随机的,所以也有可能thread1和thread2匹配,thread3进入无休止的等待,这就类似于…

QQ截图20190514093233.png

另一个值得一提的点就是通过Exchanger交换的是同一个对象,而不是对象的拷贝:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class ExchangerTest {
public static void main(String[] args) {
final Exchanger<Object> exchanger = new Exchanger<>();

new Thread(() -> {
System.out.println("thread1开始");
Object object = new Object();
System.out.println("thread1发送数据:" + object);
try {
Object exchange = exchanger.exchange(object);
System.out.println("接收thread2发送的数据:" + exchange);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("thread1结束");
}, "thread1").start();

new Thread(() -> {
System.out.println("thread2开始");
Object object = new Object();
System.out.println("thread2发送数据:" + object);
try {
Object exchange = exchanger.exchange(object);
System.out.println("接收thread1发送的数据:" + exchange);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("thread2结束");
}, "thread2").start();
}
}

程序输出如下:

1
2
3
4
5
6
7
8
thread1开始
thread2开始
thread2发送数据:java.lang.Object@6d559005
thread1发送数据:java.lang.Object@7702c19
接收thread2发送的数据:java.lang.Object@6d559005
接收thread1发送的数据:java.lang.Object@7702c19
thread2结束
thread1结束

可以看到thread1发送的对象和thread2接收的对象句柄是一致的。

设置超时时间

如果不想线程在交换数据的时候等待过长的时间,我们可以使用exchanger的重载方法exchange(V x, long timeout, TimeUnit unit)来指定超时时间:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class ExchangerTest {
public static void main(String[] args) {
final Exchanger<String> exchanger = new Exchanger<>();

new Thread(() -> {
System.out.println("thread1开始");
try {
String exchange = exchanger.exchange("来自thread1的数据", 5, TimeUnit.SECONDS);
System.out.println("接收thread2发送的数据:" + exchange);
} catch (InterruptedException | TimeoutException e) {
e.printStackTrace();
}
System.out.println("thread1结束");
}, "thread1").start();

new Thread(() -> {
System.out.println("thread2开始");
try {
TimeUnit.SECONDS.sleep(10);
String exchange = exchanger.exchange("来自thread2的数据");
System.out.println("接收thread1发送的数据:" + exchange);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("thread2结束");
}, "thread2").start();
}
}

上面例子中,thread2休眠10秒后才开始交换数据,而thread1在等待5秒后没能成功交换数据就抛出TimeoutException异常了。10秒后由于没有线程再和thread2交换数据,所以thread2会一直等待:

QQ截图20190514093752.png

QQ截图20190514093839.png


TOP