- In parallel execution, each thread is executed in a separate processing core. Therefore, tasks are really executed in true parallel fashion.
- In concurrent execution, the threads are executed on a same core. That means tasks are actually executed in interleave fashion, sharing processing time of a processing core.
Don’t worry if you think parallel programming is complex and difficult, as you will see the Fork/Join framework makes it easy for programmers.Continue reading because parallel programming will be part of every programmer’s future.if (problemSize < threshold)
solve problem directly
else {
break problem into subproblems
recursively solve each problem
combine the results
}
Fork/Join framework is added to JDK since Java 7 and improved in Java 8. It is used by several new features in the Java programming language, including Streams API and sorting an array in parallel.- It simplifies thread creation. Threads are created and managed automatically.
- It automatically makes use of multiple processors so programs can scale to make use of available processors.
With support for true parallel execution, Fork/Join framework can significantly reduce computation time and increase performance in solving very large problems such as image processing, video processing, big data processing, etc.One interesting point about Fork/Join framework: it uses a work stealing algorithm to balance the load among threads: if a worker thread runs out of things to do, it can steal tasks from other threads that are still busy.protected abstract void compute();
protected abstract V compute();
public static ForkJoinPool commonPool()
The common pool is statically constructed and automatically available for use.import java.util.concurrent.*; /** * This class illustrates how to create a ForkJoinTask that does not return * a result. * @author www.codejava.net */ public class ArrayTransform extends RecursiveAction { int[] array; int number; int threshold = 100_000; int start; int end; public ArrayTransform(int[] array, int number, int start, int end) { this.array = array; this.number = number; this.start = start; this.end = end; } protected void compute() { if (end - start < threshold) { computeDirectly(); } else { int middle = (end + start) / 2; ArrayTransform subTask1 = new ArrayTransform(array, number, start, middle); ArrayTransform subTask2 = new ArrayTransform(array, number, middle, end); invokeAll(subTask1, subTask2); } } protected void computeDirectly() { for (int i = start; i < end; i++) { array[i] = array[i] * number; } } }As you can see, this is a subclass of RecursiveAction and it implements the computation in the compute() method.The array and number are passed from its constructor. The parameters start and end specify the range of elements in the array to be processed. This helps splitting the array into sub arrays if its size is greater than a threshold, otherwise perform the computation on the whole array directly.Look at the code snippet in the else block in the compute() method:
protected void compute() { if (end - start < threshold) { computeDirectly(); } else { int middle = (end + start) / 2; ArrayTransform subTask1 = new ArrayTransform(array, number, start, middle); ArrayTransform subTask2 = new ArrayTransform(array, number, middle, end); invokeAll(subTask1, subTask2); } }Here we divide the array into 2 parts and create two subtasks that process each. In turn, the subtask may be also divided further into smaller subtasks recursively until the size is less than the threshold, which invokes the computeDirectly() method.And then you can execute the main task on a ForkJoinPool like this:
ArrayTransform mainTask = new ArrayTransform(array, number, 0, SIZE); ForkJoinPool pool = new ForkJoinPool(); pool.invoke(mainTask);or execute the task on the common pool:
ArrayTransform mainTask = new ArrayTransform(array, number, 0, SIZE); mainTask.invoke();Here’s the full source code of the test program:
import java.util.*; import java.util.concurrent.*; /** * This program demonstrates how to execute a resultless ForkJoinTask in * a ForkJoinPool * @author www.codejava.net */ public class ForkJoinRecursiveActionTest { static final int SIZE = 10_000_000; static int[] array = randomArray(); public static void main(String[] args) { int number = 9; System.out.println("First 10 elements of the array before: "); print(); ArrayTransform mainTask = new ArrayTransform(array, number, 0, SIZE); ForkJoinPool pool = new ForkJoinPool(); pool.invoke(mainTask); System.out.println("First 10 elements of the array after: "); print(); } static int[] randomArray() { int[] array = new int[SIZE]; Random random = new Random(); for (int i = 0; i < SIZE; i++) { array[i] = random.nextInt(100); } return array; } static void print() { for (int i = 0; i < 10; i++) { System.out.print(array[i] + ", "); } System.out.println(); } }As you can see, we test with an array of 10 million elements that are randomly generated. As the array is too large, we print only the first 10 elements before and after the computation to see the effect:
First 10 elements of the array before: 42, 98, 43, 14, 9, 92, 33, 18, 18, 76, First 10 elements of the array after: 378, 882, 387, 126, 81, 828, 297, 162, 162, 684,
import java.util.concurrent.*; /** * This class illustrates how to create a ForkJoinTask that returns a result. * @author www.codejava.net */ public class ArrayCounter extends RecursiveTask<Integer> { int[] array; int threshold = 100_000; int start; int end; public ArrayCounter(int[] array, int start, int end) { this.array = array; this.start = start; this.end = end; } protected Integer compute() { if (end - start < threshold) { return computeDirectly(); } else { int middle = (end + start) / 2; ArrayCounter subTask1 = new ArrayCounter(array, start, middle); ArrayCounter subTask2 = new ArrayCounter(array, middle, end); invokeAll(subTask1, subTask2); return subTask1.join() + subTask2.join(); } } protected Integer computeDirectly() { Integer count = 0; for (int i = start; i < end; i++) { if (array[i] % 2 == 0) { count++; } } return count; } }As you can see, this class extends the RecursiveTask and overrides the compute() method that returns a result (an Integer in this case).And note that we use the join() method to combine the results of subtasks:
return subTask1.join() + subTask2.join();The test program is similar to the RecursiveAction example:
import java.util.*; import java.util.concurrent.*; /** * This program demonstrates how to execute a ForkJoinTask that returns * a result in a ForkJoinPool * @author www.codejava.net */ public class ForkJoinRecursiveTaskTest { static final int SIZE = 10_000_000; static int[] array = randomArray(); public static void main(String[] args) { ArrayCounter mainTask = new ArrayCounter(array, 0, SIZE); ForkJoinPool pool = new ForkJoinPool(); Integer evenNumberCount = pool.invoke(mainTask); System.out.println("Number of even numbers: " + evenNumberCount); } static int[] randomArray() { int[] array = new int[SIZE]; Random random = new Random(); for (int i = 0; i < SIZE; i++) { array[i] = random.nextInt(100); } return array; } }Run this program and you will see the output something like this:
Number of even numbers: 5000045
import java.util.concurrent.*; /** * This class illustrates how to create a ForkJoinTask that returns a result. * @author www.codejava.net */ public class ArrayCounter extends RecursiveTask<Integer> { int[] array; int threshold; int start; int end; public ArrayCounter(int[] array, int start, int end, int threshold) { this.array = array; this.start = start; this.end = end; this.threshold = threshold; } protected Integer compute() { if (end - start < threshold) { return computeDirectly(); } else { int middle = (end + start) / 2; ArrayCounter subTask1 = new ArrayCounter(array, start, middle, threshold); ArrayCounter subTask2 = new ArrayCounter(array, middle, end, threshold); invokeAll(subTask1, subTask2); return subTask1.join() + subTask2.join(); } } protected Integer computeDirectly() { Integer count = 0; for (int i = start; i < end; i++) { if (array[i] % 2 == 0) { count++; } } return count; } }And in the test program, the level of parallelism and threshold are passed as arguments to the program:
import java.util.*; import java.util.concurrent.*; /** * This program allows you to easily test performance for ForkJoinPool * with different values of parallelism and threshold. * @author www.codejava.net */ public class ParallelismTest { static final int SIZE = 10_000_000; static int[] array = randomArray(); public static void main(String[] args) { int threshold = Integer.parseInt(args[0]); int parallelism = Integer.parseInt(args[1]); long startTime = System.currentTimeMillis(); ArrayCounter mainTask = new ArrayCounter(array, 0, SIZE, threshold); ForkJoinPool pool = new ForkJoinPool(parallelism); Integer evenNumberCount = pool.invoke(mainTask); long endTime = System.currentTimeMillis(); System.out.println("Number of even numbers: " + evenNumberCount); long time = (endTime - startTime); System.out.println("Execution time: " + time + " ms"); } static int[] randomArray() { int[] array = new int[SIZE]; Random random = new Random(); for (int i = 0; i < SIZE; i++) { array[i] = random.nextInt(100); } return array; } }This program allows you to easily test the performance with different values of parallelism and threshold. Note that it prints the execution time at the end. Try to run this program several times with different arguments and observe the execution time. Here are the suggested commands:
java ParallelismTest 1 100000 java ParallelismTest 2 100000 java ParallelismTest 3 100000 java ParallelismTest 4 100000 java ParallelismTest 2 500000 java ParallelismTest 4 500000 …
- Fork/Join framework is designed to simplify parallel programming for Java programmers.
- ForkJoinPool is the heart of Fork/Join framework. It allows many ForkJoinTasks to be executed by a small number of actual threads, with each thread running on a separate processing core.
- You can obtain an instance of ForkJoinPool by either using its constructor or static method commonPool() that returns the common pool.
- ForkJoinTask is an abstract class that represents a task that is lighter weight than a normal thread. You implement the computation logic by overriding its compute() method.
- RecursiveAction is a ForkJoinTask that doesn’t return a result.
- RecursiveTask is a ForkJoinTask that returns a result.
- ForkJoinPool is different than other pools as it uses work stealing algorithm which allows a thread that runs out of things to do, to steal tasks from other threads that are still busy.
- Threads in ForkJoinPool are daemon. You don’t have to explicitly shutdown the pool.
- You can execute a ForkJoinTask either by invoking its own methods invoke() or fork(), or by submitting the task to a ForkJoinPool and then call invoke() or execute() on the pool.
- Calling invoke() or fork() on a ForkJoinTask will cause the task to run in the common pool, if it is not already running in a ForkJoinPool.
- Use the join() method on ForkJoinTasks to combine the results.
- The invoke() method waits for the task’s completion, but the execute() method does not.