
Spark + Scala + CSV
Getting Spark Running Locally
August 23, 2019
Note: This article is unedited
Spark is a cluster computing platform. Even though it is intented to be running
in a cluster in a production environment it can prove useful for developing
proof-of-concept applications locally. Below are mostly snippets of code and
commands to get up quickly using sbt
and scala
.
Downloading Requirements
First you’ll need to download and install:
java
(JDK & JRE): https://openjdk.java.net/install/sbt
: https://www.scala-sbt.org/download.htm- Note: SBT will download the appropriate Scala version for the projects
spark
: https://spark.apache.org/downloads.html- “Pre-built for Apache Hadoop 2.7 and later” was used at time of writing.
Once both sbt
and spark have been downloaded it is usually convenient to add
the bin
directory from both downloads to your evironment’s PATH
so that
the command are available everywhere on the host system, that is also what the
remainder of this post assumes.
Creating a Project
First let us create a new folder to keep our project files. Let’s call it
poc
for proof-of-concept. We’ll also be starting off with build.sbt
file and a simple directory structure for our scala code. See below for
some helpful commands and notes.
# Setup Project Directory and set it as the current working directory
mkdir ~/poc && cd ~/poc
# Create build file where we list our dependencies:
touch build.sbt
# Create directory where our scala code will live
mkdir -p src/main/scala
touch src/main/scala/Driver.scala
Populate build.scala
with the following:
// build.sbt
:= "2.11.12"
scalaVersion := "poc"
name := "com.btellez" // Change this to match your domain/org
organization := "0.0.1"
version ++= Seq(
libraryDependencies "org.apache.spark" %% "spark-core" % "2.4.3" % "provided",
"org.apache.spark" %% "spark-sql" % "2.4.3" % "provided"
)
Populate Driver.scala
with the following:
package com.btellez
object Driver {
def main(args: Array[String]): Unit = {
// ..spark goodness goes here...
}
}
At this point we have the skeleton to start developing our proof of concept Spark application.
Scala and Spark
Let’s start by looking at a complete example of working code and input data.
Assume the input data looks something like this: (crypto_market_data.csv
)
local_date,currency,amount
2019-01-01,BTC,0.00001
2019-01-01,BTC,0.00021
2019-01-01,BTC,0.00042
2019-01-02,BTC,0.00003
2019-01-01,ETH,10.0001
2019-01-01,LTC,20.0021
2019-01-02,ETH,42.0003
...etc for a long time...
And we’ve written the following code to compute a rollup per day for the BTC currency.
// src/main/scala/Driver.scala
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Row, SparkSession}
object Driver {
def main(args: Array[String]): Unit = {
// Setup Spark:
val conf = new SparkConf().setMaster("local[*]").setAppName("poc")
val session = SparkSession.builder.config(conf).appName("poc").getOrCreate
// Load up data:
val cryptoMarketData: Dataset[Row] = session.read.option("header", "true").csv("/Users/poc/crypto_market_data.csv")
// Aggregate Market Data:
val rollup = cryptoMarketData
.filter(cryptoMarketData("currency") === "BTC")
.rollup("local_date").agg(Map(
"amount" -> "sum"
))
// Write the result back to the filesystem:
.write.csv("/Users/poc/output_crypto_market_data")
rollup}
}
Quick Breakdown of the Program
The program is divided into essentially 4 parts:
- Setting up the Spark Context/Session
- Reading Input
- Describing the computation (since Spark calculations as lazy)
- Writing the result to the filesystem (forcing the calculation)
Setting up the Spark Context/Session, Reading Input, and Writing the result are largely boiler plate code that will likely be common amoung all your spark jobs and highly dependent on the final underlying storage choosen. For this proof-of-concept project the file system is good enough.
The interesting part is describing the computation. One a Dataset/Dataframe/RDD instance is obtained, the operations that can be used are limited by the API of said datastructure.
In our example we are using Dataset
which allows us to write some SQL-esque
style calculations.
Running the program
Lastly we have arrived at how to get the program running. We’ve setup our SBT project
and our dependencies are listed in our build.sbt
. SBT will build our project and package
a jar for us we can submit to Spark to run on it’s workers in the cluster. Or in our
case our local “cluster”.
By running the package
task in sbt, a jar file will be output to our project’s target
directory.
sbt clean package
# ... following is output from sbt...
[info] Compiling 1 Scala source to /Users/poc/poc/target/scala-2.11/classes ...
[info] Done compiling.
[info] Packaging /Users/poc/poc/target/scala-2.11/poc_2.11-0.0.1.jar ...
[info] Done packaging.
We can now use this jar file to submit to our spark cluster via:
spark-submit --class com.btellez.Driver /Users/poc/poc/target/scala-2.11/poc_2.11-0.0.1.jar
This will kick off a spark job that we can track at http://localhost:4040
(or whatever the IP running the job).
The spark-submit
will also keep alive while the job is in progress
and throw any exceptions if errors are encountered with our data, or
any operations are ambigous.
Conclusion
This post is about quickly getting a proof-of-concept project for Spark using SBT and Scala. Deploying to production or a cluster environment comes with many other considerations, however this post should be enought to get comfortable with some basics.
Further Reading
- RDDS vs Dataframes vs Datasets
- Resolving Weird Spark Errors
- Dataset Javadoc(Contains lots of example within doc.)