Last Updated on 13 August 2019   |   Print Email
In this Java concurrency tutorial, you will understand and know how to use DelayQueue - a specialized priority queue that orders elements based on their delay time. Only expired elements can be taken from the queue, and the head of the queue contains the element that has expired for the longest time.A DelayQueue accepts only elements that belong to a class of type Delayed, which implements the getDelay() method to return the remaining delay time.
1. Characteristics and Behaviors of DelayQueue
Here are the characteristics of a DelayQueue:
Internal data structure: DelayQueue uses PriorityBlockingQueue as its internal data structure and ReentrantLock to protect public operations from being concurrently accessed by multiple threads.
Capacity: the queue is unbounded so that it can hold unlimited number of elements. That also means that the offer() method always return true and the put() method never blocks.
Order: the queue orders its elements based on their remaining delay time returned by the getDelay() method. The head of the queue contains an element that has the least remaining delay time, and the tail of the queue contains an element that has the longest remaining delay time.
Operations: DelayQueue shares the same performance characteristics of PriorityBlockingQueue because it makes use of PriorityBlockingQueue underneath. Queue insertion is not executed at constant time as it requires re-arrange all the elements to maintain the persistent order. Queue removal operations (poll, take) are executed at constant time. The poll() method returns null if there’s no expired element. And you can always examine elements in the queue using peek() even if the queue has no expired element.
Iterator: the iterator returns both expired and unexpired elements. The iterator is weakly-consistent and returns the elements in no particular order.
Now, let’s see some code examples to understand how to use DelayQueue.
2. Create a new DelayQueue collection
The DelayQueue class provides two constructors, one with no argument and one takes elements from another collection:
DelayQueue()
DelayQueue(Collection<? extends E> c)
Since the DelayQueue accepts only elements belong to a class of Delayed, we create the element class looks like this:
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* A task that has a delay time, which can be used in a DelayQueue.
*
* @author www.codejava.net
*/
public class DelayTask implements Delayed {
private String name;
private long delayTime;
public DelayTask(String name, long delayTime) {
this.name = name;
this.delayTime = System.currentTimeMillis() + delayTime;
}
public long getDelay(TimeUnit timeUnit) {
long difference = delayTime - System.currentTimeMillis();
return timeUnit.convert(delayTime, TimeUnit.MILLISECONDS);
}
public int compareTo(Delayed another) {
DelayTask anotherTask = (DelayTask) another;
if (this.delayTime < anotherTask.delayTime) {
return -1;
}
if (this.delayTime > anotherTask.delayTime) {
return 1;
}
return 0;
}
}
As you can see, this DelayTask class implements the Delayed interface so it must implements the getDelay() method:
public long getDelay(TimeUnit timeUnit) {
long difference = delayTime - System.currentTimeMillis();
return timeUnit.convert(difference, TimeUnit.MILLISECONDS);
}
According to Javadoc, this method must return the remaining delay time, or zero or negative values that indicate the delay has expired.This needs thorough explanation because how you implement the getDelay() method affects the correctness of the DelayQueue. The DelayTask class has a delayTime, which is added up to the current time at the time a DelayTask object is created:
public DelayTask(String name, long delayTime) {
this.name = name;
this.delayTime = System.currentTimeMillis() + delayTime;
}
When does the getDelay() method get called? It is called on each element in the queue when poll() or take() is invoked, in order to determine the element that has expired for the longest time. Hence in the getDelay() method we return the time difference between the time when the element gets created and the time when the method gets called.Moreover, the Delayed interface extends the Comparable interface so we must implement the compareTo() method as well:
public int compareTo(Delayed another) {
DelayTask anotherTask = (DelayTask) another;
if (this.delayTime < anotherTask.delayTime) {
return -1;
}
if (this.delayTime > anotherTask.delayTime) {
return 1;
}
return 0;
}
We must implement this method in order to provide an ordering consistent with the getDelay() method. That means the elements compared by the both methods must result in the same order. Therefore, in the compareTo() method, we compare two DelayTask objects based on their delay time.Then we can create a DelayQueue like this:
DelayQueue<DelayTask> queue = new DelayQueue<>();
Or take elements from another collection like this:
List<DelayTask> listTask = ... // a list of DelayTask objects
DelayQueue<DelayTask> queue = new DelayQueue<>(listTask);
3. Insert an element to the DelayQueue
The following code snippet put some DelayTask elements to the DelayQueue:
DelayTask task1 = new DelayTask("Learn Java", 20_000);
DelayTask task2 = new DelayTask("Code Java", 100_000);
DelayTask task3 = new DelayTask("Do Project", 200_000);
queue.put(task1);
queue.put(task2);
queue.put(task3);
Note that since DelayQueue is unbounded, put operation never blocks. The queue doesn’t accept null element (NullPointerException will be thrown) and duplicate elements are allowed.
4. Retrieve and Remove an element from the DelayQueue
The poll() method retrieves and removes the head of the queue, or returns null if the queue has no elements with an expired delay. For example:
The peek() method retrieves, but doesn’t removes the head of the queue (even if there’s no expired element available), or returns null if the queue is empty. For example:
DelayTask task = queue.peek();
if (task != null) {
System.out.println("Process: " + task);
} else {
System.out.println("Queue is empty");
}
5. A Producer-Consumer Example using DelayQueue
Let’s see a producer-consumer example that makes use of a DelayQueue as a shared data structure among different threads. Add the following two methods to the DelayTask class:
public void run() {
long executionTime = new Random().nextInt(10000);
try {
Thread.sleep(executionTime);
} catch (InterruptedException ie) {
ie.printStackTrace();
}
System.out.println("Task " + name + " has been done.");}
public String toString() {
return this.name;
}
The run() method contains code that will be executed when the task is processed. It sleeps for a random time to fake execution time. And we override the toString() method to return the name of the task so we can print the task name to the console.The following class is for the producer:
import java.util.*;
import java.util.concurrent.*;
/**
* A consumer thread that creates random DelayTask objects.
*
* @author www.codejava.net
*/
public class Producer extends Thread {
private DelayQueue<DelayTask> queue;
private static int taskCount;
public Producer(DelayQueue<DelayTask> queue) {
this.queue = queue;
}
public void run() {
while(true) {
queue.put(produce());
}
}
private DelayTask produce() {
String taskName = "Task " + (++taskCount);
long delayTime = new Random().nextInt(10000);
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
ie.printStackTrace();
}
System.out.println(getName() + ": produced " + taskName);
return new DelayTask(taskName, delayTime);
}
}
As you can see in the produce() method, this producer creates a DelayTask with random delay time (but always less than 10 seconds).Here is the code of the consumer class:
import java.util.concurrent.*;
/**
* A consumer thread that takes delayed tasks from a DelayQueue
* and processes them.
* @author www.codejava.net
*/
public class Consumer extends Thread {
private DelayQueue<DelayTask> queue;
private int taskCount;
public Consumer(DelayQueue<DelayTask> queue) {
this.queue = queue;
}
public void run() {
while(true) {
try {
DelayTask task = queue.take();
consume(task);
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
}
private void consume(DelayTask task) {
System.out.println(getName() + ": prepare to process " + task);
task.run();
}
}
As you can see, this consumer simply takes a DelayTask from the queue and runs it.And the following code is for the test program:
import java.util.concurrent.*;
public class DelayQueueTest {
static int CONSUMER_THREADS = 3;
public static void main(String[] args) {
DelayQueue<DelayTask> queueTasks = new DelayQueue<>();
Producer producer = new Producer(queueTasks);
producer.start();
Consumer[] consumers = new Consumer[CONSUMER_THREADS];
for (int i = 0; i < CONSUMER_THREADS; i++) {
consumers[0] = new Consumer(queueTasks);
consumers[0].start();
}
}
}
In this program, we create on thread for producer and 3 threads for consumers. These threads share the same DelayQueue collection.Run this program and you will see the output something like this:
Thread-0: produced Task 1
Thread-0: produced Task 2
Thread-0: produced Task 3
Thread-0: produced Task 4
Thread-0: produced Task 5
Thread-0: produced Task 6
Thread-0: produced Task 7
Thread-1: prepare to process Task 1
Thread-0: produced Task 8
Thread-2: prepare to process Task 7
Thread-0: produced Task 9
Thread-3: prepare to process Task 6
Thread-0: produced Task 10
Task Task 1 has been done.
Thread-1: prepare to process Task 3
Thread-0: produced Task 11
Now, let experiment this producer-consumer example yourself.
Nam Ha Minh is certified Java programmer (SCJP and SCWCD). He began programming with Java back in the days of Java 1.4 and has been passionate about it ever since. You can connect with him on Facebook and watch his Java videos on YouTube.
public long getDelay(TimeUnit timeUnit) { long difference = delayTime - System.currentTimeMillis(); return timeUnit.convert(delayTime, TimeUnit.MILLISECONDS); }
Comments
This is wrong.
public long getDelay(TimeUnit timeUnit) {
long difference = delayTime - System.currentTimeMillis();
return timeUnit.convert(delayTime, TimeUnit.MILLISECONDS);
}