java并发编程-LinkedBlockingQueue阻塞队列

   LinkedBlockingQueue:基于链表(单向)实现的一个阻塞队列,此队列按 FIFO(先进先出)排序元素。队列的头部 是在队列中时间最长的元素。队列的尾部 是在队列中时间最短的元素。 新元素插入到队列的尾部,并且队列检索操作会获得位于队列头部的元素。链接队列的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可预知的性能要低。可选的容量范围构造方法参数作为防止队列过度扩展的一种方法。如果未指定容量,则它等于 Integer.MAX_VALUE。除非插入节点会使队列超出容量,否则每次插入后会动态地创建链接节点。是作为生产者消费者的首选

  如下:LinkedBlockingQueue数据结构图:

blob.png

对于LinkedBlockingQueue需要掌握以下几点

   创建

   入队(添加元素)

   出队(删除元素)

LinkedBlockingQueue实现原理:

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {   
static class Node<E> {
        E item;//节点封装的数据
        /**
         * One of:
         * - the real successor Node
         * - this Node, meaning the successor is head.next
         * - null, meaning there is no successor (this is the last node)
         */

        Node<E> next;//下一个节点
        Node(E x) { item = x; }
    }

    /** 指定链表容量  */
    private final int capacity;

    /** 当前的元素个数 */
    private final AtomicInteger count = new AtomicInteger(0);

    /** 链表头节点 */
    private transient Node<E> head;

    /** 链表尾节点 */
    private transient Node<E> last;

    /** 出队锁 */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** 出队等待条件 */
    private final Condition notEmpty = takeLock.newCondition();

    /** 入队锁 */
    private final ReentrantLock putLock = new ReentrantLock();

    /** 入队等待条件 */
    private final Condition notFull = putLock.newCondition();

  可以看出,LinkedBlockingQueue中用来存储元素是基于Node链条元素,head和last分别表示队首元素和队尾元素,count表示队列中元素的个数。ReentrantLock是一个可重入锁,notEmpty和putLock是等待条件。

     下面看一下ArrayBlockingQueue的构造器,构造器有三个重载版本:

    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }
    public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); // Never contended, but necessary for visibility
        try {
            int n = 0;
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                enqueue(new Node<E>(e));
                ++n;
            }
            count.set(n);
        } finally {
            putLock.unlock();
        }
    }

      第一个构造器默认以Integer.MAX_VALUE为指定的最大容量,第二个构造器可以指定容量,第三个构造器可以指定个集合进行初始化,并初始转换成链表结构。

  入队:

    public boolean offer(E e)

   原理:

      在队尾插入一个元素, 如果队列没满,立即返回true; 如果队列满了,立即返回false

   使用方法: abq.offer("hello1");

/**
     * 在队尾插入一个元素, 容量没满,可以立即插入,返回true; 队列满了,直接返回false
     * 注:如果使用了限制了容量的队列,这个方法比add()好,因为add()插入失败就会抛出异常
     */
public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count; // 获取队列中的元素个数
        if (count.get() == capacity) // 队列满了

            return false;
        int c = -1;
        Node<E> node = new Node(e);
        final ReentrantLock putLock = this.putLock; // 获取入队锁

        putLock.lock();
        try {
            if (count.get() < capacity) { // 容量没满
                enqueue(node); // 入队
                c = count.getAndIncrement();// 容量+1,返回旧值(注意)
                if (c + 1 < capacity) // 如果添加元素后的容量,还小于指定容量(说明在插入当前元素后,至少还可以再插一个元素)
                    notFull.signal();// 唤醒等待notFull条件的其中一个线程
            }
        } finally {
            putLock.unlock();// 释放入队锁
        }
        if (c == 0) // 如果c==0,这是什么情况?一开始如果是个空队列,就会是这样的值,要注意的是,上边的c返回的是旧值
            signalNotEmpty();
        return c >= 0;
    }
/**
     * 创建一个节点,并加入链表尾部
     * @param x
     */
    private void enqueue(E x) {
        /*
         * 封装新节点,并赋给当前的最后一个节点的下一个节点,然后在将这个节点设为最后一个节点
         */
        last = last.next = new Node<E>(x);
    }
  private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();//获取出队锁
        try {
            notEmpty.signal();//唤醒等待notEmpty条件的线程中的一个
        } finally {
            takeLock.unlock();//释放出队锁
        }
    }
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException

 原理:

    在队尾插入一个元素,,如果队列已满,则进入等待,直到出现以下三种情况:

     被唤醒

     等待时间超时

     当前线程被中断

 使用方法:

