Zoom Data Engineering Part 2

11 minute read

In this section I will extract data from the source and upload it to our Data Warehouse in Google. Which covers topics such as Data Lake and Pipeline Orchestartion with Airflow.

Data Lake

What is a Data Lake?

A data lake is a repository for structured, unstructured, and semi-structured data. Data lakes are much different from data warehouses since they allow data to be in its rawest form without needing to be converted and analyzed first.

The main goal behind a Data Lake is being able to ingest data as quickly as possible and making is easily available to all memebers of a data team!

Data Lake (DL) vs. Data Warehouse (DW)

Basic Differences:

  • Data Processing:
    • DL: Data is raw and hasn’t been cleaned (if so then little), data is generally unstructured.
    • DW: Data is refined; it has been cleaned, pre-processed and structured for use
  • Size:
    • DL: Data Lakes are LARGE and contains a lot of data
    • DW: Data Warehouses in comparison are SMALLER as datan has been cleaned and structured
  • Users:
    • DL: Data Scientists, Data Analysts, Data Engineers generally have access
    • DW: Business Analyst (Sorry we don’t trust you with the data lake)
  • Use Cases:
    • DL: Stream Processing, Machine Learning, Real-Time Analytics
    • DW: Batch Processing, Reporting, Business Intelligence

Why did Data Lakes start coming into existence?

  • companies started to realize the importance of data, they soon found out that they couldn’t ingest data right away into their DWs and didn’t want to wasted any of the data.

ETL vs. ELT

When ingesting data, DWs use the Export, Transform and Load (ETL) model whereas DLs use Export, Load and Transform (ELT).

ELT (Schema on read) the data is directly stored without any transformations and any schemas are derived when reading the data from the DL.

Data Swamp

Data Lakes that have gone wrong.

Data Lake Cloud Providers

  • Google Cloud Platform: Cloud Storage
  • Amazon Web Services: Amazon S3
  • Microsoft Azure: Azure Blob Storage

For this Project we will use Google Cloud Storage.

Airflow Orchestation

What is Airflow?

Airflow is a platform created by the community to programmatically author, schedule and monitor workflows. So Airflow provides us with a platform where we can create and orchestrate our workflow or pipelines. In Airflow, these workflows are represented as DAGs.

Here is Airflow Architecture:

Heres a few notes on the components above:

  • Metadata Database: Airflow uses a SQL database to store metadata about the data pipelines being run. In the diagram above, this is represented as Postgres which is extremely popular with Airflow. Alternate databases supported with Airflow include MySQL.

Web Server and Scheduler: The Airflow web server and Scheduler are separate processes run (in this case) on the local machine and interact with the database mentioned above.

The Executor is shown separately above, since it is commonly discussed within Airflow and in the documentation, but in reality it is NOT a separate process, but run within the Scheduler

The Worker(s) are separate processes which also interact with the other components of the Airflow architecture and the metadata repository.

airflow.cfg is the Airflow configuration file which is accessed by the Web Server, Scheduler, and Workers.

DAGs (Directed Acyclic Graph) refers to the DAG files containing Python code, representing the data pipelines to be run by Airflow. The location of these files is specified in the Airflow configuration file, but they need to be accessible by the Web Server, Scheduler, and Workers. They are usally in a seperate dag folder within the project.

Core Ideas and Additional Definitions:

DAG is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies. DAG describes how you want to carry out your workflow.

  • Task Dependencies (control flow: >> or <<)

I have tons of notes on DAGs here: Notes on Graphs

Task: a defined unit of work. Tasks describe what to do (e.g fetch data, run analysis, trigger something, etc). Most common types of tasks are:

  • Operators:
  • Sensors
  • TaskFlow Decorator

In this lesson we will create a more complex pipeline using Airflow:

(web)
  ↓
DOWNLOAD
  ↓
(csv)
  ↓
PARQUETIZE
  ↓
(parquet)
  ↓
UPLOAD TO GCS
  ↓
(parquet in GCS)
  ↓
UPLOAD TO BIGQUERY
  ↓
(Table in BQ)

Quick Note: What is Parquet? Parquet is a columnar storage datafile format which is more efficient than CSV.

Airflow Configuration for Setting up on Docker

Pre-requisites

Video Setting up GCS

  1. Assume you have a Google Service account credentials JSON file is named google_credentials.json and stored in $HOME/.google/credentials/. Copy and rename your credentials file to the required path.

Once you’ve downloaded the key you can set the environment variables to point to the auth keys: i. The environment variable name is GOOGLE_APPLICATION_CREDENTIALS

ii. The value for the variable is the path to the json authentication file you downloaded previously.

iii. Check how to assign environment variables in your system and shell. In bash, the command should be:

export GOOGLE_APPLICATION_CREDENTIALS="<path/to/authkeys>.json"

iv. You can Refesh the token and verify the authentication with the GCP SDK:

gcloud auth application-default login

