Zoom Data Engineering Part 5

8 minute read

Introduction to Batch Processing

Cool way to draw diagrams

Batch vs Streaming

Two different ways of processing data:

  • Batch Processing: Processes chunks of data at regular intervals
    • Exammple: Processing taxi trips each month

  • Streaming: Processing data in Real-Time
    • Example: Processing Taxi trip data as soon it’s generated (in Real-Time)

Types of Batch Jobs

  • A Batch Job is a job (Unit of Time) that will process data in batches. They can be scheduled in many ways:

  • Weekly
  • Monthly
  • Daily
  • etc..

Orchestrating Batch Jobs

You can Orchestrate Batch Jobs using tools such as our Favorite Airflow

Pros and Cons of Batch Jobs

Pros:

  • Easy to manage
  • Scalable

Cons:

  • Delay. Each task of the workflow in the previous section may take a few minutes; assuming the whole workflow takes 20 minutes, we would need to wait those 20 minutes until the data is ready for work.

Introduction to Spark

What is Spark?

Apache Spark is an open-source multi-language unified analytics engine fro large-scale data processing.

Spark can be ran in Clusters* with multiple nodes, each pulling and transforming data. (Distributed Computing)

In this section we will use PySpark which os a wrapper for Python.

Spark can deal with both batches and streaming data. The technique for streaming data is seeing a stream of data as a sequence of small batches and then applying similar techniques on them to those used on regular badges. We will cover streaming in detail in the next lesson (Kafka).

Why do we need to use Spark?

Spark is used for transforming data in a Data Lake.

There are tools such as Hive, Presto or Athena (a AWS managed Presto) that allow you to express jobs as SQL queries. However, there are times where you need to apply more complex manipulation which are very difficult or even impossible to express with SQL (such as ML models); in those instances, Spark is the tool to use.

A typical workflow may combine both tools (Spark and Athena), Here’s an example of a workflow involving Machine Learning:

Creating a PySpark Session

Using Jupyter Notebook to run PySpark

I installed PySpark, Java, JVM on my MacBook M1 from this page

Import the libraries and packages needed to run PySpark.

import pyspark
import findspark
findspark.init()
from pyspark.sql import SparkSession

Create a Spark Session, which is an object that we can work with.

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('testApp') \
    .getOrCreate()
  • SparkSession is the class of the object that we instantiate. builder is the builder method.
  • master() sets the Spark master URL to connect to. The local string means that Spark will run on a local cluster. [*] means that Spark will run with as many CPU cores as possible.
  • appName() defines the name of our application/session. This will show in the Spark UI.
  • getOrCreate() will create the session or recover the object if it was previously created.

We can run the Object to make sure it is working:

spark

Output:

SparkSession - in-memory

