BlockingQueue
接口
BlockingQueue<E>
是 Java 并发包 java.util.concurrent
中的一个接口,它扩展了 Queue<E>
接口。
BlockingQueue
提供了额外的插入、提取操作,这些操作可以在队列为空时阻塞提取操作,在队列满时阻塞插入操作,直到队列有空闲空间。
主要特性
- 阻塞操作:如果队列为空,那么获取元素的操作(如
take()
)会被阻塞,直到队列中有元素可用。如果队列已满,那么插入元素的操作(如put()
)会被阻塞,直到队列中有空间。 - 线程安全:
BlockingQueue
的所有操作都是线程安全的,通过内部锁或其他并发控制机制实现。 - 不支持
null
元素:BlockingQueue
不接受null
元素,尝试插入null
将抛出NullPointerException
。 - 容量限制:
BlockingQueue
可以是容量受限的,也可以是无界的(例如总是报告Integer.MAX_VALUE
的剩余容量)。 - 内存一致性:放入队列的操作
happen-before
从队列中取出或移除元素的操作。
主要方法
插入方法
boolean add(E e)
- 尝试将元素
e
添加到队列的末尾。如果队列已满,则抛出IllegalStateException
。如果添加成功,返回true
。
- 尝试将元素
boolean offer(E e)
- 尝试将元素
e
添加到队列的末尾。如果队列已满,则返回false
。如果添加成功,返回true
。
- 尝试将元素
void put(E e) throws InterruptedException
- 将元素
e
添加到队列的末尾,如果队列已满,则阻塞当前线程,直到队列有空间。
- 将元素
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException
- 尝试在给定的时间内将元素
e
添加到队列的末尾。如果在超时时间内队列有空间,则返回true
。如果超时,返回false
。
- 尝试在给定的时间内将元素
删除方法
E remove()
- 移除并返回队列的头部元素。如果队列为空,则抛出
NoSuchElementException
。
- 移除并返回队列的头部元素。如果队列为空,则抛出
E poll()
- 获取并移除队列的头部元素,如果队列为空,则返回
null
。
- 获取并移除队列的头部元素,如果队列为空,则返回
E take() throws InterruptedException
- 获取并移除队列的头部元素,如果队列为空,则阻塞当前线程,直到队列有元素。
E poll(long timeout, TimeUnit unit) throws InterruptedException
- 尝试在给定的时间内获取并移除队列的头部元素。如果在超时时间内队列有元素,则返回该元素。如果超时,返回
null
。
- 尝试在给定的时间内获取并移除队列的头部元素。如果在超时时间内队列有元素,则返回该元素。如果超时,返回
检查方法
boolean contains(Object o)
- 检查队列是否包含指定的元素
o
。
- 检查队列是否包含指定的元素
int size()
- 返回队列中元素的数量。
int remainingCapacity()
- 返回队列还可以接受多少个额外的元素而不会阻塞。对于无界队列,返回
Integer.MAX_VALUE
。
- 返回队列还可以接受多少个额外的元素而不会阻塞。对于无界队列,返回
批量操作
int drainTo(Collection<? super E> c)
- 将队列中所有可用的元素移除并添加到给定的集合
c
中。
- 将队列中所有可用的元素移除并添加到给定的集合
int drainTo(Collection<? super E> c, int maxElements)
- 将队列中最多
maxElements
个可用的元素移除并添加到给定的集合c
中。
- 将队列中最多
ArrayBlockingQueue
实现
ArrayBlockingQueue
实现了 BlockingQueue
接口,并且扩展了 AbstractQueue
类。ArrayBlockingQueue
是一个基于数组的阻塞队列,它按照先进先出(FIFO)的顺序存储元素。
主要特点
- 基于数组:
ArrayBlockingQueue
使用固定大小的数组来存储元素。 - FIFO 顺序:新元素被添加到队列的末尾,而队列的操作(如
take
和poll
)从队列的开头获取元素。 - 阻塞操作:如果队列已满,试图向队列中添加元素的操作会阻塞;如果队列为空,试图从队列中移除元素的操作也会阻塞。
- 公平性:
ArrayBlockingQueue
支持公平性设置,默认情况下不保证公平性。如果设置为true
,则等待的生产者和消费者线程会按照它们进入队列的顺序被服务。 - 序列化:
ArrayBlockingQueue
实现了java.io.Serializable
接口,可以被序列化和反序列化。
内部类
Itrs
源码中的文档翻译如下:
迭代器和它们队列之间的共享数据,允许队列修改在元素被移除时更新迭代器。
这增加了许多复杂性,以正确处理一些不常见的操作,但是圆形数组和支持内部移除(即那些不在头部的移除)
有时会导致迭代器失去它们的位置和/或(重新)报告它们不应该报告的元素。为了避免这种情况,当队列有一个或多个迭代器时,
它会通过以下方式保持迭代器状态的一致性:
(1) 跟踪“周期”的数量,即takeIndex回绕到0的次数。
(2) 当内部元素被移除时(因此其他元素可能会被移动),通过回调removedAt通知所有迭代器。
这些足以消除迭代器的不一致性,但不幸的是,增加了维护迭代器列表的次要责任。我们使用一个简单的链表(仅在队列锁定时访问)
跟踪所有活动的迭代器,该链表由对Itr的弱引用组成。该列表通过以下3种机制进行清理:
(1) 每当创建新的迭代器时,进行一些O(1)的检查以查找过时的列表元素。
(2) 每当takeIndex回绕到0时,检查是否有迭代器在一个周期以上未被使用。
(3) 每当队列变空时,通知所有迭代器,并丢弃整个数据结构。
因此,除了必要的removedAt回调以保证正确性之外,迭代器还有shutdown和takeIndexWrapped回调,帮助从列表中移除过时的迭代器。
每当检查列表元素时,如果GC确定迭代器已被丢弃,或者迭代器报告它已经“分离”(不需要进一步的状态更新),则该元素会被清除。
当takeIndex从不前进,迭代器在耗尽之前被丢弃,且所有移除操作都是内部移除时,开销达到最大。但即使在这种情况下,我们也没有增加摊销复杂度。
必须小心保持列表清理方法不会递归地调用另一个此类方法,从而引起微妙的腐败错误。
Itrs
内部类是用于维护迭代器状态和队列元素之间的一致性的。当队列中存在迭代器时,它通过以下方式保持迭代器状态与队列操作同步:
- 跟踪循环次数:每当
takeIndex
到达队列末尾并回到队列开头时,循环次数增加。这有助于确定队列的当前状态,并确保迭代器能够正确地跟踪队列中的元素。 - 通知迭代器:当队列中删除内部元素时(即不在队列开头的位置删除),
removedAt
方法会被调用,以通知所有迭代器元素移动的情况。这有助于迭代器更新其内部状态,以反映队列中元素的变化。 - 清理过时的迭代器:
doSomeSweeping
方法用于清理过时的迭代器。当迭代器不再需要跟踪队列状态(例如,迭代器已完成或被垃圾回收器回收)时,它会从迭代器列表中移除。
Itrs
类包含以下关键成员和方法:
cycles
:记录takeIndex
循环次数的变量。head
:指向迭代器列表的链表头。sweeper
:用于在迭代器列表中清理过时元素的链表节点。Node
:内部类,用于在迭代器列表中维护迭代器的弱引用。register
:将新迭代器添加到迭代器列表的方法。takeIndexWrapped
:当takeIndex
到达队列末尾时调用,以通知所有迭代器队列状态的变化。removedAt
:当队列中删除内部元素时调用,以通知所有迭代器元素移动的情况。queueIsEmpty
:当队列变空时调用,以通知所有迭代器队列的状态。elementDequeued
:当队列中元素被移除时调用,以通知所有迭代器队列状态的变化。
Itr
Itr
内部类是用于实现 ArrayBlockingQueue
的迭代器。迭代器用于遍历队列中的元素,并且需要与队列的修改操作保持一致性。以下是 Itr
类的主要特性和方法:
主要特性
- 弱一致性:迭代器与队列的修改操作保持弱一致性。为了确保在调用
next()
时始终有元素可用,迭代器会在队列末尾预读取一个元素。 - 迭代器状态:迭代器跟踪队列中的元素位置,并在必要时调整其状态以反映队列的修改。
- 脱离模式:当所有索引都变为负数或
hasNext()
第一次返回false
时,迭代器会进入“脱离模式”。在脱离模式下,迭代器可以被立即从Itrs
列表中移除,而不需要等待垃圾回收。
主要方法
hasNext()
:检查是否还有下一个元素可以返回。next()
:返回下一个元素,并更新迭代器状态。forEachRemaining(Consumer<? super E> action)
:从调用处开始对剩余元素应用action
。如果forEachRemaining
返回,迭代器将被视为已完成,不再支持remove()
操作。remove()
:从队列中移除迭代器上次返回的元素。removedAt(int removedIndex)
:当队列中删除内部元素时调用,以通知迭代器元素移动的情况。takeIndexWrapped()
:当takeIndex
到达队列末尾时调用,以通知迭代器队列状态的变化。shutdown()
:当队列变空或迭代器严重滞后时调用,以通知迭代器放弃进一步迭代。incorporateDequeues()
:调整迭代器的状态以反映队列中的删除操作。detach()
:将迭代器设置为脱离模式。isDetached()
:检查迭代器是否处于脱离模式。distance(int index, int prevTakeIndex, int length)
:计算索引与prevTakeIndex
之间的距离。
区别
在 ArrayBlockingQueue
类中,Itrs
和 Itr
是两个不同的内部类,它们各自承担着不同的角色和职责。
Itrs 类
Itrs
类是ArrayBlockingQueue
的内部类,用于维护迭代器状态和队列元素之间的一致性。- 它跟踪队列中的迭代器数量,并确保迭代器能够正确地跟踪队列中的元素状态,即使队列正在进行修改操作。
Itrs
类包含一个循环计数器cycles
,用于跟踪takeIndex
循环次数。- 它还包含一个链表
head
,用于维护所有活跃迭代器的弱引用。 Itrs
类提供方法来添加新迭代器、清理过时迭代器、以及通知迭代器队列状态的变化。
Itr 类
Itr
类是ArrayBlockingQueue
的另一个内部类,它是迭代器的实现。- 它跟踪队列中的元素位置,并在必要时调整其状态以反映队列的修改。
Itr
类提供方法来检查是否有下一个元素、返回下一个元素、以及从队列中移除元素。- 它还包含一个
detach()
方法,用于将迭代器设置为脱离模式,这允许迭代器可以被立即从Itrs
列表中移除,而不需要等待垃圾回收。
区别
- 职责:
Itrs
类负责维护迭代器列表和队列状态的一致性,而Itr
类是迭代器的具体实现,负责跟踪队列中的元素位置。 - 状态跟踪:
Itrs
类跟踪迭代器的数量和状态,而Itr
类跟踪迭代器当前指向的队列元素位置。 - 通知机制:
Itrs
类包含方法来通知迭代器队列状态的变化,而Itr
类包含方法来处理这些通知并更新其状态。 - 操作同步:
Itrs
类操作需要在ArrayBlockingQueue
的锁控制下进行,而Itr
类操作可以在没有锁的情况下进行,因为它们不直接修改队列状态。
ArrayBlockingQueue
属性
属性名 | 类型 | 描述 | 说明 |
---|---|---|---|
items | Object[] | 存储队列元素的数组 | 这是一个最终数组,它用于存储队列中的元素。ArrayBlockingQueue 中的所有元素都存储在这个数组中。 |
takeIndex | int | 下一个从队列中提取元素的索引 | 当消费者从队列中获取元素时,会从 takeIndex 指向的元素开始。每次 take 操作后,takeIndex 会向后移动一个位置。 |
putIndex | int | 下一个向队列中添加元素的索引 | 当生产者向队列中添加元素时,会从 putIndex 指向的位置开始。每次 put 操作后,putIndex 会向后移动一个位置。 |
count | int | 队列中元素的个数 | count 变量记录了队列中当前有多少个元素。它是当前队列状态的一个重要指标。 |
lock | ReentrantLock | 用于同步队列操作的锁 | ReentrantLock 是一个可重入的互斥锁,用于确保对 ArrayBlockingQueue 的所有操作都是线程安全的。 |
notEmpty | Condition | 用于等待队列非空的条件 | 当队列空时,生产者会等待在这个条件上,直到队列中有元素可用。 |
notFull | Condition | 用于等待队列非满的条件 | 当队列满时,消费者会等待在这个条件上,直到队列中有空间可用。 |
itrs | Itrs | 用于维护迭代器状态的对象 | 当有迭代器正在遍历队列时,itrs 对象用于跟踪这些迭代器的当前状态,并确保它们能够正确地反映队列的修改。 |
主要方法
ArrayBlockingQueue
实现了 BlockingQueue
接口的所有方法,以及 Collection
和 Iterator
接口的一些可选方法。
boolean add(E e)
ArrayBlockingQueue
的add(E e)
方法调用的是AbstractQueue
的add(E e)
方法。
public boolean add(E e) {
return super.add(e);
}
调用offer(e)
方法:AbstractQueue
的add(E e)
方法会调用offer(E e)
方法。这个方法负责将元素e
添加到队列中。
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
检查元素是否为null:在offer(E e)
方法内部,首先通过Objects.requireNonNull(e)
确保传入的元素e
不为null。
获取并锁定独占锁:使用final ReentrantLock lock = this.lock;
获取队列的独占锁,并调用lock.lock()
方法上锁,确保在添加元素时的线程安全。
检查队列是否已满:通过if (count == items.length)
检查队列是否已满。如果队列已满,则返回false
。
查看代码
public boolean offer(E e) {
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
添加元素到队列:如果队列未满,则调用enqueue(e)
方法将元素e
添加到队列的putIndex
位置。然后更新putIndex
索引,如果putIndex
等于数组长度,则重置为0,可以实现循环队列的效果,但在本例中由于之前判断了数组长度,因此不会出现循环效果。同时增加队列的元素计数count
。
查看代码
private void enqueue(E e) { // ArrayBlockingQueue的enqueue
// assert lock.isHeldByCurrentThread();
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = e;
if (++putIndex == items.length) putIndex = 0;
count++;
notEmpty.signal();
}
通知等待的线程:在enqueue(e)
方法中,通过notEmpty.signal()
方法唤醒在notEmpty
条件上等待的线程,这些线程可能在等待队列中的元素。
public final void signal() {
ConditionNode first = firstWaiter;
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
if (first != null)
doSignal(first, false);
}
处理等待线程:signal()
方法内部会调用doSignal(first, false)
,因此传入的是false
,这个方法会尝试唤醒firstWaiter
指向的条件队列上的第一个节点。
查看代码
private void doSignal(ConditionNode first, boolean all) {
while (first != null) {
ConditionNode next = first.nextWaiter;
if ((firstWaiter = next) == null)
lastWaiter = null;
if ((first.getAndUnsetStatus(COND) & COND) != 0) {
enqueue(first);
if (!all)
break;
}
first = next;
}
}
如果条件队列的头节点存在,并且它的状态允许被唤醒,则将其加入到AQS队列中,并最终通过LockSupport.unpark(node.waiter)
唤醒对应的线程。
查看代码
final void enqueue(Node node) { // AQS的enqueue
if (node != null) {
for (;;) {
Node t = tail;
node.setPrevRelaxed(t); // avoid unnecessary fence
if (t == null) // initialize
tryInitializeHead();
else if (casTail(t, node)) {
t.next = node;
if (t.status < 0) // wake up to clean link
LockSupport.unpark(node.waiter);
break;
}
}
}
}
异常处理:如果offer(e)
返回false
,表示队列已满,add
方法会抛出IllegalStateException("Queue full")
异常。
释放独占锁:在finally
块中调用lock.unlock()
释放锁,无论添加操作是否成功。
boolean remove(Object o)
- 检查传入对象是否为 null:
- 如果传入的元素
o
为null
,则直接返回false
,因为ArrayBlockingQueue
不允许null
元素。
- 如果传入的元素
- 获取独占锁:
- 使用
final ReentrantLock lock = this.lock;
获取队列的独占锁,并通过lock.lock()
方法上锁,确保在移除元素时的线程安全。
- 使用
- 检查队列是否为空:
- 通过
if (count > 0)
检查队列是否为空。如果队列为空,则直接返回false
,因为没有元素可以移除。
- 通过
- 遍历队列以查找元素:
- 如果队列不为空,则获取队列元素的数组
final Object[] items = this.items;
。 - 使用一个循环遍历队列元素,查找与
o
相等的元素。由于ArrayBlockingQueue
是一个循环数组,循环会处理两个部分:从takeIndex
到putIndex
(如果takeIndex
小于putIndex
),然后从数组的开始到putIndex
(如果takeIndex
大于或等于putIndex
)。
- 如果队列不为空,则获取队列元素的数组
- 移除找到的元素:
- 如果找到了与
o
相等的元素,则调用removeAt(i)
方法移除该元素,并返回true
。
- 如果找到了与
- 释放独占锁:
- 在
finally
块中,通过lock.unlock()
释放锁,无论移除操作是否成功。
- 在
查看代码
public boolean remove(Object o) {
if (o == null) return false;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final Object[] items = this.items;
for (int i = takeIndex, end = putIndex,
to = (i < end) ? end : items.length;
; i = 0, to = end) {
for (; i < to; i++)
if (o.equals(items[i])) {
removeAt(i);
return true;
}
if (to == end) break;
}
}
return false;
} finally {
lock.unlock();
}
}
removeAt(int removeIndex)
方法:
- 如果移除的是队列头部的元素(即
removeIndex == takeIndex
),则只需将takeIndex
向前移动一位,并将原位置的元素设置为null
。如果takeIndex
达到数组末尾,则重置为0。 - 如果移除的是队列中间的元素,则需要将
removeIndex
之后的所有元素向前移动一位,直到putIndex
。如果putIndex
达到数组末尾,则从数组开始位置继续移动。 - 更新队列的元素计数
count
,并调用itrs.elementDequeued()
或itrs.removedAt(removeIndex)
更新迭代器状态。 - 最后,通过
notFull.signal()
唤醒在notFull
条件上等待的线程,因为队列可能现在有空间可以插入新的元素。
查看代码
void removeAt(final int removeIndex) {
// assert lock.isHeldByCurrentThread();
// assert lock.getHoldCount() == 1;
// assert items[removeIndex] != null;
// assert removeIndex >= 0 && removeIndex < items.length;
final Object[] items = this.items;
if (removeIndex == takeIndex) {
// removing front item; just advance
items[takeIndex] = null;
if (++takeIndex == items.length) takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
} else {
// an "interior" remove
// slide over all others up through putIndex.
for (int i = removeIndex, putIndex = this.putIndex;;) {
int pred = i;
if (++i == items.length) i = 0;
if (i == putIndex) {
items[pred] = null;
this.putIndex = pred;
break;
}
items[pred] = items[i];
}
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
}
notFull.signal();
}
LinkedBlockingDeque
实现
LinkedBlockingDeque
实现了一个线程安全的双端队列(deque),这意味着元素可以从两端插入和移除。
类声明
LinkedBlockingDeque
是一个泛型类,可以处理任何类型的元素,由类型参数 E
指定。
它扩展了 AbstractQueue
类,这意味着它继承了一些队列的基本实现。
它实现了 BlockingDeque
接口,这个接口扩展了 Deque
接口,并添加了阻塞操作。
它还实现了 Serializable
接口,这意味着这个类的实例可以被序列化和反序列化。
主要特性
- 线程安全:所有操作都是线程安全的,内部通过使用锁来保证并发访问时的数据一致性。
- 双端队列:允许元素从两端插入和移除。
- 可选容量限制:可以指定队列的最大容量,如果没有指定,则容量是无限的。
内部实现
LinkedBlockingDeque
使用内部节点类Node
来表示队列中的元素,每个节点包含元素值和指向前一个和后一个节点的引用。- 它使用
ReentrantLock
,用于头部操作和尾部操作,以及相应的条件变量来管理阻塞操作。
主要方法
E takeFirst()
获取并移除双端队列的前端元素,如果队列为空则阻塞等待。
查看代码
public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkFirst()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}
E takeLast()
获取并移除双端队列的后端元素,如果队列为空则阻塞等待。
putFirst(E e)
putFirst(E e)
方法将元素插入到双端队列的前端。
如果队列已满(即达到其容量限制),该方法会阻塞,直到队列中有空间可以插入新元素。
在等待期间,线程可能会被中断,如果发生这种情况,方法会抛出 InterruptedException
。
只有当元素成功插入队列后,方法才会返回。
查看代码
public void putFirst(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (!linkFirst(node))
notFull.await();
} finally {
lock.unlock();
}
}
offerFirst(E e)
offerFirst(E e)
方法同样将元素插入到双端队列的前端。
如果队列已满,该方法不会阻塞,而是立即返回 false
。
方法不会抛出 InterruptedException
,即使线程在等待锁时被中断。
查看代码
public boolean offerFirst(E e) {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
return linkFirst(node);
} finally {
lock.unlock();
}
}