0%

Java[并发]-Condition实现分析

每个Java Object上都有监视器相关的方法wait/notify等,配合Synchronized关键字,可以实现等待/通知的模式。Condition接口和Lock配合,也能实现等待/通知模式。这两种模式存在一定的差异性,如下图:

java-concurrence-condition

Condition接口定义的方法如下:

java-concurrence-condition2

java-concurrence-condition3

一、Condition的实现分析

Condition的实现类是AbstractQueuedSynchronizr中的ConditionObject类,每个Condition对象都包含一个等待队列,这个队列是实现等待/通知的关键。

1、等待队列
等待队列是一个FIFO队列,队列中每个节点都包含了一个线程引用,该线程就是Condtion对象上等待的线程。如果线程调用了condition.await方法,该线程就会释放锁,构成节点并加入等待队列。这里节点复用了AQS类的节点,队列中包含首节点firstWaiter和尾节点lastWaiter,condition对象的等待队列如下图所示:

java-concurrence-condition4

代码如下:

1
2
3
4
/**condition队列的第一个节点 */
private transient Node firstWaiter;
/**condition队列的最后一个节点*/
private transient Node lastWaiter;

2、等待(await)

等待的相关代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public final void await() throws InterruptedException {
//响应中断
if (Thread.interrupted())
throw new InterruptedException();
//把当前线程相关信息包装后加入等待队列
Node node = addConditionWaiter();
//释放当前线程占有的锁
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
//遍历AQS同步队列,查看当前节点是否在同步队列中,如果没有在,则继续睡眠
LockSupport.park(this);
//直到被唤醒或中断才退出
//checkInterruptWhileWaiting,检查等待中的中断标识,
//如果在signal之前被中断,interruptMode=THROW_IE
//如果在signal之后,则interruptMode=REINTERRUPT,重置中断标识
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//唤醒后加入AQS同步队列竞争锁,竞争失败则继续睡眠
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
//移出已经取消等待的节点
unlinkCancelledWaiters();
if (interruptMode != 0)
//根据interruptMode抛出InterruptedException或者重置interrupted标识
reportInterruptAfterWait(interruptMode);
}

await()方法是一个可以响应中断的方法,它的实现流程如下:
1)、如果线程为中断状态,则抛出异常;否则把当前线程包装成Node节点,加入等待队列中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 添加新的等待节点到队列
* @return 新的等待节点
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果 lastWaiter 已经取消, 把它清除出去.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//包装一个CONDITION的新节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
//没有尾节点,新的节点成为头节点
firstWaiter = node;
else
//追加到以前尾节点的nextWaiter,成为尾节点
t.nextWaiter = node;
//指向尾节点引用lastWaiter
lastWaiter = node;
return node;
}

2)、释放当前线程持有的锁。
3)、await的语义实现,一个while循环,遍历检查当前节点是否存在在同步队列中,如果不存在,则调用LockSupport.park(this)阻塞线程。checkInterruptWhileWaiting方法不断检查等待节点的中断标识:

1
2
3
4
5
private int checkInterruptWhileWaiting(Node node) {
return (Thread.interrupted()) ?
((transferAfterCancelledWait(node))? THROW_IE : REINTERRUPT) :
0;
}

定义一个interruptMode=0的变量,如果在signal之前中断,则其值为THROW_IE,即抛出中断异常。如果在signal之后中断,值为REINTERRUPT,表示需要重置中断标识。
4)、当节点加入同步队列后(被signal),就会退出while循环,await结束;此时,开始竞争锁,acquireQueued
成功则interruptMode值为REINTERRUPT,查看当前节点的下一节点,nextWaiter,把已经取消的节点移出等待队列(unlinkCancelledWaiters):
5)、如果interruptMode值不为0,则需要设置它的中断标识:

1
2
3
4
5
6
7
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}

3、通知(Signal)
通知的相关方法代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public final void signal() {
//如果当前线程不是持有锁的线程,抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
//修改头节点,完成旧节点移出Condition对象的等待队列
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&//将老的头节点,移动到AQS的同步队列中
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
/*
* 如果不能修改waitStatus,则当前节点已经被取消
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* 追加到队列并且尝试设置前驱节点的waitStatus,暗示线程可能在waiting;
* 如果取消或尝试设置waitStatus失败,唤醒线程去重新同步(在这种情况 下waitStatus很快变化)
*/
//加入同步队列,返回前驱节点
Node p = enq(node);
int ws = p.waitStatus;
//如果前驱节点的状态为cancel或修改waitStatus为SIGNAL失败,则直接唤醒该节点线程
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

通知实现的相关流程如下:
1)、判断当前线程是不是持有锁的线程,不是则抛出异常;
2)、修改首节点,把旧的首节点对象移出等待队列;
3)、transferForSignal方法负责把节点移动到AQS的同步队列的队尾,并判断它的前驱节点的状态,如果是取消或者不是SIGNAL状态,则直接唤醒当前节点的线程。

二、Condition使用

​ 下面的代码是使用ReentrantLock和Condition实现的一个有界队列,有界队列在队列满时,会阻塞入队线程,在队列空时会阻塞出队线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
public class BoundedQueue<T> {

private Object[] items;
private int addIndex,removeIndex,count;
private Lock lock = new ReentrantLock();
private Condition notEmptyCondition = lock.newCondition();
private Condition notFullCondition = lock.newCondition();

public BoundedQueue(int size){
items = new Object[size];
}

public void add(T t) throws InterruptedException{
lock.lock();
try{
while(count == items.length){
notFullCondition.await();
}
items[addIndex] = t;
if(++addIndex == items.length)
addIndex = 0;
++count;
notEmptyCondition.signal();
}finally{
lock.unlock();
}
}

public T remove(T t) throws InterruptedException{
lock.lock();
try{
while(count == 0){
notEmptyCondition.await();
}
Object x = items[removeIndex];
if(++removeIndex == items.length)
removeIndex = 0;
--count;
notFullCondition.signal();
return (T)x;
}finally{
lock.unlock();
}
}

public static void main(String[] args){
final BoundedQueue<String> queue = new BoundedQueue<String>(5);
for(int i = 0; i < 10;i++){
Thread t = new Thread(new Runnable(){
@Override
public void run(){
try {
SleepUtils.mills(500);
queue.add("aaaaa");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"addthread-"+String.valueOf(i));
t.start();
}
for(int i = 0; i < 10;i++){
Thread t = new Thread(new Runnable(){
@Override
public void run(){
try {
SleepUtils.mills(100);
queue.remove("aaaaa");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"removeThread-"+String.valueOf(i));
t.start();
}
}

}

在添加或删除方法中用while循环而不是if条件,是为了防止过早或意外的通知,只有条件符合才能退出循环。