Setup

  • Setting Up Airflow with Docker
  1. Create a new airflow subdirectory in your work directory.
  2. Download the official Docker-compose YAML file for the latest Airflow version.

     curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.2.3/docker-compose.yaml'
    
  3. We now need to set up the Airflow user. For MacOS (if you’re cool), create a new .env in the same folder as the docker-compose.yaml file with the content below:

     AIRFLOW_UID=50000
    
  4. The base Airflow Docker image won’t work with GCP, so we need to customize it to suit our needs. You may download a GCP-ready Airflow Dockerfile here. A few things of note:
    • We use the base Apache Airflow image as the base.
    • We install the GCP SDK CLI tool so that Airflow can communicate with our GCP project.
    • We also need to provide a requirements.txt file to install Python dependencies. The dependencies are:
      • apache-airflow-providers-google so that Airflow can use the GCP SDK.
      • pyarrow , a library to work with parquet files.
  5. Alter the x-airflow-common service definition inside the docker-compose.yaml file as follows:
    • We need to point to our custom Docker image. At the beginning, comment or delete the image field and uncomment the build line, or arternatively, use the following (make sure you respect YAML indentation):
         build:
           context: .
           dockerfile: ./Dockerfile
      
    • Add a volume and point it to the folder where you stored the credentials json file. Assuming you complied with the pre-requisites and moved and renamed your credentials, add the following line after all the other volumes: ```yaml
      • ~/.google/credentials/:/.google/credentials:ro ```
    • Add 2 new environment variables right after the others: GOOGLE_APPLICATION_CREDENTIALS and AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT:
      GOOGLE_APPLICATION_CREDENTIALS: /.google/credentials/google_credentials.json
      AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT: 'google-cloud-platform://?extra__google_cloud_platform__key_path=/.google/credentials/google_credentials.json'
      
    • Add 2 new additional environment variables for your GCP project ID and the GCP bucket that Terraform should have created in the previous post. You can find this info in your GCP project’s dashboard. Note you have to have Terraform running (setting up infrastructure)

      GCP_PROJECT_ID: '<your_gcp_project_id>'
      GCP_GCS_BUCKET: '<your_bucket_id>'
      

      Here are my Project Id and GCS Bucket that I got from Terraform.

      GCP_PROJECT_ID: 'new-try-zoom-data'
      GCP_GCS_BUCKET: 'dtc_data_lake_new-try-zoom-data'
      
    • Change the AIRFLOW__CORE__LOAD_EXAMPLES value to 'false'. This will prevent Airflow from populating its interface with DAG examples.
  6. You may find a modified docker-compose.yaml file in this link.

    • The only thing you need to change in the given docker-compose.yaml file is your GCP_PROJECT_ID and GCP_GCS_BUCKET
  7. Additional notes:

    • The YAML file uses CeleryExecutor as its executor type, which means that tasks will be pushed to workers (external Docker containers) rather than running them locally (as regular processes). You can change this setting by modifying the AIRFLOW__CORE__EXECUTOR environment variable under the x-airflow-common environment definition.

Execution Steps

  • main Exection steps for both local build and not
  1. Build the image (only first-time, or when there’s any change in the Dockerfile, takes ~15 mins for the first-time):
  docker-compose build
  1. Initialize the Airflow scheduler, DB, and other configurations
  docker-compose up airflow-init
  1. Run Airflow
  docker-compose up -d 
  1. You may now access the Airflow GUI by browsing to localhost:8080. Username and password are both airflow

Creating a DAG

  • Add Later About creating DAGS

Running Dags

DAG management is carried out via Airflow’s web UI on our localhost:8080

There are 2 main ways to run DAGs:

  1. Triggering them manually via the web UI or programatically via API
  2. Scheduling them

Airflow in Action for this Project

We will run two different scripts, one for Ingesting data to our Local Postgres with Airflow and the second for Ingesting data to GCP

Ingesting data to local Postgres with Airflow

  • Have to use extra .yaml file here Ill liunk

