0%

Java[并发]-SynchronousQueue实现分析

SynchronousQueue是阻塞队列的一种,它的特点是每个插入操作必须等待另一个线程的移出操作,反之亦然。同步队列中没有任何元素,队列是没有容量的,不能对队列进行迭代。SynchronousQueue是个空队列,所以队列中不存在null元素。
SynchronousQueue可以使用公平策略,默认为非公平策略,以构造方法传入,公平策略可以保证线程以FIFO的方式进行访问。SynchronousQueue实现了公平/非公平版本,公平版本是以队列FIFO的数据结构实现的,而非公平版本是以栈(LIFO)的数据结构实现的。这两个结构都以内部类的形式出现,它们共同实现了一个abstract类Transfer的transfer方法。

1
2
3
static abstract class Transferer {
abstract Object transfer(Object e, boolean timed, long nanos);
}

一、非公平版本的Stack实现

1、数据结构 非公平版本的实现是以Stack为数据结构,TransferStack继承Transfer类,实现了它的方法transfer。TransferStack是一个链表结构,它有一个内部类SNode描述了栈中的一个节点,它有一个next引用指向下一个节点。另外SNode还携带了一个match节点,用于和对方线程进行数据交换。

1
2
3
4
5
6
7
static final class SNode {
volatile SNode next; // next node in stack
volatile SNode match; // the node matched to this
volatile Thread waiter; // to control park/unpark
Object item; // data; or null for REQUESTs
int mode;
}

waiter用来表示当前线程的引用,即阻塞时设置,item即要交换的数据,mode是外部TransferStack类定义的3种状态,当前操作是put或者take,还是满足状态。

1
2
3
4
5
6
/** Node represents an unfulfilled consumer */
static final int REQUEST = 0;
/** Node represents an unfulfilled producer */
static final int DATA = 1;
/** Node is fulfilling another unfulfilled DATA or REQUEST */
static final int FULFILLING = 2;

SNode中的next,match引用的更新都是CAS操作,是通过AtomicReferenceFieldUpdater类来实现的。
TransferStack有一个SNode类型的header引用来表示栈顶的元素:

1
2
/** The head (top) of the stack */
volatile SNode head;

它的更新同样通过AtomicReferenceFieldUpdater来实现。

2、数据交换的实现

数据交换实现了这样一个逻辑,当生产者线程携带数据入栈时,它需要一个消费者线程同样携带数据入栈,然后通过栈中元素是以链表链接这种结构产生关联,进行match匹配,把双方的数据通过match节点进行交换。这两者的顺序是不确定的,也许是消费者线程先入栈。交换完成后,这两个节点都从栈中弹出。
这个逻辑主要是通过TransferStack的transfer方法来实现的,这个方法实现比较复杂,可以通过画出模拟交换过程的线程操作图来理解。

java-concurrence-queue3

图中所示按线程操作来看,这里t1,t2并不特定是生产或消费线程:
①线程t1首先判断首节点h是否为空,为空的话构造节点s1,入栈,把h节点指向s1。然后t1阻塞,等待t2操作;
②线程t2首先判断首节点是否为空,发现h不为空后,构造节点s2节点压入栈中,s2.next=h,因为①把h指向了s1,实际上是s2.next=s1;
③线程t2继续操作s1,把s1中的match节点指向s2,此时,即完成了交换操作。

