Large-Scale Distributed Sentiment Analysis with RNN

Reproduction Instructions

Our source code can be found at link

Table of Contents

  1. Data Preprocessing
  2. RNN with Distributed SGD
  3. Running the distributed version
  4. Dependencies and System Information

Data Preprocessing


Uploading Files to S3 Bucket

Create a new bucket with no special setting and upload the following files:

Deploying CPU Cluster on AWS

Please go to EMR dashboard and select Create cluster, and then Go to advanced options.

Running MapReduce

After the cluster is started and bootstrapped, go to Steps tab and Add step:

Combining Generate h5 Files

Launching Instance

Please go to EC2 dashboard and select Launch Instance.

Installing Essential Packages

After connecting to the instance, you need to first install boto3 and h5py with the following commands:

sudo apt update

sudo apt install python-pip

pip install boto3

pip install h5py

Modifying Instance Volume

Go to the EC2 dashboard, select Volumes, and modify the instance volume to 64GB.

Run sudo growpart /dev/xvda 1 on the instance and restart it.

If you run df -h, you should find that the disk space has been expanded.

Running Python File

Specify bucket name and the names of the files to be combined in You can find the file names from the S3 bucket page. You may also adjust the output file name.

Next, upload the python file to instance and run it with python

Final Setting

Make the preprocessed h5 data file public, in order for the following process to access it with its Object URL.

Go to the S3 bucket, select the Permissions tab, and set all options under Public access settings to False.

Find the combined h5 file and Make public under the Actions tab.

RNN with Distributed SGD

Deploying GPU Cluster on AWS

Please go to EC2 dashboard and select Launch Instance.

Additional configuration for the security group is required so that the nodes can communicate with each other.

Please go to Security Groups and edit the rules

Environment Setup

This setup needs to be done for each node individually.

First, activate the pytorch environment.

source activate pytorch_p36

Install the latest Pytorch 1.1.

conda install pytorch torchvision cudatoolkit=10.0 -c pytorch

Install h5pickle (a wrapper of h5py for distributed computing)

conda config --add channels conda-forge

conda install h5pickle

Find the name of private IP of the node by running ifconfig (usually ens3) and export it to NCLL socket:

export NCCL_SOCKET_IFNAME=ens3 (add to .bashrc to make this change permanent)

Upload the scripts to each node or git clone from the repository.

Also, upload the data to each node if running without NFS (Network File System) setup.

Getting the processed data

Download the data processed using MapReduce by executing:


Running the sequential version

Run the following command on one node:

python --dir <Input Path> --batch <Batch Size> --lr <Learning Rate> --epochs <# Epochs> --workers <# Workers> --n_vocab 10003 --filename <Model Name> > log.out &

where <Input Path> is the path to the input file, <# Workers> is the # of CPUs to load the data; <Model Name> is the name of the RNN model for saving; log.out is the output log file.

One example would be:

python --dir ../data/combined_result_5class.h5 --batch 128 --lr 0.1 --epochs 10 --workers 8 --n_vocab 10003 --filename model_1n_1g > log_1n_1g_b128.out &

Profiling the sequential version

If you want to profile the sequential code, please replace python with python -m cProfile in the command above, as shown below:

python -m cProfile -o --dir <Input Path> --batch <Batch Size> --lr <Learning Rate> --epochs <# Epochs> --workers <# Workers> --n_vocab 10003 --filename <Model Name> > log.out &

In order to visualize the profiling result, please install sneakviz by executing:

pip install snakeviz

Visualize the profiling by running:


Running the distributed version

Note this is the command for running code where each node keeps a local copy of the data.

For each node run:

python -m torch.distributed.launch --nproc_per_node=<#GPU per Node> --nnodes=<Total # of Nodes> --node_rank=<i> --master_addr="<Master Node Private IP>" --master_port=<Free Port> --dir <Input Path> --epochs <# Epochs> --workers <# Workers> --n_vocab <# Words in Dictionary> --dynamic <Dyanmic Mode> --filename <Model Name> > log.out &

where <#GPU per Node> is the number of GPUs in each node; <Total # of Nodes> is the total number of nodes; <i> is the rank assigned to this node (starting from 0 = master node); <Master Node Private IP> is the private IP of master node which can be found by running ifconfig under ens3; <Free Port> is any free port; <Input Path> is the path to the input file; <# Workers> is the # of CPUs to load the data; <# Words in Dictionary is the number of words in the dictionary, and in our case it’s 10003;<Model Name> is the name of the RNN model for saving; Dynamic Mode refers to using the dynamic load balancer, where negative value is not running, 0 runs only once after the 1st epoch and any positive real integer j updates the load every j epochs; log.out is the output log file.

