解读 Java 并发队列 BlockingQueue

作者:微信小助手

发布时间:2018-12-08T22:47:18

(点击上方公众号,可快速关注)


来源:JavaDoop ,

javadoop.com/post/java-concurrent-queue


最近得空,想写篇文章好好说说 java 线程池问题,我相信很多人都一知半解的,包括我自己在仔仔细细看源码之前,也有许多的不解,甚至有些地方我一直都没有理解到位。


说到线程池实现,那么就不得不涉及到各种 BlockingQueue 的实现,那么我想就 BlockingQueue 的问题和大家分享分享我了解的一些知识。


本文没有像之前分析 AQS 那样一行一行源码分析了,不过还是把其中最重要和最难理解的代码说了一遍,所以不免篇幅略长。本文涉及到比较多的 Doug Lea 对 BlockingQueue 的设计思想,希望有心的读者真的可以有一些收获,我觉得自己还是写了一些干货的。


本文直接参考 Doug Lea 写的 java doc 和注释,这也是我们在学习 java 并发包时最好的材料了。希望大家能有所思、有所悟,学习 Doug Lea 的代码风格,并将其优雅、严谨的作风应用到我们写的每一行代码中。


BlockingQueue


开篇先介绍下 BlockingQueue 这个接口的规则,后面再看其实现。


首先,最基本的来说, BlockingQueue 是一个先进先出的队列(Queue),为什么说是阻塞(Blocking)的呢?是因为 BlockingQueue 支持当获取队列元素但是队列为空时,会阻塞等待队列中有元素再返回;也支持添加元素时,如果队列已满,那么等到队列可以放入新元素时再放入。


BlockingQueue 是一个接口,继承自 Queue,所以其实现类也可以作为 Queue 的实现来使用,而 Queue 又继承自 Collection 接口。


BlockingQueue 对插入操作、移除操作、获取元素操作提供了四种不同的方法用于不同的场景中使用:1、抛出异常;2、返回特殊值(null 或 true/false,取决于具体的操作);3、阻塞等待此操作,直到这个操作成功;4、阻塞等待此操作,直到成功或者超时指定时间。总结如下:



BlockingQueue 的各个实现都遵循了这些规则,当然我们也不用死记这个表格,知道有这么回事,然后写代码的时候根据自己的需要去看方法的注释来选取合适的方法即可。


对于 BlockingQueue,我们的关注点应该在 put(e) 和 take() 这两个方法,因为这两个方法是带阻塞的。


BlockingQueue 不接受 null 值的插入,相应的方法在碰到 null 的插入时会抛出 NullPointerException 异常。null 值在这里通常用于作为特殊值返回(表格中的第三列),代表 poll 失败。所以,如果允许插入 null 值的话,那获取的时候,就不能很好地用 null 来判断到底是代表失败,还是获取的值就是 null 值。


一个 BlockingQueue 可能是有界的,如果在插入的时候,发现队列满了,那么 put 操作将会阻塞。通常,在这里我们说的无界队列也不是说真正的无界,而是它的容量是 Integer.MAX_VALUE(21亿多)。


BlockingQueue 是设计用来实现生产者-消费者队列的,当然,你也可以将它当做普通的 Collection 来用,前面说了,它实现了 java.util.Collection 接口。例如,我们可以用 remove(x) 来删除任意一个元素,但是,这类操作通常并不高效,所以尽量只在少数的场合使用,比如一条消息已经入队,但是需要做取消操作的时候。


BlockingQueue 的实现都是线程安全的,但是批量的集合操作如 addAll, containsAll, retainAll 和 removeAll 不一定是原子操作。如 addAll(c) 有可能在添加了一些元素后中途抛出异常,此时 BlockingQueue 中已经添加了部分元素,这个是允许的,取决于具体的实现。


BlockingQueue 不支持 close 或 shutdown 等关闭操作,因为开发者可能希望不会有新的元素添加进去,此特性取决于具体的实现,不做强制约束。


最后,BlockingQueue 在生产者-消费者的场景中,是支持多消费者和多生产者的,说的其实就是线程安全问题。


相信上面说的每一句都很清楚了,BlockingQueue 是一个比较简单的线程安全容器,下面我会分析其具体的在 JDK 中的实现,这里又到了 Doug Lea 表演时间了。


BlockingQueue 实现之 ArrayBlockingQueue


ArrayBlockingQueue 是 BlockingQueue 接口的有界队列实现类,底层采用数组来实现。


