PriorityQueue源码解析

PriorityQueue源码解析

概述

  • PriorityQueue 采用数组的形式保存数据,逻辑上采用二叉堆储存
  • PriorityQueue 数组排序并非按照插入顺序,而是需要根据比较器判断; 插入自定义对象时,自定义对象需要实现Comparable接口 ,或者使用外部比较器对象,外部比较器对象需实现Compartor接口
  • PriorityQueue 不能加入null
  • 非线程安全
  • 自动扩容。如果容量小于64,两倍增长扩容;否则增长50%
  • 入队出队的时间复杂度O(log(n))

PriorityQueue的使用场景

  • 根据权值和优先级。返回前n 条记录。如根据投诉记录条数排序,返回投诉记录最多的前三个商家。
  • 消息中间件mq根据消息优先级优先发送等

储备知识

二叉堆

(如果已有了解可跳过)

二叉堆本质上是一种完全二叉树,它分为两个类型:

  • 最大堆(大根堆)

  • 最小堆(小根堆)

最大堆:任何一个父节点的值都大于左右孩子结点的值。

最大堆

最小堆:任何一个父结点的值都小于左右孩子结点的值。

image-20210615112954521

根节点称为堆顶,最大堆堆顶为最大的元素,最小堆堆顶为最小的元素。

具体的插入删除操作不作详细说明。

根据最大堆和最小堆的性质,我们可以类比优先级进行看待,堆顶元素最小或最大,那么对应的优先级也就最小或最大,由于即使插入删除后会通过调整一直保持该种特性,我们即实现了根据优先级进行排列。

结构认识

类图

PriorityQueue集成了AbstractQueue,实现了Serializable接口。不支持克隆,没有什么特别的

源码解析

字段解释

PriorityQueue是基于数组实现的逻辑上的二叉堆。

对于数组中某个元素queue[n],其左孩子在queue[2n+1]位置上,右孩子queue[2(n+1)]位置,它的父亲则在queue[(n-1)/2]上,而根的位置则是[0]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 默认初始大小11
private static final int DEFAULT_INITIAL_CAPACITY = 11;

/**
* Priority queue represented as a balanced binary heap: the two
* children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The
* priority queue is ordered by comparator, or by the elements'
* natural ordering, if comparator is null: For each node n in the
* heap and each descendant d of n, n <= d. The element with the
* lowest value is in queue[0], assuming the queue is nonempty.
* 英文注释的意思即,是数组实现的二叉堆,对于数组中某个元素queue[n],其左孩子在queue[2n+1]位置上,右孩子queue[2(n+1)]位置,它的父亲则在queue[(n-1)/2]上,而根的位置则是[0]。
*/
transient Object[] queue; // non-private to simplify nested class access
// 队列中元素大小
int size;
// 比较器
private final Comparator<? super E> comparator;

transient int modCount; // non-private to simplify nested class access

private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

PriorityQueue的字段不多,比较容易,没有ConcurrentHashMap那么恐怖。

构造函数解释

PriorityQueue构造函数一共7个,我们来看看。

构造函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public PriorityQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}

// 指定初始容量,但是不指定比较器
public PriorityQueue(int initialCapacity) {
this(initialCapacity, null);
}

// 指定比较器,初始容量默认11
public PriorityQueue(Comparator<? super E> comparator) {
this(DEFAULT_INITIAL_CAPACITY, comparator);
}

// 指定初始容量、也指定比较器
public PriorityQueue(int initialCapacity,
Comparator<? super E> comparator) {
// Note: This restriction of at least one is not actually needed,
// but continues for 1.5 compatibility
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.queue = new Object[initialCapacity];
this.comparator = comparator;
}

/**
* Creates a {@code PriorityQueue} containing the elements in the
* specified collection. If the specified collection is an instance of
* a {@link SortedSet} or is another {@code PriorityQueue}, this
* priority queue will be ordered according to the same ordering.
* Otherwise, this priority queue will be ordered according to the
* {@linkplain Comparable natural ordering} of its elements.
* 英文注释大致意思就是如果传入的集合c是有序集合或者是另外一个priorityQueue,那么就按照原来的顺序进行排列,否则按照自然排序的方式排列
*/
public PriorityQueue(Collection<? extends E> c) {
if (c instanceof SortedSet<?>) {
SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
this.comparator = (Comparator<? super E>) ss.comparator();
initElementsFromCollection(ss);
}
else if (c instanceof PriorityQueue<?>) {
PriorityQueue<? extends E> pq = (PriorityQueue<? extends E>) c;
this.comparator = (Comparator<? super E>) pq.comparator();
initFromPriorityQueue(pq);
}
else {
this.comparator = null;
initFromCollection(c);
}
}

