Zoom Data Engineering Part 2
In this section I will extract data from the source and upload it to our Data Warehouse in Google. Which covers topics such as Data Lake and Pipeline Orchestartion with Airflow.
Data Lake
What is a Data Lake?
A data lake is a repository for structured, unstructured, and semi-structured data. Data lakes are much different from data warehouses since they allow data to be in its rawest form without needing to be converted and analyzed first.
The main goal behind a Data Lake is being able to ingest data as quickly as possible and making is easily available to all memebers of a data team!
Data Lake (DL) vs. Data Warehouse (DW)
Basic Differences:
- Data Processing:
- DL: Data is raw and hasn’t been cleaned (if so then little), data is generally unstructured.
- DW: Data is refined; it has been cleaned, pre-processed and structured for use
- Size:
- DL: Data Lakes are LARGE and contains a lot of data
- DW: Data Warehouses in comparison are SMALLER as datan has been cleaned and structured
- Users:
- DL: Data Scientists, Data Analysts, Data Engineers generally have access
- DW: Business Analyst (Sorry we don’t trust you with the data lake)
- Use Cases:
- DL: Stream Processing, Machine Learning, Real-Time Analytics
- DW: Batch Processing, Reporting, Business Intelligence
Why did Data Lakes start coming into existence?
- companies started to realize the importance of data, they soon found out that they couldn’t ingest data right away into their DWs and didn’t want to wasted any of the data.
ETL vs. ELT
When ingesting data, DWs use the Export, Transform and Load (ETL) model whereas DLs use Export, Load and Transform (ELT).
ELT (Schema on read) the data is directly stored without any transformations and any schemas are derived when reading the data from the DL.
Data Swamp
Data Lakes that have gone wrong.
Data Lake Cloud Providers
- Google Cloud Platform: Cloud Storage
- Amazon Web Services: Amazon S3
- Microsoft Azure: Azure Blob Storage
For this Project we will use Google Cloud Storage.
Airflow Orchestation
What is Airflow?
Airflow is a platform created by the community to programmatically author, schedule and monitor workflows
. So Airflow provides us with a platform where we can create
and orchestrat
e our workflow or pipelines. In Airflow, these workflows are represented as DAGs.
Here is Airflow Architecture:
Heres a few notes on the components above:
- Metadata Database: Airflow uses a SQL database to store metadata about the data pipelines being run. In the diagram above, this is represented as Postgres which is extremely popular with Airflow. Alternate databases supported with Airflow include MySQL.
Web Server and Scheduler: The Airflow web server and Scheduler are separate processes run (in this case) on the local machine and interact with the database mentioned above.
The Executor is shown separately above, since it is commonly discussed within Airflow and in the documentation, but in reality it is NOT a separate process, but run within the Scheduler
The Worker(s) are separate processes which also interact with the other components of the Airflow architecture and the metadata repository.
airflow.cfg
is the Airflow configuration file which is accessed by the Web Server, Scheduler, and Workers.
DAGs (Directed Acyclic Graph) refers to the DAG files containing Python code, representing the data pipelines to be run by Airflow. The location of these files is specified in the Airflow configuration file, but they need to be accessible by the Web Server, Scheduler, and Workers. They are usally in a seperate dag
folder within the project.
Core Ideas and Additional Definitions:
DAG is a collection of all the tasks
you want to run, organized in a way that reflects their relationships and dependencies. DAG describes how you want to carry out your workflow.
- Task Dependencies (control flow:
>>
or<<
)
I have tons of notes on DAGs here: Notes on Graphs
Task: a defined unit of work. Tasks describe what to do (e.g fetch data, run analysis, trigger something, etc). Most common types of tasks are:
- Operators:
- Sensors
- TaskFlow Decorator
In this lesson we will create a more complex pipeline using Airflow:
(web)
↓
DOWNLOAD
↓
(csv)
↓
PARQUETIZE
↓
(parquet)
↓
UPLOAD TO GCS
↓
(parquet in GCS)
↓
UPLOAD TO BIGQUERY
↓
(Table in BQ)
Quick Note: What is Parquet? Parquet is a columnar storage datafile format which is more efficient than CSV.
Airflow Configuration for Setting up on Docker
Pre-requisites
- Assume you have a Google Service account credentials JSON file is named
google_credentials.json
and stored in$HOME/.google/credentials/
. Copy and rename your credentials file to the required path.
Once you’ve downloaded the key you can set the environment variables to point to the auth keys: i. The environment variable name is GOOGLE_APPLICATION_CREDENTIALS
ii. The value for the variable is the path to the json authentication file you downloaded previously.
iii. Check how to assign environment variables in your system and shell. In bash, the command should be:
export GOOGLE_APPLICATION_CREDENTIALS="<path/to/authkeys>.json"
iv. You can Refesh the token and verify the authentication with the GCP SDK:
gcloud auth application-default login
Setup
- Setting Up Airflow with Docker
- Create a new
airflow
subdirectory in your work directory. -
Download the official Docker-compose YAML file for the latest Airflow version.
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.2.3/docker-compose.yaml'
- The official
docker-compose.yaml
file is quite complex and contains several service definitions. - For a refresher on how
docker-compose
works, you can check out this lesson from the ML Zoomcamp.
- The official
-
We now need to set up the Airflow user. For MacOS (if you’re cool), create a new
.env
in the same folder as thedocker-compose.yaml
file with the content below:AIRFLOW_UID=50000
- The base Airflow Docker image won’t work with GCP, so we need to customize it to suit our needs. You may download a GCP-ready Airflow Dockerfile here. A few things of note:
- We use the base Apache Airflow image as the base.
- We install the GCP SDK CLI tool so that Airflow can communicate with our GCP project.
- We also need to provide a
requirements.txt
file to install Python dependencies. The dependencies are:apache-airflow-providers-google
so that Airflow can use the GCP SDK.pyarrow
, a library to work with parquet files.
- Alter the
x-airflow-common
service definition inside thedocker-compose.yaml
file as follows:- We need to point to our custom Docker image. At the beginning, comment or delete the
image
field and uncomment thebuild
line, or arternatively, use the following (make sure you respect YAML indentation):build: context: . dockerfile: ./Dockerfile
- Add a volume and point it to the folder where you stored the credentials json file. Assuming you complied with the pre-requisites and moved and renamed your credentials, add the following line after all the other volumes:
```yaml
- ~/.google/credentials/:/.google/credentials:ro ```
- Add 2 new environment variables right after the others:
GOOGLE_APPLICATION_CREDENTIALS
andAIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT
:GOOGLE_APPLICATION_CREDENTIALS: /.google/credentials/google_credentials.json AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT: 'google-cloud-platform://?extra__google_cloud_platform__key_path=/.google/credentials/google_credentials.json'
-
Add 2 new additional environment variables for your GCP project ID and the GCP bucket that Terraform should have created in the previous post. You can find this info in your GCP project’s dashboard. Note you have to have Terraform running (setting up infrastructure)
GCP_PROJECT_ID: '<your_gcp_project_id>' GCP_GCS_BUCKET: '<your_bucket_id>'
Here are my Project Id and GCS Bucket that I got from Terraform.
GCP_PROJECT_ID: 'new-try-zoom-data' GCP_GCS_BUCKET: 'dtc_data_lake_new-try-zoom-data'
- Change the
AIRFLOW__CORE__LOAD_EXAMPLES
value to'false'
. This will prevent Airflow from populating its interface with DAG examples.
- We need to point to our custom Docker image. At the beginning, comment or delete the
-
You may find a modified
docker-compose.yaml
file in this link.- The only thing you need to change in the given
docker-compose.yaml
file is yourGCP_PROJECT_ID
andGCP_GCS_BUCKET
- The only thing you need to change in the given
-
Additional notes:
- The YAML file uses
CeleryExecutor
as its executor type, which means that tasks will be pushed to workers (external Docker containers) rather than running them locally (as regular processes). You can change this setting by modifying theAIRFLOW__CORE__EXECUTOR
environment variable under thex-airflow-common
environment definition.
- The YAML file uses
Execution Steps
- main Exection steps for both local build and not
- Build the image (only first-time, or when there’s any change in the
Dockerfile
, takes ~15 mins for the first-time):
docker-compose build
- Initialize the Airflow scheduler, DB, and other configurations
docker-compose up airflow-init
- Run Airflow
docker-compose up -d
- You may now access the Airflow GUI by browsing to
localhost:8080
. Username and password are both airflow
Creating a DAG
- Add Later About creating DAGS
Running Dags
DAG management is carried out via Airflow’s web UI on our localhost:8080
There are 2 main ways to run DAGs:
- Triggering them manually via the web UI or programatically via API
- Scheduling them
Airflow in Action for this Project
We will run two different scripts, one for Ingesting data to our Local Postgres with Airflow and the second for Ingesting data to GCP
Ingesting data to local Postgres with Airflow
- Have to use extra
.yaml
file here Ill liunk
Steps:
-
Prepare an ingestion script. Here is mine, put this in our
/dags
subdirectory in my work folder. -
Create a DAG for the local ingestion. Here is my DAG file and copy it to the
/dags
subdirectory in my work folder like above.
- Some modifictions have been made here since the AWS bucket containing the origninal files is gone.
- Updated to the new
dataset_url
and added.gz
extension to the since that’s the format the files are in! - Updated the
parquet_file* as well to account for the
.gz` extension
- Modify the
.env
file to include:
AIRFLOW_UID=50000
PG_HOST=pgdatabase
PG_USER=root
PG_PASSWORD=root
PG_PORT=5432
PG_DATABASE=ny_taxi
-
Make sure the Airflow
docker-compose.yaml
file to include the environment variables (.env
) -
Modify the Airflow
Dockerfile
to include theingest_script.py
(could take time to complie first run)
- Add this right after installing the requirements.txt file: RUN pip install –no-cache-dir pandas sqlalchemy psycopg2-binary
- Rebuild the Airflow image with (might take a few when first built):
docker-compose build
- Initialize the Airflow config with:
docker-compose up airflow-init
- Start Airflow by running (will take 5 or so minutes)
docker-compose up
And in a seperate terminal find out what network it is running on by using:
docker network ls
It most likely will be something like airflow_default
.
- Now were going to MODIFY the
docker-compose.yaml
file from Post 1 (which includes Dockerizing our database). We will also rename the file todocker-compose-lesson2.yaml
. Here we will add theshared virtual network
and include it in our.yaml
file like shown below:
networks:
airflow:
xternal:
name: airflow_better_default
Heres a link to my docker-compose-lesson2.yaml
file
Run the docker-compose-lesson2.yaml
now:
docker-compose -f docker-compose-lesson2.yaml up
We run it like this above because of the different name
-
Open Airflow Dashbaord via
localhost:8080
and run theLocalIngestionDag
by clicking on the Play icon. -
When both the download and ingest tasks are finished we will log into the database
pgcli -h localhost -p 5431 -u root -d ny_taxi
- Run some queries on our database now to test that it worked!
select count(*) from yellow_taxi_2021_01;
Output:
+--------+
| count |
|--------|
| 300000 |
+--------+
- When finish we will run
docker-compose down
On both the Airflow and Postgres terminals !!
Ingesting Data to GCP
Here we will run a more complex DAG that will download the NYC taxi trip data, convert it to parquet, upload it to a GCP bucket and ingest it to GCP’s BigQuery. Heres a step-by-step visualization:
(web)
↓
DOWNLOAD
↓
(csv.gz)
↓
PARQUETIZE
↓
(parquet)
↓
UPLOAD TO GCS
↓
(parquet in GCS)
↓
UPLOAD TO BIGQUERY
↓
(Table in BQ)
Steps: For Uploading to Google Cloud!!!
First we have to make sure the *infrsutrucre is set up! (Terraform )
Steps to run Terraform:
My Project ID is: new-try-zoom-data
# Refresh service-account's auth-token for this session
# Make sure it returns the current project that is setup on this website!
gcloud auth application-default login
# Initialize state file (.tfstate)
terraform init
# Check changes to new infra plan
#terraform plan -var="project=<your-gcp-project-id>"
terraform plan
# Create new infra
terraform apply
Terraform will print out:
Do you want to perform these actions?
Terraform will perform the actions described above.
Only 'yes' will be accepted to approve.
Then just return yes
. And then our “infruscture” is all set up! Which includes a data lake bucket in Google Cloud Storage. See below:
google_storage_bucket.data-lake-bucket: Creating...
google_storage_bucket.data-lake-bucket: Creation complete after 1s [id=dtc_data_lake_new-try-zoom-data]
Apply complete! Resources: 1 added, 0 changed, 0 destroyed.
In-order to Destory Terraform, we run the command:
# Delete infra after your work, to avoid costs on any running services
terraform destroy
Now that we have our lake open we can start the Ingestion to Google!
Video about Terraform (in 100 seconds): https://www.youtube.com/watch?v=tomUWcQ0P3k
How different is this from the steps above to import data locally?
We don’t need to run that docker extra .yaml
file.
Make sure these variables in our docker-compose.yaml
file are set to our Google Project ID and Data Lake we set earlier
GCP_PROJECT_ID: 'new-try-zoom-data'
GCP_GCS_BUCKET: 'dtc_data_lake_new-try-zoom-data'
Steps to Run:
-
Make sure Terraform is set up and running!!!!!
-
Create/Prepare DAG for working with Google here
- I’ve edited some of the DAG in-order to work with the data!
- If not started we need to run Airflow below (Takes like 5 Minutes to execute everything)
docker-compose up airflow-init
And then run the actual Airflow Webserver, which takes more than 5 minutes…. 🥲
docker-compose up
-
Open Airflow in our http://localhost:8080/ and run the Dag data_ingestion_gcs_dag
-
Once the DAG finishes, you can go to your GCP project’s dashboard and search for BigQuery. You should see your project ID; expand it and you should see a new trips_data_all database with an external_table table.
-
Lets explore the data!!
-
Can also go to
Cloud Storage Service
and look at the parquet -
Once down we can close docker container!
docker-compose down
- Lastly TEAR DOWN OUR Terrafrom Infrustrucure!
# Delete infra after your work, to avoid costs on any running services
terraform destroy
Homework Section
In this section we will upload the remaining data into our GCS account.
Upload: Green Taxi, Yellow Taxi, and FHV Files into Google Data Lake
Dags:
data_ingestion_gcs_dag_yellowtaxis
data_ingestion_gcs_dag_greentaxis
Files from Green Taxi, Yellow Taxi, and FHV(First hire Vehicle) are uploaded in our Datalake Bucket named: dtc_data_lake_new-try-zoom-data into seperate folders named:
- fhv
- raw
- yellow
Then from here we can run another dag called gcs_2_bq_dag that will move them into a BigQuery Warehouse and organize them Then Run Dag to put files into Big Query Database!