Java LinkedBlockingQueue Example
- Details
- Written by Nam Ha Minh
- Last Updated on 13 August 2019   |   Print Email
This Java Concurrency tutorial helps you understand the characteristics of LinkedBlockingQueue with detailed code example.
LinkedBlockingQueue is a BlockingQueue’s implementation with the following characteristics:
- Internal data structure: LinkedBlockingQueue uses doubly-linked nodes. Each node contains the element and has references to its next and previous nodes. Hence the insertion and removal are executed very fast at constant time.
- Capacity: if created with no-argument constructor, LinkedBlockingQueue can store up to Integer.MAX_VALUE of elements (can be considered unlimited or unbounded). You can specify a fixed capacity by using an optional constructor LinkedBlockingQueue(int capacity) - in this case the queue is bounded.
- Order: since the queue uses linked nodes, it orders elements by FIFO (first-in first-out). That means the first element at the head of the queue has been on the queue the longest time. And the last element at the tail of the queue has been on the queue the shortest time.
- Operations: implement the typical operations of a BlockingQueue: put, offer, take and poll.
- Iterator behavior: is weakly consistent which means the iterator can be used concurrently with other operations, and it will never throw ConcurrentModificationException. However, it may not reflect any modifications after the iterator is constructed.
Now, let’s see a complete code example based on producer-consumer processing.
The producer thread continuously created and placed objects on the queue, and the consumer thread processes objects on the queue when they arrive.
The following code is for the producer class:
import java.util.*; import java.util.concurrent.*; /** * A producer that puts elements on a BlockingQueue * @author www.codejava.net */ public class Producer implements Runnable { private BlockingQueue<Integer> queue; public Producer (BlockingQueue<Integer> queue) { this.queue = queue; } public void run() { try { for (int i = 0; i < 50; i++) { Integer number = produce(); queue.put(number); System.out.println("PRODUCER: created " + number); } queue.put(-1); // indicates end of producing System.out.println("PRODUCER: STOPPED."); } catch (InterruptedException ie) { ie.printStackTrace(); } } private Integer produce() { Random random = new Random(); Integer number = random.nextInt(100); // fake producing time try { Thread.sleep(random.nextInt(1000)); } catch (InterruptedException ie) { ie.printStackTrace(); } return number; } }
As you can see, this producer will put 50 objects on the queue. The objects are created by the produce() method which uses Random class to generate random numbers and fakes a random producing time. It puts a negative number “-1” to indicate that no objects will be created.
Next, the consumer class looks like this:
import java.util.*; import java.util.concurrent.*; /** * A consumer that takes elements from a BlockingQueue * @author www.codejava.net */ public class Consumer implements Runnable { private BlockingQueue<Integer> queue; public Consumer(BlockingQueue<Integer> queue) { this.queue = queue; } public void run() { try { while (true) { Integer number = queue.take(); if (number == -1) { System.out.println("Consumer: STOPPED."); break; } consume(number); } } catch (InterruptedException ie) { ie.printStackTrace(); } } private void consume(Integer number) { // fake consuming time Random random = new Random(); try { Thread.sleep(random.nextInt(1000)); } catch (InterruptedException ie) { ie.printStackTrace(); } System.out.println("Consumer: processed " + number); } }
As you can see, this consumer takes elements from the queue until it find a negative number “-1” which indicates the producer has stopped. The consume() method uses Random class to fake processing time.
And the following code is the test program:
import java.util.*; import java.util.concurrent.*; /** * A producer-consumer test program for LinkedBlockingQueue * @author www.codejava.net */ public class LinkedBlockingQueueTest { public static void main(String[] args) { BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10); Thread producer = new Thread(new Producer(queue)); Thread consumer = new Thread(new Consumer(queue)); producer.start(); consumer.start(); } }
As you can see, this program uses a bounded LinkedBlockingQueue with a capacity of 10 elements. Then it creates and starts two threads: one for the producer and one for the consumer.
Run this program and observe the output. The output is random but typically it looks like this:
PRODUCER: created 80 PRODUCER: created 68 PRODUCER: created 80 PRODUCER: created 95 Consumer: processed 80 Consumer: processed 68 ... PRODUCER: created 16 Consumer: processed 80 PRODUCER: created 19 Consumer: processed 95 PRODUCER: created 36 Consumer: processed 75 PRODUCER: created 71 Consumer: processed 74 ... Consumer: processed 13 Consumer: processed 68 PRODUCER: created 10 PRODUCER: created 95 PRODUCER: created 22 PRODUCER: STOPPED. Consumer: processed 59 Consumer: processed 43 Consumer: processed 46 Consumer: processed 73 Consumer: processed 10 Consumer: processed 95 Consumer: processed 22 Consumer: STOPPED.
At the beginning, the producer quickly fills up the queue to reach its capacity. Then the producer and consumer works in tandem as the queue is full, the producer can only put new element on the queue if the consumer processes and removes one.
And at the end, the producer stops first and the consumer processes the remaining elements in the queue.
You can experiment more by using the timeout methods offer() and poll().
Update the run() method of the Producer class to the following code:
public void run() { try { for (int i = 0; i < 50; i++) { Integer number = produce(); boolean success = queue.offer(number, 100, TimeUnit.MILLISECONDS); if (success) { System.out.println("PRODUCER: created " + number); } else { System.out.println("PRODUCER: gave up"); } } queue.put(-1); // indicates end of producing System.out.println("PRODUCER: STOPPED."); } catch (InterruptedException ie) { ie.printStackTrace(); } }
Here, by using the offer() method: if the queue is full, the producer will wait up to a specified time before giving up, instead of waiting until space become available in the case of put() method.
And update the run() method of the Consumer class to the following code:
public void run() { try { while (true) { Integer number = queue.poll(100, TimeUnit.MILLISECONDS); if (number != null) { if (number == -1) { System.out.println("Consumer: STOPPED."); break; } consume(number); } else { System.out.println("Consumer: gave up"); } } } catch (InterruptedException ie) { ie.printStackTrace(); } }
Here, by using the poll() method: if the queue is empty, the consumer will wait up to a specified time before giving up, instead of waiting until space become available in the case of take() method.
You can test how this timeout methods work by increasing the producing and consuming time to longer values, and observe the results.
API References:
Other Java Concurrent Queues:
- Java ArrayBlockingQueue Examples
- Java DelayQueue Examples
- Java PriorityBlockingQueue Examples
- Java SynchronousQueue Examples
Other Java Concurrency Tutorials:
- How to use Threads in Java (create, start, pause, interrupt and join)
- Java Synchronization Tutorial
- Understanding Deadlock, Livelock and Starvation with Code Examples in Java
- Understanding Java Fork-Join Framework with Examples
- Understanding Java Thread Pool and Executors
Comments