在AWS上学习用Spark,作业一共分为三个子Task,包括Social Graph Dataset处理,PageRank计算,以及学习使用GraphX进行Graph计算。
General Suggestions
- Read the Scala and Spark primers before attempting this project.
- Spark programs can be written in Python, Scala and Java. However, we suggest you choose your language wisely in the project. Some facts you should consider:- Spark is written in Scala. Scala has the most comprehensive support and Spark programs written in Scala perfectly reects the Spark way of thinking.
- Python code is easy to deploy in Spark (you don’t need to worry much about dependencies, jar build, etc), but it can be too slow for you to get the bonus.
- GraphX (which you will be using in Task 3) only has Scala API for the time being.
 
- Spark has a Web UI that you may nd useful for troubleshooting cluster problems and job failure. See Monitoring and Instrumentation: Web Interfaces (http://spark.apache.org/docs/latest/monitoring.html#web-interfaces) for details.
- Do not use one RDD action when it is not necessary. Once you call an action, a job is created. Spark will run a computation on your distributed dataset and return one value back to the driver. For example, many students from last semester who encountered out-of-memory problems were trying to copy a big RDD to the driver. someBigRdd.collect()
 You may notice one container fails and then the rest fail one by one. Think of the reason why this happens.
- You should realize that shue operations are expensive. Shue operations will add a burden to your disk I/O, data serialization, network I/O and memory pressure (from garbage collection)! For example, join is one of the most expensive operations. You will realize how long it takes in the webUI if you have it in your application.
- Wisely choose your transformation method. For example, if you want to sort an RDD by value, you can use sortBy , but not invert Key and Value first, then use sortByKey .
 We strongly recommend you being becoming familiar with some basic operations before writing any code.
- Use reduceByKey() instead of groupByKey() when possible. For a simple word count example, below are two approaches which will have a large dierence in performance. Try to nd out the reason.
- Do not cache your RDDs everywhere. Cache RDDs when necessary.
- Partitions are basic units of parallelism in Spark. Use repartition when it is necessary (one partition is created for each block of the file in HDFS). You should realize the number of partitions of your RDD. Having too many or too few partitions is a problem.
- If you are using some “static” or “global” data structure for some reason, try to broadcast that variable. Be careful, global variable that are big will also filead to OOM problems.
Task 1
Scenario
You have built a successful search engine, but no one seems to be using it. You try to spread the word by asking all your 2773 Facebook friends and 32 Twitter followers to use the Mellon Search Input Text Predictor (MSITP).
Unfortunately, this doesn’t work. After one week, only 7 people have used your website. You realize that, for it to be a success, you need to showcase your service to highly inuential people who are easily impressed - Twitter celebrities!
You encounter a dataset and some research by Kwak [1], describing the analysis of a network of Twitter users. Some further digging reveals the PageRank algorithm for identifying inuential nodes in a network. You download the dataset and decide to use your MapReduce skills to run PageRank and nd the inuential nodes to target.
Unfortunately, many network analysis and machine filearning algorithms rely on multiple iterations of execution. This is where MapReduce works poorly - after each iteration of Map and Reduce, it spills all the data to disk and spends a lot of time saving and loading the data.
Fortunately, the Cloud Computing course introduces you to Spark at exactly the right time. Spark is optimized for iterative jobs, by enabling the capability of storing intermediate results in memory. In this module, you will be introduced to Spark through an increasingly harder set of tasks, and use it to perform PageRank on the dataset of Twitter users to nd the inuencers. Afterwards, you will implement a second degree centrality algorithm using Spark’s GraphX.
Tasks and Instructions
We are going to use the Apache Spark framework to run a few graph computations on the Twitter social graph. The dataset details are as follows:
Table 1: Dataset for this project
| File Name | Location | Size | 
|---|---|---|
| Graph | s3://cmucc-datasets/p42/Graph | 10.4GB | 
Use aws s3 cp or wget to get our data and files from our S3 bucket.
The graph is stored as an edge list format. This provides the list of source and destination vertices for each edge of the graph. Each node represents a user in the Twitter social network and an edge (u, v) means user u follows user v in Twitter. For your convenience, the first 10 lines of the file are listed below (Note elds are separated by \t ).
5510    3430913
5510    3506997
5510    4289028
5510    11752536
5510    13433256
5510    14380596
5510    17413259
5510    25643118
5510    26929986
5510    30403395
Task 1: Getting Started with Spark and the Social Graph Dataset
The first task is for you to get familiar with Spark. We will ask you to nd the number of edges and vertices in the Twitter social graph, as well as the number of followers of each user. The edges in the graph are directed, so if there are edges (u, v) and (v, u), you should count them as two edges.
We will explore two dierent ways of working with Spark: using Spark Shell, and submitting Spark programs.
Spark Shell
- You should count the number of edges and vertices by running commands in the Spark Shell (either Scala sparkshell or Python pyspark). You will nd this interactive REPL way is very useful for iterative data processing. You need to put the result you get in a file called answer to get points.
Spark programs
You will need to achieve the same goal by using two dierent APIs, RDDs and Dataframe. In your two spark programs, you need to count the number of followers for each user, and sort by the number of followers. Note you will need to run our submitter to run your Spark programs.
As mentioned in the Spark primer, dataframe uses Catalyst (https://spark-summit.org/2016/events/deep-dive-intocatalyst-apache-spark-20s-optimizer/) to optimize code. Typically dataframe and dataset programs with Catalyst will run faster than RDDs. But in this task, because loading text file into a dataframe needs more time than RDDs, you may not see a big dierence in performance when using these two APIs. From our testing result, dataframe is 2-10 times faster than RDDs (without loading).
You can use System.nanotime to print out your program execution time to see how fast dataframe is.
Building your project
If you choose to use maven, you can nd the pom.xml file for this project here: s3://cmuccpublic/s17/p42/ScalaSparkMavenTemplate.zip .
If you choose to use sbt, you can nd the build.sbt file for this project here: s3://cmucc-public/s17/p42/build.sbt`.
Steps to complete this task and submit
- Make sure you use EMR 5.4.0 (check the Spark primer for details). There is no limitation on the instance type and number in this task.
- Download the submitter from s3://cmucc-public/s17/p42/submitter1.tgz on the master node of your Spark cluster.
- Run the Spark commands in shell mode to complete counting vertices and edges.
- Enter the numbers you got from your shell run in the file answer .
- The RDD program should produce output in the following format for the entire Twitter social graph: [user_id]t[num_followers]
 Only store the top 100 records in hdfs:///followerRDD-output . Your code should directly store your output into that path. Our submitter will look for RDD output in this path.
- In the dataframe program, the final dataframe should be in this format( use show() ):
- If you write a Python script, please exactly name your RDD script followerRDD.py and dataframe script followerDF.py. If you implement in Java or Scala, name RDD and dataframe class as FollowerRDD and FollowerDF , compile as one jar package, name it exactly p42.jar.
- Our submitter will run the following command in the home directory to run your Spark program. Please make sure you can run it without errors before you run the submitter. For Python developers: spark-submit followerRDD.py spark-submit followerDF.py
- You should not merge your output files, our submitter will take care of that.
- Make sure to copy ALL the shell scripts and source code (.py, .java and .scala files) into the src folder.
- Modify the reference file and note down all the help you got from the Internet and from other students.
- Once you are condent about your code, you can first chmod +x submitter1 and run ./submitter1 .
Task 2: Rank Each User by Inuence
Let us now run an iterative computation on the Twitter social graph. For this task, you will rank each user by their inuence. The problem is to nd the inuential or important nodes. Given a graph, which node is more “important”?
We solve the problem by using the PageRank algorithm. PageRank is a type of a random walk algorithm. Imagine there is an agent walking on a graph. The agent can randomly jump from one node to another node over the edges in the graph. The agent tirelessly walks the graph. At the end of the day, inuential nodes are the ones that were frequently visited by the agent. Similarly, the PageRank algorithm nds the score for each node iteratively. When the score of every node does not change across iterations, we refer to it as the algorithm converges . When it converges, the final score of each node represents the probability of being visited by the agent. Therefore, the bigger the score is, the more influential the node is. PageRank is a type of random walk algorithm.
PageRank Implementation Rules
- Initial Rank Values. The initial value of the rank of each user should be 1/n . This value needs to be assigned to every vertex, so it’s easy to think of this as being a map operation.
- Damping Factor. There are many possible values for the damping factor, and in this task we set it to 0.85 .
- Output Format. You must ensure that the output of your PageRank function matches the same syntax of the input, so that the algorithm can iteratively compute the ranks.
- Dangling Users. You need to handle the case of dangling nodes (nodes with zero out-degrees). The weight of the dangling users must be redistributed across all the users during each iteration (see Figure 2.2). Remember, the sum of all PageRank scores should always be 1 in each iteration.
Steps to complete this task and submit
- Make sure you use EMR 5.4.0 .
- If you want to submit your code, you have to launch 5 r3.xlarge core instances.
- Download the submitter from s3://cmucc-public/s17/p42/submitter2.tgz on the master node of your Spark cluster.
- Write a Spark program that computes the PageRank value for each node in the Twitter social graph. Your program should follow the implementation rules described above and produce the following output for the entire graph by running 10 iterations of the computation. [user_id]t[PageRank_value]
- If you write a Python script, please name your script exactly pagerank.py . If you implement in Java or Scala, name your jar package exactly p42.jar and the main class should be PageRank .
- Please make sure you can run it without errors before you run the submitter.
- Our submitter will look for output in hdfs:///pagerank-output .
- Again, do not merge and sort your output files, our submitter will take care of that.
- Make sure to copy all the source code (.py, .java and .scala files) into the src folder.
- Modify the reference file and note down all the help you got from the Internet and from other students.
- Once you are condent about your code, you can run chmod +x submitter2 and run the submitter: ./submitter2 .
Bonus Task: Speed up Spark!
Chances are that it might take hours to run the 10-iteration PageRank on Spark. After you have passed the correctness check, it is time to speed it up! We want you to look into advanced topics of Spark to shorten your execution time of PageRank to filess than 30 minutes. Note that you can get the bonus only if you got full marks in Task2.
Here are some suggestions you can try to improve the performance of your program:
- Review our general suggestions above.
- Do some research about the programming languages in Spark.
- Improve your code. Develop a better understanding of RDD manipulations. Understand the “lazy” transformation in Spark. Think carefully of whether and when you should use operations of cache() , collect() , persist() and unpersist(). Check Spark Programming Guide: RDD Persistence (https://spark.apache.org/docs/latest/programming-guide#rdd-persistence) to filearn more.
- Monitor your instances to make sure they are fully utilized. You can enable detailed CloudWatch monitoring on each instance in your Spark cluster. Metrics of disk and network I/O are captured, which can help you decide if you need more compute resources in your cluster. Alternatively, you could choose to use htop, and utilities like iotop and iostat.
- Spark is a tunable framework where there are many parameters that you can congure to make the best use of the resources you have. You might want to understand the meaning of parameters such as spark.driver.memory , spark.executor.memory , spark.executor.cores , and spark.python.worker.memory . Check Spark Conguration (http://spark.apache.org/docs/latest/conguration.html) to filearn more and congure your Spark cluster to achieve better performance.
- Notice that RDDs are read-only and your PageRank program iterates 10 times, so there can be many “orphan” intermediate RDDs or garbage. Thinking about garbage collection can contribute a lot to performance improvement. The parameters of spark.memory.fraction and spark.memory.storageFraction are closely related to this topic. For more references, check Tuning Spark: Garbage Collection Tuning (https://spark.apache.org/docs/latest/tuning#garbage-collection-tuning).
- Using accumulators (http://spark.apache.org/docs/latest/programming-guide.html#accumulators) is a good way to count or sum throughout dierent nodes.
- Be careful to use repartition on your RDD. It might improve your CPU utilization, but this transformation always shues all data over the network. This will increase the amount of work the Spark engine has to do. Another way is to set spark.default.parallelism to change the default partition number of an RDD. Don’t change the default number until you know what you are doing. filearn more about RDD partitioning, and choose the best strategy. You can use rdd.partitions.size (Scala) to check the partition number of an RDD.
How to submit the bonus task
Please use EMR 5.4.0 as you did in the previous tasks.
Please use the same submitter you used for the PageRank task. Download and run the submitter on your master node. Follow the same instructions from last task to run your application. Once your performance is better than 1800s (30 min), you can get a bonus up to 10%! (If you simply get below 1800s (30 min) you will earn a 10% bonus. Bonus only applies if you already got full points in Task 2.)
Note: We are not done yet! Don’t forget to do the AssessMe and unlock Task 3. It is worth 20% of this project. Moreover, the only choice for Task 3 is Scala because that’s all GraphX supports for the time being. https://theproject.zone/s17-15619/iterative-processing
Task 3: Graph Processing using GraphX
In the previous tasks, you gained some experience with using Spark to write an iterative program. The PageRank algorithm you implemented performs several iterations on the graph to analyze the links and compute the PageRank score of each node. During each iteration, a node will need values from all the neighbors. This nature of PageRank makes it perfectly t into the graph-parallel model. In fact, there are graph processing frameworks that were developed to help us do this kind of analytics. These frameworks include GraphLab (developed at CMU) ,GraphX (a part of Apache Spark), and others.
In this task, you will use GraphX to do further analysis based on your PageRank results. Don’t worry if you didn’t (yet) do well in Task 2. You are allowed to use GraphX’s built-in pagerank() to calculate the PageRank result, so that you can get full points in Task 3 regardless of your score in Task 2. Be careful, if you use GraphX’s built-in pagerank() function for Task 2, you will incur a 100% penalty.
By completing this task, you will gain experience in developing graph-parallel programs and a deeper understanding of the advantage of adopting the graph-parallel programming model to deal with iterative applications where the data is highly dependent.
GraphX
GraphX is a component in Spark for graphs and graph-parallel computation. Spark users will nd it familiar and easy to get started in because in GraphX a graph is constructed by two RDDs: edges and vertices. Also, properties of arbitrary types can be attached to each edge and each vertex (for us to analyze).
GraphX provides a set of basic graph operators, such as numVertices , numEdges , degrees , subgraph , joinVertices , etc. A complete list of operators can be found in GraphX Programming Guide (https://spark.apache.org/docs/latest/graphxprogramming-guide#summary-list-of-operators). Apart from the basic graph operations, there are generalized and powerful operations like aggregateMessages and pregel that you can use to build graph processing solutions for a variety of problems ( pregel is an optimized variant of the Pregel (https://www.cs.cmu.edu/~pavlo/courses/fall2013/static/papers/p135-malewicz.pdf) API). In addition, GraphX includes a growing collection of specialized graph algorithms and builders to simplify graph analytics tasks. As an example, you will nd that there is a pagerank implementation in GraphX.
Task Description
In this task, you will filearn how to use aggregateMessages to perform graph-parallel analytics on a large dataset. We will continue to use the same Twitter graph dataset. In addition, we are going to add properties to the graph based on your PageRank result.
The 2nd-degree influential score calculates a type of centrality score for a node, i.e. how influential is a node in the graph. PageRank score is a type of centrality score which mainly considers the directional 1st-degree edges of a node. For example, the following graph (though node 4 is not possible in the Graph of this project) has 5 nodes, and what are their final PageRank scores?
The converged PageRank score for node 0 to 4 are [0.036, 0.4402, 0.451, 0.036, 0.036]. As expected Node 1 and 2 are the most important nodes. But node 4 has the same score with node 0 and 3. Do you think it is reasonable? One might argue that Node 0 seems to be more important than Node 4 because it has a few connections to central nodes (Node 2 and Node 1). We can x the PageRank score by considering the 2nd-degree influential score described in this section.
High-level instructions for you to approach this task are illustrated as in Figure 3.2 and described as follows:
- Make sure you have the two input datasets (graph and properties). If you think your PageRank result is awed, feel free to use GraphX’s pagerank to generate the data. But remember, GraphX is not allowed in Task 2! Some may notice that GraphX doesn’t give the same values for each user, but that won’t aect your scores for this task.
- Create a graph by loading the graph edge list dataset.
- Attach the inuence values of each user to the corresponding vertex. This can be done by a graph join operation between the graph and the properties dataset.
- Now you should have a property graph that each node has its inuence value. To continue, for each user, nd the most inuential person he/she follows. Hints: you may want use aggregateMessages here and attach (join) this intermediate result to the graph so that the next step can perform aggregateMessages again. This is called iterative processing!
- Now you should have a new property graph that each node knows its most inuential followee. Based on this result, for each user, nd the most inuential person his/her followees follow. Save this result to an output file. O. Then A, B, C, and O will aggregate (reduce) the value they receive to nd the max (to be sent later). Note, for the vertices that do not receive values (like A1), the max value is 0. In the second round, A, B, C each sends its max value to O. Then O will aggregate (reduce) to nd the max.
In this task, we are going to use the formula below to calculate the new score for one user. new_inuencial_score = 0.5 pagerank_score + 0.5 most_inuential_second_degree_user_score
For example, If the PageRank score of Node O in Figure 3.2 is 0.02, and the maximum PageRank score of O’s 2nd-degree neighbors is 0.01, then O’s final score is (0.02+0.01)/2=0.015.
Neighborhood Aggregation
Unlike most other graph operators in GraphX, aggregateMessages is a general purpose mechanism to enable parallel computation on a property graph. You will nd other operators in the GraphX source code (https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala) are actually implemented gracefully using aggregateMessages , such as collectNeighborIds and degrees . Here is a short explanation of how it works: 1) for each edge (or strictly speaking, triplet, using GraphX’s terminology), send some sort of messages to one or both of its two ends - this step is like a map ; 2) For each vertex, process all the messages it gets from its connecting edges - this step is like a reduce . See the GraphX Programming Guide: Neighborhood Aggregation (https://spark.apache.org/docs/latest/graphx-programming-guide#neighborhood-aggregation) for details.
As revealed in the signature, the function will return an RDD of Msg type, which is a generic type and represents the type of the messages you send. To help you understand how to send and aggregate messages, filet’s take a look at the degree calculation of an undirected graph as an example.
The code is very succinct. Since this is an undirected graph, every edge contributes 1 to the degree of both of its ends. So we send message 1 to both vertices, and then sum the ones up on each vertex of the whole graph to get the degree RDD.
Steps to complete this task and submit
- Make sure you use EMR 5.4.0. No limitation on instance type and number in this task.
- Download the submitter from s3://cmucc-public/s17/p42/submitter3.tgz on the master node of your Spark cluster.
- Write a Spark GraphX program to compute second degree centrality for each user in the Twitter social graph. Your program should follow the implementation rules described above and produce the following output for the entire graph. [user_id]t[most_inuential_second_degree_user_id]t[new_user_inuencial_score] ?
 Note: If no user is found (e.g. I am not following anyone), assume a user with id=0 and inuence=0.0 when you aggregate messages.
- Name your jar package exactly p42.jar with a main class called Task3 .
- Please make sure you can run it without errors before you run the submitter.
- Our submitter will look for output in hdfs:///task3-output .
- Again, do not merge and sort your output files, our submitter will take care of that.
- Make sure to copy all of the source code (for this task, .scala files) into the src folder.
- Modify the reference file and note down all the help you got from the Internet and from other students.
- Once you are condent about your code, you can run chmod +x submitter3 and run the submitter: ./submitter3 .