阻塞队列是指一个支持两个附加操作的队列,这两个附加操作支持阻塞地插入和移除的方法。
• 支持阻塞地插入方法,队列满时,队列会阻塞插入元素的线程,直到队列有空位;
• 支持阻塞地移除方法,当队列为空时,队列会阻塞移除元素的线程,直到队列有元素。
阻塞队列一般用于生产-消费的场景,插入和移除操作的4种处理方法如下:

• 抛出异常:当队列满/空时,再往队列添加/移除元素,就会抛出异常;
• 返回特殊值:当队列插入元素时,会返回插入是否成功的布尔值,当移除元素时,则从队列取回一个元素,不存在返回NULL。
• 一直阻塞:当队列满时,再往队列put元素,队列会一直阻塞生产线程直到队列可用或响应中断退出;当队列为空时,如果从队列take出一个元素,则会一直阻塞消费线程直到队列不为空。
• 超时退出:当队列满/空时,如果添加/移除一个元素,则会线程阻塞到指定的时间后退出。
JDK6一共提供了6种阻塞队列,以应对各种场景的生产-消息模型,以下是这6种阻塞队列的类关系图:

这6种队列都实现了BlockingQueue接口和继承AbstraceQueue类,有部分是使用ReentrantLock和Condition实现的,比如ArrayBlockingQueue/LinkedBlockingQueue/LinkedBlockingDeque/PriorityBlockingQueue/DelayQueue
一、ArrayBlockingQueue的实现分析
ArrayBlockingQueue的并发控制主要是依赖ReentrantLock和它的两个Condition对象来实现的,它是一个有界队列,使用时必须指定它的容量大小,另外它还接受公平锁版本的初始化,因为它使用了ReentrantLock,而ReentrantLock是区分公平锁和非公平锁的。ArrayBlockingQueue的读写都必须在锁的控制下进行的。即由Lock和condition使用的方式决定的。
1、ArrayBlockingQueue的基本结构
ArrayBlockingQueue主要由一个final的对象数组items,和代表下一个要添加的索引位置putIndex,下一个要移除的索引位置takeIndex组成,并发控制主要靠ReentrantLock对象和它的两个Condition,notEmpty表示等待移除的条件,notFull表示等待添加的条件。
主要代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| private final E[] items; private int takeIndex; private int putIndex; private int count; private final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull;
|
因为都是在lock保护下进行的,所有takeIndex/putIndex/count这些整型变量并没有定义为volatile的。另外items定义为final的,说明必须要初始化此数组。
2、常用方法实现分析
offer/put/poll/take等方法都基于两个基本的方法来实现的,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| private void insert(E x) { items[putIndex] = x; putIndex = inc(putIndex); ++count; notEmpty.signal(); } private E extract() { final E[] items = this.items; E x = items[takeIndex]; items[takeIndex] = null; takeIndex = inc(takeIndex); --count; notFull.signal(); return x; }
|
1)offer方法:
offer有两个版本的方法,返回boolean值的比较简单,如果队列满则直接返回false,否则添加元素后返回 true.
offer还有一个支持等待超时的方法,如果超出指定时间队列,则返回false。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { if (count != items.length) { insert(e); return true; } if (nanos <= 0) return false; try { nanos = notFull.awaitNanos(nanos); } catch (InterruptedException ie) { notFull.signal(); throw ie; } } } finally { lock.unlock(); } }
|
2)、put方法
put方法在队列满时会一直阻塞添加元素的线程直到有位置,才添加元素后返回。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { while (count == items.length) notFull.await(); } catch (InterruptedException ie) { notFull.signal(); throw ie; } insert(e); } finally { lock.unlock(); } }
|
3)、poll/take方法
同理,poll方法也有2个版本的实现,take方法在队列空时会一直阻塞直到队列有元素移出才返回
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| public E poll(long timeout, TimeUnit unit) throws InterruptedException { l ong nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { if (count != 0) { E x = extract(); return x; } if (nanos <= 0) return null; try { nanos = notEmpty.awaitNanos(nanos); } catch (InterruptedException ie) { notEmpty.signal(); throw ie; } } } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { while (count == 0) notEmpty.await(); } catch (InterruptedException ie) { notEmpty.signal(); throw ie; } E x = extract(); return x; } finally { lock.unlock(); } }
|