// 传入一个PriorityQueue,重新建立堆,深拷贝一个。
public PriorityQueue(PriorityQueue<? extends E> c) {
this.comparator = (Comparator<? super E>) c.comparator();
initFromPriorityQueue(c);
}

// 传入一个已经排序的Set,根据其排序规则创建PriorityQueue
public PriorityQueue(SortedSet<? extends E> c) {
this.comparator = (Comparator<? super E>) c.comparator();
initElementsFromCollection(c);
}

initFromPriorityQueue(pq);中使用了heaplify(),进行构建堆,我们后面再看。

主要方法解释

  • 添加

添加即对二叉堆进行插入操作。二叉堆的插入操作有一个上浮操作,对应到数组上怎么操作呢?

还是根据前面的逻辑,对于数组中某个元素queue[n],其左孩子在queue[2n+1]位置上,右孩子queue[2(n+1)]位置,它的父亲则在queue[(n-1)/2]上,而根的位置则是[0]

看下面的图可以更清晰认识(源自网络):

img
img

结合上面的图解,我们来说明一下二叉堆的添加元素过程:

  1. 将元素2添加在最后一个位置(队尾)(图2)。

  2. 由于2(queue[12])比其父亲6[queue[(12-1)/2=5]]要小,所以将元素2上移,交换2和6的位置(图3);

  3. 然后由于2比5小,继续将2上移,交换2和5的位置(图4),此时2大于其父亲(根节点)1,结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public boolean add(E e) {
return offer(e);
}
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
modCount++;
int i = size;
// 当前元素个数大于等于队列底层数组的长度,则进行扩容
if (i >= queue.length)
grow(i + 1);
// 调用siftUp方法,将元素添加到尾部,进行上移判断
siftUp(i, e);
size = i + 1;
return true;
}

上浮的方法:siftUp()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private void siftUp(int k, E x) {
if (comparator != null)
// 如果比较器comparator不为空,则调用siftUpUsingComparator方法进行上移操作
siftUpUsingComparator(k, x, queue, comparator);
else
// 否则调用siftUpComparable方法进行上移操作
siftUpComparable(k, x, queue);
}

private static <T> void siftUpUsingComparator(
int k, T x, Object[] es, Comparator<? super T> cmp) {
// k>0表示判断k不是根的情况下,也就是元素x有父节点
while (k > 0) {
// 计算元素x的父节点位置[(n-1)/2]
int parent = (k - 1) >>> 1;
Object e = es[parent];
// 如果新增的元素k比其父亲e大,则不需要"上移",跳出循环结束
if (cmp.compare(x, (T) e) >= 0)
break;
// // x比父亲小,则需要进行"上移"
es[k] = e;
// // 将新插入元素的位置k指向父亲的位置,进行下一层循环
k = parent;
}
// 找到新增元素x的合适位置k之后进行赋值
es[k] = x;
}

private static <T> void siftUpComparable(int k, T x, Object[] es) {
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = es[parent];
if (key.compareTo((T) e) >= 0)
break;
es[k] = e;
k = parent;
}
es[k] = key;
}

添加元素超过length会自动扩容,所以没有抛出异常。下面看看扩容方法

  • grow()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
private void grow(int minCapacity) {
int oldCapacity = queue.length;
// Double size if small; else grow by 50%
// 如果当前队列长度小于64则扩容2倍,否则扩容1.5倍
int newCapacity = oldCapacity + ((oldCapacity < 64) ?
(oldCapacity + 2) :
(oldCapacity >> 1));
// overflow-conscious code
// 如果扩容后newCapacity超出int的范围,则将newCapacity赋值为Integer.Max_VALUE
if (newCapacity - MAX_ARRAY_SIZE > 0)
newCapacity = hugeCapacity(minCapacity);
// 数组copy进行扩容
queue = Arrays.copyOf(queue, newCapacity);
}
  • remove()出队操作,出队永远是要删除根元素,即根节点不断下沉到尾结点,最后被覆盖。
img
img
img

结合上面的图解,我们来说明一下二叉堆的出队过程:

  1. 将找出队尾的元素8,并将它在队尾位置上删除(图2);

  2. 此时队尾元素8比根元素1的最小孩子3要大,所以将元素1下移,交换1和3的位置(图3);

  3. 然后此时队尾元素8比元素1的最小孩子4要大,继续将1下移,交换1和4的位置(图4);

  4. 然后此时根元素8比元素1的最小孩子9要小,不需要下移,直接将根元素8赋值给此时元素1的位置,1被覆盖则相当于删除(图5),结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public boolean remove(Object o) {
int i = indexOf(o);
if (i == -1)
return false;
else {
removeAt(i);
return true;
}
}

