每个Java Object上都有监视器相关的方法wait/notify等,配合Synchronized关键字,可以实现等待/通知的模式。Condition接口和Lock配合,也能实现等待/通知模式。这两种模式存在一定的差异性,如下图:

Condition接口定义的方法如下:


一、Condition的实现分析
Condition的实现类是AbstractQueuedSynchronizr中的ConditionObject类,每个Condition对象都包含一个等待队列,这个队列是实现等待/通知的关键。
1、等待队列
等待队列是一个FIFO队列,队列中每个节点都包含了一个线程引用,该线程就是Condtion对象上等待的线程。如果线程调用了condition.await方法,该线程就会释放锁,构成节点并加入等待队列。这里节点复用了AQS类的节点,队列中包含首节点firstWaiter和尾节点lastWaiter,condition对象的等待队列如下图所示:

代码如下:
1 2 3 4
| private transient Node firstWaiter; private transient Node lastWaiter;
|
2、等待(await)
等待的相关代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
|
await()方法是一个可以响应中断的方法,它的实现流程如下:
1)、如果线程为中断状态,则抛出异常;否则把当前线程包装成Node节点,加入等待队列中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
|
private Node addConditionWaiter() { Node t = lastWaiter; if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
|
2)、释放当前线程持有的锁。
3)、await的语义实现,一个while循环,遍历检查当前节点是否存在在同步队列中,如果不存在,则调用LockSupport.park(this)阻塞线程。checkInterruptWhileWaiting方法不断检查等待节点的中断标识:
1 2 3 4 5
| private int checkInterruptWhileWaiting(Node node) { return (Thread.interrupted()) ? ((transferAfterCancelledWait(node))? THROW_IE : REINTERRUPT) : 0; }
|
定义一个interruptMode=0的变量,如果在signal之前中断,则其值为THROW_IE,即抛出中断异常。如果在signal之后中断,值为REINTERRUPT,表示需要重置中断标识。
4)、当节点加入同步队列后(被signal),就会退出while循环,await结束;此时,开始竞争锁,acquireQueued
成功则interruptMode值为REINTERRUPT,查看当前节点的下一节点,nextWaiter,把已经取消的节点移出等待队列(unlinkCancelledWaiters):
5)、如果interruptMode值不为0,则需要设置它的中断标识:
1 2 3 4 5 6 7
| private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); }
|
3、通知(Signal)
通知的相关方法代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false;
Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
|
通知实现的相关流程如下:
1)、判断当前线程是不是持有锁的线程,不是则抛出异常;
2)、修改首节点,把旧的首节点对象移出等待队列;
3)、transferForSignal方法负责把节点移动到AQS的同步队列的队尾,并判断它的前驱节点的状态,如果是取消或者不是SIGNAL状态,则直接唤醒当前节点的线程。
二、Condition使用
下面的代码是使用ReentrantLock和Condition实现的一个有界队列,有界队列在队列满时,会阻塞入队线程,在队列空时会阻塞出队线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
| public class BoundedQueue<T> {
private Object[] items; private int addIndex,removeIndex,count; private Lock lock = new ReentrantLock(); private Condition notEmptyCondition = lock.newCondition(); private Condition notFullCondition = lock.newCondition(); public BoundedQueue(int size){ items = new Object[size]; } public void add(T t) throws InterruptedException{ lock.lock(); try{ while(count == items.length){ notFullCondition.await(); } items[addIndex] = t; if(++addIndex == items.length) addIndex = 0; ++count; notEmptyCondition.signal(); }finally{ lock.unlock(); } } public T remove(T t) throws InterruptedException{ lock.lock(); try{ while(count == 0){ notEmptyCondition.await(); } Object x = items[removeIndex]; if(++removeIndex == items.length) removeIndex = 0; --count; notFullCondition.signal(); return (T)x; }finally{ lock.unlock(); } } public static void main(String[] args){ final BoundedQueue<String> queue = new BoundedQueue<String>(5); for(int i = 0; i < 10;i++){ Thread t = new Thread(new Runnable(){ @Override public void run(){ try { SleepUtils.mills(500); queue.add("aaaaa"); } catch (InterruptedException e) { e.printStackTrace(); } } },"addthread-"+String.valueOf(i)); t.start(); } for(int i = 0; i < 10;i++){ Thread t = new Thread(new Runnable(){ @Override public void run(){ try { SleepUtils.mills(100); queue.remove("aaaaa"); } catch (InterruptedException e) { e.printStackTrace(); } } },"removeThread-"+String.valueOf(i)); t.start(); } }
}
|
在添加或删除方法中用while循环而不是if条件,是为了防止过早或意外的通知,只有条件符合才能退出循环。