Steps:

  1. Prepare an ingestion script. Here is mine, put this in our /dags subdirectory in my work folder.

  2. Create a DAG for the local ingestion. Here is my DAG file and copy it to the /dags subdirectory in my work folder like above.

  • Some modifictions have been made here since the AWS bucket containing the origninal files is gone.
  • Updated to the new dataset_url and added .gz extension to the since that’s the format the files are in!
  • Updated the parquet_file* as well to account for the .gz` extension
  1. Modify the .env file to include:
  AIRFLOW_UID=50000

  PG_HOST=pgdatabase
  PG_USER=root
  PG_PASSWORD=root
  PG_PORT=5432
  PG_DATABASE=ny_taxi
  1. Make sure the Airflow docker-compose.yaml file to include the environment variables (.env)

  2. Modify the Airflow Dockerfile to include the ingest_script.py (could take time to complie first run)

  • Add this right after installing the requirements.txt file: RUN pip install –no-cache-dir pandas sqlalchemy psycopg2-binary
  1. Rebuild the Airflow image with (might take a few when first built):
  docker-compose build 
  1. Initialize the Airflow config with:
  docker-compose up airflow-init
  1. Start Airflow by running (will take 5 or so minutes)
  docker-compose up

And in a seperate terminal find out what network it is running on by using:

  docker network ls

It most likely will be something like airflow_default.

  1. Now were going to MODIFY the docker-compose.yaml file from Post 1 (which includes Dockerizing our database). We will also rename the file to docker-compose-lesson2.yaml. Here we will add the shared virtual network and include it in our .yaml file like shown below:
  networks:
    airflow:
      xternal:
        name: airflow_better_default

Heres a link to my docker-compose-lesson2.yaml file

Run the docker-compose-lesson2.yaml now:

  docker-compose -f docker-compose-lesson2.yaml up

We run it like this above because of the different name

  1. Open Airflow Dashbaord via localhost:8080 and run the LocalIngestionDag by clicking on the Play icon.

  2. When both the download and ingest tasks are finished we will log into the database

pgcli -h localhost -p 5431 -u root -d ny_taxi
  • Run some queries on our database now to test that it worked!
select count(*) from yellow_taxi_2021_01;

Output:

+--------+
| count  |
|--------|
| 300000 |
+--------+
  1. When finish we will run
docker-compose down

On both the Airflow and Postgres terminals !!


Ingesting Data to GCP

Here we will run a more complex DAG that will download the NYC taxi trip data, convert it to parquet, upload it to a GCP bucket and ingest it to GCP’s BigQuery. Heres a step-by-step visualization:

(web)
  ↓
DOWNLOAD
  ↓
(csv.gz)
  ↓
PARQUETIZE
  ↓
(parquet)
  ↓
UPLOAD TO GCS
  ↓
(parquet in GCS)
  ↓
UPLOAD TO BIGQUERY
  ↓
(Table in BQ)

Steps: For Uploading to Google Cloud!!!

First we have to make sure the *infrsutrucre is set up! (Terraform )

Steps to run Terraform:

My Project ID is: new-try-zoom-data

# Refresh service-account's auth-token for this session
# Make sure it returns the current project that is setup on this website!
gcloud auth application-default login

# Initialize state file (.tfstate)
terraform init

# Check changes to new infra plan
#terraform plan -var="project=<your-gcp-project-id>"
terraform plan
# Create new infra
terraform apply 

Terraform will print out:

Do you want to perform these actions?
  Terraform will perform the actions described above.
  Only 'yes' will be accepted to approve.

Then just return yes. And then our “infruscture” is all set up! Which includes a data lake bucket in Google Cloud Storage. See below:

google_storage_bucket.data-lake-bucket: Creating...
google_storage_bucket.data-lake-bucket: Creation complete after 1s [id=dtc_data_lake_new-try-zoom-data]

Apply complete! Resources: 1 added, 0 changed, 0 destroyed.

In-order to Destory Terraform, we run the command:

# Delete infra after your work, to avoid costs on any running services
terraform destroy

Now that we have our lake open we can start the Ingestion to Google!

Video about Terraform (in 100 seconds): https://www.youtube.com/watch?v=tomUWcQ0P3k

How different is this from the steps above to import data locally?

We don’t need to run that docker extra .yaml file.

Make sure these variables in our docker-compose.yaml file are set to our Google Project ID and Data Lake we set earlier

  GCP_PROJECT_ID: 'new-try-zoom-data'
  GCP_GCS_BUCKET: 'dtc_data_lake_new-try-zoom-data'

Steps to Run:

  1. Make sure Terraform is set up and running!!!!!

  2. Create/Prepare DAG for working with Google here

  • I’ve edited some of the DAG in-order to work with the data!
  1. If not started we need to run Airflow below (Takes like 5 Minutes to execute everything)
  docker-compose up airflow-init

And then run the actual Airflow Webserver, which takes more than 5 minutes…. 🥲

  docker-compose up
  1. Open Airflow in our http://localhost:8080/ and run the Dag data_ingestion_gcs_dag

  2. Once the DAG finishes, you can go to your GCP project’s dashboard and search for BigQuery. You should see your project ID; expand it and you should see a new trips_data_all database with an external_table table.

  3. Lets explore the data!!

  4. Can also go to Cloud Storage Service and look at the parquet

  5. Once down we can close docker container!

  docker-compose down
  1. Lastly TEAR DOWN OUR Terrafrom Infrustrucure!
  # Delete infra after your work, to avoid costs on any running services
  terraform destroy

Homework Section

In this section we will upload the remaining data into our GCS account.

Upload: Green Taxi, Yellow Taxi, and FHV Files into Google Data Lake

Dags:

data_ingestion_gcs_dag_yellowtaxis

data_ingestion_gcs_dag_greentaxis

Files from Green Taxi, Yellow Taxi, and FHV(First hire Vehicle) are uploaded in our Datalake Bucket named: dtc_data_lake_new-try-zoom-data into seperate folders named:

  • fhv
  • raw
  • yellow

Then from here we can run another dag called gcs_2_bq_dag that will move them into a BigQuery Warehouse and organize them Then Run Dag to put files into Big Query Database!