E removeAt(int i) {
// assert i >= 0 && i < size;
final Object[] es = queue;
modCount++;
int s = --size;
// 如果要删除的是数组最后一个元素,直接删除
if (s == i) // removed last element
es[i] = null;
else {
E moved = (E) es[s];
es[s] = null;
// 否则进行下沉,将元素下沉到队尾
siftDown(i, moved);
if (es[i] == moved) {
siftUp(i, moved);
if (es[i] != moved)
return moved;
}
}
return null;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
private void siftDown(int k, E x) {
// 如果比较器comparator不为空,则调用siftDownUsingComparator方法进行下移操作
if (comparator != null)
siftDownUsingComparator(k, x);
// 比较器comparator为空,则调用siftDownComparable方法进行下移操作
else
siftDownComparable(k, x);
}
private void siftDownComparable(int k, E x) {
Comparable<? super E> key = (Comparable<? super E>)x;
// 通过size/2找到一个没有叶子节点的元素
int half = size >>> 1; // loop while a non-leaf
// 比较位置k和half,如果k小于half,则k位置的元素就不是叶子节点
while (k < half) {
// 找到根元素的左孩子的位置[2n+1]
int child = (k << 1) + 1; // assume left child is least
// 左孩子的元素
Object c = queue[child];
// 找到根元素的右孩子的位置[2(n+1)]
int right = child + 1;
// 如果左孩子大于右孩子,则将c复制为右孩子的值,这里也就是找出左右孩子哪个最小
if (right < size &&
((Comparable<? super E>) c).compareTo((E) queue [right]) > 0)
c = queue[child = right];
// 如果队尾元素比根元素孩子都要小,则不需"下移",结束
if (key.compareTo((E) c) <= 0)
break;
// 队尾元素比根元素孩子都大,则需要"下移"
// 交换跟元素和孩子c的位置
queue[k] = c;
// 将根元素位置k指向最小孩子的位置,进入下层循环
k = child;
}
// 找到队尾元素x的合适位置k之后进行赋值
queue[k] = key;
}

// 这个方法和上面的操作一样,不多说了
private void siftDownUsingComparator(int k, E x) {
int half = size >>> 1;
while (k < half) {
int child = (k << 1) + 1;
Object c = queue[child];
int right = child + 1;
if (right < size &&
comparator.compare((E) c, (E) queue [right]) > 0)
c = queue[child = right];
if (comparator .compare(x, (E) c) <= 0)
break;
queue[k] = c;
k = child;
}
queue[k] = x;
}

删除不是直接将根元素删除,然后再将下面的元素做上移,重新补充根元素;而是找出队尾的元素,并在队尾的位置上删除,然后通过根元素的下移,给队尾元素找到一个合适的位置,最终覆盖掉根元素,从而达到删除根元素的目的。

  • 堆的构造:heapify()
1
2
3
4
5
6
7
8
9
10
11
12
private void heapify() {
final Object[] es = queue;
// i是为了找寻最后一个非叶子节点,然后倒序进行"下移"siftDown操作
int n = size, i = (n >>> 1) - 1;
final Comparator<? super E> cmp;
if ((cmp = comparator) == null)
for (; i >= 0; i--)
siftDownComparable(i, (E) es[i], es, n);
else
for (; i >= 0; i--)
siftDownUsingComparator(i, (E) es[i], es, n, cmp);
}

如果把数组中的数一个个取出来然后调用入队的方法会导致每次入队都要频繁地调整元素,性能消耗较大。

为了方便,我们将上面我们图解中的数组去掉几个元素,只留下7、6、5、12、10、3、1、11、15、4(顺序已经随机打乱)。ok、那么接下来,我们就按照当前的顺序建立一个二叉堆,暂时不用管它是否符合标准。

int a = [7, 6, 5, 12, 10, 3, 1, 11, 15, 4 ];

img

我们观察下用数组a建成的二叉堆,很明显,对于叶子节点4、15、11、1、3来说,它们已经是一个合法的堆。所以只要最后一个节点的父节点,也就是最后一个非叶子节点a[4]=10开始调整,然后依次调整a[3]=12,a[2]=5,a[1]=6,a[0]=7,分别对这几个节点做一次"下移"操作就可以完成了堆的构造。

img
img
img

小结

PriorityQueue其实实质就是二叉堆,是使用大根堆还是小根堆进行排列。入队和出队操作都是通过二叉堆的插入和删除操作实现的。


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!