BlockingQueue

为什么要使用阻塞队列

JDK 实现的生产者-消费者模式,并且支持 block即阻塞线程,阻塞通常指的是

  • 队列空时,消费者线程等待队列非空
  • 队列满时,生产者线程等待队列可用

BlockingQueue 核心方法

方法\处理方式抛出异常返回特殊值一直阻塞超时退出
插入方法addofferputoffer
删除方法removepolltakepoll
检查方法elementpeek
  • 返回特殊值:如果试图操作无法立即执行,返回一个特殊值,通常是 true/false
  • 阻塞:如果试图操作无法立即执行,则一直阻塞或相应中断
  • 超时退出:如果操作无法立即执行,阻塞指定时间,返回一个特殊值告知操作是否完成,通常是 true/fasle

注意

  • 不能插入 null , 会抛出空指针异常

BlockingQueue 的实现类

ArraryBlockingQueue

  • 数组结构的有界阻塞队列。
  • 容量不可变
  • 默认非公平锁

LinkedBlockingQueue

  • 链表结构的有界阻塞队列
  • 默认大小为 Integer.MAX_VALUE

DelayQueue

  • 队列中元素只有当延迟时间到了,才能从队列中获取到
  • 无界队列

PriorityBlockingQueue

  • 基于优先级无界阻塞队列

SynchronousQueue

  • 没有内部容量
  • 每个 put 必须等待一个 take

阻塞队列原理

阻塞队列利用了 Lock 锁的多条件 Condition 阻塞控制。

我们来分析下 ArrayBlockingQueue 的源码

构造器初始化了队列大小和是否为公平锁,还对锁初始化了两个监视器,分别是 notEmptynotFull. 当线程是put操作时,加上监视器 notFull, 标记为生产者;当线程时 take操作时,加上监视器notFull,标记为消费者。

构造器如下:

public ArrayBlockingQueue(int capacity, boolean fair) {  
    if (capacity <= 0)  
        throw new IllegalArgumentException();  
    this.items = new Object[capacity];  
    lock = new ReentrantLock(fair);  
    notEmpty = lock.newCondition();  
    notFull =  lock.newCondition();  
}

put 方法:

public void put(E e) throws InterruptedException {  
    checkNotNull(e);  
    final ReentrantLock lock = this.lock;  
    // 自旋获取锁
    lock.lockInterruptibly();  
    try {  
	    //队列满
        while (count == items.length)  
            //如果队列满 阻塞线程,释放锁, 标记当前线程为 notFull ,等待唤醒
            notFull.await();  
        enqueue(e);  
    } finally {  
        lock.unlock();  
    }  
}

private void enqueue(E x) {  
    // assert lock.getHoldCount() == 1;  
    // assert items[putIndex] == null;    final Object[] items = this.items;  
    items[putIndex] = x;  
    if (++putIndex == items.length)  
        putIndex = 0;  
    count++;  
    // 唤醒等待的消费者线程
    notEmpty.signal();  
}
  1. 竞争lock锁
  2. 如果阻塞队列满了,调用 await 阻塞当前线程,并标记为 notFull,释放lock锁,等待消费者线程唤醒
  3. 如果队列没满,调用enqueue将元素放进阻塞队列,唤醒一个标记为notEmpty的消费者线程

take 方法源码:

public E take() throws InterruptedException {  
    final ReentrantLock lock = this.lock;  
    lock.lockInterruptibly();  
    try {  
        while (count == 0)  
            notEmpty.await();  
        return dequeue();  
    } finally {  
        lock.unlock();  
    }  
}

private E dequeue() {  
    // assert lock.getHoldCount() == 1;  
    // assert items[takeIndex] != null;    final Object[] items = this.items;  
    @SuppressWarnings("unchecked")  
    E x = (E) items[takeIndex];  
    items[takeIndex] = null;  
    if (++takeIndex == items.length)  
        takeIndex = 0;  
    count--;  
    if (itrs != null)  
        itrs.elementDequeued();  
    notFull.signal();  
    return x;  
}
  1. 竞争 lock 锁
  2. 判断队列是否为空,为空调用 await 阻塞队列,标记为 notEmpty(消费者线程),释放lock锁,等待生产者线程唤醒它
  3. 如果不为空 调用 dequeue方法,唤醒 notFull(生产者)线程

总结

  1. put 和 take 操作都要先获取锁,没有获取到锁的线程会 CAS 自旋,直到获取到锁
  2. 拿到锁后,需要判断队列是否可用(满/空),如果不可用,会调用await,并释放锁
  3. 步骤2被阻塞的线程唤醒后,依然需要拿到锁才能往下执行,拿到了锁依然要while判断队列是否可用

使用场景

  • 生产者-消费者模型
  • 线程池中使用阻塞队列
Last Updated:
Contributors: himcs