With introduction of ForkJoinPool in Java people started getting confused weather to use ForkJoinPool or ExecutorService. In this post I am going to discuss which thread pool use in which case.
ForkJoinPool is designed to be used for CPU intensive workloads. The default number of threads in ForkJoinPool is equal to number of CPUs on the system. If any threads goes into waiting state due to calling join() on some other ForkJoinTask an new compensatory thread is started to utilize all CPUs of the system. ForkJoinPool has a common pool which can be get by calling ForkJoinPool.commonPool() static method. The aim of this design is to use only single ForkJoinPool in the system with number of threads being equal to number of processors on the system. It can utilize full computation capacity of the system if all ForkJoinTasks are doing computation intensive activities.
But in real life scenario tasks are a mix of CPU and IO intensive tasks. IO intensive task are a bad choice for a ForkJoinPool. You should use Executor service for doing IO intensive tasks. In ExecutorService you can set number of threads according to IO capacity of your system instead of CPU capacity of your system.
If you want to call an IO intensive operation from a ForkJoinTask then you should create a class which implement ForkJoinPool.ManagedBlocker interface and do IO intensive operation in block() method. You need to call your ForkJoinPool.ManagedBlocker implementation using static method ForkJoinPool.managedBlock(). This method creates a compensatory threads before calling block() method. block() method is supposed to do IO operation and store result in some instance variable. After calling ForkJoinPool.managedBlock() you are supposed to call your business method to get result of IO operation. This way you can mix CPU intensive operations with IO intensive operations. A classic example is WebCrawler where you fetch pages from internet which is an IO intensive operation and after that you need to parse the HTML page to extract links which is a CPU intensive operation.
I have not implemented a full WebCrawler but a sample code where I fetch web pages using an ExecutorService with 10 threads. I am using common pool of ForkJoinPool for submitting ForkJoinTasks. My ForkJoinTask submits the page fetch request to ExecutorService and wait for result using ForkJoinPool.managedBlock() static method. After getting the page it calculates SHA-256 sum for the content of the page and stores it in a ConcurrentHashMap. This way we can make full use of CPU capacity of the system and IO capacity of the system.
The sample code is:
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
public class ForkJoinPoolTest {
public static class FetchPage implements ForkJoinPool.ManagedBlocker {
private String url;
private ExecutorService executorSerivce;
private byte[] pageBytes;
private static ConcurrentHashMap<String,byte[]> pagesMap = new ConcurrentHashMap<>();
public FetchPage(String url, ExecutorService executorSerivce) {
this.url = url;
this.executorSerivce = executorSerivce;
}
@Override
public boolean block() throws InterruptedException {
if((pageBytes= pagesMap.get(url))!=null) {
return true;
}
Callable<byte[]> callable= new Callable<byte[]>() {
public byte[] call() throws Exception {
CloseableHttpClient client = HttpClients.createDefault();
HttpGet request = new HttpGet(url);
CloseableHttpResponse response = client.execute(request);
return EntityUtils.toByteArray(response.getEntity());
}
};
Future<byte[]> future = executorSerivce.submit(callable);
try {
pageBytes = future.get();
} catch (InterruptedException | ExecutionException e) {
pageBytes=null;
}
return true;
}
@Override
public boolean isReleasable() {
if(pageBytes!=null) {
return true;
}
return false;
}
public byte[] getPage() {
return pageBytes;
}
}
private static ConcurrentHashMap<String, String> hashPageMap = new ConcurrentHashMap<>();
public static class MyRecursiveTask extends RecursiveTask<String> {
private String url;
private ExecutorService executorSerivce;
public MyRecursiveTask(String url, ExecutorService executorSerivce) {
this.url = url;
this.executorSerivce = executorSerivce;
}
protected String compute() {
try {
FetchPage fp = new FetchPage(url,executorSerivce);
ForkJoinPool.managedBlock(fp );
byte[] bytes = fp.getPage();
if(bytes!=null) {
String code = toHexString(getSHA(bytes));
hashPageMap.put(url, code);
return code;
}
} catch (InterruptedException | NoSuchAlgorithmException e) {
return null;
}
return null;
}
}
public static void main(String[] args) {
ExecutorService executorSerivce = Executors.newFixedThreadPool(10);
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
MyRecursiveTask task1 = new MyRecursiveTask("https://www.yahoo.com", executorSerivce);
MyRecursiveTask task2 = new MyRecursiveTask("https://www.google.com", executorSerivce);
Future<String> f1 = forkJoinPool.submit(task1);
Future<String> f2 = forkJoinPool.submit(task2);
try {
String res1 = f1.get();
String res2 = f2.get();
System.out.println(res1);
System.out.println(res2);
executorSerivce.shutdown();
} catch (InterruptedException | ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public static byte[] getSHA(byte[] input) throws NoSuchAlgorithmException
{
// Static getInstance method is called with hashing SHA
MessageDigest md = MessageDigest.getInstance("SHA-256");
// digest() method called
// to calculate message digest of an input
// and return array of byte
return md.digest(input);
}
public static String toHexString(byte[] hash)
{
// Convert byte array into signum representation
BigInteger number = new BigInteger(1, hash);
// Convert message digest into hex value
StringBuilder hexString = new StringBuilder(number.toString(16));
// Pad with leading zeros
while (hexString.length() < 32)
{
hexString.insert(0, '0');
}
return hexString.toString();
}
}
This article is also published at GeeksForGeeks