Apache Spark™ is a unified analytics engine for large-scale data processing. It can be used for a variety of things like big data processing, machine learning, stream processing and etc.,
Environment
I had tested it in Mac OS and Ubuntu. May or may not work on Windows.
Prerequisites
Java 8 installed and available as java
in command line.
Set up
Download
- Download the Spark package from Apache Spark Website. Download links points to Spark 2.2.0 which i had used. You can choose the latest from the website.
- Download movies dataset from GroupLens. Download links point to a small dataset. You can choose a larger one if you have infra.
Unpack
Move both files to the same directory.
unzip ml-lastest-small.zip tar -xvzf spark-2.2.0-bin-hadoop2.7.tgz
Spark Shell
Move into the spark extracted directory.
cd spark-2.2.0-bin-hadoop2.7 ./bin/spark-shell
It will take some seconds to boot up, be patient. Once it is up, you will be able to see,
Spark context Web UI available at http://172.16.3.139:4040 Spark context available as 'sc' (master = local[*], app id = local-1543057723300). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.2.0 /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_161) Type in expressions to have them evaluated. Type :help for more information. scala>
You can see the scala>
prompt. We will be entering commands in this prompt.
Define data loading function
def loadDF(filepath:String) : org.apache.spark.sql.DataFrame = spark.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load(filepath)
You will see following console output
loadDF: (filepath: String)org.apache.spark.sql.DataFrame
Load Data into Dataframe
Data frame is similar to a table in SQL world.
val moviesFile = "../ml-latest-small/movies.csv" val ratingsFile = "../ml-latest-small/ratings.csv" val tagsFile = "../ml-latest-small/tags.csv" val moviesDF = loadDF(moviesFile) // You can load other files into dataframe as well.
Let’s start the journey
Now we have moviesDF
which loaded the data from movies.csv
which we had downloaded earlier.
Refer to this documentation for various functions available in the data frame.
Number of records
So simple, just call method .count
As with Scala, single parentheses is optional for methods.
moviesDF.count
It will show the number of records as follows
res1: Long = 9742
Schema of the Dataframe
moviesDF.printSchema
The output will show field names and data types.
root |-- movieId: integer (nullable = true) |-- title: string (nullable = true) |-- genres: string (nullable = true)
Data frames can support complex data types such as map, array, and struct. So, it can be nested as well.
Show records from the data frame
moviesDF.show
By default, it shows top 20 records in the data frame.
+-------+--------------------+--------------------+ |movieId| title| genres| +-------+--------------------+--------------------+ | 1| Toy Story (1995)|Adventure|Animati...| | 2| Jumanji (1995)|Adventure|Childre...| | 3|Grumpier Old Men ...| Comedy|Romance| | 4|Waiting to Exhale...|Comedy|Drama|Romance| | 5|Father of the Bri...| Comedy| | 6| Heat (1995)|Action|Crime|Thri...| | 7| Sabrina (1995)| Comedy|Romance| | 8| Tom and Huck (1995)| Adventure|Children| | 9| Sudden Death (1995)| Action| | 10| GoldenEye (1995)|Action|Adventure|...| | 11|American Presiden...|Comedy|Drama|Romance| | 12|Dracula: Dead and...| Comedy|Horror| | 13| Balto (1995)|Adventure|Animati...| | 14| Nixon (1995)| Drama| | 15|Cutthroat Island ...|Action|Adventure|...| | 16| Casino (1995)| Crime|Drama| | 17|Sense and Sensibi...| Drama|Romance| | 18| Four Rooms (1995)| Comedy| | 19|Ace Ventura: When...| Comedy| | 20| Money Train (1995)|Action|Comedy|Cri...| +-------+--------------------+--------------------+ only showing top 20 rows
Also, we can specify the number of records as well.
moviesDF.show(4)
Creating new data frames
Operations such as select
we do on data frame create another data frame.
val movieTitlesDF = moviesDF.select($"title")
$”title” means column
title. select
also supports "title"
as well. But, it cannot be mix of column
and string
.
Console output,
movieTitlesDF: org.apache.spark.sql.DataFrame = [title: string]
Now we have created a new data frame called movieTitlesDF
from the moviesDF
movieTitlesDF.show(3) +--------------------+ | title| +--------------------+ | Toy Story (1995)| | Jumanji (1995)| |Grumpier Old Men ...| +--------------------+ only showing top 3 rows
Creating new fields.
In machine learning applications, for feature engineering, we would be deriving new fields.
Creating new Dataframe with a length of a movie title.
val titleDF = movieTitlesDF.select($"title", length($"title").alias("length")) titleDF.show(2) +----------------+------+ | title|length| +----------------+------+ |Toy Story (1995)| 16| | Jumanji (1995)| 14| +----------------+------+ only showing top 2 rows
Let us find the movie name with the longest name,
titleDF.sort($"length".desc).head res2: org.apache.spark.sql.Row = [Dragon Ball Z the Movie: The World's Strongest (a.k.a. Dragon Ball Z: The Strongest Guy in The World) (Doragon bôru Z: Kono yo de ichiban tsuyoi yatsu) (1990),158]
So the movie name is Dragon Ball Z the Movie: The World's Strongest (a.k.a. Dragon Ball Z: The Strongest Guy in The World) (Doragon bôru Z: Kono yo de ichiban tsuyoi yatsu) (1990)
which has 158
characters.
We have used sort
and sorted the rows in data frame using length
in desc
order.
Let’s do pattern matching.
Find movies which have(year)
in the name.
titleDF.where($"title".rlike("[0-9]{4}")).count res3: Long = 9730
So, in total, we have 12 movies without a year in it.
Let’s work with Array
If you have seen the moviesDF.show
closely, genres
is |
separated multiple values.
First we will split the genres
column by |
and make it as array<string>
val genresDF = moviesDF.select(split($"genres","\\|").alias("genres")) genresDF: org.apache.spark.sql.DataFrame = [genres: array] genresDF.show(2) +--------------------+ | genres| +--------------------+ |[Adventure, Anima...| |[Adventure, Child...| +--------------------+ only showing top 2 rows
As you can see, genres is an array now.
Find out top 3 genres in our records
Next, we will explore the array and find the count of each genre.
genresDF.select(explode($"genres").alias("genre")).groupBy($"genre").count().show(3) +--------+-----+ | genre|count| +--------+-----+ | Crime| 1199| | Romance| 1596| |Thriller| 1894| +--------+-----+ only showing top 3 rows
Let’s find top 3 movies with most tags
val tagsDF = loadDF(tagsFile) tagsDF.groupBy($"movieId").count.sort($"count".desc).limit(3). join(moviesDF, "movieId").select("title","count").show() +--------------------+-----+ | title|count| +--------------------+-----+ | Pulp Fiction (1994)| 181| | Fight Club (1999)| 54| |2001: A Space Ody...| 41| +--------------------+-----+
That is not all
We have gone through some basic functions available in data frames.
- count
- printSchema
- show
- select
- length
- alias
- desc
- sort, desc
- split
- rlike
- where
- head
- groupBy
Spark data frame supports number of operations and functions.
Enjoy learning Spark
Explore more and play with the data frame. You can use ratings.csv
as well.
References
- https://spark.apache.org/docs/2.2.0/sql-programming-guide.html
- https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql.Dataset
This post originally appeared here. Republished with permission.