Syntax highlighter header

Monday 9 September 2019

java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access

I got the following error while trying to run spark streaming code for one of my assignment.

java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access

When I searched the internet I found that it is a bug and it is fixed in latest version. The link to bug is: https://jira.apache.org/jira/browse/SPARK-19185
It is fixed in version 2.4.0 

I used the following pom.xml file to fix this problem.

Running Map-Reduce program on AWS EMR

In this post we will learn how to run a Map-Reduce program on AWS EMR cluster. This post is continuation of my earlier post:
https://blog.bigdatawithjasvant.com/2019/09/setting-up-machine-code-and-data-for.html
Please read it before continuing.

The video for this post is available below:

For running any jobs on AWS EMR the code input data needs to be in S3 storage. There should be a directory where log files will be stored. The output directory should not exist, because it is going to create it. If they output directory already exists then Map-Reduce program will fail.



We put the jar file containing the code for Map-Reduce program in S3 storage.


The input for Map-reduce program is also placed in S3 storage.


Now open EMR service console in AWS.


Now create a new cluster. Provide name of cluster. Provide a directory where log files for the cluster will be placed. These files can be used for analyzing reason of failure later on. For adding steps to cluster choose "Step type" as "Custom Jar" and click on "Configure" button to specify JAR file which contains our Map-Reduce program and arguments for the program.



In the pop-up dialog box select the jar file which contains the code. I the arguments box provide following arguments.


com.wordcount.WordCountHashPartitioner

s3://bigdatawithjasvant-emr/bigtext

s3://bigdatawithjasvant-emr/output


The first line contains the main class name. Second line contains location of the input files. Third line contains the location of the output directory. The output directory should not exist already. If the output directory exists the the job will fail. Select "Action on failure" as "Terminate the cluster" because we don't want to keep running the cluster if our Map-Reduce program fails.


Now we select number of nodes in the cluster. I will be using 5 m5.xlarge nodes for the cluster.One will be used for master node and 4 core nodes of cluster.



Once we click on "Create cluster" button. The cluster will be created and started. It will take around 3 minutes to cluster to come up. You can keep clicking the refresh button to update the status.


Once cluster is in running state. we can see status of steps which we configured in the cluster.



Once our setp is in running state we can click "View jobs" to see jobs running as part of that step.


To see details of tasks of our job we need to click on "View tasks" link.


You can see total number of tasks, completed, running and pending tasks in the tasks page. I as you can see 13 tasks are currently running. there are 4 core in each of 4 core instances. Total 16 cores and 64GB RAM in core instances. I don't understand size of one execution slot in EMR in terms of RAM and CPU. The number of running tasks peeked at 13 which gives a hint that total number of slots were close to 13.

You can look at console logs of the job from web interface.


You can look at controller logs also using web console.


Once Map-Reduce job step is complete the cluster will shut down automatically. I my case it took around 5 minutes for my program to run. Around 5 minutes were taken in starting up and shutdown.

AWS EMR is a good option if you have a lot of data and want to process it quickly. You can launch a cluster with upto 20 instances and tear down once it is not needed. In our case we were billed for 10 minutes for each of 5 instances and for 10 minutes for EMR usage fee for each 5 instances. You are billed for 50 minutes for instance and 50 minutes for EMR usage fee. But you saved 40 minutes of time by running you program on a multi node cluster.

Sunday 1 September 2019

Setting up code and data for AWS EMR

This post is in continuation of my earlier post. Please read that post before reading this post. Link to that post is:
https://blog.bigdatawithjasvant.com/2019/09/configuring-aws-cli.html

In this post I am going to tell you how to setup code and data for running a Map-Reduce program on AWS EMR cluster. I am going to use a sample work count program for running it on EMR cluster. In this post I am concentrating on running program on EMR and not on how to write Map-Reduce program. So lets get started.

Video:


Step 1: Download Word count program

Download word count program from 
and extract it in a folder called workspace. I am using D:\workspace in my example

Now start eclipse with D:\workspace as workspace.

Click on Import project in  Eclipse.

Select project type to Existing Maven Project.


Select project folder.

Build the project.

JAR file will be created in the target folder.

Step 2: Download data file to AWS instance

Download https://www.bigdatawithjasvant.com/blogdata/00/0000/data/bigtext.tar.gz file to AWS instance using following command.

curl -O https://www.bigdatawithjasvant.com/blogdata/00/0000/data/bigtext.tar.gz
tar -xvzf bigtext.tar.gz


Run ./generate.sh command to generate bigtext.dat file. It will be created by repeating complet_work.txt 700 times.

Step3: Grant permission on S3 buckets to AWS CLI user

Open IAM on aws console and click on Users.

Select user to edit.

Click on Add permissions button.

Click on Add existing policies directly.


Select S3 permissions and add them to user.


Click Add permissions button.


Step 4: Upload jar file and input data to S3

Create new bucket for you where you can upload test data and jar file. The bucket name needs to be unique in aws. I have used following command:
aws s3 mb s3://bigdatawithjasvant-emr

Copy bigtext.dat file to s3 bucket using following command

aws s3 cp bigtext.dat  s3://bigdatawithjasvant-emr/bigtext/


Now login to AWS console go to S3 service and click on the bucket. 

Create logs folder in S3 bucket.

Create jar folder in S3 bucket.

