Syntax highlighter header

Monday, 20 July 2020

Tuning ThreadPoolExecutor and BlockingQueueSize

ThreadPoolExecutor uses a blocking queue for storing requests till some executor thread is available to execute the request. But if request are arriving at a rate higher than the rate at which requests are getting processed then this queue will keep growing and ultimately will lead to OutOfMemoryError and application with fail.

Solution to this problem is to use a bounded queue with fixed size and reject requests when queue is full. The problem is to find optimum size of the queue so that it can be sufficient to buffer the busts in requests and not too big to fail the system or provide late response which is as good as failure.

I conducted a small experiment using the following code to find out optimum queue size for ThreadPoolExecutor. This code uses a SynchronusQueue which has a queue size 0. This code stabilizes at 18 worker threads. When I use a Linked blocking Queue with size 10 this code stabalizes a 13 worker threads and stable queue size being 0 or 1 items in queue. When number of worker threads were 10 then the queue size increased to 10 and that lead to increase in worker threads but once worker threads were increased to 13 the requirement of queue size decreased. With queue size of 20 also thread pool stabilized at 13 worker threads. 13 worker threads were needed for processing average work load so increasing queue size to 20 did not help and was filled up eventually. Queue size of 10 was good enough. 

My recommendation is to keep the queue size equal to number of core threads.


package test;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolTest {
 
 public static class Producer implements Runnable {
  ThreadPoolExecutor threadPool;
  public Producer(ThreadPoolExecutor es) {
   threadPool =es;
  }

  @Override
  public void run() {
   for(int i=0; i<180; i++) {
    int delay = ThreadLocalRandom.current().nextInt(2000);
    try {
     Thread.sleep(delay);
    } catch (InterruptedException e) {
     
    }
    threadPool.execute(new Task());
    int queueSize = threadPool.getQueue().size();
    int poolSize = threadPool.getPoolSize();
    System.out.println("Submitted task, queueSize="+ queueSize +" poolSize="+ poolSize);
   }   
  }
  
 }
 
 public static class Task implements Runnable{

  @Override
  public void run() {
   int delay = ThreadLocalRandom.current().nextInt(2000);
   try {
    Thread.sleep(delay);
   } catch (InterruptedException e) {
    return;
   }   
  }
  
 }

 public static void main(String[] args) {
  BlockingQueue bq = new SynchronousQueue<>();
  ThreadPoolExecutor tp = new ThreadPoolExecutor(10, 20, 2, TimeUnit.MINUTES,bq);
  for(int i=0; i<10; i++) {
   new Thread(new Producer(tp)).start();   
  }

 }

}

java.lang.OutOfMemoryError: Unable to create new native thread

Recently I received the error "java.lang.OutOfMemoryError: Unable to create new native thread" and when we debugged the issue on the linux machine the root cause was not related to memory but something totally different.  In Java when OS denies to create more threads because limit of number of processes have hit the limit then this error get mapped to java.lang.OutOfMemoryError because there is no specific error defined in java for capturing denial of creation of new thread due to hitting limit of number of processes.

If you want to check limits on a linux machine then you need to run the following command:

$ ulimit -a
core file size          (blocks, -c) 0
data seg size           (kbytes, -d) unlimited
scheduling priority             (-e) 0
file size               (blocks, -f) unlimited
pending signals                 (-i) 62837
max locked memory       (kbytes, -l) 64
max memory size         (kbytes, -m) unlimited
open files                      (-n) 16384
pipe size            (512 bytes, -p) 8
POSIX message queues     (bytes, -q) 819200
real-time priority              (-r) 0
stack size              (kbytes, -s) 10240
cpu time               (seconds, -t) unlimited
max user processes              (-u) 1024
virtual memory          (kbytes, -v) unlimited
file locks                      (-x) unlimited


"max user process" define maximum number of child processes/threads a root level unix process can open. There are soft limits and hard limits, soft limits can be set on a process and it applicable to that process and child processes and hard limit is applicable to all processes of that user.

In our case soft limit was configured to 1024 and we were trying to create more number of threads.
The limit is defined in file "/etc/security/limits.d/90-nproc.conf" we changed number of processes to 2048 and our application started working. 


# Default limit for number of user's processes to prevent
# accidental fork bombs.
# See rhbz #432903 for reasoning.

*          soft    nproc     1024
root       soft    nproc     unlimited

In Java denial of any resource by OS to application maps to java.lang.OutOfMemoryError and real reason might not be related to memory.

Setting Compression Level in GZIPOutputStream

Most of the time people want to compress files which they are generating to save space on disk and bandwidth in transmission. Apart from saving space compression can actually speed up the application because of low disk usage because of small file size. For this the compression and decompression need to done in memory and not after writing whole uncompressed contents to disk.