SparkContext
[](https://open.spotify.com/artist/22bE4uQ6baNwSHPVcDxLCe)
Spark UI

Version
v3.3.0
Master
local[*]
AppName
test

We can go to localhost:4040 in our local computer, which will send us to the User Interface for Spark Jobs.

“Insert PySpark”

It is working now !

Different Functions that Spark (PySpark) can do

Reading CSV Files

Just like Pandas libary in Python, PySpark can work with .csv files and loaded them dataframes. The difference between Pandas and PySpark in working with .csv files is that Spark can handle way bigger datasets but it’s unable to infer the datatypes of each column like in Pandas.

Example:

# Download file from Amazon
!wget https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv

Create Dataframe and load the file into Spark df. Here:

  • read() reads the file
  • option() Contains options for the read method, here were specifying the first line of the csv file contains the column names
  • csv is for CSV files
df = spark.read \
    .option("header", "true") \
    .csv('taxi+_zone_lookup.csv')

Lets see if the Dataframe loaded:

df.show()

Output:

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brooklyn|           Bay Ridge|   Boro Zone|
|        15|       Queens|Bay Terrace/Fort ...|   Boro Zone|
|        16|       Queens|             Bayside|   Boro Zone|
|        17|     Brooklyn|             Bedford|   Boro Zone|
|        18|        Bronx|        Bedford Park|   Boro Zone|
|        19|       Queens|           Bellerose|   Boro Zone|
|        20|        Bronx|             Belmont|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 20 rows

We can convert the dataframe into parquet format by using write.parquet, which saves the file in our directory.

df.write.parquet('zones')

Partitions

A Spark cluster is composed of multiple executors. Each executor can process data independently in order to parallelize and speed up work. (Distributed Computing!!)

In the previous example we read a single large CSV file. A file can only be read by a single executor, which means that the code we’ve written so far isn’t parallelized and thus will only be run by a single executor rather than many at the same time.

In order to solve this issue, we can split a file into multiple parts so that each executor can take care of a part and have all executors working simultaneously. These splits are called partitions.

We will now read the CSV file, partition the dataframe and parquetize it. This will create multiple files in parquet format.

df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-01.csv')

We can create 24 partitions in our dataframe using repartition() function in PySpark and then *parquetize *and write them to a folder in our directory that we be created (fhvhv/2021/01/) using the function write.parquet.

df = df.repartition(24)

df.write.parquet('fhvhv/2021/01/')

You may check the Spark UI at any time on localhost:4040 and see the progress of the current job, which is divided into stages which contain tasks. The tasks in a stage will not start until all tasks on the previous stage are finished.

When creating a dataframe, Spark creates as many partitions as CPU cores available by default, and each partition creates a task. Thus, assuming that the dataframe was initially partitioned into 6 partitions, the write.parquet() method will have 2 stages: the first with 6 tasks and the second one with 24 tasks.

Besides the 24 parquet files, you should also see a _SUCCESS file which should be empty. This file is *created() when the job finishes successfully. Here is what the directory layout looks:

(base) devinpowers@Devins-MacBook-Pro fhvhv % tree
.
└── 2021
    └── 01
        ├── _SUCCESS
        ├── part-00000-abb4b105-717b-4767-963c-c479e84583d0-c000.snappy.parquet
        ├── part-00001-abb4b105-717b-4767-963c-c479e84583d0-c000.snappy.parquet
        ├── part-00002-abb4b105-717b-4767-963c-c479e84583d0-c000.snappy.parquet
        ├── part-00003-abb4b105-717b-4767-963c-c479e84583d0-c000.snappy.parquet
        ├── part-00004-abb4b105-717b-4767-963c-c479e84583d0-c000.snappy.parquet
        ├── part-00005-abb4b105-717b-4767-963c-c479e84583d0-c000.snappy.parquet
        ├── part-00006-abb4b105-717b-4767-963c-c479e84583d0-c000.snappy.parquet
        ├── part-00007-abb4b105-717b-4767-963c-c479e84583d0-c000.snappy.parquet
        ├── part-00008-abb4b105-717b-4767-963c-c479e84583d0-c000.snappy.parquet
        ├── part-00009-abb4b105-717b-4767-963c-c479e84583d0-c000.snappy.parquet
        ├── part-00010-abb4b105-717b-4767-963c-c479e84583d0-c000.snappy.parquet
        ├── part-00011-abb4b105-717b-4767-963c-c479e84583d0-c000.snappy.parquet
        ├── part-00012-abb4b105-717b-4767-963c-c479e84583d0-c000.snappy.parquet
        ├── part-00013-abb4b105-717b-4767-963c-c479e84583d0-c000.snappy.parquet
        ├── part-00014-abb4b105-717b-4767-963c-c479e84583d0-c000.snappy.parquet
        ├── part-00015-abb4b105-717b-4767-963c-c479e84583d0-c000.snappy.parquet
        ├── part-00016-abb4b105-717b-4767-963c-c479e84583d0-c000.snappy.parquet
        ├── part-00017-abb4b105-717b-4767-963c-c479e84583d0-c000.snappy.parquet
        ├── part-00018-abb4b105-717b-4767-963c-c479e84583d0-c000.snappy.parquet
        ├── part-00019-abb4b105-717b-4767-963c-c479e84583d0-c000.snappy.parquet
        ├── part-00020-abb4b105-717b-4767-963c-c479e84583d0-c000.snappy.parquet
        ├── part-00021-abb4b105-717b-4767-963c-c479e84583d0-c000.snappy.parquet
        ├── part-00022-abb4b105-717b-4767-963c-c479e84583d0-c000.snappy.parquet
        └── part-00023-abb4b105-717b-4767-963c-c479e84583d0-c000.snappy.parquet

2 directories, 25 files

Trying to write the files again will output an error because Spark will not write to a non-empty folder. You can force an overwrite with the mode argument:

df.write.parquet('fhvhv/2021/01/', mode='overwrite')

Spark Dataframes

Just like Pandas, Spark works with Dataframes. There are many Panda-like operations that we can do on Spark dataframe which you can find here.

Translations

Some Spark methods are “lazy”, meaning that they are not executed right away. These lazy commands are called transformations and the eager commands are called actions. Computations only happen when actions are triggered (have to add .) and use the method .show().

df.select(...).filter(...).show()

List of Transformations:

  • Selecting columns
  • Filtering
  • Joins
  • Group by
  • Partitions

List of Actions:

  • Show, take, head
  • Write, read

Functions and User Defined Functions (UDFs)

Spark provides additional built-in functions that allow for more complex data manipulation, there functions are imported like normal Python Libaray.

Spark SQL

Spark can also run SQL queries, which can come in handy if you have a Spark cluster running.

Lets work through some examples of using Spark SQL

[Add more from Jupyter notebook here]

Spark Internals

‘Lower’ level architecture.

Until now, we’ve used a local cluster (on our computer) to run our Spark code, but Spark Clusters often contain multiple computers that behace as executors, remember this is Distributed Computing.

Here’s a diagram to help you visualize a Spark cluster:

Spark clusters are managed by a Master Node, A Driver (an Airflow DAG, a computer running a local script, etc.) that wants to execute a Spark job will send the job to the Master, which in turn will divide the work among the cluster’s executors. If any executor fails and becomes offline for any reason, the Master will reassign the task to another executor.

Each executor will fetch a dataframe partition stored in a Data Lake (e.g. GCS), do something with it and then store it somewhere, which could be the same Data Lake or somewhere else.

Note: This is in contrast to Hadoop, another data analytics engine, whose executors locally store the data they process. Partitions in Hadoop are duplicated across several executors for redundancy, in case an executor fails for whatever reason (Hadoop is meant for clusters made of commodity hardware computers). However, data locality has become less important as storage and data transfer costs have dramatically decreased and nowadays it’s feasible to separate storage from computation, so Hadoop has fallen out of style.

More PySpark SQL Functions

  • GROUP BY in Spark
  • Joins in Spark

Resilient Distributed Datasets (RDDs)

  • RDDs: Map and Reduce

Running Spark in the Cloud (GCS)

Connecting to Google Cloud Storage

Configuring Spark with the GCS connector

Creating a Local Spark Cluster

Spark and Docker

  • How to use Spark and Docker!!