BlockingQueue
为什么要使用阻塞队列
JDK 实现的生产者-消费者模式,并且支持 block
即阻塞线程,阻塞通常指的是
- 队列空时,消费者线程等待队列非空
- 队列满时,生产者线程等待队列可用
BlockingQueue 核心方法
方法\处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入方法 | add | offer | put | offer |
删除方法 | remove | poll | take | poll |
检查方法 | element | peek |
- 返回特殊值:如果试图操作无法立即执行,返回一个特殊值,通常是 true/false
- 阻塞:如果试图操作无法立即执行,则一直阻塞或相应中断
- 超时退出:如果操作无法立即执行,阻塞指定时间,返回一个特殊值告知操作是否完成,通常是 true/fasle
注意
- 不能插入 null , 会抛出空指针异常
BlockingQueue 的实现类
ArraryBlockingQueue
- 数组结构的有界阻塞队列。
- 容量不可变
- 默认非公平锁
LinkedBlockingQueue
- 链表结构的有界阻塞队列
- 默认大小为
Integer.MAX_VALUE
DelayQueue
- 队列中元素只有当延迟时间到了,才能从队列中获取到
- 无界队列
PriorityBlockingQueue
- 基于优先级的无界阻塞队列
SynchronousQueue
- 没有内部容量
- 每个 put 必须等待一个 take
阻塞队列原理
阻塞队列利用了 Lock
锁的多条件 Condition
阻塞控制。
我们来分析下 ArrayBlockingQueue
的源码
构造器初始化了队列大小和是否为公平锁,还对锁初始化了两个监视器,分别是 notEmpty
和 notFull
. 当线程是put
操作时,加上监视器 notFull
, 标记为生产者;当线程时 take
操作时,加上监视器notFull
,标记为消费者。
构造器如下:
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
put 方法:
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 自旋获取锁
lock.lockInterruptibly();
try {
//队列满
while (count == items.length)
//如果队列满 阻塞线程,释放锁, 标记当前线程为 notFull ,等待唤醒
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null; final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
// 唤醒等待的消费者线程
notEmpty.signal();
}
- 竞争lock锁
- 如果阻塞队列满了,调用
await
阻塞当前线程,并标记为notFull
,释放lock
锁,等待消费者线程唤醒 - 如果队列没满,调用
enqueue
将元素放进阻塞队列,唤醒一个标记为notEmpty
的消费者线程
take 方法源码:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null; final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
- 竞争 lock 锁
- 判断队列是否为空,为空调用
await
阻塞队列,标记为notEmpty
(消费者线程),释放lock
锁,等待生产者线程唤醒它 - 如果不为空 调用
dequeue
方法,唤醒notFull
(生产者)线程
总结
- put 和 take 操作都要先获取锁,没有获取到锁的线程会 CAS 自旋,直到获取到锁
- 拿到锁后,需要判断队列是否可用(满/空),如果不可用,会调用
await
,并释放锁 - 步骤2被阻塞的线程唤醒后,依然需要拿到锁才能往下执行,拿到了锁依然要
while
判断队列是否可用
使用场景
- 生产者-消费者模型
- 线程池中使用阻塞队列