Developing and Running a Spark WordCount Application
This tutorial describes how to write, compile, and run a simple Spark word count application in three of the languages supported by Spark: Scala, Python, and Java. The Scala and Java code was originally developed for a Cloudera tutorial written by Sandy Ryza.
Continue reading:
Writing the Application
The example application is an enhanced version of WordCount, the canonical MapReduce example. In this version of WordCount, the goal is to learn the distribution of letters in the most popular words in a corpus. The application:
- Creates a SparkConf and SparkContext. A Spark application corresponds to an instance of the SparkContext class. When running a shell, the SparkContext is created for you.
- Gets a word frequency threshold.
- Reads an input set of text documents.
- Counts the number of times each word appears.
- Filters out all words that appear fewer times than the threshold.
- For the remaining words, counts the number of times each letter occurs.
In MapReduce, this requires two MapReduce applications, as well as persisting the intermediate data to HDFS between them. In Spark, this application requires about 90 percent fewer lines of code than one developed using the MapReduce API.
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf object SparkWordCount { def main(args: Array[String]) { // create Spark context with Spark configuration val sc = new SparkContext(new SparkConf().setAppName("Spark Count")) // get threshold val threshold = args(1).toInt // read in text file and split each document into words val tokenized = sc.textFile(args(0)).flatMap(_.split(" ")) // count the occurrence of each word val wordCounts = tokenized.map((_, 1)).reduceByKey(_ + _) // filter out words with fewer than threshold occurrences val filtered = wordCounts.filter(_._2 >= threshold) // count characters val charCounts = filtered.flatMap(_._1.toCharArray).map((_, 1)).reduceByKey(_ + _) System.out.println(charCounts.collect().mkString(", ")) } }
import sys from pyspark import SparkContext, SparkConf if __name__ == "__main__": # create Spark context with Spark configuration conf = SparkConf().setAppName("Spark Count") sc = SparkContext(conf=conf) # get threshold threshold = int(sys.argv[2]) # read in text file and split each document into words tokenized = sc.textFile(sys.argv[1]).flatMap(lambda line: line.split(" ")) # count the occurrence of each word wordCounts = tokenized.map(lambda word: (word, 1)).reduceByKey(lambda v1,v2:v1 +v2) # filter out words with fewer than threshold occurrences filtered = wordCounts.filter(lambda pair:pair[1] >= threshold) # count characters charCounts = filtered.flatMap(lambda pair:pair[0]).map(lambda c: c).map(lambda c: (c, 1)).reduceByKey(lambda v1,v2:v1 +v2) list = charCounts.collect() print repr(list)[1:-1]
import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import org.apache.spark.api.java.*; import org.apache.spark.api.java.function.*; import org.apache.spark.SparkConf; import scala.Tuple2; public class JavaWordCount { public static void main(String[] args) { // create Spark context with Spark configuration JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Spark Count")); // get threshold final int threshold = Integer.parseInt(args[1]); // read in text file and split each document into words JavaRDD<String> tokenized = sc.textFile(args[0]).flatMap( new FlatMapFunction() { public Iterable call(String s) { return Arrays.asList(s.split(" ")); } } ); // count the occurrence of each word JavaPairRDD<String, Integer> counts = tokenized.mapToPair( new PairFunction() { public Tuple2 call(String s) { return new Tuple2(s, 1); } } ).reduceByKey( new Function2() { public Integer call(Integer i1, Integer i2) { return i1 + i2; } } ); // filter out words with fewer than threshold occurrences JavaPairRDD<String, Integer> filtered = counts.filter( new Function, Boolean>() { public Boolean call(Tuple2 tup) { return tup._2 >= threshold; } } ); // count characters JavaPairRDD<Character, Integer> charCounts = filtered.flatMap( new FlatMapFunction<Tuple2<String, Integer>, Character>() { @Override public Iterable<Character> call(Tuple2<String, Integer> s) { Collection<Character> chars = new ArrayList<Character>(s._1().length()); for (char c : s._1().toCharArray()) { chars.add(c); } return chars; } } ).mapToPair( new PairFunction<Character, Character, Integer>() { @Override public Tuple2<Character, Integer> call(Character c) { return new Tuple2<Character, Integer>(c, 1); } } ).reduceByKey( new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } } ); System.out.println(charCounts.collect()); } }
Because Java 7 does not support anonymous functions, this Java program is considerably more verbose than Scala and Python, but still requires a fraction of the code needed in an equivalent MapReduce program. Java 8 supports anonymous functions and their use can further streamline the Java application.
Compiling and Packaging the Scala and Java Applications
The tutorial uses Maven to compile and package the Scala and Java programs. Excerpts of the tutorial pom.xml are included below. For best practices using Maven to build Spark applications, see Building Spark Applications.
To compile Scala, include the Scala tools plug-in:
<plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin>
which requires the scala-tools plug-in repository:
<pluginRepositories> <pluginRepository> <id>scala-tools.org</id> <name>Scala-tools Maven2 Repository</name> <url>http://scala-tools.org/repo-releases</url> </pluginRepository> </pluginRepositories>
Also, include Scala and Spark as dependencies:
<dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.10.2</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.6.0-cdh5.7.0</version> <scope>provided</scope> </dependency> </dependencies>
To generate the application JAR, run:
$ mvn package
to create sparkwordcount-1.0-SNAPSHOT-jar-with-dependencies.jar in the target directory.
Running the Application
- The input to the application is a large text file in which each line contains all the words in a document, stripped of punctuation. Put an input file in a directory on HDFS. You can
use tutorial example input file:
$ wget --no-check-certificate .../inputfile.txt $ hdfs dfs -put inputfile.txt
- Run one of the applications using spark-submit:
- Scala - Run in a local process with threshold 2:
$ spark-submit --class com.cloudera.sparkwordcount.SparkWordCount \ --master local --deploy-mode client --executor-memory 1g \ --name wordcount --conf "spark.app.id=wordcount" \ sparkwordcount-1.0-SNAPSHOT-jar-with-dependencies.jar hdfs://namenode_host:8020/path/to/inputfile.txt 2
If you use the example input file, the output should look something like:(e,6), (p,2), (a,4), (t,2), (i,1), (b,1), (u,1), (h,1), (o,2), (n,4), (f,1), (v,1), (r,2), (l,1), (c,1)
- Java - Run in a local process with threshold 2:
$ spark-submit --class com.cloudera.sparkwordcount.JavaWordCount \ --master local --deploy-mode client --executor-memory 1g \ --name wordcount --conf "spark.app.id=wordcount" \ sparkwordcount-1.0-SNAPSHOT-jar-with-dependencies.jar hdfs://namenode_host:8020/path/to/inputfile.txt 2
If you use the example input file, the output should look something like:(e,6), (p,2), (a,4), (t,2), (i,1), (b,1), (u,1), (h,1), (o,2), (n,4), (f,1), (v,1), (r,2), (l,1), (c,1)
- Python - Run on YARN with threshold 2:
$ spark-submit --master yarn --deploy-mode client --executor-memory 1g \ --name wordcount --conf "spark.app.id=wordcount" wordcount.py hdfs://namenode_host:8020/path/to/inputfile.txt 2
In this case, the output should look something like:[(u'a', 4), (u'c', 1), (u'e', 6), (u'i', 1), (u'o', 2), (u'u', 1), (u'b', 1), (u'f', 1), (u'h', 1), (u'l', 1), (u'n', 4), (u'p', 2), (u'r', 2), (u't', 2), (u'v', 1)]
- Scala - Run in a local process with threshold 2: