Understanding Java Fork-Join Framework with Examples
- Details
- Written by Nam Ha Minh
- Last Updated on 13 August 2019   |   Print Email
- In parallel execution, each thread is executed in a separate processing core. Therefore, tasks are really executed in true parallel fashion.
- In concurrent execution, the threads are executed on a same core. That means tasks are actually executed in interleave fashion, sharing processing time of a processing core.
Don’t worry if you think parallel programming is complex and difficult, as you will see the Fork/Join framework makes it easy for programmers.Continue reading because parallel programming will be part of every programmer’s future.1. What is Fork/Join Framework?
Fork/Join framework is a set of APIs that allow programmers to take advantage of parallel execution supported by multicore processors. It uses ‘divide-and-conquer’ strategy: divide a very large problem into smaller parts, which in turn, the small part can be divided further into smaller ones, recursively until a part can be solved directly. This is called ‘fork’.Then all parts are executed in parallel on multiple processing cores. The results of each part are ‘joined’ together to produce the final result. Hence the name of the framework ‘Fork/Join’.The following pseudo code illustrates how the divide and conquer strategies work with Fork/Join framework:if (problemSize < threshold)
solve problem directly
else {
break problem into subproblems
recursively solve each problem
combine the results
}
Fork/Join framework is added to JDK since Java 7 and improved in Java 8. It is used by several new features in the Java programming language, including Streams API and sorting an array in parallel.- It simplifies thread creation. Threads are created and managed automatically.
- It automatically makes use of multiple processors so programs can scale to make use of available processors.
With support for true parallel execution, Fork/Join framework can significantly reduce computation time and increase performance in solving very large problems such as image processing, video processing, big data processing, etc.One interesting point about Fork/Join framework: it uses a work stealing algorithm to balance the load among threads: if a worker thread runs out of things to do, it can steal tasks from other threads that are still busy.2. Understand Fork/Join Framework’s API
The Fork/Join framework API is implemented in the java.util.concurrent package. At its core are the following 4 classes:- ForkJoinTask<V>: an abstract class that defines a task that runs within a ForkJoinPool.
- ForkJoinPool: a thread pool that manages the execution of ForkJoinTasks.
- RecursiveAction: a ForkJoinTask’s subclass for tasks that don’t return values.
- RecursiveTask<V>: a ForkJoinTask’s subclass for tasks that return values.
ForkJoinTask<V>
This is the abstract base class for tasks that run within a ForkJoinPool. The type parameter V specifies the result type of the task. A ForkJoinTask is a thread-like entity that represents lightweight abstraction of a task, rather than an actual thread of execution. This mechanism allows a large number o tasks to be managed by a small number of actual threads in a ForkJoinPool. Its key methods are:- final ForkJoinTask<V> fork()
- final V join()
- final V invoke()
- static void invokeAll(ForkJoinTask<?> task1, ForkJoinTask<?> task2): execute two tasks.
- static void invokeAll(ForkJoinTask<?>… taskList): execute a list of tasks.
RecursiveAction:
This is a recursive ForkJoinTaskthat doesn’t return a result. “Recursive” means that the task can be split into subtasks of itself by divide-and-conquer strategy (you’ll see how to divide in the code examples in the next section below).You must override its abstract method compute() in which computational code is put.protected abstract void compute();
RecursiveTask<V>:
Similar to RecursiveAction, but a RecursiveTask returns a result whose type is specified by the type parameter V. You also must to put computational code by overriding the compute() method:protected abstract V compute();
ForkJoinPool:
This class is the heart of Fork/Join framework. It’s responsible for the management of threads and execution of ForkJoinTasks. You must first have an instance of ForkJoinPool in order to execute ForkJoinTasks.There are two ways for acquiring a ForkJoinPool instance. The first way creates a ForkJoinPool object using one of its constructors:- ForkJoinPool(): creates a default pool that supports a level of parallelism equal to the number of processors available in the system.
- ForkJoinPool(int parallelism): creates a pool with a custom level of parallelism which must be greater than 0 and not more than the actual number of processors available.
public static ForkJoinPool commonPool()
The common pool is statically constructed and automatically available for use.Execute ForkJoinTasks in a ForkJoinPool
After you have created an instance of ForkJoinPool, you can start executing a task using one of the following methods:- <T> T invoke(ForkJoinTask<T> task): executes the specified task and returns its result upon completion. This call is synchronous, meaning that the calling thread waits until this method returns. For a resultless task (RecursiveAction), the type parameter Tis Void.
- void execute(ForkJoinTask<?> task): executes the specified task asynchronously - the calling code doesn’t wait for the task’s completion - it continues to run.
3. Example #1 - Using RecursiveAction
In this first example, you will learn how to use the Fork/Join framework to execute a task that doesn’t return a result, by extending the RecursiveAction class.Suppose that we need to do a transformation on a very large array of numbers. For the sake of simplicity, the transformation is simply multiply every element in the array by a specified number. The following code is for the transformation task:import java.util.concurrent.*; /** * This class illustrates how to create a ForkJoinTask that does not return * a result. * @author www.codejava.net */ public class ArrayTransform extends RecursiveAction { int[] array; int number; int threshold = 100_000; int start; int end; public ArrayTransform(int[] array, int number, int start, int end) { this.array = array; this.number = number; this.start = start; this.end = end; } protected void compute() { if (end - start < threshold) { computeDirectly(); } else { int middle = (end + start) / 2; ArrayTransform subTask1 = new ArrayTransform(array, number, start, middle); ArrayTransform subTask2 = new ArrayTransform(array, number, middle, end); invokeAll(subTask1, subTask2); } } protected void computeDirectly() { for (int i = start; i < end; i++) { array[i] = array[i] * number; } } }As you can see, this is a subclass of RecursiveAction and it implements the computation in the compute() method.The array and number are passed from its constructor. The parameters start and end specify the range of elements in the array to be processed. This helps splitting the array into sub arrays if its size is greater than a threshold, otherwise perform the computation on the whole array directly.Look at the code snippet in the else block in the compute() method:
protected void compute() { if (end - start < threshold) { computeDirectly(); } else { int middle = (end + start) / 2; ArrayTransform subTask1 = new ArrayTransform(array, number, start, middle); ArrayTransform subTask2 = new ArrayTransform(array, number, middle, end); invokeAll(subTask1, subTask2); } }Here we divide the array into 2 parts and create two subtasks that process each. In turn, the subtask may be also divided further into smaller subtasks recursively until the size is less than the threshold, which invokes the computeDirectly() method.And then you can execute the main task on a ForkJoinPool like this:
ArrayTransform mainTask = new ArrayTransform(array, number, 0, SIZE); ForkJoinPool pool = new ForkJoinPool(); pool.invoke(mainTask);or execute the task on the common pool:
ArrayTransform mainTask = new ArrayTransform(array, number, 0, SIZE); mainTask.invoke();Here’s the full source code of the test program:
import java.util.*; import java.util.concurrent.*; /** * This program demonstrates how to execute a resultless ForkJoinTask in * a ForkJoinPool * @author www.codejava.net */ public class ForkJoinRecursiveActionTest { static final int SIZE = 10_000_000; static int[] array = randomArray(); public static void main(String[] args) { int number = 9; System.out.println("First 10 elements of the array before: "); print(); ArrayTransform mainTask = new ArrayTransform(array, number, 0, SIZE); ForkJoinPool pool = new ForkJoinPool(); pool.invoke(mainTask); System.out.println("First 10 elements of the array after: "); print(); } static int[] randomArray() { int[] array = new int[SIZE]; Random random = new Random(); for (int i = 0; i < SIZE; i++) { array[i] = random.nextInt(100); } return array; } static void print() { for (int i = 0; i < 10; i++) { System.out.print(array[i] + ", "); } System.out.println(); } }As you can see, we test with an array of 10 million elements that are randomly generated. As the array is too large, we print only the first 10 elements before and after the computation to see the effect:
First 10 elements of the array before: 42, 98, 43, 14, 9, 92, 33, 18, 18, 76, First 10 elements of the array after: 378, 882, 387, 126, 81, 828, 297, 162, 162, 684,
4. Example #2 - Using RecursiveTask
In this second example, you will learn how to implement a task that returns a result. The following task counts the occurrences of even numbers in a large array:import java.util.concurrent.*; /** * This class illustrates how to create a ForkJoinTask that returns a result. * @author www.codejava.net */ public class ArrayCounter extends RecursiveTask<Integer> { int[] array; int threshold = 100_000; int start; int end; public ArrayCounter(int[] array, int start, int end) { this.array = array; this.start = start; this.end = end; } protected Integer compute() { if (end - start < threshold) { return computeDirectly(); } else { int middle = (end + start) / 2; ArrayCounter subTask1 = new ArrayCounter(array, start, middle); ArrayCounter subTask2 = new ArrayCounter(array, middle, end); invokeAll(subTask1, subTask2); return subTask1.join() + subTask2.join(); } } protected Integer computeDirectly() { Integer count = 0; for (int i = start; i < end; i++) { if (array[i] % 2 == 0) { count++; } } return count; } }As you can see, this class extends the RecursiveTask and overrides the compute() method that returns a result (an Integer in this case).And note that we use the join() method to combine the results of subtasks:
return subTask1.join() + subTask2.join();The test program is similar to the RecursiveAction example:
import java.util.*; import java.util.concurrent.*; /** * This program demonstrates how to execute a ForkJoinTask that returns * a result in a ForkJoinPool * @author www.codejava.net */ public class ForkJoinRecursiveTaskTest { static final int SIZE = 10_000_000; static int[] array = randomArray(); public static void main(String[] args) { ArrayCounter mainTask = new ArrayCounter(array, 0, SIZE); ForkJoinPool pool = new ForkJoinPool(); Integer evenNumberCount = pool.invoke(mainTask); System.out.println("Number of even numbers: " + evenNumberCount); } static int[] randomArray() { int[] array = new int[SIZE]; Random random = new Random(); for (int i = 0; i < SIZE; i++) { array[i] = random.nextInt(100); } return array; } }Run this program and you will see the output something like this:
Number of even numbers: 5000045
5. Example #3 - Experiment with Parallelism
In this last example, you will learn how the level of parallelism affects the computation time.The ArrayCounter class is rewritten to have the threshold passed from constructor like this:import java.util.concurrent.*; /** * This class illustrates how to create a ForkJoinTask that returns a result. * @author www.codejava.net */ public class ArrayCounter extends RecursiveTask<Integer> { int[] array; int threshold; int start; int end; public ArrayCounter(int[] array, int start, int end, int threshold) { this.array = array; this.start = start; this.end = end; this.threshold = threshold; } protected Integer compute() { if (end - start < threshold) { return computeDirectly(); } else { int middle = (end + start) / 2; ArrayCounter subTask1 = new ArrayCounter(array, start, middle, threshold); ArrayCounter subTask2 = new ArrayCounter(array, middle, end, threshold); invokeAll(subTask1, subTask2); return subTask1.join() + subTask2.join(); } } protected Integer computeDirectly() { Integer count = 0; for (int i = start; i < end; i++) { if (array[i] % 2 == 0) { count++; } } return count; } }And in the test program, the level of parallelism and threshold are passed as arguments to the program:
import java.util.*; import java.util.concurrent.*; /** * This program allows you to easily test performance for ForkJoinPool * with different values of parallelism and threshold. * @author www.codejava.net */ public class ParallelismTest { static final int SIZE = 10_000_000; static int[] array = randomArray(); public static void main(String[] args) { int threshold = Integer.parseInt(args[0]); int parallelism = Integer.parseInt(args[1]); long startTime = System.currentTimeMillis(); ArrayCounter mainTask = new ArrayCounter(array, 0, SIZE, threshold); ForkJoinPool pool = new ForkJoinPool(parallelism); Integer evenNumberCount = pool.invoke(mainTask); long endTime = System.currentTimeMillis(); System.out.println("Number of even numbers: " + evenNumberCount); long time = (endTime - startTime); System.out.println("Execution time: " + time + " ms"); } static int[] randomArray() { int[] array = new int[SIZE]; Random random = new Random(); for (int i = 0; i < SIZE; i++) { array[i] = random.nextInt(100); } return array; } }This program allows you to easily test the performance with different values of parallelism and threshold. Note that it prints the execution time at the end. Try to run this program several times with different arguments and observe the execution time. Here are the suggested commands:
java ParallelismTest 1 100000 java ParallelismTest 2 100000 java ParallelismTest 3 100000 java ParallelismTest 4 100000 java ParallelismTest 2 500000 java ParallelismTest 4 500000 …
6. Conclusion
So far I have walked you through a lesson about Fork/Join framework. Here are the key points to remember:- Fork/Join framework is designed to simplify parallel programming for Java programmers.
- ForkJoinPool is the heart of Fork/Join framework. It allows many ForkJoinTasks to be executed by a small number of actual threads, with each thread running on a separate processing core.
- You can obtain an instance of ForkJoinPool by either using its constructor or static method commonPool() that returns the common pool.
- ForkJoinTask is an abstract class that represents a task that is lighter weight than a normal thread. You implement the computation logic by overriding its compute() method.
- RecursiveAction is a ForkJoinTask that doesn’t return a result.
- RecursiveTask is a ForkJoinTask that returns a result.
- ForkJoinPool is different than other pools as it uses work stealing algorithm which allows a thread that runs out of things to do, to steal tasks from other threads that are still busy.
- Threads in ForkJoinPool are daemon. You don’t have to explicitly shutdown the pool.
- You can execute a ForkJoinTask either by invoking its own methods invoke() or fork(), or by submitting the task to a ForkJoinPool and then call invoke() or execute() on the pool.
- Calling invoke() or fork() on a ForkJoinTask will cause the task to run in the common pool, if it is not already running in a ForkJoinPool.
- Use the join() method on ForkJoinTasks to combine the results.
- The invoke() method waits for the task’s completion, but the execute() method does not.
API References:
Other Java Concurrency Tutorials:
- How to use Threads in Java (create, start, pause, interrupt and join)
- Understanding Deadlock, Livelock and Starvation with Code Examples in Java
- Java Synchronization Tutorial
- Understand Thread Pool and Executors
- Understanding Atomic Variables in Java
Comments
java ParallelismTest 1 100000
java ParallelismTest 2 100000
java ParallelismTest 3 100000
…
Parameters from command line:
int threshold = Integer.parseInt(args[0]);
int parallelism = Integer.parseInt(args[1]);
So threshold = 1(!)
and parallelism = 100_000(!)
Is an error?!
I think that means the number of the "real concurrent" threads - operating system threads - not threads in Java.
Can you explain this.. we can have more thread than processor