TransferStack中的transfer方法就是根据上面的步骤来实现的,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
Object transfer(Object e, boolean timed, long nanos) {
SNode s = null; // constructed/reused as needed
//e为null时,表示操作想从栈中获取数据,比如take方法
//e不为空时,表示携带数据入栈,如put方法
int mode = (e == null)? REQUEST : DATA;
//自旋
for (;;) {
SNode h = head;
//头节点为空或匹配操作模式
if (h == null || h.mode == mode) { // empty or same-mode
//等待超时版本,如put/take 的超时方法
if (timed && nanos <= 0) { // can't wait
//如果头节点已经取消,换下一个节点
if (h != null && h.isCancelled())
casHead(h, h.next); // pop cancelled node
else
return null;
} else if (casHead(h, s = snode(s, e, h, mode))) {//构造s节点指向头节点
//等待对方线程满足匹配
SNode m = awaitFulfill(s, timed, nanos);
if (m == s) { // wait was cancelled
clean(s);
return null;
}
//等待中返回后
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
//如果为take操作,返回匹配的节点中的元素,put操作返回当前节点
return mode == REQUEST? m.item : s.item;
}
} else if (!isFulfilling(h.mode)) { // try to fulfill,栈顶存在节点,如果为mode=0或1时
//头节点取消,把下一个节点替换头节点
if (h.isCancelled()) // already cancelled
casHead(h, h.next); // pop and retry
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
for (;;) { // loop until matched or waiters disappear
SNode m = s.next; // m is s's match
if (m == null) { // all waiters are gone
casHead(s, null); // pop fulfill node
s = null; // use new node next time
break; // restart main loop
}
SNode mn = m.next;
//更新m,即对方线程的node的match节点为当前线程的节点s
if (m.tryMatch(s)) {
//两个节点都出栈
casHead(s, mn); // pop both s and m
//如果是take操作,返回对方线程node中的元素,否则返回当前线程栈中的元素
return (mode == REQUEST)? m.item : s.item;
} else // lost match
s.casNext(m, mn); // help unlink
}
}
} else { // help a fulfiller
SNode m = h.next; // m is h's match
if (m == null) // waiter is gone
casHead(h, null); // pop fulfilling node
else {
SNode mn = m.next;
if (m.tryMatch(h)) // help match
casHead(h, mn); // pop both h and m
else // lost match
h.casNext(m, mn); // help unlink
}
}
}
}

实现步骤如下:
①线程t1首先判断是put还是take操作,决定节点的mode;
②然后t1进入自旋,判断如果头节点为空,说明栈为空,则判断操作方法是否为超时版本的方法,如果是判断头节点是否已经取消,如果取消则交换头节点为下一节点,否则构造新的节点S,压入栈中,即头节点引用h指向s节点。
③最后t1调用awaitFulfill方法阻塞自旋等待或者阻塞t1线程,直到match引用指定的节点不为空,即t2把match节点指定向它入栈的节点;
④t2线程同样进入自旋,发现头节点h不为空后,进入分支else if,构造新的节点s2,s2的next节点为h,即为s1节点,然后把h指向s2,即s2压入栈中;
⑤t2线程获取s2.next节点,即s1节点后,调用s1.tryMatch方法进行匹配,实际上是CAS更新s1的match引用指向s2; 然后唤醒阻塞的t1线程,根据节点的mode判断,如果是take操作,返回s1.item,否则返回自身操作节点s2.item;
最后t1线程被唤醒后,判断节点的mode,如果是take操作,则返回s1.match.item,否则返回自己s1.item;
线程t1陷入等待的方法会根据CPU的核心数做了一些优化,目的是避免线程马上陷入阻塞状态,通过计算CPU的核心,得到自旋的时间,先通过循环来消耗时间,如果在此过程中发现t2完成了交换,那么就没有必要调用LockSupport.park来阻塞线程;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
long lastTime = (timed)? System.nanoTime() : 0;
Thread w = Thread.currentThread();
SNode h = head;
int spins = (shouldSpin(s)?
(timed? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted())
s.tryCancel();
//获取匹配的节点
SNode m = s.match;
if (m != null)
return m;
//超时逻辑
if (timed) {
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
if (nanos <= 0) {
s.tryCancel();
continue;
}
}
//需要循环的时间片
if (spins > 0)
spins = shouldSpin(s)? (spins-1) : 0;
else if (s.waiter == null)
//设置s节点中的等待线程
s.waiter = w; // establish waiter so can park next iter
else if (!timed)
//时间用完还未匹配,阻塞当前节点线程
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
//带超时时间的阻塞
LockSupport.parkNanos(this, nanos);
}
}