There are two formats which can be used if you are generating files from Java. One is GZIPOutputStream which is used for generating GZIP files, other is ZipOutputStream which is used for generating ZIP files. There is one basic difference between GZIP and ZIP file. GZIP file can contain only one file inside it and name of the file contained inside it is optional and while ZIP file is an archive of multiple files and name of the files contained in a ZIP file is mandatory while creating a ZIP file. Because of presence of multiple files inside a ZIP file. ZIP file cannot be passed to a filter which will decompress a ZIP file on the fly from an input stream because filter can't select one file out of multiple files which may be present in the ZIP file.

For seamless processing of compressed file while reading GZIP format is most suitable one. But unfortunately Java API for GZIPOutputStream lacks one method which can be used to controlling compression level to achieve BEST_SPEED or BEST_COMPRESSION as per your need. This facility is available in ZipOutputStream. Sometime people just use ZipOutput stream by setting compression level to BEST_SPEED to gain performance  when they actually need GZIPOutputStream for compressing their data. It create problem for the reader because now he need to handle a archive which can potentially contain multiple file rather than a compressed file. In memory filters can't be used for decompression because of possibility of multiple files in ZIP file. Therefore there are no libraries which can provide in memory filter for reading ZIP file contents as a stream of data.

Fortunately you can set compression level in GZIPOutputStream also by creating sub class of GZIPOutputStream and exposing setLevel(int level) method in your subclass.  We did it in our code and achieved even slightly better results than using ZipOutputStream with BEST_SPEED compression level. Following is comparison when compressing a 5.4 GB file:


Zip compression with BEST_SPEED        48078219 bytes     80 seconds
GZip compression with BEST_SPEED     48078113 bytes     78 seconds

Here is the code for MyGZIPOutputStream class:

import java.util.zip.*;
import java.io.*;

public class MyGZIPOutputStream extends GZIPOutputStream
{
    /**
     * Creates a new output stream with the specified buffer size.
     * @param out the output stream
     * @param size the output buffer size
     * @exception IOException If an I/O error has occurred.
     * @exception IllegalArgumentException if size is <= 0
     */
    public MyGZIPOutputStream(OutputStream out, int size) throws IOException {
        super(out, size);
    }

    /**
     * Creates a new output stream with a default buffer size.
     * @param out the output stream
     * @exception IOException If an I/O error has occurred.
     */
    public MyGZIPOutputStream(OutputStream out) throws IOException {
        this(out, 512);
    }

    /**
     * Sets the compression level for subsequent entries which are DEFLATED.
     * The default setting is DEFAULT_COMPRESSION.
     * @param level the compression level (0-9)
     * @exception IllegalArgumentException if the compression level is invalid
     */
    public void setLevel(int level) {
        def.setLevel(level);
    }
}

Sample file for using it BEST_SPEED compression in GZIPOutputStream:

import java.io.BufferedWriter;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.util.zip.*;

public class GZipCompression {

    public static void main(String[] args) throws IOException {
        compressInputFile("a.txt", "a.txt.gz");
    }

    public static void compressInputFile(String inputFileName,
            String outputFileName) throws IOException {
        FileOutputStream fos = new FileOutputStream(new File(outputFileName));
        MyGZIPOutputStream gzos = null;
        byte[] buffer = new byte[1024];
        gzos = new MyGZIPOutputStream(fos);
        gzos.setLevel(Deflater.BEST_SPEED);
        long startTime = System.currentTimeMillis();              

        FileInputStream fis = new FileInputStream(inputFileName);

        int length;
        while ((length = fis.read(buffer)) > 0) {
            gzos.write(buffer, 0, length);
        }
        fis.close();
        gzos.close();

        long endTime = System.currentTimeMillis();
        System.out.println("Time taken to gzip "+ (endTime-startTime) + " miliseconds.");
    }
}

Wednesday, 8 July 2020

Mounting EFS volume on EC2 machine

In my previous post I explained you how to mount EFS volume inside a fargate task container. EFS serves as persistent storage for ephemeral  container. Sometime you may want to see the data containers are storing in EFS. Most of the time containers are special purpose containers like MongoDB which does not provide any interface to browse the file system.

The easy way to look at EFS is to mount it on an EC2 instance. EFS volume can be mounted on multiple machines therefore you can mount it on EC2 machine and inside the container at the same time. You need to look at DNS name for EFS volume.

This DNS name is used while mounting EFS volume as a NFS drive on EC2 volume.

Create a directory for mounting EFS volume. For example I am creating /efs3


sudo mkdir /efs3

Now you can mount the EFS volume using following command. Please replace DNS name of your EFS volume:

sudo mount -t nfs -o nfsvers=4.1,rsize=1048576,wsize=1048576,hard,timeo=600,retrans=2,noresvport fs-9ce1684d.efs.ap-south-1.amazonaws.com:/ 
 /efs3

You can follow the following link for more information https://docs.aws.amazon.com/efs/latest/ug/mounting-fs-mount-cmd-dns-name.html