Click on jar folder.

Click on Upload button and click on select file in upload dialog box.


Select jar file to upload.

Click on Upload button to upload jar file containing Map-Reduce program.

Now you can see uploaded jar file in s3 bucket.

Now are all set to execute Map-Reduce program on EMR cluster. Code and input data is already available in S3 bucket.

Configuring AWS CLI

I this post I will be explaining steps to configure AWS CLI in a CentOS virtual machine configured in AWS. I have chosen CentOS instead of Amazon Linux because it is free and it is also available for your local PC and other cloud providers like Google Cloud.

First you need to create a AWS Instance by clicking Launch Instance button in AWS EC2 dashboard.

Choose AWS marketplace in Choose AMI screen.


Search for CentOS in the marketplace and select CentOS 7 AMI.


Select t2.micro as instance type. This instance type is free tire eligible. It mean that use of this instance is free for first year after registration.


Next you need to select subnet for the instance. I already have an subnet created so I am using that. Please make sure that you have Auto assign Public IP setting enabled. If you don't enable it then your instance will not get a public IP address attached to it and due to that you will not be able to connect to it.


Next you need to add storage to the instance. Upto 30GB General purpose storage is free for first year after registration. So I am using 30GB storage space.


Now add Name tag to the instance so that you can identify it.



Next you need to attach a security group to the instance. I am using an existing security group. The security group is used to configure incoming and outgoing network connection permissions. I am using it to allow incoming connection to AWS instance only from my PC so that no one can hack my AWS instance. If you allow incoming connections from whole world then there are attacks from China and Russia which consumes a lot of bandwidth and you get charged for it. So make sure that you allow incoming connections from your PC only.


Next you need to create a public-private key pair. Public key will be installed into the machine and private key will be downloaded to your PC. Using private key you will be able to login to the AWS instance without password. Authentication will be based on the private key present in the your PC. You need to keep copy of private key at multiple location to recover in case of disk failure or any other problem in your PC. Without private key you can't login to your AWS instance. Download the key to your local PC and launch the instance.


Once instance is started. Click on the security group to configure incoming connection permission on the AWS instance. Security group permissions apply to all instances in the security group. Currently we are having only one instance instance in the security group.


Click on Edit Inbound rules.


In the Inbound rules remove all rules and add only one rule which allows incoming connection on all ports from your PC. You can click on combo box in source column and select "My IP" to select your public IP. Once rule is set it will allow incoming connection only from your IP address. For some ISP the public IP address keep changing. So connection attempt to instance using putty will fail if public IP address was different while setting incoming rules and when you try to connect using putty. To fix this problem you need to add all IP addresses which your ISP can assign to you. IP addresses assigned you you are not random but has a common prefix. You can add that prefix to incoming rules to allow all IP addresses with that prefix. If you add 122.177.95.0/24 as source then it will allow all IP addresses in range 122.177.95.[1-255]


Now you need to convert private key for the instance downloaded earlier to format recognized by putty. Putty can't use the private key downloaded directly. You need to run PUTYGEN.EXE and click on load button.

Select the private key file downloaded earlier. In my case the private key file is named jasvant2.pem


Once key file is imported successfully you need to save it in putty's format. You need to click on Save private key button.

When you click Save private key button, it shows a warning message that private key not encrypted with a parse phase. If parse phase is not provide anybody can use the private key file to connect to the AWS instance. If parse phase is provided then every time you use key file for connecting to server you will be prompted for parse phase. I choose not to provide a parse phase. Click on OK button to remove the warning message.


Provide the private key file name in Putty's private key format and click on Save button.


Now click on PUTTY.EXE to run putty and use public IP address of the running AWS instance.


Now click on Auth and select the private key file in Putty's private key format.


Select the private key.


Once private key is selected and connect button is pressed. It will show a warning with public key of the server and ask you for confirmation from you if you accept the public key. Click on Yes button.


Provide centos as username to connect to AWS instance.


Now follow instruction provided in web page
https://docs.aws.amazon.com/cli/latest/userguide/install-linux.html

Run python --version command to check python version installed on AWS instance.


Run curl -O https://bootstrap.pypa.io/get-pip.py command to download PIP installer which is required for installing AWS CLI.

Run python get-pip.py --user command to install PIP which is required by AWS CLI.

Add an export command at the end of your ~/.bash_profile file.
export PATH=~/.local/bin:$PATH
Reload the profile into your current session to put those changes into effect.
source ~/.bash_profile
Use pip to install the AWS CLI.
pip install awscli --upgrade --user


Now open AWS console and click on IAM.

Click on Add user button.


Provide user name and select programmatic access.


Add permission boundary. I am setting it to custom policy which allow access to all S3 operations.

AWS is giving warning that user has no permissions attached to it. It is a mistake and I will correct it in my next post. I will add permissions to user before performing any S3 operations. Click on Create user button.


Now download .csv file with details of credentials to be used for accessing the AWS APIs.


Note down region ID for region which you will be using with AWS CLI. For Mumbai it is ap-south-1


Now open credentials.csv file in notepad++ and copy Access Key ID and Secret Access Key. You will need it for configuring AWS CLI.


Now Run aws configure command to configure credentials for accessing AWS APIs using aws command.

In my next post I will be using aws cli for accessing s3 buckets and running a Map reduce program on EMR cluster.