Spark + Scala + CSV

Getting Spark Running Locally

August 23, 2019



Note: This article is unedited

Spark is a cluster computing platform. Even though it is intented to be running in a cluster in a production environment it can prove useful for developing proof-of-concept applications locally. Below are mostly snippets of code and commands to get up quickly using sbt and scala.

Downloading Requirements

First you’ll need to download and install:

Once both sbt and spark have been downloaded it is usually convenient to add the bin directory from both downloads to your evironment’s PATH so that the command are available everywhere on the host system, that is also what the remainder of this post assumes.

Creating a Project

First let us create a new folder to keep our project files. Let’s call it poc for proof-of-concept. We’ll also be starting off with build.sbt file and a simple directory structure for our scala code. See below for some helpful commands and notes.

# Setup Project Directory and set it as the current working directory
mkdir ~/poc && cd ~/poc

# Create build file where we list our dependencies:
touch build.sbt 

# Create directory where our scala code will live
mkdir -p src/main/scala
touch src/main/scala/Driver.scala

Populate build.scala with the following:

// build.sbt
scalaVersion := "2.11.12"
name := "poc"
organization := "com.btellez" // Change this to match your domain/org
version := "0.0.1"
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "2.4.3" % "provided",
  "org.apache.spark" %% "spark-sql" % "2.4.3" % "provided"
)

Populate Driver.scala with the following:

package com.btellez

object Driver {
  def main(args: Array[String]): Unit = {
		// ..spark goodness goes here...
  }
}

At this point we have the skeleton to start developing our proof of concept Spark application.

Scala and Spark

Let’s start by looking at a complete example of working code and input data.

Assume the input data looks something like this: (crypto_market_data.csv)

local_date,currency,amount
2019-01-01,BTC,0.00001
2019-01-01,BTC,0.00021
2019-01-01,BTC,0.00042
2019-01-02,BTC,0.00003
2019-01-01,ETH,10.0001
2019-01-01,LTC,20.0021
2019-01-02,ETH,42.0003
...etc for a long time...

And we’ve written the following code to compute a rollup per day for the BTC currency.

// src/main/scala/Driver.scala
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Row, SparkSession}

object Driver {
  def main(args: Array[String]): Unit = {
		// Setup Spark:
    val conf = new SparkConf().setMaster("local[*]").setAppName("poc")
    val session = SparkSession.builder.config(conf).appName("poc").getOrCreate

    // Load up data:
    val cryptoMarketData: Dataset[Row] = session.read.option("header", "true").csv("/Users/poc/crypto_market_data.csv")

    // Aggregate Market Data:
    val rollup = cryptoMarketData
      .filter(cryptoMarketData("currency") === "BTC")
      .rollup("local_date").agg(Map(
        "amount" -> "sum"
       ))

    // Write the result back to the filesystem:
		rollup.write.csv("/Users/poc/output_crypto_market_data")
  }
}

Quick Breakdown of the Program

The program is divided into essentially 4 parts:

  • Setting up the Spark Context/Session
  • Reading Input
  • Describing the computation (since Spark calculations as lazy)
  • Writing the result to the filesystem (forcing the calculation)

Setting up the Spark Context/Session, Reading Input, and Writing the result are largely boiler plate code that will likely be common amoung all your spark jobs and highly dependent on the final underlying storage choosen. For this proof-of-concept project the file system is good enough.

The interesting part is describing the computation. One a Dataset/Dataframe/RDD instance is obtained, the operations that can be used are limited by the API of said datastructure.

In our example we are using Dataset which allows us to write some SQL-esque style calculations.

Running the program

Lastly we have arrived at how to get the program running. We’ve setup our SBT project and our dependencies are listed in our build.sbt. SBT will build our project and package a jar for us we can submit to Spark to run on it’s workers in the cluster. Or in our case our local “cluster”.

By running the package task in sbt, a jar file will be output to our project’s target directory.

sbt clean package
# ... following is output from sbt...
[info] Compiling 1 Scala source to /Users/poc/poc/target/scala-2.11/classes ...
[info] Done compiling.
[info] Packaging /Users/poc/poc/target/scala-2.11/poc_2.11-0.0.1.jar ...
[info] Done packaging.

We can now use this jar file to submit to our spark cluster via:

spark-submit --class com.btellez.Driver /Users/poc/poc/target/scala-2.11/poc_2.11-0.0.1.jar

This will kick off a spark job that we can track at http://localhost:4040(or whatever the IP running the job).

The spark-submit will also keep alive while the job is in progress and throw any exceptions if errors are encountered with our data, or any operations are ambigous.

Conclusion

This post is about quickly getting a proof-of-concept project for Spark using SBT and Scala. Deploying to production or a cluster environment comes with many other considerations, however this post should be enought to get comfortable with some basics.

Further Reading