二、公平版本的Queue实现

公平版本的实现过程逻辑也类似,但是以FIFO的队列结构为基础,实现类为TransferQueue,它同样包含一个QNode内部类作为队列中的节点元素,也是以链表形式出现的。公平性就是通过队列的FIFO来实现的。

1、数据结构

主要通过QNode类来实现队列中的节点,节点中包括一个next引用指向队列中下一个节点,item引用表示节点中的数据,waiter来表示操作节点的线程,只有陷入阻塞时才会赋值;

1
2
3
4
5
6
static final class QNode {
volatile QNode next; // next node in queue
volatile Object item; // CAS'ed to or from null
volatile Thread waiter; // to control park/unpark
inal boolean isData;
}

另外QNode同样通过AtomicReferenceFieldUpdater类来实现对next,item的更新,isData用来判断当前的操作是否携带数据,携带数据的为put操作,否则为take操作。TransferQueue类是队列的实现类,它的引用一个head和tail引用来指向队列的头节点和尾节点。

1
2
3
4
/** Head of queue */
transient volatile QNode head;
/** Tail of queue */
transient volatile QNode tail;

在TransferQueue初始化时,会构造一个虚拟的节点,分别把head,tail引用指向它。

1
2
3
4
5
TransferQueue() {
QNode h = new QNode(null, false); // initialize to dummy node.
head = h;
tail = h;
}

同样,对head,tail引用的更新也是通过AtomicReferenceFieldUpdater来实现的。

2、数据交换的实现
在队列中数据交换是这样实现的,首先,TransferQueue在初始化时,会先创建一个虚拟节点,首尾节点的引用指向它。如果线程t1为生产者线程入队,则构造入队节点QNode,添加到队列的尾部,接着生产者线程t2发现队列的首尾引用不相等时,表示队列中已经有元素。它会把队列尾部的元素返回,接口唤醒t1线程;还有一种情况是消费者线程t2先入队,生产者线程t1后入队,生产者线程会把自己携带的元素和队尾的节点元素进行交换,然后唤醒消费者线程。整个过程如下图所示:

java-concurrence-queue4

img

实现代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
Object transfer(Object e, boolean timed, long nanos) {
QNode s = null; // constructed/reused as needed
//e!=null,表示添加元素,如put
//e =null,表示移除元素,如take
boolean isData = (e != null);

for (;;) {
QNode t = tail;
QNode h = head;
if (t == null || h == null) // saw uninitialized value
continue; // spin
//空队列或队列的尾节点的操作模式与当前线程操作相同
if (h == t || t.isData == isData) { // empty or same-mode
QNode tn = t.next;
//tail节点已经被别的线程修改
if (t != tail) // inconsistent read
continue;
//t有next节点,说明它已经不是尾节点,把tn作为尾节点
if (tn != null) { // lagging tail
advanceTail(t, tn);
continue;
}
//超时版本的方法并且已经超时,返回null
if (timed && nanos <= 0) // can't wait
return null;
//创建一个队列节点,如果两个线程都创建成功,在casNext时,只能一个成功
if (s == null)
s = new QNode(e, isData);
//把s节点连接到队列尾节点的next上
//如果同时两个线程都到达,只有一个成功,失败者跳出此逻辑到达else分支
if (!t.casNext(null, s)) // failed to link in
continue;
//把tail节点指向s节点
advanceTail(t, s); // swing tail and wait
//等待对方匹配线程进行交换
Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) { // wait was cancelled
clean(t, s);
return null;
}

if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null)? x : e;
} else { // complementary-mode
//当前头节点的下一节点,即对方线程创建的节点
QNode m = h.next; // node to fulfill
//非一致性读
if (t != tail || m == null || h != head)
continue; // inconsistent read
Object x = m.item;
//x != null代表是数据节点,表明对方线程是put操作,而本方是get操作,已经满足交换条件;
//x == null代表对方线程为take操作,那么本方为put操作,判断x==m,如果成立,则表示m已经取消,则出队后重试;
//x!=m,则casItem交换m节点中的元素成立则继续下面的操作;否则出队然后重试
if (isData == (x != null) || // m already fulfilled
x == m || // m cancelled
!m.casItem(x, e)) { // lost CAS
advanceHead(h, m); // dequeue and retry
continue;
}
//头节点h指向下一节点
advanceHead(h, m); // successfully fulfilled
//唤醒对方等待的线程
LockSupport.unpark(m.waiter);
return (x != null)? x : e;
}
}
}

