public class CustomCommonPoolSize {
public void testParallelOperation() {
long start = System.currentTimeMillis();
IntStream s = IntStream.range(0, 20);
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
s.parallel().forEach(i -> {
try {
Thread.sleep(100);
} catch (Exception ignore) {}
System.out.print((System.currentTimeMillis() - start) + " ms");
});
System.out.println("\nOverall time consumed: "+ (System.currentTimeMillis() - start)+" ms");
}
}
Java 8 Parallel Stream custom ThreadPool
Upasana | August 23, 2020 | 2 min read | 3,876 views | Multithreading and Concurrency
There are two approaches to configure custom sized thread pool for Java 8 parallel stream operations - configure default common pool and running parallel stream operation inside ForkJoinPool.
Java does not provide any direct mechanism to control the number of threads and ThreadPool used by parallel()
method in stream API, but there are two indirect way to configure the same.
Configure default Common Pool
Its documented that parallel() method utilizes the common pool available per classloader per jvm, and we have a mechanism to control the configuration of that default common pool using below 3 System properties
- java.util.concurrent.ForkJoinPool.common.parallelism
-
The parallelism level, a non-negative integer
- java.util.concurrent.ForkJoinPool.common.threadFactory
-
The class name of a ForkJoinPool.ForkJoinWorkerThreadFactory
- java.util.concurrent.ForkJoinPool.common.exceptionHandler
-
The class name of a Thread.UncaughtExceptionHandler
For example, set the System property before calling the parallel stream
192 192 192 192 192 192 192 192 192 192 192 192 192 192 192 192 192 192 192 192
Overall time consumed: 193 ms
Se see that all 20 tasks run in parallel and this the overall time is just 193 ms, even if individual task was taking 192ms each.
Run the parallel() operation inside a custom ForkJoinPool
There actually is a trick how to execute a parallel operation in a specific fork-join pool. If you execute it as a task in a fork-join pool, it stays there and does not use the common one. The trick is based on ForkJoinTask.
Fork documentation which specifies:
Arranges to asynchronously execute this task in the pool the current task is running in, if applicable, or using the ForkJoinPool.commonPool() if not in ForkJoinPool()
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;
import static java.lang.Math.sqrt;
import static java.util.stream.Collectors.toList;
class StreamExampleJava8 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinPool forkJoinPool = new ForkJoinPool(4); // Configure the number of threads
forkJoinPool.submit(() -> IntStream.range(1, 1_000_000)
.parallel()
.filter(StreamExampleJava8::isPrime).boxed()
.collect(toList()))
.get();
forkJoinPool.shutdown();
}
private static boolean isPrime(long n) {
return n > 1 && IntStream.rangeClosed(2, (int) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
}
}
In above code block, fork join pool will create 4 threads and run the parallel operations inside this custom fork join pool.
That’s all for this tutorial.
Top articles in this category:
- Custom Thread pool implementation in Java
- How will you implement your custom threadsafe Semaphore in Java
- Count word frequency in Java
- How will you increment each element of an Integer array, using parallel operation
- Diamond Problem of Inheritance in Java 8
- Can two threads call two different synchronized instance methods of an Object?
- ThreadLocal with examples in Java