Assignment #4: Three’s Company
Due date: Monday 24 April 2017, 11:59:00 pm
This assignment is a social network version of the triangle counting problem, which has become an archetypal problem in graph analysis. The blog has a condemnation of Map/Reduce for the task. It also has a very inefficient implementation. Your program will output all triangles (which is different that a count of triangles) in a single map/reduce program.
Identify threesomes of users that are mutual friends in a social network, i.e. 100 is friends with 200, 200 friends with 300 , and 300 friends with 100.
The output should enumerate the mutual friends for each user and avoid duplicate entries, i.e. the trio of users 100, 200, and 300 will contribute output:
100 200 300 200 100 300 300 100 200
NOTE: There is an integer sorting on the last 2 elements of the output line. As such we
would never get the output
100 300 200.
This needs to be implemented in the Map/Reduce paradigm, using no auxiliary data structures and no shared data. The only files used are the inputs to the map stage. The solution will demonstrate one potential parallelism tradeoff, it will expand the data sending more data than is intuitively necessary over the network and examining the expanded data in the reducers. The expansion of data prevents random I/O (e.g., the reading of multiple friend lists by the mappers and reducers) and allows for a high degree of parallelism.
The input to the Map/Reduce program is set of friend lists. Each file in the input contains the id of the user followed by a list of her/his friends. All identifiers are integer values separated by spaces and all friends in the list appear on a single line. For example, the file:
117 2149 84 57 6048
is the list of user 117’s friend, consisting of users 2149, 84, 57, and 6048. You may assume that the input has two properties:
- Symmetry: If 100 is a friend of 200 then 200 is a friend of 100
- No Duplicates: Each friends appears in a list at most once.
Auxiliary data structures
In this case I mean you should not have data structures that are asympotically larger than the friends list of a single user. You for instance, should not need to read the contents of two files (e.g 100 and 200) before emitting key, value pairs for reducers. A single friend should be able to generate key, value pairs with their own friends list alone.
We provide two input data sets. The first
friends.simple is a small set
of files for testing and debugging. The larger data set
friends1000 is a
1000 userids numbered
0-999 for larger scale testing.
The inputs to the project are available at Amazon’s S3 to be used in Amazon’s Elastic Map/Reduce
s3:///friends1000/. These can be downloaded via
AWS command line tools:
s3cmd get s3://friends.simple/*
s3cmd is documented here.
AWS command line tools are found and documented here.
Development Virtual Machine
To assist you in developing your code, we are providing a VirtualBox virtual machine running Ubuntu 14.04 with Hadoop preconfigured.
You can download the VM here.
To import the VM into VirtualBox, go to
File and then
The default user is
parallel, and the password is
project3. Note that the VM only
includes Ubuntu 14.04 server edition (no graphical user interface). To access the VM,
you may want to configure port forwarding. Using virtual box, select the VM (
and click on “settings”. Select “Network” click “Advanced”, and then “Port Forwarding”.
I forwarded port 22 on the VM to port 2222 of my Mac, like so:
Once port 22 is forwarded, you can SSH to the VM, e.g.
ssh -p 2222 parallel@localhost
and trasfer files using
scp -P 2222 <arg1> <arg2>
Hadoop is installed in the
hadoop directory in the parallel user’s home directory.
hadoop/bin directory has been added to your path, so commands like
hdfs work will work.
Step 1: Getting a Streaming Version to Work
Write a mapper progam and reducer program in your favorite scripting language, like Python.
My mapper is called
fof.mapper.py and my reducer is called
Make sure scripts are executable. For instance I shabang my Python scripts with:
chmod +x fof.*.py to make them executable.
The neat thing about streaming is that you can test a serial version of your code using the following command sequence from the shell:
cat simple.input/* | ./fof.mapper.py | sort | ./fof.reducer.py
If this produces the correct output, you can run these scripts in the Map/Reduce streaming mode, which looks something like:
Start Hadoop! services
Run the following command to get Hadoop! running:
Hadoop expects your files to be in HDFS. You can find a listing of all HDFS commands
here or by typing
Organize files in HDFS
You must move your files from the local filesystem (Linux) into HDFS.
hadoop fs -mkdir /simple.input hadoop fs -copyFromLocal simple.input/* /simple.input/
You can verify your files are in HDFS using:
hadoop fs -ls /simple.input
Run your streaming job in Hadoop!
This process is surprisingly easy (or at least we hope it was).
hadoop jar hadoop/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar -mapper ./fof.mapper.py -reducer fof.reducer.py -input simple.input -output simple.output
You can check your output using the following:
hadoop fs -cat /simple.output/part*
- Research on where to find error logs for Hadoop! and use them – they are very useful! TAs/CAs are not expected to help you debug your code.
- Read the messages printed to
stderrthey are very verbose, but also very informative.
Streaming on Amazon’s Elastic Map/Reduce
You will need to upload your mapper and reducer programs to a bucket in Amazon’s S3 (Simple Storage Service).
Login to the AWS Management Console for Elastic Map Reduce and create a streaming job flow.
I recommend that you experiment with using the small input set first and configure your job to use only a single small instance of Map/Reduce, because in this way, each failed attempt only costs $0.10 and you find out quickly that whether you have configured your job correctly.
Step 2: Java Implementation
The streaming Map/Reduce takes some liberties with typing in that the data are not really separated into keys and values. In this step, you will discover how sorting actually works in Map/Reduce. Reimplement your algorithm in Java. The examples in class and the Wordcount 1.0 example provide some guidelines as to how the toolchain works. I had to configure Java
My compilation and execution process looked something like:
First, setup the Java environment:
hadoop com.sun.tools.javac.Main FoF.java jar cf FoF.jar FoF*.class hadoop jar ./FoF.jar FoF /simple.input /simple.output
Note that the path to your
jar file is relative to your cluster’s master root drive, whereas the input and output paths are again relative to hdfs.
This should produce similar output to your streaming version, although not necessarily identical. NOTE: you need not use your Reducer as a combiner class as they do in the wordcount example. Leave the combiner class undefined.
Step 3: Custom JAR on Amazon’s Elastic Map/Reduce
Having gotten the Java implementation of your Three’s Company program running, you can now execute your code on the AWS Map Reduce service.
First, create a S3 bucket (folder) to upload your JAR file and display your
output results. The bucket name should be
e.g Vin Diesel’s could be could be
vdiesel1-pp2017-FastAndFurious. Click on
the bucket and then click upload to add your JAR file from earlier. You may
want to setup logging for your bucket to track problems or issues by clicking
on Properties, enabling logging, and specifying a directory in your bucket
to log to.
From the AWS dashboard, go to the IAM (Identity and Acess Managment) service. Click on Roles and then Create New Role. Give the role a name, select Amazon EC2 from the list (should be the first option), and then click the checkbox next to the following policies:
AmazonElasticMapReduceRole AmazonS3FullAccess AmazonElasticMapReduceforEC2Role AmazonElasticMapReduceFullAccess AmazonElasticMapReduceforAutoScalingRole
This creates a custom role that is able to run the EMR and access the S3 service within map reduce cluster you will be running soon.
Next, go to the EMR service from the dashboard. First, we need to create a cluster on which you will run your custom JAR job. Again, experiment with the small input and only one or two instances in order to save money.
Follow these steps for your cluster:
- Name the cluster
- Specify your S3 folder that you created for logging (e.g., s3://
/logs) - this will let you know why your job failed
- Leave the default software configuration (Amazon, emr-5.4.0, Core Hadoop)
- Specify the hardware configuration as needed (start with 1-2 small instances, move to 4 c4.xlarge later)
- Choose your EC2 key pair that you created from previous assignments
- Choose Custom Permissions, using EMR_DefaultRole for the EMR role and The IAM Role you created for EC2 instance profile
- Click create cluster
Next, the cluster will begin booting up and provisioning resources. As it is doing this, you can add MR jobs by clicking the Add Step button. You should choose Custom JAR job, give it a name, then specify the JAR location and runtime arguments. My JAR location and arguments looked like this:
Jar location: s3://rbjhu/fof.jar Jar arguments: FoF s3://friends.simple/ s3://rbjhu/output/friends-simple/
Having gotten everything working, run the same job on the large input using 4 c4.xlarge, High-CPU Extra Large instances. Do not do this until you’re certain of your implementation to save your credits.
Please store the output in the S3 bucket as follows:
Where output, friends-simple and friends-1000 are all directories.
I would also recommend following the Hadoop! tutorial http://developer.yahoo.com/hadoop/tutorial/, particularly Module 3. If you don’t do this now, no problem, you will do it later when you start working with Hadoop! in Java.
Submit the following to Gradescope Project 4 Writeup:
Please prepare a PDF document that provides the following items and answers the following questions:
Describe your map/reduce algorithm for solving the three’s company problem.
a. Describe the operation of the mapper and reducer. How does this combination solve the three’s company problem?
b. What is the potential parallelism? How many mappers does you implementation allow for? Reducers?
a. Why do we leave the combiner class undefined?
Let’s gently analyze the total work done by the algorithm.
a. How many messages do your mappers output? Assuming the Hadoop runtime has to sort these messages how much total work does the algorithm do (in Big-O notation)? You should assume that there are n friends list each of length O(n).
b. How does this compare with the performance of serial implementations linked here? Describe the tradeoff between complexity and parallelism (Note: it is more reasonable to compare your implementation with the naive O(n3) algorithm than the optimized O(n2.37) algorithm.)
Submit the following to Gradescope Project 4 Code:
FoF.java fof.mapper.py (or fof.mapper.pl etc ..) fof.reducer.py (or fof.reducer.pl etc ..) s3bucket.txt
s3bucket.txt should contain one line with your bucket name only e.g
Reminder: This bucket and all it’s contents must be PUBLIC to all. You will be penalized if it is not.
Sharing your bucket names with your classmates is a violation of Hopkins ethics policies.
The fof’s within each of your output files should be formatted as follows:
100<space>200<space>300<newline> 200<space>100<space>300<newline> 300<space>100<space>200<newline>
So the real output (without markup) should look this:
100 200 300 200 100 300 300 100 200
REMINDER: There is an integer sorting on the last 2 elements of the output line. As such we
would never get the output
100 300 200.
If you format this incorrectly and the grading scripts cannot parse your output there will be a loss of points.
Keep track of your AWS usage. The $40 voucher should be more than sufficient for all course assignments. I did this project for <$2. If you expect to exceed the amount in your voucher, please consult the instructor or Head TA. You are responsible for the charges incurred on AWS.