A Foray into Spark and Scala

ApacheSparkApache Spark is a new wave in Big Data computing, an alternative to technologies such as Hadoop. I was recently watching someone analyze log files of image URL requests using shell scripts to create a MySQL database and thought it might be an interesting exercise to try it in Spark as well.

Hadoop versus Spark

So what is Spark and how is it different from Hadoop?

Hadoop is an older, more mature technology. It is really a collection of technologies such as a distributed resilient file system (HDFS), job tracking, and a parallelized map-reduce engine. Hadoop has very much a batch processing feel. I often think of it as a distributed grep command – it is a great way to bulk process all the input data without trying to be clever about what to read. Just read and process all the data (like grep) rather than using indexes to speed up searches (like a database).

Spark is a newer parallel processing platform, but only really replaces the map-reduce engine of Hadoop. Spark supports map-reduce, but has other processing models as well. This gives you more flexibility in designing your applications. A key concept is a RDD (Resilient Distributed Dataset), which you can think of like a database table. An RDD consists of a number of rows where each row can have arbitrary structure. For example, a row could be an array, a Scala tuple (like a relational database row), a JSON object (like MongoDB), or any other serializable class. The map() function is like the SELECT line in SQL, filter() is like WHERE, there a group by functions, join operations, and more. There is even an SQL parser that converts SQL queries into Spark jobs, allowing you to mix SQL queries and arbitrary processing logic in the one processing chain.

If I had to pick an analogy, designing a Spark application feels a lot like defining a series of SQL views, one built on top of another. Each view can enrich or restructure the data a bit more, ready for the next level of processing. My example below uses this approach of defining a series of new RDDs from a previous RDD, cleaning up the data step by step. Each intermediate step can be reused multiple times if desired. You can also ask for results to be cached on disk for faster processing.

My Sample Project

The project I am going to use in this blog is a CSV log file of image URL requests. The goal is to work out the distribution of different file size and file format requests, to help work out the likely cache hit rates for a CDN.

The log file I am using has 16 columns, but only some are relevant. So the task is to parse the CSV file, extract the desired columns, then count the number of rows per combination of image size and format.

Installation

My first step was to install Spark. I have written a bit on Docker recently, so I went off looking for a Docker image. I was a bit surprised that it was harder to find than I expected, and similarly for Vagrant boxes. I found the Spark download pretty easily (spark-1.3.0-bin-hadoop2.4.tgz, 246MB), but was expecting some installation pain so wanted someone to do that work for me. (The Hortonworks demo image for example is 4GB!)

Eventually, in frustration, I just grabbed and extracted the archive. In it I found a bin directory with a spark-shell command. So I just tried to run it on my Windows laptop. It just worked! This may be why there are not lots of virtual images around – the download was self-sufficient (I already had Java installed). This was for a sandbox environment – I am sure deploying to a real cluster would require more configuration. But it was a promising start.

Start up the Spark Interactive Shell

Next I started up the Spark interactive shell.