For example, running 2 nodes without dynamic load balancing and on the background and with log files would be:

Node 1:

python -m torch.distributed.launch --nproc_per_node=1 --nnodes=2 --node_rank=0 --master_addr="" --master_port=23456 --dir ../data/combined_result_5class.h5 --batch 128 --lr 0.1 --epochs 10 --dynamic -1 --workers 8 --n_vocab 10003 --filename model_2n_1g > log.out &

Node 2:

python -m torch.distributed.launch --nproc_per_node=1 --nnodes=2 --node_rank=1 --master_addr="" --master_port=23456 --dir ../data/combined_result_5class.h5 --batch 128 --lr 0.1 --epochs 10 --dynamic -1 --workers 8 --n_vocab 10003 --filename model_2n_1g > log.out &

Configure NFS for file sharing

This is inspired by Harvard CS205 - Spring 2019 - Infrastructure Guide - I7 - MPI on AWS, but with modifications to bypass the extra user account created, which is unnecessary in this setting.

Let master$ denote master node and $node denote any other node

Run the following commands on master node:

Configure the NFS client on other nodes:

Running with NFS mounted directory

Please upload the data to NFS mounted directory cloud first.

For each node run:

python -m torch.distributed.launch --nproc_per_node=<#GPU per Node> --nnodes=<Total # of Nodes> --node_rank=<i> --master_addr="<Master Node Private IP>" --master_port=<Free Port> --dir <Mounted Input Path> --epochs <# Epochs> --workers <# Workers> --n_vocab <# Words in Dictionary> --dynamic <Dyanmic Mode> --filename <Model Name> > log.out &

where <#GPU per Node> is the number of GPUs in each node; <Total # of Nodes> is the total number of nodes; <i> is the rank assigned to this node (starting from 0 = master node); <Master Node Private IP> is the private IP of master node which can be found by running ifconfig under ens3; <Free Port> is any free port; <Mounted Input Path> is the path to the mounted input file; <# Workers> is the # of CPUs to load the data; <# Words in Dictionary is the number of words in the dictionary, and in our case it’s 10003;<Model Name> is the name of the RNN model for saving; Dynamic Mode refers to using the dynamic load balancer, where negative value is not running, 0 runs only once after the 1st epoch and any positive real integer j updates the load every j epochs; log.out is the output log file.

For example, running 2 nodes without dynamic load balancing and on the background and with log files would be:

Node 1:

python -m torch.distributed.launch --nproc_per_node=1 --nnodes=2 --node_rank=0 --master_addr="" --master_port=23456 --dir ../cloud/combined_result_5class.h5 --batch 128 --lr 0.1 --epochs 10 --dynamic -1 --workers 8 --n_vocab 10003 --filename model_2n_1g > log.out &

Node 2:

python -m torch.distributed.launch --nproc_per_node=1 --nnodes=2 --node_rank=1 --master_addr="" --master_port=23456 --dir ../cloud/combined_result_5class.h5 --batch 128 --lr 0.1 --epochs 10 --dynamic -1 --workers 8 --n_vocab 10003 --filename model_2n_1g > log.out &

System Information


GPU Instances Information

  GPUs vCPU Mem(GiB) GPU Memory(GiB) Max Bandwidth(Mbps) Max Throughput(MB/s 128 KB I/O) Maximum IOPS(16KB I/O) GPU Card
p2.xlarge 1 4 61 12 750 93.75 6000 NVIDIA Tesla K80
g3.4xlarge 1 16 122 8 3500 437.5 20000 NVIDIA Tesla M60
g3.16xlarge 4 64 488 32 14000 1750 80000 NVIDIA Tesla M60

CPU Instance on AWS EMR

  vCPUs Model Name Memory(L2) Operating System
m4.xlarge 4 Inter(R) Xeon(R) CPU E5-2686 v4 2.3GHz 256K Ubuntu 16.04.5 LTS
m4.x2large 8 Intel(R) Xeon(R) CPU E5-2686 v4 2.30GHz 256K Ubuntu 16.04.5 LTS

CUDA Information