其并发控制采用可重入锁来控制,不管是插入操作还是读取操作,都需要获取到锁才能进行操作。


如果读者看过我之前写的《一行一行源码分析清楚 AbstractQueuedSynchronizer(二)》 的关于 Condition 的文章的话,那么你一定能很容易看懂 ArrayBlockingQueue 的源码,它采用一个 ReentrantLock 和相应的两个 Condition 来实现。


https://javadoop.com/post/AbstractQueuedSynchronizer-2


ArrayBlockingQueue 共有以下几个属性:


// 用于存放元素的数组

final Object[] items;

// 下一次读取操作的位置

int takeIndex;

// 下一次写入操作的位置

int putIndex;

// 队列中的元素数量

int count;

 

// 以下几个就是控制并发用的同步器

final ReentrantLock lock;

private final Condition notEmpty;

private final Condition notFull;


我们用个示意图来描述其同步机制:



ArrayBlockingQueue 实现并发同步的原理就是,读操作和写操作都需要获取到 AQS 独占锁才能进行操作。如果队列为空,这个时候读操作的线程进入到读线程队列排队,等待写线程写入新的元素,然后唤醒读线程队列的第一个等待线程。如果队列已满,这个时候写操作的线程进入到写线程队列排队,等待读线程将队列元素移除腾出空间,然后唤醒写线程队列的第一个等待线程。


对于 ArrayBlockingQueue,我们可以在构造的时候指定以下三个参数:


  1. 队列容量,其限制了队列中最多允许的元素个数;

  2. 指定独占锁是公平锁还是非公平锁。非公平锁的吞吐量比较高,公平锁可以保证每次都是等待最久的线程获取到锁;

  3. 可以指定用一个集合来初始化,将此集合中的元素在构造方法期间就先添加到队列中。


更具体的源码我就不进行分析了,因为它就是 AbstractQueuedSynchronizer 中 Condition 的使用,感兴趣的读者请看我写的《一行一行源码分析清楚 AbstractQueuedSynchronizer(二)》,因为只要看懂了那篇文章,ArrayBlockingQueue 的代码就没有分析的必要了,当然,如果你完全不懂 Condition,那么基本上也就可以说看不懂 ArrayBlockingQueue 的源码了。


https://javadoop.com/post/AbstractQueuedSynchronizer-2/


BlockingQueue 实现之 LinkedBlockingQueue


底层基于单向链表实现的阻塞队列,可以当做无界队列也可以当做有界队列来使用。看构造方法:


// 传说中的无界队列

public LinkedBlockingQueue() {

    this(Integer.MAX_VALUE);

}

// 传说中的有界队列

public LinkedBlockingQueue(int capacity) {

    if (capacity <= 0) throw new IllegalArgumentException();

    this.capacity = capacity;

    last = head = new Node<E>(null);

}


我们看看这个类有哪些属性:


// 队列容量

private final int capacity;

 

// 队列中的元素数量

private final AtomicInteger count = new AtomicInteger(0);

 

// 队头

private transient Node<E> head;

 

// 队尾

private transient Node<E> last;

 

// take, poll, peek 等读操作的方法需要获取到这个锁

private final ReentrantLock takeLock = new ReentrantLock();

 

// 如果读操作的时候队列是空的,那么等待 notEmpty 条件

private final Condition notEmpty = takeLock.newCondition();

 

// put, offer 等写操作的方法需要获取到这个锁

private final ReentrantLock putLock = new ReentrantLock();

 

// 如果写操作的时候队列是满的,那么等待 notFull 条件

private final Condition notFull = putLock.newCondition();


这里用了两个锁,两个 Condition,简单介绍如下:


takeLock 和 notEmpty 怎么搭配:如果要获取(take)一个元素,需要获取 takeLock 锁,但是获取了锁还不够,如果队列此时为空,还需要队列不为空(notEmpty)这个条件(Condition)。


putLock 需要和 notFull 搭配:如果要插入(put)一个元素,需要获取 putLock 锁,但是获取了锁还不够,如果队列此时已满,还需要队列不是满的(notFull)这个条件(Condition)。


首先,这里用一个示意图来看看 LinkedBlockingQueue 的并发读写控制,然后再开始分析源码:



看懂这个示意图,源码也就简单了,读操作是排好队的,写操作也是排好队的,唯一的并发问题在于一个写操作和一个读操作同时进行,只要控制好这个就可以了。


先上构造方法:


public LinkedBlockingQueue(int capacity) {

    if (capacity <= 0) throw new IllegalArgumentException();

    this.capacity = capacity;

    last = head = new Node<E>(null);

}