C:\spark-play> C:\spark\spark-1.3.0-bin-hadoop2.4\bin\spark-shell
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
…lots of logging information…
Welcome to
   ____              __
  / __/__  ___ _____/ /__
 _\ \/ _ \/ _ `/ __/  '_/
/___/ .__/\_,_/_/ /_/\_\   version 1.3.0
   /_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0)
Type in expressions to have them evaluated.
Type :help for more information.
…lots of logging information…
Spark context available as sc.
SQL context available as sqlContext.

scala>

While easy, there was a lot of extra logging information being displayed. (I pruned most of it out above.) Looking in the Spark conf directory I found a log4j.properties.template file. I copied this to log4j.properties (in the same directory) and changed the log4j.rootCategory from ‘INFO’ to ‘FATAL’.  Restarting the shell reduced the amount of noise on the screen.

You may have noticed from some of the messages above that Spark is written in the Scala programming language. The Spark shell actually is just the Scala interactive shell with Spark pre-loaded. Spark can be used with Java and Python as well, but I stick to Scala in this post, introducing the language a little as I go for those not familiar with Scala.

Parsing a CSV File

My first challenge was the best way to parse a CSV file. Reading the Spark book I had, it recommended to read input as a text file and then do a CSV parse of each line. I had a large input file, but for the purposes of testing I created a smaller file from the first hundred lines of my big file for testing. The textFile() function worries about parallel processing of the input file, calling the supplied code on each line.

import java.io.StringReader
import au.com.bytecode.opencsv.CSVReader

val input = sc.textFile("small.txt")
val csv1 = input.map{ line =>
  val reader = new CSVReader(new StringReader(line));
  reader.readNext();
}

Copying and pasting the above directly into the interactive shell came out as follows. (The pasted text is shown in bold.)

scala> import java.io.StringReader
import java.io.StringReader

scala> import au.com.bytecode.opencsv.CSVReader
import au.com.bytecode.opencsv.CSVReader

scala>

scala> val input = sc.textFile("small.txt")
input: org.apache.spark.rdd.RDD[String] = small.txt MapPartitionsRDD[1] at textFile at <console>:23

scala> val csv1 = input.map{ line =>
     |   val reader = new CSVReader(new StringReader(line));
     |   reader.readNext();
     | }
csv1: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at map at <console>:25

Ok, so great! It ran my code creating a variable csv1.

So quick crash course on the Scala programming language.

  • The val keyword declares a value variable (it cannot be assigned to more than once).
  • The code in braces { to } declares an inline function (a closure) with a single parameter called line (before the => symbol).
  • The return value of the function is the result of the last expression in the function, which is reader.readNext() in this example.

In Scala, lots of things like parenthesis for function calls can be left out. So the above code is calling a map() function on the input file, which will apply the supplied inline function (closure) to each line of input one by one.

So while I have a csv1 variable now declared, how to work out if it worked? After a bit of hunting I found csv1.first() which returns the first line from the csv1 RDD. The result is as follows:

scala> csv1.first()
res0: Array[String] = Array(Timestamp, Pool, Machine, Type, Command, Status, Duration, UserId, Item, Agent, Referrer, UserIP, Extract1, Extract2, Extract3, Extract4, LogViewURL)

This shows the first row is returned as an array of strings with values such as ‘Timestamp’ – that is, the first row is the column headers. This is not useful in my case, so my next job was to remove the first line of the file.

Remove First Line

Well, removing the first line of the file was not quite as easy as I expected. There is a first() function, but no function I could find to return everything but the first line. Instead, the most efficient approach I could find was the following. Spark supports the concept of partitioning data for parallel processing. The mapPartitionsWithIndex() function calls the supplied inline function per partition. Each invocation is passed the partition number and an iterator to iterate over values in the partition (the idx and iter arguments to the closure in the sample below). The return value is the iterator to use, which in this case for the first partition (index 0) drops the first value from the iterator. Hence the first row is dropped. Nothing is dropped from other partitions.

scala> val csv2 = csv1.mapPartitionsWithIndex{ (idx, iter) => if (idx == 0) iter.drop(1) else iter }
scala> csv2.first()
res1: Array[String] = Array(03/27/2015 12:00:39.730, r1zmedia, phx7b02c-7bae, URL, Images.m, 0, 251, "", "", eBayiPad/3.4.1, "", 111.11.1.11, /images/m/mIwwqNfXn5nm5oVgbDIBuOA/s-l500.webp, 500x500, Images.m.Clients, eBayiPad/3.4.1)

As can be seen, the new first row returned now has values rather than column headings.

Extract Interesting Columns

Next, in my case the data file I am reading from has a number of columns that are not interesting. I only need 4 of the columns. In particular, from the example above, /images/m/mIwwqNfXn5nm5oVgbDIBuOA/s-l500.webp, 500×500, Images.m.Clients, eBayiPad/3.4.1. This is where the concept of SQL views I find is a good analogy. I created a new RDD from the old RDD using the map() function, which is like a SELECT line in a SQL query. It takes an input row (an array of strings, as returned by the CSV parser) and generates a tuple containing the 4 requested columns.

scala> val csv3 = csv2.map{ (x) => (x(12), x(13), x(14), x(15)) }
scala> csv3.first()
res2: (String, String, String, String) = (/images/m/mIwwqNfXn5nm5oVgbDIBuOA/s-l500.webp,500x500,Images.m.Clients,eBayiPad/3.4.1)

In this case, the variable x is bound to the array of strings (the type of rows returned by csv2) and a new tuple is formed from indexes 12, 13, 14, and 15 of the array.

Complex Restructuring

So far so good. I was able to break down the processing of input into logical steps. Spark worries about how to efficiently evaluate the computation in parallel as required. The next step however is more complicated. First, here is the full code.

val csv4 = csv3.map{ (tuple) =>

  // Break up the input tuple.
  val (url, wxh, cmd, client) = tuple

  // First, return the base URL without query params and with file extension separate
  val Re = """^(.*)\.([^?]*)(\?.*)?""".r
  val Re(baseUrl, fileExt, queryParams) = url

  // Split width and height (separated by an 'x')
  //val (w, h) = wxh splitAt (wxh indexOf 'x') -- includes 'x' in second value.
  val xIdx = wxh indexOf 'x'
  val w = wxh slice(0, xIdx)
  val h = wxh slice(xIdx + 1, wxh.length)

  // Clean the command up a bit.
  val shortCmd = cmd stripSuffix ".Clients"

  // Split client information "foo/1.1" into client type and version
  val slashIdx = client indexOf '/'
  val clientType = client slice(0, slashIdx)
  val clientVersion = client slice(slashIdx + 1, client.length)

  // Return new improved tuple.
  (baseUrl, fileExt.toLowerCase, w, h, shortCmd, clientType, clientVersion)
}

Let’s take this one step by step. Again, this code is calling map() to transform the 4 columns of input. The function to compute the result per row is however larger than the previous steps.

val csv4 = csv3.map{ (tuple) =>

  // Break up the input tuple.
  val (url, wxh, cmd, client) = tuple

The val declaration here binds variables onto the 4 columns of the tuple. Without this, tuple columns can be accessed using tuple._1, tuple._2, tuple._3, and tuple._4. Note that another approach would have been to create a class instead of a tuple. For my case it did not feel worth the effort.

  // First, return the base URL without query params and with file extension separate
  val Re = """^(.*)\.([^?]*)(\?.*)?""".r
  val Re(baseUrl, fileExt, queryParams) = url

The declaration of Re here is regular expression. Using of a triple quoted string allows a string literal to span lines, but also turns off backslash escaping, making the regular expression slightly more readable. The .r function compiles the string into a regular expression. The second val line uses pattern matching to bind three variables on to the three parenthesis groups in the regular expression. This allows the URL in the url variable to be split into the base URL, the file extension (after the ‘.’) and optional query parameters at the end of the URL. Funky!

  // Split width and height (separated by an 'x')
  //val (w, h) = wxh splitAt (wxh indexOf 'x')
  val xIdx = wxh indexOf 'x'
  val w = wxh slice(0, xIdx)
  val h = wxh slice(xIdx + 1, wxh.length)

The CSV file has a column with strings such as 500×500. The commented out line above tried to take advantage of the splitAt() function to bind width and height variables in a single statement. But it did not work as the ‘x’ was included in the height value. So instead I used the slice() function to extract the relevant substrings of the input width x height string. Note that wxh indexOf ‘x’ in Scala is the same as wxh.indexOf(‘x’). All arithmetic operators in Scala are actually function calls like this.

  // Clean the command up a bit.
  val shortCmd = cmd stripSuffix ".Clients"

The stripSuffix() function is a useful little function to remove .Clients from the input string. In this example, every row ended in .Clients, which just added overhead with no benefit.

The next part split the eBay/1.1 style strings in half, like width and height above, so I won’t repeat that again here.

  // Return new improved tuple.
  (baseUrl, fileExt.toLowerCase, w, h, shortCmd, clientType, clientVersion)
}

This is the final part of the function, which is the return value. In this case a tuple is formed from the extracted variables, resulting in the RDD holding tuples with 7 columns instead of 4. The idea is to allow more options for aggregation of results – for example, by width or height separately.

Printing just the first row was a bit limiting, so the following can be used to print out the first 5 rows. The take() function constructs an array from the first N rows of the RDD.

scala> csv4.take(5) foreach println
(/images/m/mIwwqNfXn5nm5oVgbDIBuOA/s-l500,webp,500,500,Images.m,eBayiPad,3.4.1)
(/d/l298/m/moCAAt2GTeOwgD1KYqpph7A,jpg,298,298,Thumbs.DIP,eBay,2.3.1)
(/d/l298/pict/221288685672_3,jpg,298,298,Thumbs.DIP,eBayiPhone,3.2.1)
(/images/m/mRSw6xwQCc8dJEb7FkM3GHg/s-l400,webp,400,400,Images.m,eBayAndroid,2.9.0.25)
(/images/g/HhAAAOSwBahVECnU/s-l500,webp,500,500,Images.g,eBayiPad,3.4.0)

The above code calls the foreach function passing it the println function as a closure, which is then invoked for each entry in the array. So how to print all values in the array?

scala> csv4 foreach println

Group by Width, Height, and Image Type

So far we have removed one row from an RDD (similar to a WHERE clause in SQL) and performed several stages of transformation (similar to a SELECT clauses in SQL). In my case I want to get a count of how many times a selected image dimension is requested per file type. This can be helpful for cache hit rate estimations.

I decided to do this in two steps: first massage the rows into key/value pairs where the key is the string to group values by. Then use Spark’s groupByKey() function to coalesce rows with the same key.

val csv5 = csv4.map{ tuple =>
  val (baseUrl, fileExt, w, h, shortCmd, clientType, clientVersion) = tuple

  // Key is width, height, and file extension. Returned tuple is a key/value pair.
  (w + "x" + h + "." + fileExt, tuple)
}

This is similar to previous examples. It binds variable on to the input tuple rows, then returns a new tuple consisting of a key/value pair, as required by the grouping function. The key is a string such as 500×500.jpg.

So how to group the results and count each group?

scala> val csv6 = csv5.groupByKey().mapValues{(tuples) => tuples.size}

The groupByKey() function takes the key/value input tuples and groups all rows with the same key producing an array of tuples with the same key. The mapValues() function then returns the size of each array (being the count of values with that key). Viewing the final results gives us:

scala> csv6 foreach println
(225x225.jpg,3)
(500x500.jpg,1)
(200x200.jpg,1)
(400x400.jpg,7)
(298x298.jpg,4)
(640x640.jpg,2)
(500x375.jpg,1)
(80x80.jpg,2)
(160x160.jpg,3)
(160x160.webp,7)
(1600x1600.webp,2)
...

Using Spark SQL

My next experiment was to try and use the Spark SQL engine instead of manually writing the code to group by key and count array sizes. Spark feels useful to allow complex logic to be defined to form the data to be processed, but SQL is easier to understand when manipulated the extracted / cleaned-up data. The SQL engine needs to know the name of table columns. An easy way to do this is to use an RDD of classes. Here is the definition of a ‘case class’ (a simple class that just remembers the constructor argument list).

case class LogEntry(key:String, baseUrl:String, fileExt:String, width:Int, height:Int,
                    shortCmd:String, clientType:String, clientVersion:String)

The names of the constructor arguments here become the column names. Next, importing some more functions is required followed by code to create a new RDD with a LogEntry instance created per row.

import sqlContext.implicits._

val logEntries = csv4.map( t =>
    LogEntry((t._3 + "x" + t._4 + "." + t._2), t._1, t._2, t._3.toInt, t._4.toInt, t._5, t._6, t._7)
).toDF()

This example creates a LogEntry instance per row, returning a 500×500.jpg value as the first column, then each column of the tuple provided as the rest of the arguments to the LogEntry constructor. The toInt() function converts strings to integer values. The .toDF() function call is important. It is what converts the RDD into a DataFrame – which is basically an RDD where you can query the schema of returned rows.  (DataFrame used to be called SchemaRDD in Spark releases prior to 1.3.)

logEntries.registerTempTable("log")

This line gits the RDD (DataFrame) a table name that can be used in SQL queries within the current session. With this done, we can try our first SQL query!

sqlContext.sql("""select *
                  from log
                  where width < 200
                  order by width desc
                  """).collect.foreach(println)

The above is a pretty straight forward SQL query to return rows with widths less than 200. The show() function prints the output in columns with column headings, which is a bit easier to read.

sqlContext.sql("""select *
                  from log
                  where width < 200
                  order by width desc
                  """).show

And finally, lets see how to get our grouped results that we previously used partitions and groupByKey() to form.

spark> sqlContext.sql("select key, count(*) as freq from log group by key").show
key            freq
1600x1600.webp 2
298x298.jpg    4
500x500.jpg    1
200x200.webp   3
640x640.webp   5
400x400.webp   6
298x298.webp   17
225x225.webp   4
160x160.jpg    3
80x80.webp     3
500x375.jpg    1
80x80.jpg      2
640x640.jpg    2
250x250.webp   8
225x225.jpg    3
960x960.webp   2
200x200.jpg    1
500x500.webp   18
160x160.webp   7
400x400.jpg    7

Pretty nice! Spark SQL also supports joins and other operations (although it is not a 100% complete implementation of SQL).

Conclusions

So how did the above compare to the approach using shell scripts loading up a MySQL database.  The shell script was a few sed commands to reformat the input and perform the same operations as above. The script was much shorter than the above, but much less readable and not parallelized. The output was a MySQL database load script. SQL queries were used to do the grouping. For example, the SQL query to work out how many images use the same width was

SELECT width, count(*)
FROM logdata
GROUP BY width

Pretty easy query, pretty easy to try out new groupings.

So when would I use Spark? Well, even though Spark is not a part of Haddop, it can still read directly from HDFS (Hadoop distributed file system) files. So it can read directly from log files being written to a HDFS cluster. As a result there is no need to extract files from Haddop to a text file, transform the text file using sed scripts to another text file, then load that into MySQL. That is convenient, and processing is easily parallelized. This is particularly convenient with very large data sets.

Going further, it appears Spark can be exposed as a ODBC / JDBC driver, meaning any application that can talk SQL can potentially talk to a Spark RDD.

Do I prefer Spark over Hadoop? The above was pretty easy to do ad-hoc processing to experiment. Spark also has the feel of being a lot more interactive than Hadoop. It is more like performing queries than running batch jobs on Hadoop. It is pretty flexible – it does not force you to turn every problem into a map-reduce job. I find the processing model more natural and intuitive, although I only did a relatively simple task above.

Spark SQL looks particularly interesting in terms of mixing the above extraction code with SQL query processing, hopefully making it easier to write various aggregation queries without having to translate such operations into various partitioning operations by hand. For example, my code above was pretty simple. You could imagine if the actual images were available, maybe a column of data would be produced from an OCR job run over the image. Parallelizing such expensive computations can be a real benefit.

Another whitepaper I saw recently combined Spark with an in memory database. I can see the logic in such a combination. It makes it easy to combine the structured data management discipline of a relational database with the arbitrary processing logic of Spark, parallelized across commodity hardware. The in-memory database makes it easier to ensure the database is not the bottleneck.

All up, interesting technology. Not a solution to every problem, but certainly worth understanding. I quite like it, but its certainly not going to put relational database systems out of a job.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.

%d bloggers like this: