java并发编程-PriorityBlockingQueue阻塞队列

  PriorityBlockingQueue :基于数组实现的无界阻塞队列,它会按照元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素。注意,此阻塞队列为无界阻塞队列,即容量没有上限(通过源码就可以知道,它没有容器满的信号标志),里面存储的对象必须是实现Comparable接口。队列通过这个接口的compare方法确定对象的priority。规则是:当前和其他对象比较,如果compare方法返回负数,那么在队列里面的优先级就比较搞。

否则:

blob.png

问题1:PriorityBlockingQueue添加元素时,是否是基于全部元素来重新排序?

 答:否;PriorityBlockingQueue队列添加新元素时候不是将全部元素进行顺序排列,而是从某个指定位置开始将新元素与之比较,一直比到队列头,这样既能保证队列头一定是优先级最高的元素,可以查看PriorityBlockingQueue源码,添加元素:

private void siftUp(int k, E x) {                  // 插入元素 向上调整
        if (comparator != null)
            siftUpUsingComparator(k, x);
        else
            siftUpComparable(k, x);
    }
    @SuppressWarnings("unchecked")
    private void siftUpComparable(int k, E x) {
        Comparable<? super E> key = (Comparable<? super E>) x;
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = queue[parent];
            if (key.compareTo((E) e) >= 0)   // 小根堆,当 待插入元素 比 当前位置的父元素 大的时候,
            代表 待插入元素 可以插入到当前位置。
                break;
            queue[k] = e;
            k = parent;
        }
        queue[k] = key;
    }

    @SuppressWarnings("unchecked")
    private void siftUpUsingComparator(int k, E x) {
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = queue[parent];
            if (comparator.compare(x, (E) e) >= 0)
                break;
            queue[k] = e;
            k = parent;
        }
        queue[k] = x;
    }

问题1:PriorityBlockingQueue由于上述问题1导致,取完队列头,后续的顺序不会不对吗?

 答 否;取完队列头时候,后面的剩余的元素不是排序的,岂不是不符合要求了,继续查看源码,发现每取一个头元素时候,都会对剩余的元素做一次调整,这样就能保证每次队列头的元素都是优先级最高的元素 ,可以查看PriorityBlockingQueue源码,取元素:

private void siftDown(int k, E x) {                  // 删除元素 向下调整
        if (comparator != null)
            siftDownUsingComparator(k, x);
        else
            siftDownComparable(k, x);
    }

    @SuppressWarnings("unchecked")
    private void siftDownComparable(int k, E x) {
        Comparable<? super E> key = (Comparable<? super E>)x;
        int half = size >>> 1;        // loop while a non-leaf
        while (k < half) {
            int child = (k << 1) + 1; // assume left child is least
            Object c = queue[child];
            int right = child + 1;
            if (right < size &&
                ((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)
                c = queue[child = right];
            if (key.compareTo((E) c) <= 0)         // 比子节点小,插入当前位置即可
                break;
            queue[k] = c;
            k = child;
        }
        queue[k] = key;
    }

    @SuppressWarnings("unchecked")
    private void siftDownUsingComparator(int k, E x) {
        int half = size >>> 1;
        while (k < half) {
            int child = (k << 1) + 1;
            Object c = queue[child];
            int right = child + 1;
            if (right < size &&
                comparator.compare((E) c, (E) queue[right]) > 0)
                c = queue[child = right];
            if (comparator.compare(x, (E) c) <= 0)
                break;
            queue[k] = c;
            k = child;
        }
        queue[k] = x;
    }

案例分析:

public class PriorityBlockingQueueTest {

	static Random r = new Random(47);

	public static void main(String[] args) {

		final PriorityBlockingQueue<PriorityEntity> q=new PriorityBlockingQueue<PriorityEntity>();  
		
		ExecutorService se = Executors.newCachedThreadPool();

		se.execute(new Runnable() {
			public void run() {
				int i = 0;
				while (true) {
					PriorityEntity entity= new PriorityEntity(r.nextInt(10), i++);
					System.out.println("put-----"+entity.toString());
					q.put(entity);
					try {
						TimeUnit.MILLISECONDS.sleep(r.nextInt(1000));
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}
		});
		
		se.execute(new Runnable() {
			public void run() {
				while (true) {
					try {
						System.out.println("take-- "+q.take()+" left:-- ["+q.toString()+"]");
						try {  
                            TimeUnit.MILLISECONDS.sleep(r.nextInt(1000));  
                        } catch (InterruptedException e) {  
                            e.printStackTrace();  
                        } 
					} catch (InterruptedException e1) {
						e1.printStackTrace();
					} 
				}
			}
		});
		
		try {  
            TimeUnit.SECONDS.sleep(5);  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
		System.out.println("shutdown");  

	}

}

class PriorityEntity implements Comparable<PriorityEntity> {
	private int priority;
	private int index = 0;

	public PriorityEntity(int _priority, int _index) {
		this.priority = _priority;
		this.index = _index;
	}

	public int getPriority() {
		return priority;
	}

	public void setPriority(int priority) {
		this.priority = priority;
	}

	public int getIndex() {
		return index;
	}

	public void setIndex(int index) {
		this.index = index;
	}

	/**
	 *  //数字小,优先级高 
	 */
	/*@Override
	public int compareTo(PriorityEntity o) {
		if(this.priority > o.priority){
			return 1;
		}else if(this.priority < o.priority){
			return -1;
		}
		return 0;
	}*/

	@Override
	public String toString() {
		return "PriorityEntity [priority=" + priority + ", index=" + index
				+ "]";
	}

	/**
	 *  //数字大,优先级高 
	 */
	@Override
	public int compareTo(PriorityEntity o) {
		if(this.priority < o.priority){
			return 1;
		}else if(this.priority > o.priority){
			return -1;
		}
		return 0;
	}
}

发表评论