注意,这里会初始化一个空的头结点,那么第一个元素入队的时候,队列中就会有两个元素。读取元素时,也总是获取头节点后面的一个节点。count 的计数值不包括这个头节点。


我们来看下 put 方法是怎么将元素插入到队尾的:


public void put(E e) throws InterruptedException {

    if (e == null) throw new NullPointerException();

    // 如果你纠结这里为什么是 -1,可以看看 offer 方法。这就是个标识成功、失败的标志而已。

    int c = -1;

    Node<E> node = new Node(e);

    final ReentrantLock putLock = this.putLock;

    final AtomicInteger count = this.count;

    // 必须要获取到 putLock 才可以进行插入操作

    putLock.lockInterruptibly();

    try {

        // 如果队列满,等待 notFull 的条件满足。

        while (count.get() == capacity) {

            notFull.await();

        }

        // 入队

        enqueue(node);

        // count 原子加 1,c 还是加 1 前的值

        c = count.getAndIncrement();

        // 如果这个元素入队后,还有至少一个槽可以使用,调用 notFull.signal() 唤醒等待线程。

        // 哪些线程会等待在 notFull 这个 Condition 上呢?

        if (c + 1 < capacity)

            notFull.signal();

    } finally {

        // 入队后,释放掉 putLock

        putLock.unlock();

    }

    // 如果 c == 0,那么代表队列在这个元素入队前是空的(不包括head空节点),

    // 那么所有的读线程都在等待 notEmpty 这个条件,等待唤醒,这里做一次唤醒操作

    if (c == 0)

        signalNotEmpty();

}

 

// 入队的代码非常简单,就是将 last 属性指向这个新元素,并且让原队尾的 next 指向这个元素

// 这里入队没有并发问题,因为只有获取到 putLock 独占锁以后,才可以进行此操作

private void enqueue(Node<E> node) {

    // assert putLock.isHeldByCurrentThread();

    // assert last.next == null;

    last = last.next = node;

}

 

// 元素入队后,如果需要,调用这个方法唤醒读线程来读

private void signalNotEmpty() {

    final ReentrantLock takeLock = this.takeLock;

    takeLock.lock();

    try {

        notEmpty.signal();

    } finally {

        takeLock.unlock();

    }

}


我们再看看 take 方法:


public E take() throws InterruptedException {

    E x;

    int c = -1;

    final AtomicInteger count = this.count;

    final ReentrantLock takeLock = this.takeLock;

    // 首先,需要获取到 takeLock 才能进行出队操作

    takeLock.lockInterruptibly();

    try {

        // 如果队列为空,等待 notEmpty 这个条件满足再继续执行

        while (count.get() == 0) {

            notEmpty.await();

        }

        // 出队

        x = dequeue();

        // count 进行原子减 1

        c = count.getAndDecrement();

        // 如果这次出队后,队列中至少还有一个元素,那么调用 notEmpty.signal() 唤醒其他的读线程

        if (c > 1)

            notEmpty.signal();

    } finally {

        // 出队后释放掉 takeLock

        takeLock.unlock();

    }

    // 如果 c == capacity,那么说明在这个 take 方法发生的时候,队列是满的

    // 既然出队了一个,那么意味着队列不满了,唤醒写线程去写

    if (c == capacity)

        signalNotFull();

    return x;

}

// 取队头,出队

private E dequeue() {

    // assert takeLock.isHeldByCurrentThread();

    // assert head.item == null;

    // 之前说了,头结点是空的

    Node<E> h = head;

    Node<E> first = h.next;

    h.next = h; // help GC

    // 设置这个为新的头结点

    head = first;

    E x = first.item;

    first.item = null;

    return x;

}

// 元素出队后,如果需要,调用这个方法唤醒写线程来写

private void signalNotFull() {

    final ReentrantLock putLock = this.putLock;

    putLock.lock();

    try {

        notFull.signal();

    } finally {

        putLock.unlock();

    }

}


源码分析就到这里结束了吧,毕竟还是比较简单的源码,基本上只要读者认真点都看得懂。


BlockingQueue 实现之 SynchronousQueue


它是一个特殊的队列,它的名字其实就蕴含了它的特征 – - 同步的队列。为什么说是同步的呢?这里说的并不是多线程的并发问题,而是因为当一个线程往队列中写入一个元素时,写入操作不会立即返回,需要等待另一个线程来将这个元素拿走;同理,当一个读线程做读操作的时候,同样需要一个相匹配的写线程的写操作。这里的 Synchronous 指的就是读线程和写线程需要同步,一个读线程匹配一个写线程。


