队列Queue–非阻塞队列

本章章节:

      Queue与Deque简介与使用;

      PriorityQueue简介与使用;

     ConcurrentLinkedQueue简介与使用;


    Queue用于模拟"队列"这种数据结构(先进先出 FIFO)。队列的头部保存着队列中存放时间最长的元素,队列的尾部保存着队列中存放时间最短的元素。新元素插入(offer)到队列的尾部,访问元素(poll)操作会返回队列头部的元素,peek 获取最后的一个元素,队列不允许随机访问队列中的元素。结合生活中常见的排队就会很好理解这个概念。

    Deque接口代表一个"双端队列",双端队列可以同时从两端来添加、删除元素,因此Deque的实现类既可以当成队列使用、也可以当成栈使用。

    如实现排队案例:

public class Test {
	public static void main(String[] args) {
		Queue<String> queue=new ArrayDeque<>();
		for (int i = 0; i < 5; i++) {
			queue.offer("user-"+i);
		}
		while (queue.peek()!=null) {
			System.out.println(queue.poll());
		}
	}
}

   用Deque实现类似堆栈的效果:—Stack

public class MyStack<T> {
    // 堆栈容器
    private Deque<T> constains = new ArrayDeque<>();
    private int cap;
    public MyStack(int cap) {
        super();
        this.cap = cap;
    }
    /*
     * 进栈
     */
    public boolean push(T t) {
        if (constains.size()+1 > cap) {
            return false;
        }
        return constains.offerLast(t);
    }
    /*
     * 出栈
     */
    public T prop() {
        return constains.pollLast();
    }

    public T peek() {
        return constains.peekLast();
    }
    public int size() {
        return constains.size();
    }
}

    非阻塞队列有:PriorityQueue、LinkedList(LinkedList是双向链表,它实现了Dequeue接口)、ConcurrentLinkedQueue ;

     PriorityQueue:是个基于优先级堆的极大优先级队列。优先队列本质上就是一个最小堆。

    此队列按照在构造时所指定的顺序对元素排序,既可以根据元素的自然顺序来指定排序(参阅 Comparable),也可以根据 Comparator 来指定,这取决于使用哪种构造方法。优先级队列不允许 null 元素。依靠自然排序的优先级队列还不允许插入不可比较的对象(这样做可能导致 ClassCastException)

     优先级队列是无界的,但是有一个内部容量(默认是11),控制着用于存储队列元素的数组大小。它通常至少等于队列的大小。随着不断向优先级队列添加元素,其容量会自动增加。无需指定容量增加策略的细节

public static void main(String[] args) {
		PriorityQueue<Object> queue = new PriorityQueue<Object>();
		queue.add("AAAAA"); // Add接受的参数是Obj,PriorityQueue使用integer
		queue.add("BBBBB");
		queue.add("CCCCC");
		queue.add("DDDDD");
		System.out.println(queue.peek()); // 获取但不移除此队列的头
		System.out.println(queue.poll()); // 获取并移除此队列的头
}

   定义比较器:

 private static PriorityQueue queue = new PriorityQueue(10,new Comparators());  
     public static void main(String[] args) {  
          QueueObject queueObject = new QueueObject();  
          queueObject.setId(4);  
          queueObject.setObject("AAAAA");  
          queue.add(queueObject);  
          QueueObject queueObject1 = new QueueObject();  
          queueObject1.setId(1);  
          queueObject1.setObject("BBBBB");  
          queue.add(queueObject1);  
          QueueObject queueObject2 = new QueueObject();  
          queueObject2.setId(3);  
          queueObject2.setObject("CCCCC");  
          queue.add(queueObject2);  
            
          System.out.println(((QueueObject)queue.poll()).getObject());  
          System.out.println(((QueueObject)queue.poll()).getObject());  
          System.out.println(((QueueObject)queue.poll()).getObject());  
     }  
}  
class QueueObject {  
    private int id;  
    private Object object;  
    public int getId() {  
        return id;  
    }  
    public void setId(int id) {  
        this.id = id;  
    }  
    public Object getObject() {  
        return object;  
    }  
    public void setObject(Object object) {  
        this.object = object;  
    }  
}  
@SuppressWarnings("unchecked")  
class Comparators implements Comparator{  
    public int compare(Object arg0, Object arg1) {  
        int val1 = ((QueueObject)arg0).getId();  
        int val2 = ((QueueObject)arg1).getId();  
        return val1 < val2 ? 0 : 1;  
    }  
}

  详细的内部解析:http://shmilyaw-hotmail-com.iteye.com/blog/1827136

    注意:该队列是用数组实现,但是数组大小可以动态增加,容量无限。此实现不是同步的。不是线程安全的。如果多个线程中的任意线程从结构上修改了列表, 则这些线程不应同时访问 PriorityQueue 实例,这时请使用线程安全的PriorityBlockingQueue 类。不允许使用 null 元素。

    ConcurrentLinkedQueue:

        基于链接节点的、无界的、线程安全。此队列按照 FIFO(先进先出)原则对元素进行排序。队列的头部 是队列中时间最长的元素。队列的尾部 是队列中时间最短的元素。新的元素插入到队列的尾部,队列检索操作从队列头部获得元素。当许多线程共享访问一个公共 collection 时,ConcurrentLinkedQueue 是一个恰当的选择。此队列不允许 null 元素。

public class ConcurrentLinkedQueueTest {

	private static ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<Integer>();

	public static void main(String[] args) throws InterruptedException {
		long timeStart = System.currentTimeMillis();

		ConcurrentLinkedQueueTest.offer();
		System.out.println("----生成完毕,进行并发消费------");
		ExecutorService executorService = Executors.newFixedThreadPool(4);
		for (int i = 0; i < 3; i++) {
			executorService.submit(new Poll());
		}
		System.out.println("cost time "
				+ (System.currentTimeMillis() - timeStart) + "ms");
	}

	/**
	 * 生产
	 */
	public static void offer() {
		for (int i = 0; i < 10; i++) {
			queue.offer(i);
		}
	}

	/**
	 * 消费
	 * 
	 * @author Tony
	 * 
	 */
	static class Poll implements Runnable {
		public void run() {
			while (!queue.isEmpty()) {
				System.out.println(queue.poll());
			}
		}
	}

}

     内部原理:http://ifeve.com/concurrentlinkedqueue/

发表评论