try {
           abq.offer("hello2",1000,TimeUnit.MILLISECONDS);
       } catch (InterruptedException e) {
           e.printStackTrace();
       }
/**
     * 在队尾插入一个元素,,如果队列已满,则进入等待,直到出现以下三种情况: 
     * 1、被唤醒 
     * 2、等待时间超时 
     * 3、当前线程被中断
     */
    public boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException {

        if (e == null)
            throw new NullPointerException();
        long nanos = unit.toNanos(timeout);// 转换为纳秒
        int c = -1;
        final ReentrantLock putLock = this.putLock;// 入队锁
        final AtomicInteger count = this.count;// 总数量
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {// 容量已满
                if (nanos <= 0)// 已经超时
                    return false;
                /*
                 * 进行等待: 在这个过程中可能发生三件事: 
                 * 1、被唤醒-->继续当前这个while循环
                 * 2、超时-->继续当前这个while循环 
                 * 3、被中断-->抛出中断异常InterruptedException
                 */
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(e);// 入队
            c = count.getAndIncrement();// 入队元素数量+1
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return true;
    }

注意:

  awaitNanos(nanos)是AQS中的一个方法,这里就不详细说了,有兴趣的自己去查看AQS的源代码。

public void put(E e) throws InterruptedException

 原理:

  在队尾插入一个元素,如果队列满了,一直阻塞,直到队列不满了或者线程被中断

 使用方法:

  try {
            abq.put("hello1");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
/**
     * 在队尾插一个元素
     * 如果队列满了,一直阻塞,直到队列不满了或者线程被中断
     */
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        final ReentrantLock putLock = this.putLock;//入队锁
        final AtomicInteger count = this.count;//当前队列中的元素个数
        putLock.lockInterruptibly();//加锁
        try {
            while (count.get() == capacity) {//如果队列满了 
                /*
                 * 加入notFull等待队列,直到队列元素不满了,
                 * 被其他线程使用notFull.signal()唤醒
                 */
                notFull.await();
            }
            enqueue(e);//入队
            c = count.getAndIncrement();//入队数量+1
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }

出队

  public E poll()

原理:

   如果没有元素,直接返回null;如果有元素,出队

使用方法:

abq.poll();

/**
     * 出队: 
     * 1、如果没有元素,直接返回null 
     * 2、如果有元素,出队
     */
    public E poll() {
        final AtomicInteger count = this.count;// 获取元素数量
        if (count.get() == 0)// 没有元素
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();// 获取出队锁
        try {
            if (count.get() > 0) {// 有元素
                x = dequeue();// 出队
                // 元素个数-1(注意:该方法是一个无限循环,直到减1成功为止,且返回旧值)
                c = count.getAndDecrement();
                if (c > 1)// 还有元素(如果旧值c==1的话,那么通过上边的操作之后,队列就空了)
                    notEmpty.signal();// 唤醒等待在notEmpty队列中的其中一条线程
            }
        } finally {
            takeLock.unlock();// 释放出队锁
        }
        if (c == capacity)// c == capacity是怎么发生的?如果队列是一个满队列,注意:上边的c返回的是旧值
            signalNotFull();
        return x;
    }
  /**
     * 从队列头部移除一个节点
     */
    private E dequeue() {
        Node<E> h = head;//获取头节点:x==null
        Node<E> first = h.next;//将头节点的下一个节点赋值给first
        h.next = h; // 将当前将要出队的节点置null(为了使其做head节点做准备)
        head = first;//将当前将要出队的节点作为了头节点
        E x = first.item;//获取出队节点的值
        first.item = null;//将出队节点的值置空
        return x;
    }
private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }

注意:出队逻辑如果不懂,查看最后总结部分的图

public E poll(long timeout, TimeUnit unit) throws InterruptedException

原理:

   从队头删除一个元素,如果队列不空,出队;如果队列已空且已经超时,返回null;如果队列已空且时间未超时,则进入等待,直到出现以下三种情况:

  被唤醒

  等待时间超时

  当前线程被中断

使用方法:

try {
            abq.poll(1000, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
   /**
     * 从队列头部删除一个元素,
     * 如果队列不空,出队;
     * 如果队列已空,判断时间是否超时,如果已经超时,返回null
     * 如果队列已空且时间未超时,则进入等待,直到出现以下三种情况:
     * 1、被唤醒
     * 2、等待时间超时
     * 3、当前线程被中断
     */
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        int c = -1;
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {//如果队列没有元素
                if (nanos <= 0)//已经超时
                    return null;
                /*
                 * 进行等待:
                 * 在这个过程中可能发生三件事:
                 * 1、被唤醒-->继续当前这个while循环
                 * 2、超时-->继续当前这个while循环
                 * 3、被中断-->抛出异常
                 */
                nanos = notEmpty.awaitNanos(nanos);
            }
            x = dequeue();//出队
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
public E take() throws InterruptedException

原理:

  将队头元素出队,如果队列空了,一直阻塞,直到队列不为空或者线程被中断

使用方法:

try {
            abq.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
/**
     * 出队:
     * 如果队列空了,一直阻塞,直到队列不为空或者线程被中断
     */
    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;//获取队列中的元素总量
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();//获取出队锁
        try {
            while (count.get() == 0) {//如果没有元素,一直阻塞
                /*
                 * 加入等待队列, 一直等待条件notEmpty(即被其他线程唤醒)
                 * (唤醒其实就是,有线程将一个元素入队了,然后调用notEmpty.signal()唤醒其他等待这个条件的线程,同时队列也不空了)
                 */
                notEmpty.await();
            }
            x = dequeue();//出队
            c = count.getAndDecrement();//元素数量-1
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

总结:

1、具体入队与出队的原理图:

   图中每一个节点前半部分表示封装的数据x,后边的表示指向的下一个引用。

1.1、初始化

blob.png

 初始化之后,初始化一个数据为null,且head和last节点都是这个节点。

1.2、入队两个元素过后

blob.png

这个可以根据入队方法enqueue(E x)来看,源代码再贴一遍:

 /**
    * 创建一个节点,并加入链表尾部
    *
    * @param x
    */
   private void enqueue(E x) {
       /*
        * 封装新节点,并赋给当前的最后一个节点的下一个节点,然后在将这个节点设为最后一个节点
        */
       last = last.next = new Node<E>(x);
   }

1.3、出队一个元素后

blob.png

表面上看,只是将头节点的next指针指向了要删除的x1.next,事实上这样我觉的就完全可以,但是jdk实际上是将原来的head节点删除了,而上边看到的这个head节点,正是刚刚出队的x1节点,只是其值被置空了。

这一块对应着源代码来看:dequeue():

  /**
     * 从队列头部移除一个节点
     */
    private E dequeue() {
        Node<E> h = head;// 获取头节点:x==null
        Node<E> first = h.next;// 将头节点的下一个节点赋值给first
        h.next = h; // 将当前将要出队的节点置null(为了使其做head节点做准备)
        head = first;// 将当前将要出队的节点作为了头节点
        E x = first.item;// 获取出队节点的值
        first.item = null;// 将出队节点的值置空
        return x;
    }

2、三种入队对比:

  offer(E e):如果队列没满,立即返回true; 如果队列满了,立即返回false–>不阻塞

  put(E e):如果队列满了,一直阻塞,直到队列不满了或者线程被中断–>阻塞

  offer(E e, long timeout, TimeUnit unit):在队尾插入一个元素,,如果队列已满,则进入等待,直到出现以下三种情况:–>阻塞

   被唤醒

   等待时间超时

   当前线程被中断

3、三种出队对比: 

  poll():如果没有元素,直接返回null;如果有元素,出队

  take():如果队列空了,一直阻塞,直到队列不为空或者线程被中断–>阻塞

  poll(long timeout, TimeUnit unit):如果队列不空,出队;如果队列已空且已经超时,返回null;如果队列已空且时间未超时,则进入等待,直到 出现以下三种情况:

   被唤醒

   等待时间超时

   当前线程被中断

4、ArrayBlockingQueue与LinkedBlockingQueue对比

    ArrayBlockingQueue:一个对象数组+一把锁+两个条件,入队与出队都用同一把锁,在只有入队高并发或出队高并发的情况下,因为操作数组,且不需要扩容,性能很高,采用了数组,必须指定大小,即容量有限.

   LinkedBlockingQueue:一个单向链表+两把锁+两个条件,两把锁,一把用于入队,一把用于出队,有效的避免了入队与出队时使用一把锁带来的竞争。在入队与出队都高并发的情况下,性能比ArrayBlockingQueue高很多,采用了链表,最大容量为整数最大值,可看做容量无限

发表评论