0%

Java[并发]-LinkedBlockingQueue实现分析

LinkedBlockingQueue是基于链表的阻塞队列,默认最大的容量为Integer的MAX_VALUE,不接收null元素,它与ArrayBlockingQueue在并发控制上的不同之处在于,它有两个ReetrantLock,分别对应于添加和删除两个操作,并发能力得到提高。它的每个Lock都有一个Condition对象,用于通知/等待。

一、LinkedBlockingQueue的结构

LinkedBlockingQueue包含一个Node内部类代表队列节点,Node中有next引用代表下一个节点,item代表节点中的元素;一个Node类型的head引用代表头节点,last代表尾节点;两个ReentrantLock分别代表添加和移除节点用到的锁对象,ReentrantLock各一个Condition对象用于实现等待/通知模式的并发控制;另外还有一个AtomicInteger类型的对象,用于代表队列中的节点数量。
相关代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/** 队列的容量,如果没设置则为 Integer.MAX_VALUE */
private final int capacity;
/**元素的数量 */
private final AtomicInteger count = new AtomicInteger(0);
/** 链表的头节点*/
private transient Node<E> head;
/** 链表的尾节点 */
private transient Node<E> last;
/** 移除操作的锁 */
private final ReentrantLock takeLock = new ReentrantLock();
/** 移除操作的等待条件 */
private final Condition notEmpty = takeLock.newCondition();
/** 添加操作的锁 */
private final ReentrantLock putLock = new ReentrantLock();
/** 添加操作的等待条件 */
private final Condition notFull = putLock.newCondition();

二、LinkedBlockingQueue主要操作的实现

LinkedBlockingQueue同样实现了阻塞队列的基本方法,offer/put/poll/take等。这些方法都会用到共用的入队和出队方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void enqueue(E x) {
// assert putLock.isHeldByCurrentThread();
//构造一个新节点,添加到链接的尾节点
last = last.next = new Node<E>(x);
}
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
//获取头节点引用,指向h
Node<E> h = head;
//获取头节点的下一节点引用
Node<E> first = h.next;
h.next = h; // help GC
head = first;
//把元素出队
E x = first.item;
first.item = null;
return x;
}

1、put方法实现
offer方法和它的超时版本实现都比较简单,和put方法类似,下面是put方法的有关实现代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/**
* 如果元素数量等于队列容量,说明队列已经满
* 释放锁,进入putLock的Condition等待队列
* 有两种情况可以激活阻塞的put线程,
* 1、某个put线程添加元素后,发现队列未满,就会调用notFull.signal()激活阻塞的put线程;
* 2、take线程获取元素时,发现队列已满,则取出元素后notFull.signal()激活阻塞的put线程
*/
while (count.get() == capacity) {
notFull.await();
}
//从等待返回获取锁后,入队
enqueue(e);
//增加元素数量计数
c = count.getAndIncrement();
//如果队列未满,激活可能存在的阻塞的put线程
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
//如果c==0,说明take线程已经陷入阻塞,激活阻塞 的take线程,
if (c == 0)
signalNotEmpty();
}
/**
* 添加时通知等待移除的竞争条件
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}

从代码可以看出,put方法的实现流程如下:
1).如果添加的元素为null,则抛出异常;
2).获取putLock并获取当前队列中元素的数量;
3).加锁,如果当前队列已经满,则putLock的Condition对象notFull.await();注意这里用的是while,不是if。阻塞put的线程,等待唤醒;
4).如果被唤醒后,则继续,把元素加入队列中,原子变量操作增加队列的元素数量,这里有个优化:如果当前队列未满,则唤醒put操作中阻塞的线程;
5).如果当前队列的元素数量为0,则说明take线程已经阻塞,通过takeLock的Condition对象notEmpty.signal()唤醒take操作中阻塞的线程;

2、take方法实现
同样,移除的方法poll和poll的超时版本和take方法类似,只分析take方法的相关代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
//队列为空,释放锁,进入等待队列
while (count.get() == 0) {
notEmpty.await();
}
//从等待中返回,出队
x = dequeue();
c = count.getAndDecrement();
//发现队列中元素数量>1,则调用notEmpty.signal()激活take线程继续消费
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
//如果发现队列已经满,说明put线程已经阻塞,则触发putLock的notFull通知
if (c == capacity)
signalNotFull();
return x;
}
/**
* 移除时通知等待添加的竞争条件
*/
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}

从代码可以看出,take方法的实现流程如下:
1).获取当前队列的元素数量,并获取takeLock对象;
2).加锁(可响应中断),如果队列为空,则释放锁,进入takeLock的Condition对象的等待队列,notEmpty.await();
3).从await中返回后,出队,并原子变量操作队列的数量减1 ;
4).如果队列中的元素数量大于1,则继续通知阻塞在takeLock的Condition对象的等待队列中的线程;
5).如果发现队列已经满,则说明put线程已经阻塞,则触发putLock的Condition对象notFull进行通知;

三、总结

从LinkedBlockingQueue可以看到,在并发上使用了两个锁分别对添加和移除操作进行加锁,并使用原子变量count来作为队列数量的计数器。Put/take方法在利用这个count变量进行并发的优化,减少线程阻塞在Condtion对象的等待队列中。