我们比较少使用到 SynchronousQueue 这个类,不过它在线程池的实现类 ScheduledThreadPoolExecutor 中得到了应用,感兴趣的读者可以在看完这个后去看看相应的使用。


虽然上面我说了队列,但是 SynchronousQueue 的队列其实是虚的,其不提供任何空间(一个都没有)来存储元素。数据必须从某个写线程交给某个读线程,而不是写到某个队列中等待被消费。


你不能在 SynchronousQueue 中使用 peek 方法(在这里这个方法直接返回 null),peek 方法的语义是只读取不移除,显然,这个方法的语义是不符合 SynchronousQueue 的特征的。SynchronousQueue 也不能被迭代,因为根本就没有元素可以拿来迭代的。虽然 SynchronousQueue 间接地实现了 Collection 接口,但是如果你将其当做 Collection 来用的话,那么集合是空的。当然,这个类也是不允许传递 null 值的(并发包中的容器类好像都不支持插入 null 值,因为 null 值往往用作其他用途,比如用于方法的返回值代表操作失败)。


接下来,我们来看看具体的源码实现吧,它的源码不是很简单的那种,我们需要先搞清楚它的设计思想。


源码加注释大概有 1200 行,我们先看大框架:


// 构造时,我们可以指定公平模式还是非公平模式,区别之后再说

public SynchronousQueue(boolean fair) {

    transferer = fair ? new TransferQueue() : new TransferStack();

}

abstract static class Transferer {

    // 从方法名上大概就知道,这个方法用于转移元素,从生产者手上转到消费者手上

    // 也可以被动地,消费者调用这个方法来从生产者手上取元素

    // 第一个参数 e 如果不是 null,代表场景为:将元素从生产者转移给消费者

    // 如果是 null,代表消费者等待生产者提供元素,然后返回值就是相应的生产者提供的元素

    // 第二个参数代表是否设置超时,如果设置超时,超时时间是第三个参数的值

    // 返回值如果是 null,代表超时,或者中断。具体是哪个,可以通过检测中断状态得到。

    abstract Object transfer(Object e, boolean timed, long nanos);

}


Transferer 有两个内部实现类,是因为构造 SynchronousQueue 的时候,我们可以指定公平策略。公平模式意味着,所有的读写线程都遵守先来后到,FIFO 嘛,对应 TransferQueue。而非公平模式则对应 TransferStack。



我们先采用公平模式分析源码,然后再说说公平模式和非公平模式的区别。


接下来,我们看看 put 方法和 take 方法:


// 写入值

public void put(E o) throws InterruptedException {

    if (o == null) throw new NullPointerException();

    if (transferer.transfer(o, false, 0) == null) { // 1

        Thread.interrupted();

        throw new InterruptedException();

    }

}

// 读取值并移除

public E take() throws InterruptedException {

    Object e = transferer.transfer(null, false, 0); // 2

    if (e != null)

        return (E)e;

    Thread.interrupted();

    throw new InterruptedException();

}


我们看到,写操作 put(E o) 和读操作 take() 都是调用 Transferer.transfer(…) 方法,区别在于第一个参数是否为 null 值。


我们来看看 transfer 的设计思路,其基本算法如下:


  1. 当调用这个方法时,如果队列是空的,或者队列中的节点和当前的线程操作类型一致(如当前操作是 put 操作,而队列中的元素也都是写线程)。这种情况下,将当前线程加入到等待队列即可。

  2. 如果队列中有等待节点,而且与当前操作可以匹配(如队列中都是读操作线程,当前线程是写操作线程,反之亦然)。这种情况下,匹配等待队列的队头,出队,返回相应数据。


其实这里有个隐含的条件被满足了,队列如果不为空,肯定都是同种类型的节点,要么都是读操作,要么都是写操作。这个就要看到底是读线程积压了,还是写线程积压了。


我们可以假设出一个男女配对的场景:一个男的过来,如果一个人都没有,那么他需要等待;如果发现有一堆男的在等待,那么他需要排到队列后面;如果发现是一堆女的在排队,那么他直接牵走队头的那个女的。


既然这里说到了等待队列,我们先看看其实现,也就是 QNode:


static final class QNode {

    volatile QNode next;          // 可以看出来,等待队列是单向链表

    volatile Object item;         // CAS'ed to or from null