Last Updated on 13 August 2019   |   Print Email
This Java Concurrency tutorial helps you understand the characteristics, behaviors and how to use PriorityBlockingQueue - a concurrent collection provided in the java.util.concurrent package. As its name suggests, this implementation is a thread-safe, blocking version of PriorityQueue so it uses the same ordering rules as PriorityQueue and supplies blocking retrieval operations.
1. PriorityBlockingQueue’s Characteristics and Behaviors
Here are the key points you need to know about PriorityBlockingQueue:
Internal data structure: this implementation uses an array representation of a binary heap which is a complete binary tree. This data structure allows the queue efficiently orders its elements based on their values. It also uses ReentrantLock to protect public operations in multi-threaded context.
Capacity: this queue is unbounded - no limit on the number of elements it can hold.
Order: instead of FIFO (First-In First-Out) order, this queue orders its elements based on their natural ordering so it doesn’t permit insertion of non-comparable objects (doing so results in ClassCastException). That means the class of elements must implement the Comparable interface. You can also use a custom order by supplying a Comparator when constructing a new PriorityBlockingQueue object.
Operations: since this queue is unbounded, the offer() and put() operations never return false or block. But insertion is not executed at constant time as it requires re-sorting the array each time an element is added to maintain the persistent order. Queue removal operations (poll, take) are executed at constant time.
Iterator: iterators returned by method iterator() are weakly-consistent, they never throw ConcurrentModificationException and support remove operation. And it is not guaranteed to return the elements in any particular order.
PriorityBlockingQueue does not allow null element and duplicates are allowed.Now, let’s see how to use PriorityBlockingQueue in details with code examples.
2. Constructing a new PriorityBlockingQueue object
The no-argument constructor creates a PriorityBlockingQueue with the default capacity of 11 elements and elements are ordered according to their natural ordering. The following code snippet creates such a PriorityBlockingQueue of String elements:
BlockingQueue<String> queue = new PriorityBlockingQueue<>();
queue.put("one");
queue.put("two");
queue.put("three");
queue.put("four");
String element = queue.poll();
while (element != null) {
System.out.println(element);
element = queue.poll();
}
As you can see, the code inserts 4 elements “one”, “two”, “three”, “four” to the queue, but when polling out it prints the following output:
four one three two
That means the queue orders elements by their natural ordering (in this example, the String’s natural ordering is alphabetical order).
You can also create a PriorityBlockingQueue with initial elements took from another collection, using this overloaded constructor:
PriorityBlockingQueue(Collection<? extends E> c)
The elements in the specified collection must be comparable, otherwise a ClassCastException is thrown. Here’s an example:
List<Integer> list = Arrays.asList(28, 8, 20, 17, 15, 40);
BlockingQueue<Integer> queue = new PriorityBlockingQueue<>(list);
System.out.println(queue);
This creates a PriorityBlockingQueuewith some initial elements took from a list. And it prints the following output:
[8, 15, 20, 17, 28, 40]
You can see it prints the elements in unspecified order because the PriorityBlockingQueue’s toString() method uses the queue’s iterator which is not guaranteed to return elements in any particular order.The third constructor allows you to specify the number of initial elements which the queue can hold (very straightforward):
PriorityBlockingQueue(int initialCapacity)
And the last constructor allows you to specify both initial capacity and a custom comparator:
PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator)
Consider the following Person class:
/**
* A model class that implements the Comparable interface
* so its objects can be hold in a PriorityBlockingQueue
* @author www.codejava.net
*/
public class Person implements Comparable<Person> {
private String name;
private int age;
private int income;
public Person(String name, int age, int income) {
this.name = name;
this.age = age;
this.income = income;
}
// getters and setters...
public int compareTo(Person another) {
return this.name.compareTo(another.name);
}
public String toString() {
return this.name + "\t" + this.age + "\t" + this.income;
}
}
As you can see, this Person class implements the Comparable interface to define its natural ordering:
public int compareTo(Person another) {
return this.name.compareTo(another.name);
}
This method defines the ordering is the alphabetical order of Person’s name.The following code snippet creates a PriorityBlockingQueue that holds Person elements:
BlockingQueue<Person> queue = new PriorityBlockingQueue<>();
queue.put(new Person("Tony", 30, 50000));
queue.put(new Person("John", 25, 60000));
queue.put(new Person("Alex", 35, 80000));
queue.put(new Person("Garry", 23, 99000));
Person person = queue.poll();
while (person != null) {
System.out.println(person);
person = queue.poll();
}
Output:
Alex 35 80000
Garry 23 99000
John 25 60000
Tony 30 50000
This output indicates that the queue sorts its elements according to their natural ordering, which is the names of the persons in alphabetical order.Now, create a Comparator class that defines different order, according to person’s age. Here’s the code:
Comparator<Person> ageComparator = new Comparator<Person>() {
public int compare(Person person1, Person person2) {
return person1.getAge() - person2.getAge();
}
};
And pass this comparator when creating the PriorityBlockingQueue like this:
BlockingQueue<Person> queue = new PriorityBlockingQueue<>(10, ageComparator);
queue.put(new Person("Tony", 30, 50000));
queue.put(new Person("John", 25, 60000));
queue.put(new Person("Alex", 35, 80000));
queue.put(new Person("Garry", 23, 99000));
Person person = queue.poll();
while (person != null) {
System.out.println(person);
person = queue.poll();
}
Now the output is different - it sorts persons according to their ages:
Garry 23 99000
John 25 60000
Tony 30 50000
Alex 35 80000
Similarly, you can create another comparator that sorts the persons according to their income:
Comparator<Person> incomeComparator = new Comparator<Person>() {
public int compare(Person person1, Person person2) {
return person1.getIncome() - person2.getIncome();
}
};
As mentioned previously, since PriorityBlockingQueue is unbounded, insert operations never block or return false. Here are some code examples:
BlockingQueue<Person> queue = new PriorityBlockingQueue<>();
Person person = new Person("Tom", 48, 120000);
// this never returns false:
boolean success = queue.offer(person);
// this never returns false or block:
boolean result = queue.offer(person, 1, TimeUnit.SECONDS);
// this never blocks:
queue.put(person);
Remember that PriorityBlockingQueue doesn’t order elements in FIFO so the tail of the queue is not always the newly inserted element. Also the execution time varies, depending on the element inserted as the queue must arrange its elements to maintain the consistent order.
4. Retrieve and remove the head of the queue
PriorityBlockignQueue’s removal operations behave exactly as a typical BlockingQueue: waiting if necessary until an element becomes available. The following code snippet explains the removal operations:
// this will block if the queue is empty,
// waiting until an element becomes available
Person firstOne = queue.take();
// this will block if the queue is empty,
// waiting up to the specified timeout
Person nextOne = queue.poll(2, TimeUnit.SECONDS);
Removal operations always execute in constant time, as the head element is always the first element in the array used internally by the queue.
5. A Producer-Consumer Example Using PriorityBockingQueue
Take the file text search program example in tutorial Java ArrayBlockingQueue Examples, we can modify it to use PriorityBlockingQueue so that the program tends to process larger files first. In other words, the program treats larger files with “higher priority” than smaller ones.Create a comparator class as shown below:
/**
* A comparator that compares two Files based on their length.
* @author www.codejava.net
*/
class FileComparator implements Comparator<File> {
public int compare(File file1, File file2) {
long file1Length = file1.length();
long file2Length = file2.length();
if (file1Length > file2Length) {
return -1;
} else {
return 1;
}
}
}
Using this comparator, the PriorityBlockingQueue orders its elements (of type File) based on their size. Therefore, the main program is updated to use PriorityBlockingQueue like this:
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
/**
* A file text search program uses PriorityBlockingQueue
* @author www.codejava.net
*/
public class FileTextSearch {
public static void main(String[] args) {
String dirPath = args[0];
String extension = args[1];
String keyword = args[2];
BlockingQueue<File> queue = new PriorityBlockingQueue<>(100, new FileComparator());
DirectoryLister lister = new DirectoryLister(queue, new File(dirPath), extension);
lister.start();
for (int i = 0; i < 10; i++) {
FileParser parser = new FileParser(queue, keyword);
parser.start();
}
}
}
Update the parseFile() method in the FileParser class to print the size of each file processed:
private void parseFile(File file) throws IOException {
List<String> lines = Files.readAllLines(file.toPath());
int lineCount = 0;
for (String aLine : lines) {
lineCount++;
if (aLine.contains(keyword)) {
String result = "Found in %s at line %d. Size: %d\n";
System.out.printf(result,
file.getAbsolutePath(), lineCount, file.length());
break;
}
}
}
The DirectoryLister class remains unchanged. Run the program by using the following command:
It would print the output something like this (the actual paths are replace by three dots for brevity):
Found in ...\ForkJoinPool.java at line 38. Size: 142047
Found in ...\SynchronousQueue.java at line 180. Size: 45697
Found in ...\ScheduledThreadPoolExecutor.java at line 44. Size: 47357
Found in ...\LinkedTransferQueue.java at line 209. Size: 58902
Found in ...\ForkJoinTask.java at line 218. Size: 60647
Found in ...\CompletableFuture.java at line 47. Size: 113096
Found in ...\ConcurrentHashMap.java at line 2197. Size: 258237
Found in ...\LinkedBlockingQueue.java at line 199. Size: 34092
Found in ...\ThreadPoolExecutor.java at line 48. Size: 80896
Found in ...\ConcurrentSkipListMap.java at line 322. Size: 135
You can see the program prefers bigger files over smaller ones, though sometimes it processes some bigger files later as the producer and the consumers execute concurrently, and the files are repeatedly polled from the given directory.Now, let practice and experiment the code in this tutorial.
Nam Ha Minh is certified Java programmer (SCJP and SCWCD). He began programming with Java back in the days of Java 1.4 and has been passionate about it ever since. You can connect with him on Facebook and watch his Java videos on YouTube.
Comments