0%

Java[并发]ArrayBlockingQueue实现分析

阻塞队列是指一个支持两个附加操作的队列,这两个附加操作支持阻塞地插入和移除的方法。
• 支持阻塞地插入方法,队列满时,队列会阻塞插入元素的线程,直到队列有空位;
• 支持阻塞地移除方法,当队列为空时,队列会阻塞移除元素的线程,直到队列有元素。

阻塞队列一般用于生产-消费的场景,插入和移除操作的4种处理方法如下:

java-concurrence-queue

• 抛出异常:当队列满/空时,再往队列添加/移除元素,就会抛出异常;
• 返回特殊值:当队列插入元素时,会返回插入是否成功的布尔值,当移除元素时,则从队列取回一个元素,不存在返回NULL。
• 一直阻塞:当队列满时,再往队列put元素,队列会一直阻塞生产线程直到队列可用或响应中断退出;当队列为空时,如果从队列take出一个元素,则会一直阻塞消费线程直到队列不为空。
• 超时退出:当队列满/空时,如果添加/移除一个元素,则会线程阻塞到指定的时间后退出。

JDK6一共提供了6种阻塞队列,以应对各种场景的生产-消息模型,以下是这6种阻塞队列的类关系图:

blockingqueue

这6种队列都实现了BlockingQueue接口和继承AbstraceQueue类,有部分是使用ReentrantLock和Condition实现的,比如ArrayBlockingQueue/LinkedBlockingQueue/LinkedBlockingDeque/PriorityBlockingQueue/DelayQueue

一、ArrayBlockingQueue的实现分析

ArrayBlockingQueue的并发控制主要是依赖ReentrantLock和它的两个Condition对象来实现的,它是一个有界队列,使用时必须指定它的容量大小,另外它还接受公平锁版本的初始化,因为它使用了ReentrantLock,而ReentrantLock是区分公平锁和非公平锁的。ArrayBlockingQueue的读写都必须在锁的控制下进行的。即由Lock和condition使用的方式决定的。

1、ArrayBlockingQueue的基本结构
ArrayBlockingQueue主要由一个final的对象数组items,和代表下一个要添加的索引位置putIndex,下一个要移除的索引位置takeIndex组成,并发控制主要靠ReentrantLock对象和它的两个Condition,notEmpty表示等待移除的条件,notFull表示等待添加的条件。
主要代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/** 队列元素  */
private final E[] items;
/**下一个要移出的元素索引位置*/
private int takeIndex;
/**下一个要添加的元素索引位置*/
private int putIndex;
/**队列中元素的数量*/
private int count;
/** 保护所有访问的锁 */
private final ReentrantLock lock;
/** 等待移除条件 */
private final Condition notEmpty;
/** 等待添加条件 */
private final Condition notFull;

因为都是在lock保护下进行的,所有takeIndex/putIndex/count这些整型变量并没有定义为volatile的。另外items定义为final的,说明必须要初始化此数组。

2、常用方法实现分析
offer/put/poll/take等方法都基于两个基本的方法来实现的,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void insert(E x) {
//把元素添加到putIndex位置
items[putIndex] = x;
//putIndex自增
putIndex = inc(putIndex);
//队列元素数量增加
++count;
//通知等待移除线程
notEmpty.signal();
}
private E extract() {
final E[] items = this.items;
//获取要移出的元素
E x = items[takeIndex];
//设置元素下标对应元素为null
items[takeIndex] = null;
//takeIndex增加
takeIndex = inc(takeIndex);
//元素数量减少
--count;
//通知等待添加线程
notFull.signal();
return x;
}

1)offer方法:
offer有两个版本的方法,返回boolean值的比较简单,如果队列满则直接返回false,否则添加元素后返回 true.
offer还有一个支持等待超时的方法,如果超出指定时间队列,则返回false。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//自旋
for (;;) {
if (count != items.length) {
insert(e);
return true;
}
//如果超时,则返回false
if (nanos <= 0)
return false;
try {
//等待队列有空位的通知
nanos = notFull.awaitNanos(nanos);
} catch (InterruptedException ie) {
notFull.signal(); // propagate to non-interrupted thread
throw ie;
}
}
} finally {
lock.unlock();
}
}

2)、put方法
put方法在队列满时会一直阻塞添加元素的线程直到有位置,才添加元素后返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
final E[] items = this.items;
final ReentrantLock lock = this.lock;
//支持中断
lock.lockInterruptibly();
try {
try {
//队列已满,进入等待
while (count == items.length)
//进入notFull condition等待队列
notFull.await();
} catch (InterruptedException ie) {
notFull.signal(); // propagate to non-interrupted thread
throw ie;
}
//从等待中返回,添加到队列中
insert(e);
} finally {
lock.unlock();
}
}

3)、poll/take方法

同理,poll方法也有2个版本的实现,take方法在队列空时会一直阻塞直到队列有元素移出才返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
l ong nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//自旋
for (;;) {
//队列中有元素,直接移出
if (count != 0) {
E x = extract();
return x;
}
//超时,返回null
if (nanos <= 0)
return null;
try {
//notEmpty条件等待
nanos = notEmpty.awaitNanos(nanos);
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}

}
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//支持中断
lock.lockInterruptibly();
try {
try {
//队列为空,线程进入notEmpty条件等待队列
while (count == 0)
notEmpty.await();
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}
//从等待中返回,移除元素
E x = extract();
return x;
} finally {
lock.unlock();
}
}