0%

Java[并发]-PriorityBlockingQueue实现分析

PriorityBlockingQueue,优先级阻塞队列,它是一个无界的阻塞队列,它主要依赖类PriorityQueue来实现入队和出队操作,但增加了并发控制。它的特点是对于入队操作不会阻塞,因为是无界的,但出队操作有阻塞,因为队列可能为空。队列不允许使用null元素,虽然理论上是无界的(Integer.MAX_VALUE),但无限制地入队并且出队速度跟不上,可能引起资源耗尽而出现OOM异常;PriorityBlockingQueue的另一个特点是它不是FIFO的队列,它是按优先级来出队的,优先级的实现可以自己定义,使用Comparator接口来实现;

一、PriorityBlockingQueue的结构

PriorityBlockingQueue的结构比较简单,只有一个ReentrantLock对象和它的Condtion对象notEmpty用于并发控制,因为它只有在获取时才会阻塞。另外持有一个PriorityQueue的引用,所有的出队入队操作都委托给了这个对象来实现。另外从代码可以看到,用的是公平锁,这是因为要保证线程获取的优先级顺序。

1
2
3
private final PriorityQueue<E> q;
private final ReentrantLock lock = new ReentrantLock(true);
private final Condition notEmpty = lock.newCondition();

二、PriorityBlockingQueue的主要操作实现

1、入队操作
入队操作主要使用offer/put方法来完成,因为put方法没有阻塞,所以它是以offer方法来实现的,offer方法的实现也比较简单,出队操作委托给了PriorityQueue来实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
boolean ok = q.offer(e);
assert ok;
//通知阻塞在等待队列中的出队线程
notEmpty.signal();
return true;
} finally {
lock.unlock();
}
}

2、出队操作
出队的操作主要是通过poll/poll超时方法/take来实现,主要看take方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
//如果队列为空,则进入等待队列
while (q.size() == 0)
notEmpty.await();
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}
//从等待中返回,出队
E x = q.poll();
assert x != null;
return x;
} finally {
lock.unlock();
}
}