We’ll be using version 1.4.0. Select that version from releases, and select Pre-built for Hadoop 2.6 and later (unless you currently have another Hadoop / HDFS instance at a different version.)
Go ahead and download / unpack that into the directory of your choice, and cd into it.
Getting our wordlist
We’ll be using an english wordlist from SIL for the following exercises. Make sure to save wordsEn.txt somewhere where you can load it later.
Spark REPL
The last tutorial mentioned the concept of a REPL as a way to play with code interactively. Handy enough, Spark implemented its own REPL over Scala and Python (and not Java.)
For Scala that would be bin/spark-shell
For Python, it’s bin/pyspark
You should see something like this (snipped for length):
spark-repl-scala
12345678910
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.3.1
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_31)
Type in expressions to have them evaluated.
Type :help for more information.
or this:
spark-repl-python
123456789
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 1.3.1
/_/
Using Python version 2.7.5 (default, Mar 9 2014 22:15:05)
SparkContext available as sc, HiveContext available as sqlContext.
There will be a considerable amount of other debugging and logging statements than that, but for the point of this those will do as things to look for.
Spark Context
In the Spark shell, we’re given the entirety of the Spark library as sc to interact with. We can use that to load in our text file:
WARNING - Remember last time when I mentioned Spark was Lazy? If you type that path in wrong, it’s not going to tell you anything until you try and run commands on it. This is the same for a lot of functions in Spark, you won’t know it’s broken until you run it.
Now we have our files loaded into memory to do some experimentation with as RDDs (Resilient Distributed Datasets), Spark’s abstraction for distributed data.
Let’s try a basic one to start, how many lines are in the file? (I’m going to be trimming output so we don’t fill the page with debugger info)
With that you’ve just run a Spark job. Simple as that, and not much different than how you’d interact with anything else.
Starts with
Now, since this is a dictionary, each word is in there once. That makes a wordcount a bit pointless, so instead let’s get a list of what letters they start with:
letterCounts=wordList \
.filter(lambdaw:w!="") \
.map(lambdaw:(w[0],1)) \
.reduceByKey(lambdaa,b:a+b) \
.collect()# Force the result to run>>>forcountinletterCounts:...printcount...(u'a',6541)(u'c',10324)(u'e',4494)(u'g',3594)(u'i',4382)(u'k',964)(u'm',5806)(u'o',2966)(u'q',577)(u's',12108)(u'u',3312)(u'w',2714)(u'y',370)(u'b',6280)(u'd',6694)(u'f',4701)(u'h',3920)(u'j',1046)(u'l',3363)(u'n',2475)(u'p',8448)(u'r',6804)(u't',5530)(u'v',1825)(u'x',79)(u'z',265)
Spark SQL
On occasion we’ll have the niceties of structured data such as JSON, and Spark has just the way to deal with it using Spark SQL.
WARNING - Spark guide has been quoted as saying:
Note that the file that is offered as a json file is not a typical JSON file. Each line must contain a separate, self-contained valid JSON object. As a consequence, a regular multi-line JSON file will most often fail.
…and it will crash if you pass it actually valid JSON. If any reader knows the reasoning behind this particularly confounding piece of work, I’d love to know.
Let’s go ahead and load it up using the sqlContext:
scala-sql-load
12345
scala>valpeople=sqlContext.jsonFile("/Users/lemur/dev/wordlist/people.json")people:org.apache.spark.sql.DataFrame=[_id:string, address:string, age:bigint, balance:double, company:string, email:string, eyeColor:string, gender:string, guid:string, index:bigint, isActive:boolean, latitude:double, longitude:double, name:string, phone:string, picture:string, registered:string]// Make SURE to register it as a tablescala>people.registerTempTable("people")
python-sql-load
1234567
>>>people=sqlContext.jsonFile("/Users/lemur/dev/wordlist/people.json")>>>peopleDataFrame[_id:string,address:string,age:bigint,balance:double,company:string,email:string,eyeColor:string,gender:string,guid:string,index:bigint,isActive:boolean,latitude:double,longitude:double,name:string,phone:string,picture:string,registered:string]# Make SURE to register it as a table>>>people.registerTempTable("people")
Let’s start with something fairly basic on the SQL, getting the index of people who are inactive with a balance greater than $2000:
scala-sql-basic
123456789
// Note I'm calling on SQL Context herescala>sqlContext.sql(""" | SELECT index | FROM people | WHERE isActive == false AND | balance > 2000.00 | """).count()res1:Long=75
python-sql-basic
12345678
>>>sqlContext.sql("""... SELECT index... FROM people... WHERE isActive == false AND... balance > 2000.00... """).count()75
Triple quotes are a life saver when making larger SQL-like strings.
Like SQL, you can join, count, group, and various other operations all in a big data context. It’s a shame it won’t play nicely with actual JSON, but the features are handy nonetheless.
Spark even comes with its own Machine Learning libraries, but for the sake of brevity we’re only going to look into some of the basic statistical options. Later tutorials will address this in some depth.
We’ll be looking into the column stats of our wordList from earlier:
scala-statistics-basics
123456789101112131415161718192021222324
// Make SURE to import itscala>importorg.apache.spark.mllib.stat.Statisticsscala>importorg.apache.spark.mllib.linalg.Vectorsscala>valwordList=sc.textFile("/Users/lemur/dev/wordlist/wordsEn.txt")scala>valwordLengths=wordList.map(w=>Vectors.dense(w.length))wordLengths:org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector]=MapPartitionsRDD[6]atmapat<console>:32scala>valsummaryStatistics=Statistics.colStats(wordLengths)summaryStatistics:org.apache.spark.mllib.stat.MultivariateStatisticalSummary=org.apache.spark.mllib.stat.MultivariateOnlineSummarizer@4377e40a// Let's take a look inside shall we?scala>summaryStatistics.meanres22:org.apache.spark.mllib.linalg.Vector=[8.533905806557591]scala>summaryStatistics.maxres23:org.apache.spark.mllib.linalg.Vector=[28.0]scala>summaryStatistics.minres24:org.apache.spark.mllib.linalg.Vector=[0.0]scala>summaryStatistics.varianceres25:org.apache.spark.mllib.linalg.Vector=[6.448337984119102]
python-statistics-basics
12345678910111213141516171819202122
# Make SURE to import it>>>frompyspark.mllib.statimportStatistics>>>wordList=sc.textFile("/Users/lemur/dev/wordlist/wordsEn.txt")# Python will take a standard list in>>>wordLengths=wordList.map(lambdaw:[len(w)])>>>summaryStatistics=Statistics.colStats(wordLengths)# Let's take a look inside shall we?>>>summaryStatistics.mean()array([8.53390581])>>>summaryStatistics.max()array([28.])>>>summaryStatistics.min()array([0.])>>>summaryStatistics.variance()array([6.44833798])
We’ve taken a cursory look at some of the features and basic operations of Spark. Here’s the question though, what do you as readers want to know more about? Vote on Strawpoll to let me know: http://strawpoll.me/4701594
Think of it as a choose your own adventure of sorts. I’ll be writing about all of the above in more detail, but in the order you want to see it happen.