Syntax highlighter header

Monday, 20 July 2020

Tuning ThreadPoolExecutor and BlockingQueueSize

ThreadPoolExecutor uses a blocking queue for storing requests till some executor thread is available to execute the request. But if request are arriving at a rate higher than the rate at which requests are getting processed then this queue will keep growing and ultimately will lead to OutOfMemoryError and application with fail.

Solution to this problem is to use a bounded queue with fixed size and reject requests when queue is full. The problem is to find optimum size of the queue so that it can be sufficient to buffer the busts in requests and not too big to fail the system or provide late response which is as good as failure.

I conducted a small experiment using the following code to find out optimum queue size for ThreadPoolExecutor. This code uses a SynchronusQueue which has a queue size 0. This code stabilizes at 18 worker threads. When I use a Linked blocking Queue with size 10 this code stabalizes a 13 worker threads and stable queue size being 0 or 1 items in queue. When number of worker threads were 10 then the queue size increased to 10 and that lead to increase in worker threads but once worker threads were increased to 13 the requirement of queue size decreased. With queue size of 20 also thread pool stabilized at 13 worker threads. 13 worker threads were needed for processing average work load so increasing queue size to 20 did not help and was filled up eventually. Queue size of 10 was good enough. 

My recommendation is to keep the queue size equal to number of core threads.


package test;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolTest {
 
 public static class Producer implements Runnable {
  ThreadPoolExecutor threadPool;
  public Producer(ThreadPoolExecutor es) {
   threadPool =es;
  }

  @Override
  public void run() {
   for(int i=0; i<180; i++) {
    int delay = ThreadLocalRandom.current().nextInt(2000);
    try {
     Thread.sleep(delay);
    } catch (InterruptedException e) {
     
    }
    threadPool.execute(new Task());
    int queueSize = threadPool.getQueue().size();
    int poolSize = threadPool.getPoolSize();
    System.out.println("Submitted task, queueSize="+ queueSize +" poolSize="+ poolSize);
   }   
  }
  
 }
 
 public static class Task implements Runnable{

  @Override
  public void run() {
   int delay = ThreadLocalRandom.current().nextInt(2000);
   try {
    Thread.sleep(delay);
   } catch (InterruptedException e) {
    return;
   }   
  }
  
 }

 public static void main(String[] args) {
  BlockingQueue bq = new SynchronousQueue<>();
  ThreadPoolExecutor tp = new ThreadPoolExecutor(10, 20, 2, TimeUnit.MINUTES,bq);
  for(int i=0; i<10; i++) {
   new Thread(new Producer(tp)).start();   
  }

 }

}

No comments:

Post a Comment