Java SynchronousQueue Examples
- Details
- Written by Nam Ha Minh
- Last Updated on 13 August 2019   |   Print Email
This Java Concurrency tutorial helps you understand SynchronousQueue - a special BlockingQueue implementation with no internal capacity.
SynchronousQueue is designed to hand off objects from a producer thread to consumer threads synchronously: when the producer thread puts an object to the queue, it must wait for a corresponding take from a consumer thread, and vice versa: when the consumer thread wants to take element from the queue, it must wait for a corresponding put from the producer thread.
Imagine when I give you a book, for example, without any table for me to put the book on. So I must give you the book directly, when you show your hand, right? And you show your hand only if I show my hand, right? Think SynchronousQueueworks like that.
Elements are transferred from the producer to the consumers without being held in the queue, unlike other blocking queues to which the elements are put, waiting to be taken by consumer threads. So SynchronousQueue is a choice when there are enough consumer threads so the producer thread doesn’t have to enqueue elements.
Since SynchronousQueue has no capacity, it acts like an empty collection and has special behaviors:
- isEmpty(): always returns true
- iterator(): returns an empty iterator in which hasNext() always returns false
- peek(): always returns null
- size(): always return zero.
Now, let’s see how to use SynchronousQueue in details with code examples.
1. Create a new SynchronousQueue
There are two constructors that allow you to create a SynchronousQueue with different access policy:
- SynchronousQueue() : creates a SynchronousQueue with nonfair access policy. That means if multiple threads are waiting, they will be granted access in unspecified order.
- SynchronousQueue(boolean fair): creates a SynchronousQueue with a fairness access policy (fair = true): the waiting threads will be granted access in FIFO (First-In First-Out) order.
For example, the following statement creates a SynchronousQueue of String elements with nonfair access policy:
BlockingQueue<String> syncQueue = new SynchronousQueue<>();
And the following statement creates a SynchronousQueue of Integer elements with nonfair access policy:
BlockingQueue<Integer> syncQueue = new SynchronousQueue<>(true);
2. Insert an Element to SynchronousQueue
Use the put() method to add an element to the queue. The current thread may wait for another thread to receive the element:
try { syncQueue.put("Element"); } catch (InterruptedException ie) { ie.printStackTrace(); }
This method causes the current thread block until another thread has received the element.
3. Retrieve an Element from SynchronousQueue
Use the take() method to retrieve and remove an element from the queue. The current thread may wait for another thread to insert an element:
try { String element = syncQueue.take(); } catch (InterruptedException ie) { ie.printStackTrace(); }
This method causes the current thread block until another thread has inserted an element.
4. A SynchronousQueue Producer-Consumer Example
Let’s look at a complete example using SynchronousQueue as a shared data structure between producer and consumers.
The following code is of the Producer class:
import java.util.*; import java.util.concurrent.*; /** * A producer that puts elements to a BlockingQueue * * @author www.codejava.net */ public class Producer extends Thread { private BlockingQueue<Integer> queue; public Producer(BlockingQueue<Integer> queue) { this.queue = queue; } public void run() { while (true) { try { queue.put(produce()); } catch (InterruptedException ie) { ie.printStackTrace(); } } } private Integer produce() { Random randomer = new Random(); Integer number = randomer.nextInt(1000); try { Thread.sleep(randomer.nextInt(1000)); } catch (InterruptedException ie) { ie.printStackTrace(); } System.out.println("Producer: created number: " + number); return number; } }
As you can see, this producer simply creates random Integer numbers and put them to the queue.
And the following code is for the consumer class:
import java.util.concurrent.*; /** * A consumer that takes elements from a BlockingQueue * * @author www.codejava.net */ public class Consumer extends Thread { private BlockingQueue<Integer> queue; public Consumer(BlockingQueue<Integer> queue) { this.queue = queue; } public void run() { while (true) { try { Integer number = queue.take(); consume(number); } catch (InterruptedException ie) { ie.printStackTrace(); } } } private void consume(Integer number) { String message = "Consumer [" + getName() + "]: "; message += " processed number: " + number; System.out.println(message); } }
This consumer takes elements from the queue and processes them.
And the test program looks like this:
import java.util.concurrent.*; /** * A program tests for producer-consumer using SynchronousQueue * * @author www.codejava.net */ public class SynchronousQueueTest { static final int NUMBER_OF_CONSUMERS = 10; public static void main(String[] args) { BlockingQueue<Integer> syncQueue = new SynchronousQueue<>(); Producer producer = new Producer(syncQueue); producer.start(); Consumer[] consumers = new Consumer[NUMBER_OF_CONSUMERS]; for (int i = 0; i < NUMBER_OF_CONSUMERS; i++) { consumers[i] = new Consumer(syncQueue); consumers[i].start(); } } }
As you can see, this test program creates one producer thread and 10 consumer threads - all is sharing an instance of a SynchronousQueue.
Run this program you will see something like this:
Producer: created number: 333 Producer: created number: 673 Consumer [Thread-8]: processed number: 333 Consumer [Thread-4]: processed number: 673 Producer: created number: 167 Producer: created number: 949 Producer: created number: 234 Producer: created number: 732 Producer: created number: 307 Producer: created number: 28 Consumer [Thread-4]: processed number: 167 Consumer [Thread-3]: processed number: 28 Producer: created number: 898 Consumer [Thread-9]: processed number: 307 Consumer [Thread-6]: processed number: 732 Consumer [Thread-10]: processed number: 234 Consumer [Thread-8]: processed number: 949 Consumer [Thread-3]: processed number: 898
Note that the program runs forever so you must press Ctrl + C to stop it.
API References:
Other Java Concurrent Queues:
- Java ArrayBlockingQueue Examples
- Java DelayQueue Examples
- Java LinkedBlockingQueue Example
- Java PriorityBlockingQueue Examples
Other Java Concurrency Tutorials:
- How to use Threads in Java (create, start, pause, interrupt and join)
- Java Synchronization Tutorial
- Understanding Deadlock, Livelock and Starvation with Code Examples in Java
- Understanding Java Fork-Join Framework with Examples
- Understanding Java Thread Pool and Executors
Comments