This Java Concurrency tutorial helps you understand SynchronousQueue - a special BlockingQueue implementation with no internal capacity.

SynchronousQueue is designed to hand off objects from a producer thread to consumer threads synchronously: when the producer thread puts an object to the queue, it must wait for a corresponding take from a consumer thread, and vice versa: when the consumer thread wants to take element from the queue, it must wait for a corresponding put from the producer thread.

Imagine when I give you a book, for example, without any table for me to put the book on. So I must give you the book directly, when you show your hand, right? And you show your hand only if I show my hand, right? Think SynchronousQueueworks like that.

Elements are transferred from the producer to the consumers without being held in the queue, unlike other blocking queues to which the elements are put, waiting to be taken by consumer threads. So SynchronousQueue is a choice when there are enough consumer threads so the producer thread doesn’t have to enqueue elements.

Since SynchronousQueue has no capacity, it acts like an empty collection and has special behaviors:

- isEmpty(): always returns true

- iterator(): returns an empty iterator in which hasNext() always returns false

- peek(): always returns null

- size(): always return zero.

Now, let’s see how to use SynchronousQueue in details with code examples.

 

1. Create a new SynchronousQueue

There are two constructors that allow you to create a SynchronousQueue with different access policy:

  • SynchronousQueue() : creates a SynchronousQueue with nonfair access policy. That means if multiple threads are waiting, they will be granted access in unspecified order.
  • SynchronousQueue(boolean fair): creates a SynchronousQueue with a fairness access policy (fair = true): the waiting threads will be granted access in FIFO (First-In First-Out) order.

For example, the following statement creates a SynchronousQueue of String elements with nonfair access policy:

BlockingQueue<String> syncQueue = new SynchronousQueue<>();

And the following statement creates a SynchronousQueue of Integer elements with nonfair access policy:

BlockingQueue<Integer> syncQueue = new SynchronousQueue<>(true);

 

2. Insert an Element to SynchronousQueue

Use the put() method to add an element to the queue. The current thread may wait for another thread to receive the element:

try {
	syncQueue.put("Element");
} catch (InterruptedException ie) {
	ie.printStackTrace();
}

This method causes the current thread block until another thread has received the element.

 

3. Retrieve an Element from SynchronousQueue

Use the take() method to retrieve and remove an element from the queue. The current thread may wait for another thread to insert an element:

try {
	String element = syncQueue.take();
} catch (InterruptedException ie) {
	ie.printStackTrace();
}

This method causes the current thread block until another thread has inserted an element.

 

4. A SynchronousQueue Producer-Consumer Example

Let’s look at a complete example using SynchronousQueue as a shared data structure between producer and consumers.

The following code is of the Producer class:

import java.util.*;
import java.util.concurrent.*;

/**
 * A producer that puts elements to a BlockingQueue
 *
 * @author www.codejava.net
 */
public class Producer extends Thread {
	private BlockingQueue<Integer> queue;

	public Producer(BlockingQueue<Integer> queue) {
		this.queue = queue;
	}

	public void run() {
		while (true) {
			try {

				queue.put(produce());

			} catch (InterruptedException ie) {
				ie.printStackTrace();
			}
		}
	}

	private Integer produce() {
		Random randomer = new Random();
		Integer number = randomer.nextInt(1000);

		try {
			Thread.sleep(randomer.nextInt(1000));
		} catch (InterruptedException ie) {
			ie.printStackTrace();
		}

		System.out.println("Producer: created number: " + number);

		return number;
	}
}

As you can see, this producer simply creates random Integer numbers and put them to the queue.

And the following code is for the consumer class:

import java.util.concurrent.*;

/**
 * A consumer that takes elements from a BlockingQueue
 *
 * @author www.codejava.net
 */
public class Consumer extends Thread {
	private BlockingQueue<Integer> queue;

	public Consumer(BlockingQueue<Integer> queue) {
		this.queue = queue;
	}

	public void run() {
		while (true) {
			try {

				Integer number = queue.take();
				consume(number);

			} catch (InterruptedException ie) {
				ie.printStackTrace();
			}
		}
	}

	private void consume(Integer number) {
		String message = "Consumer [" + getName() + "]: ";
		message += " processed number: " + number;
		System.out.println(message);
	}
}

This consumer takes elements from the queue and processes them.

And the test program looks like this:

import java.util.concurrent.*;

/**
 * A program tests for producer-consumer using SynchronousQueue
 *
 * @author www.codejava.net
 */
public class SynchronousQueueTest {
	static final int NUMBER_OF_CONSUMERS = 10;

	public static void main(String[] args) {

		BlockingQueue<Integer> syncQueue = new SynchronousQueue<>();

		Producer producer = new Producer(syncQueue);
		producer.start();

		Consumer[] consumers = new Consumer[NUMBER_OF_CONSUMERS];

		for (int i = 0; i < NUMBER_OF_CONSUMERS; i++) {
			consumers[i] = new Consumer(syncQueue);
			consumers[i].start();
		}
	}
}

As you can see, this test program creates one producer thread and 10 consumer threads - all is sharing an instance of a SynchronousQueue.

Run this program you will see something like this:

Producer: created number: 333
Producer: created number: 673
Consumer [Thread-8]:  processed number: 333
Consumer [Thread-4]:  processed number: 673
Producer: created number: 167
Producer: created number: 949
Producer: created number: 234
Producer: created number: 732
Producer: created number: 307
Producer: created number: 28
Consumer [Thread-4]:  processed number: 167
Consumer [Thread-3]:  processed number: 28
Producer: created number: 898
Consumer [Thread-9]:  processed number: 307
Consumer [Thread-6]:  processed number: 732
Consumer [Thread-10]:  processed number: 234
Consumer [Thread-8]:  processed number: 949
Consumer [Thread-3]:  processed number: 898

Note that the program runs forever so you must press Ctrl + C to stop it.

 

API References:

 

Other Java Concurrent Queues:

 

Other Java Concurrency Tutorials:


About the Author:

is certified Java programmer (SCJP and SCWCD). He began programming with Java back in the days of Java 1.4 and has been passionate about it ever since. You can connect with him on Facebook and watch his Java videos on YouTube.

Add comment