实现流程如下:
①首先判断操作的类型,e==null表示为take操作,e!=null表示put操作;然后进入自旋;
②判断首尾节点的引用是否相同,如果相同表示队列中存在1个节点,则构造一个新节点q入队,通过CAS操作添加到队尾,同时把tail引用指向入队的节点,然后调用awaitFulfill方法陷入等待;需要注意的时,可能同时有多个线程到达这个分支,但最后只有一个线程能添加到队尾;
③另一个线程发现首尾节点引用不同后,到达else分支,如果当前线程为消费者线程,则当前节点为生产者线程设置的,即包含有数据,则直接返回节点中的数据,然后把head引用指向当前节点,即初始化的虚拟节点出队;如果当前线程为生产者线程,说明当前节点是消费者线程设置的,没有包含数据(item=null),则通过CAS操作,把当前线程携带的元素和当前节点中的item交换,然后唤醒对方的线程;

awaitFulfill方法的实现和TransferStack中的实现基本相同:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) {
/* Same idea as TransferStack.awaitFulfill */
//与TransferStack.awaitFulfill的实现类似
long lastTime = (timed)? System.nanoTime() : 0;
Thread w = Thread.currentThread();
int spins = ((head.next == s) ?
(timed? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
//线程被中断,取消
if (w.isInterrupted())
s.tryCancel(e);
//获取节点中的元素
Object x = s.item;
//如果当前节点的元素和原来的元素不一样,说明已经被对方线程交换
//返回对方的元素
if (x != e)
return x;
//超时版本
if (timed) {
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
if (nanos <= 0) {
s.tryCancel(e);
continue;
}
}
//自旋次数
if (spins > 0)
--spins;
else if (s.waiter == null)
//自旋次数小于等于0,设置等待的线程
s.waiter = w;
else if (!timed)
//阻塞当前线程
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
//有超时的阻塞当前线程
LockSupport.parkNanos(this, nanos);
}
}

三、SynchronousQueue主要的方法实现

SynchronousQueue对队列进行添加和删除的方法都是通过TransferStack或TransferQueue中的transfer方法来实现的,通过构造方法传入boolean值来判断,如果是公平模式则使用TransferQueue,否则使用TransferStack,默认为TransferStack。

1、添加操作:

实现都比较简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
return transferer.transfer(e, true, 0) != null;
}
public boolean offer(E o, long timeout, TimeUnit unit)
throws InterruptedException {
if (o == null) throw new NullPointerException();
if (transferer.transfer(o, true, unit.toNanos(timeout)) != null)
return true;
if (!Thread.interrupted())
return false;
throw new InterruptedException();
}
public void put(E o) throws InterruptedException {
if (o == null) throw new NullPointerException();
if (transferer.transfer(o, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}

2、移除操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public E poll() {
return (E)transferer.transfer(null, true, 0);
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
Object e = transferer.transfer(null, true, unit.toNanos(timeout));
if (e != null || !Thread.interrupted())
return (E)e;
throw new InterruptedException();
}
public E take() throws InterruptedException {
Object e = transferer.transfer(null, false, 0);
if (e != null)
return (E)e;
Thread.interrupted();
throw new InterruptedException();
}