java并发编程-ArrayBlockingQueue阻塞队列

本章章节:

  1、介绍什么是阻塞队列;

  2、常见的队列实现;

  3、阻塞队列实现的几个重要方法;

  4、ArrayBlockingQueue队列实现分析;

什么是阻塞队列?

     队列是一种数据结构,它有两个基本操作:在队列尾部加入一个元素,从队列头部移除一个元素。阻塞队里与普通的队列的区别在于,普通队列不会对当前线程产生阻塞,在面对类似消费者-生产者模型时,就必须额外的实现同步策略以及线程间唤醒策略。使用阻塞队列,就会对当前线程产生阻塞,当队列是空时,从队列中获取元素的操作将会被阻塞,当队列是满时,往队列里添加元素的操作也会被阻塞。

blob.png

常见的阻塞队列?

       自从Java 1.5之后,在java.util.concurrent包下提供了若干个阻塞队列,主要有以下几个:

  1.  a、ArrayBlockingQueue:基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小。并且可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列。本章主要介绍;

       b、LinkedBlockingQueue:基于链表实现的一个阻塞队列,此队列按 FIFO(先进先出)排序元素。队列的头部 是在队列中时间最长的元素。队列的尾部 是在队列中时间最短的元素。 新元素插入到队列的尾部,并且队列检索操作会获得位于队列头部的元素。链接队列的吞吐量通常要高于基于数组的队列, 但是在大多数并发应用程序中,其可预知的性能要低。 可选的容量范围构造方法参数作为防止队列过度扩展的一种方法。 如果未指定容量,则它等于 Integer.MAX_VALUE。除非插入节点会使队列超出容量,否则每次插入后会动态地创建链接节点。是作为生产者消费者的首选.

     c、PriorityBlockingQueue:以上2种队列都是先进先出队列,而PriorityBlockingQueue却不是,它会按照元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素。注意,此阻塞队列为无界阻塞队列,即容量没有上限(通过源码就可以知道,它没有容器满的信号标志),前面2种都是有界队列。

    d、DelayQueue:基于PriorityQueue,一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。  DelayQueue也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。

阻塞队列中的几个主要方法?

     阻塞队列包括了非阻塞队列中的大部分方法,上面列举的5个方法在阻塞队列中都存在,但是要注意这5个方法在阻塞队列中都进行了同步措施。除此之外,阻塞队列提供了另外4个非常有用的方法: 

    put(E e)
  take()
  offer(E e,long timeout, TimeUnit unit)
  poll(long timeout, TimeUnit unit) 
  put方法用来向队尾存入元素,如果队列满,则等待;
  take方法用来从队首取元素,如果队列为空,则等待;
  offer方法用来向队尾存入元素,如果队列满,则等待一定的时间,当时间期限达到时,如果还没有插入成功,则返回false;否则返回true;
  poll方法用来从队首取元素,如果队列空,则等待一定的时间,当时间期限达到时,如果取到,则返回null;否则返回取得的元素;

 如 例子:

    当限制阻塞队列数量为5时,添加了5个元素之后,继续添加将会队列外阻塞等待,此时程序并未终止。直到我们将队首元素移除,则可以继续向阻塞队列中添加元素:

	public static void main(String[] args) throws InterruptedException {

        BlockingQueue<String>  blockingQueue=new ArrayBlockingQueue<>(5);
        for (int i = 0; i < 20; i++) {
        //	blockingQueue.put("加入元素" + i); //put方法用来向队尾存入元素,如果队列满,则等待;
        	 boolean result=blockingQueue.offer("加入元素" + i);
        	 System.out.println("向阻塞队列中添加了元素:" + i +";result "+result);
        	 if(i>6){
        		 System.out.println("移除队首元素"+blockingQueue.take());
        	 }
		}
        System.out.println("程序到此运行结束,即将退出----");
		
	}

ArrayBlockingQueue原理分析?

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    /**
     * Serialization ID. This class relies on default serialization
     * even for the items array, which is default-serialized, even if
     * it is empty. Otherwise it could not be declared final, which is
     * necessary here.
     */
    private static final long serialVersionUID = -817911632652898426L;
    /** The queued items */
    final Object[] items;
    /** items index for next take, poll, peek or remove */
    int takeIndex;
    /** items index for next put, offer, or add */
    int putIndex;
    /** Number of elements in the queue */
    int count;
    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */
    /** Main lock guarding all access */
    final ReentrantLock lock;
    /** Condition for waiting takes */
    private final Condition notEmpty;
    /** Condition for waiting puts */
    private final Condition notFull;
    // Internal helper methods

  可以看出,ArrayBlockingQueue中用来存储元素的实际上是一个数组,takeIndex和putIndex分别表示队首元素和队尾元素的下标,count表示队列中元素的个数。

 lock是一个可重入锁,notEmpty和notFull是等待条件。

 下面看一下ArrayBlockingQueue的构造器,构造器有三个重载版本:

    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
    public ArrayBlockingQueue(int capacity, boolean fair,  Collection<? extends E> c) {
        this(capacity, fair);
        final ReentrantLock lock = this.lock;
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
            int i = 0;
            try {
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }

    第一个构造器只有一个参数用来指定容量,第二个构造器可以指定容量和公平性,第三个构造器可以指定容量、公平性以及用另外一个集合进行初始化。

 然后看它的两个关键方法的实现:put()和take():

public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            insert(e);
        } finally {
            lock.unlock();
        }
    }

    从put方法的实现可以看出,它先获取了锁,并且获取的是可中断锁,然后判断当前元素个数是否等于数组的长度,如果相等,则调用notFull.await()进行等待,如果捕获到中断异常,则唤醒线程并抛出异常。

 当被其他线程唤醒时,通过insert(e)方法插入元素,最后解锁。

 我们看一下insert方法的实现:

private void insert(E x) {
        items[putIndex] = x;
        putIndex = inc(putIndex);
        ++count;
        notEmpty.signal();
    }

 它是一个private方法,插入成功后,通过notEmpty唤醒正在等待取元素的线程。

 下面是take()方法的实现:

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return extract();
        } finally {
            lock.unlock();
        }
    }

    跟put方法实现很类似,只不过put方法等待的是notFull信号,而take方法等待的是notEmpty信号。在take方法中,如果可以取元素,则通过extract方法取得元素,下面是extract方法的实现:

private E extract() {
        final Object[] items = this.items;
        E x = this.<E>cast(items[takeIndex]);
        items[takeIndex] = null;
        takeIndex = inc(takeIndex);
        --count;
        notFull.signal();
        return x;
    }

    跟insert方法也很类似。

     其实从这里大家应该明白了阻塞队列的实现原理,事实它和我们用Object.wait()、Object.notify()和非阻塞队列实现生产者-消费者的思路类似,只不过它把这些工作一起集成到了阻塞队列中实现。

实例:

 通过使用Object.wait()和Object.notify()、非阻塞队列实现生产者-消费者模式:

public class Test {

	private int queueSize = 10;

	private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);

	public static void main(String[] args) {

		Test test = new Test();
		Product producer = test.new Product();
		Comsumer consumer = test.new Comsumer();

		producer.start();
		consumer.start();

	}

	class Comsumer extends Thread {
		@Override
		public void run() {
			while (true) {
				synchronized (queue) {
					while (queue.size() == 0) {
						try {
							System.out.println("队列空,等待数据");
							queue.wait();
						} catch (InterruptedException e) {
							e.printStackTrace();
							queue.notify();
						}
					}
					try {
						Thread.sleep(200);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					Integer integer = queue.poll();
					System.out.println("消费=" + integer);
					queue.notify();
					System.out.println("从队列取走一个元素,队列剩余" + queue.size() + "个元素");
				}
			}
		}
	}

	class Product extends Thread {
		@Override
		public void run() {
			while (true) {
				synchronized (queue) {
					while (queue.size() == queueSize) {
						try {
							System.out.println("队列满,等待有空余空间");
							queue.wait();
						} catch (InterruptedException e) {
							e.printStackTrace();
							queue.notify();
						}
					}
					try {
						Thread.sleep(200);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					queue.offer(1);
					queue.notify();
					System.out.println("向队列取中插入一个元素,队列剩余空间:"+ (queueSize - queue.size()));
				}
			}
		}
	}

}

    这个是经典的生产者-消费者模式,通过阻塞队列和Object.wait()和Object.notify()实现,wait()和notify()主要用来实现线程间通信。

 具体的线程间通信方式(wait和notify的使用)在后续问章中会讲述到。

    下面是使用阻塞队列实现的生产者-消费者模式:

public class Test {
    private int queueSize = 10;
    private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize);
     
    public static void main(String[] args)  {
        Test test = new Test();
        Producer producer = test.new Producer();
        Consumer consumer = test.new Consumer();
         
        producer.start();
        consumer.start();
    }
     
    class Consumer extends Thread{
         
        @Override
        public void run() {
            consume();
        }
         
        private void consume() {
            while(true){
                try {
                    queue.take();
                    System.out.println("从队列取走一个元素,队列剩余"+queue.size()+"个元素");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
     
    class Producer extends Thread{
         
        @Override
        public void run() {
            produce();
        }
         
        private void produce() {
            while(true){
                try {
                    queue.put(1);
                    System.out.println("向队列取中插入一个元素,队列剩余空间:"+(queueSize-queue.size()));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

    在并发编程中,一般推荐使用阻塞队列,这样实现可以尽量地避免程序出现意外的错误。阻塞队列使用最经典的场景就是socket客户端数据的读取和解析,读取数据的线程不断将数据放入队列,然后解析线程不断从队列取数据解析。还有其他类似的场景,只要符合生产者-消费者模型的都可以使用阻塞队列。


 参考链接:http://blog.csdn.net/ydj7501603/article/details/17246949

       http://www.cnblogs.com/dolphin0520/p/3932906.html


发表评论