java并发编程- DelayQueue阻塞队列

    DelayQueue(= BlockingQueue + PriorityQueue + Delayed)是一个使用优先队列(PriorityQueue)实现的BlockingQueue,优先队列的比较基准值是时间,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最长。注意:不能将null元素放置到这种队列中。

    DelayQueue阻塞队列在我们系统开发中也常常会用到,例如:缓存系统的设计,缓存中的对象,超过了空闲时间,需要从缓存中移出;任务调度系统,能够准确的把握任务的执行时间。我们可能需要通过线程处理很多时间上要求很严格的数据,如果使用普通的线程,我们就需要遍历所有的对象,一个一个的检查看数据是否过期等,首先这样在执行上的效率不会太高,其次就是这种设计的风格也大大的影响了数据的精度。一个需要12:00点执行的任务可能12:01才执行,这样对数据要求很高的系统有更大的弊端。由此我们可以使用DelayQueue。

 场景:

   a) 关闭空闲连接。服务器中,有很多客户端的连接,空闲一段时间之后需要关闭之。

  b) 缓存。缓存中的对象,超过了空闲时间,需要从缓存中移出。

   c) 任务超时处理。在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求。

实现原理:

// 增删查公用的锁
private transient final ReentrantLock lock = new ReentrantLock();

// 用优先队列存储元素
private final PriorityQueue<E> q = new PriorityQueue<E>();

// 实现Leader-Follower设计模式,以最少等待获取元素
// leader线程用于在队首等待到期元素
// leader线程可以更换,其只需要下一次delay时间即可,而其他Follower线程无限期等待
// leader线程在从take()、poll(...)返回前需要通知其他Follower线程直至其中某个线程成为leader
// 当有更早的过期元素成为队首元素时,通过设置leader为null,使当前leader线程失去leader位置,等待中的某个Follower线程成为leader
// 等待中的Follower线程随时准备好成为leader或失去leader位置
private Thread leader = null;

// 当队首可获取新的元素时或某个Follower线程成为新leader时,会发送available信号
private final Condition available = lock.newCondition();

  DelayQueue内部的实现使用了一个优先队列。当调用DelayQueue的offer方法时,把Delayed对象加入到优先队列q中。

 添加元素

1)获取锁;
      2)将元素委托给优先队列q入队;
      3)若入队的元素delay时间最小,则更换leader;
      4)返回true。

如下:

public boolean offer(E e) {
	final ReentrantLock lock = this.lock;
	lock.lock();		// 获取lock锁
	try {
		q.offer(e);     // 委托给优先队列入队
		if (q.peek() == e) {	// 若入队的元素delay时间最小,则更换leader
			leader = null;
			available.signal();
		}
		return true;
	} finally {
		lock.unlock();  // 释放lock锁
	}
}

 DelayQueue take 获取并移除元素:

    步骤:

  1)获取锁;

     2)若队列为空,则阻塞等待;

    3)否则,获取队首元素delay时间;

    4)若delay时间已过期,则释放锁,将队首元素出队(注意:take返回前,如果leader为null且队列不为空,则发送available信号);

    5)若delay时间未到期且已设置leader,则阻塞等待;

    6)若delay时间未到期且未设置leader,则设置当前线程为leader,等待队首元素过期;

    7)循环2)——6),返回前,释放锁。

public E take() throws InterruptedException {
	final ReentrantLock lock = this.lock;
	lock.lockInterruptibly();
	try {
		for (;;) {
			E first = q.peek();      // 委托给优先队列q获取队首元素
			if (first == null)		 // 队列q为空,阻塞等待
				available.await();
			else {					 // 获取队首元素first
				long delay = first.getDelay(TimeUnit.NANOSECONDS);
				if (delay <= 0)		 // 队首元素delay时间已到,直接将其出队
					return q.poll();
				else if (leader != null) // 队首元素delay时间未到,已设置leader线程,则Follower阻塞等待
					available.await();
				else {					 // 队首元素delay时间未到,还未设置leader线程,设置当前线程为leader,等待队首元素过期
					Thread thisThread = Thread.currentThread();
					leader = thisThread;
					try {
						available.awaitNanos(delay); 	// leader线程等待delay时间
					} finally {
						if (leader == thisThread)		// delay时间到,更换leader
							leader = null;
					}
				}
			}
		}
	} finally {
		if (leader == null && q.peek() != null) // take返回前,如果leader为null且队列不为空,则发送available信号
			available.signal();
		lock.unlock();
	}
}

public E poll() {
	final ReentrantLock lock = this.lock;
	lock.lock();				// 获取锁
	try {
		E first = q.peek();
		if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
			return null; // 队列为空或无过期时间元素,则返回null
		else
			return q.poll(); // 有过期元素,将其出队
	} finally {
		lock.unlock();			// 释放锁
	}
}

// 有限阻塞
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
	long nanos = unit.toNanos(timeout);
	final ReentrantLock lock = this.lock;
	lock.lockInterruptibly();
	try {
		for (;;) {
			E first = q.peek();
			if (first == null) {    		
				if (nanos <= 0)
					return null;
				else
					nanos = available.awaitNanos(nanos);
			} else {
				long delay = first.getDelay(TimeUnit.NANOSECONDS);
				if (delay <= 0)				
					return q.poll();
				if (nanos <= 0)				 
					return null;
				if (nanos < delay || leader != null)
					nanos = available.awaitNanos(nanos);
				else {
					Thread thisThread = Thread.currentThread();
					leader = thisThread;
					try {
						long timeLeft = available.awaitNanos(delay);
						nanos -= delay - timeLeft;
					} finally {
						if (leader == thisThread)
							leader = null;
					}
				}
			}
		}
	} finally {
		if (leader == null && q.peek() != null) // poll返回前,如果leader为null且队列不为空,则发送available信号
			available.signal();
		lock.unlock();
	}
}

DelayQueue peek获取元素:

步骤:

1)获取锁;

2)获取队首元素;

3)释放锁。

public E peek() {
	final ReentrantLock lock = this.lock;
	lock.lock();
	try {
		return q.peek();
	} finally {
		lock.unlock();
	}
}

迭代器

      遍历所有已过期和未到期的元素,不保证元素的顺序。

      弱一致性,基于元素的副本构造:

public Iterator<E> iterator() {
	return new Itr(toArray());
}

private class Itr implements Iterator<E> {
	final Object[] array; // Array of all elements
	int cursor;           // index of next element to return;
	int lastRet;          // index of last element, or -1 if no such

	Itr(Object[] array) {
		lastRet = -1;
		this.array = array;
	}

	public boolean hasNext() {
		return cursor < array.length;
	}

	@SuppressWarnings("unchecked")
	public E next() {
		if (cursor >= array.length)
			throw new NoSuchElementException();
		lastRet = cursor;
		return (E)array[cursor++];
	}

	public void remove() {
		if (lastRet < 0)
			throw new IllegalStateException();
		Object x = array[lastRet];
		lastRet = -1;
		// Traverse underlying queue to find == element,
		// not just a .equals element.
		lock.lock();
		try {
			for (Iterator it = q.iterator(); it.hasNext(); ) {
				if (it.next() == x) {
					it.remove();
					return;
				}
			}
		} finally {
			lock.unlock();
		}
	}
}

特性

   用leader—follower设计模式实现leader获取已过期的优先队列队首元素。

http://express.ruanko.com/ruanko-express_69/tech-overnight5.html

http://www.cnblogs.com/sunzhenchao/p/3515085.html

发表评论