Zoom Data Engineering Part 1
Introduction to Data Engineering
Data Engineering is the design and development of systems for collecting, storing and analyzing data at scale.
Architecture
During this course we will replicate the following architecture:
Data and Tools we will use in this course
-
NYC Taxi and Limousine Corporation Trip Records Dataset
-
Spark: Analytics Enginer for large-scale data prodcessing (distributed processing)
-
Google BigQuery: Serverless data warehouse
-
Apache Airflow: Workflow management plateform for data engineering pipelines
-
Kafka: Streeming high-throughput,low-latency platform for handling real-time data
What is a Data Pipeline?
A data pipeline is a service that receives data as an input and outputs more data. An example is shown below:
Docker and Postgres
Terminology:
What is Docker? Docker is a containerization software that allows us to isolate software in a similar way to virtual machines but in a much leaner way!
What is a Docker Image? A Docker image is a blueprint of a container that we can define to run our software, or in this case our data pipelines. We can take (export) our Docker Images to cloud provides such as AWS or Google Cloud and can run our containers there! (since we have the blueprint)
Why use Docker?
- Reproducibility
- Local experimentation
- Integration tests (CI/CD)
- Running pipelines on the cloud (AWS Batch, Kubernetes jobs)
- Spark (analytics engine for large-scale data processing)
- Serverless (AWS Lambda, Google functions)
Docker containers are stateless: any changes done inside a container will NOT be saved when the container is killed and started again.
Example of creating a Container in Docker
Here is a quick Python script that when ran python3 pipeline.py <some_numbers>
will return:
['pipeline.py', '<some_number>']
job finished successfully for day = <some_number>
Lets containerize1
it! (Dockerfile)
#base Docker image that we will build on
FROM python:3.9.1
# set up our image by installing prerequisites; pandas in this case
RUN pip install pandas
# set up the working directory inside the container
WORKDIR /app
# copy the script to the container. 1st name is source file, 2nd is destination
COPY pipeline.py pipeline.py
# define what to do first when the container runs
# in this example, we will just run the script
ENTRYPOINT ["python3", "pipeline.py"]
Lets build the image
docker build -t test_pipe:dog
- The image name will be
test_pipe
and its tag will bedog
, if the tag isnt specified it will default tolatest
.
Now we can run the container and pass an argument to it, so that our pipeline will receive it:
docker run -it test_pipe:dog 13
Output:
['pipeline.py', '13']
job finished successfully for day = 13
Note: pipeline.py
and the Dockerfile
are both in the same directory
Running Postgres in a Container
We will use a Volume for storing data! What is a Volumne in Docker? A Docker Volumne are file systems to preserve data generated by the running container. Volumes are stored on the host (our computer in this example); independent of the container life-cyle. This allows users to back up data and share file systems between containers easily.
In this course we will run a containerized version of Postgres! To set it up, we only need a few environment variables and a volume for storing data.
Here how we run the container (on MacOs):
docker run -it \
-e POSTGRES_USER="root" \
-e POSTGRES_PASSWORD="root" \
-e POSTGRES_DB="ny_taxi" \
-v $(pwd)/ny_taxi_postgres_data:/var/lib/postgres/data \
-p 5431:5432 \
postgres:13
The container needs 3 environment variables (-e):
The -v
points to the volume directly. The colon :
separates the first part (path to the folder in the host computer) from the second part (path to the folder inside the container).
The -p
is for port mapping.
The last argument is the image name and tag, we run the offical postgres
image on its version 13
.
Note: We can use Port 5431 on my local machine since i’ve already installed postgres on my local machine at Port 5432.
Once the container is running, we can log into our database with pgcli with the following command:
pgcli -h localhost -p 5431 -u root -d ny_taxi
Ingesting Data to Postgres with Python
We will now create a Python script that will read a CSV file and export it to Postgres
upload-data.py
## Add script/code here
Connecting pgAdmin and Postgres with Docker Networking
We can run both pgAdmin
and Postgres
in our container and have them share a virtual network.
Lets create a virtual Docker Network called pg-network
:
docker network create pg-network
Now we will re-run our Postgres container with the added network name and the container network name so the two can find each other!! We will use pg-database
for the container name.
docker run -it \
-e POSTGRES_USER="root" \
-e POSTGRES_PASSWORD="root" \
-e POSTGRES_DB="ny_taxi" \
-v $(pwd)/ny_taxi_postgres_data:/var/lib/postgres/data \
-p 5431:5432 \
--network=pg-network \
--name pg-database \
postgres:13
We will now run the pgAdmin container on another terminal (we can see how both their networks are the same: pg-network):
docker run -it \
-e PGADMIN_DEFAULT_EMAIL="admin@admin.com" \
-e PGADMIN_DEFAULT_PASSWORD="root" \
-p 8080:80 \
--network=pg-network \
--name pgadmin \
dpage/pgadmin4
-
Just like with the Postgres container, we specify a network and a name. However, the name in this example isn’t really necessary because there won’t be any containers trying to access this particular container.
-
The actual image name is dpage/pgadmin4 .
Now once loged into pgAdmin we can add a sever and connect to our Postgres database host name/addresspg-database
.
Using the Ingestion Script with Docker
Now we can use our Python script and have Docker run it!
Clean up the script by removing everything we don’t need. We will also rename it to ingest_data.py
and add a few modifications:
- We will use argparse to handle the following command line arguments:
- Username
- Password
- Host
- Port
- Database name
- Table name
- URL for the CSV file
import os
import argparse
from time import time
import pandas as pd
from sqlalchemy import create_engine
def main(params):
user = params.user
password = params.password
host = params.host
port = params.port
db = params.db
table_name = params.table_name
url = params.url
csv_name = 'output.csv'
os.system(f"wget {url} -O {csv_name}")
engine = create_engine(f'postgresql://{user}:{password}@{host}:{port}/{db}')
# Added compression = 'gzip' because were opening a .gz file!!!!
df_iter = pd.read_csv(csv_name, iterator=True, chunksize=100000, compression='gzip')
df = next(df_iter)
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
df.head(n=0).to_sql(name=table_name, con=engine, if_exists='replace')
df.to_sql(name=table_name, con=engine, if_exists='append')
while True:
t_start = time()
df = next(df_iter)
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
df.to_sql(name=table_name, con=engine, if_exists='append')
t_end = time()
print('inserted another chunk, took %.3f second' % (t_end - t_start))
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Ingest CSV data to Postgres')
parser.add_argument('--user', help='user name for postgres')
parser.add_argument('--password', help='password for postgres')
parser.add_argument('--host', help='host for postgres')
parser.add_argument('--port', help='port for postgres')
parser.add_argument('--db', help='database name for postgres')
parser.add_argument('--table_name', help='name of the table where we will write the results to')
parser.add_argument('--url', help='url of the csv file')
args = parser.parse_args()
main(args)
In order to run the file! (pgAdmin and Postgres are already running on our network as Containers, if not run them again!), we can then run our ingest_data.py in the same network!
python ingest_data.py \
--user=root \
--password=root \
--host=localhost \
--port=5431 \
--db=ny_taxi \
--table_name=yellow_taxi_trips \
--url="https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2019-01.csv.gz"
Note: I used Port = 5431 here (IDK WHY????????)
Note: the url changed from the orginal Zoom data engineering link from Amazon.
pgcli -h localhost -p 5431 -u root -d ny_taxi
Can login (with password = root) and explore the database
select count(*) from yellow_taxi_trips;
Output
+---------+
| count |
|---------|
| 5200000 |
+---------+
SELECT 1
Time: 0.401s
Dockerizing the Script
Now we can Modify the Dockerfile we had beffore and include our ingest_data.py
script and create a new image:
FROM python:3.9.1
# We need to install wget to download the csv.gz file
RUN apt-get install wget
# psycopg2 is a postgres db adapter for python: sqlalchemy needs it
RUN pip install pandas sqlalchemy psycopg2
WORKDIR /app
COPY ingest_data.py ingest_data.py
ENTRYPOINT [ "python", "ingest_data.py" ]
Now build the Image:
docker build -t taxi_ingest:dog .
Run it:
docker run -it \
--network=pg-network \
taxi_ingest:dog \
--user=root \
--password=root \
--host=pg-database \
--port=5431 \
--db=ny_taxi \
--table_name=yellow_taxi_trips \
--url="https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2019-01.csv.gz"
-
We need to provide the network for Docker to find the Postgres container (e.g
pg-network
). It goes before the name of the image (taxi_ingest:dog
). -
Since Postgres is running on a separate container, the host argument will have to point to the container name of Postgres.
-
You can drop the table in pgAdmin beforehand if you want, but the script will automatically replace the pre-existing table.
Running Postgres and pgAdmin with Docker-Compose
docker-compose
allows us to launch multiple containers using a single configuration file, so that we don’t have to run multiple complex docker run commands separately like what we’ve been doing lately!
Heres a docker-compose.yaml
file for running Postgres and pgAdmin containers!
services:
pgdatabase:
image: postgres:13
environment:
- POSTGRES_USER=root
- POSTGRES_PASSWORD=root
- POSTGRES_DB=ny_taxi
volumes:
- "./ny_taxi_postgres_data:/var/lib/postgres/data:rw"
ports:
- "5431:5432"
pgadmin:
image: dpage/pgadmin4
environment:
- PGADMIN_DEFAULT_EMAIL=admin@admin.com
- PGADMIN_DEFAULT_PASSWORD=root
volumes:
- "./data_pgadmin:/var/lib/pgadmin"
ports:
- "8080:80"
-
We don’t have to specify a network because docker-compose takes care of it: every single container (or “service”, as the file states) will run withing the same network and will be able to find each other according to their names (pgdatabase and pgadmin in this example).
-
We’ve added a volume for pgAdmin to save its settings, so that you don’t have to keep re-creating the connection to Postgres every time ypu rerun the container. Make sure you create a data_pgadmin directory in your work folder where you run docker-compose from.
We can now run Docker compose by running the following command from the same directory where docker-compose.yaml is found. Make sure that all previous containers aren’t running anymore:
docker-compose up
Homework Week 1
Run two containers (postgres)
I think the network link between the two is homework_default
so change name in the ingest script run it file!
docker build -t taxi_ingest:homework .
edit ingest_py file adn ad network names:
docker run -it \
--network=homework_default \
taxi_ingest:homework \
--user=root \
--password=root \
--host=pgdatabase \
--port=5432 \
--db=ny_taxi \
--table_name_1=trips \
--table_name_2=zones
After running that above we can explore the database:
pgcli -h localhost -p 5431 -u root -d ny_taxi
select count(*) from trips;
Output:
+---------+
| count |
|---------|
| 1369765 |
+---------+
The Port 5431 only works when the containers are running! Now it kinda makes sense!
Week One: Part 2: Terraform and Google Cloud Platform
What the fuck is Terraform? Infrastructure as code
We will now create a project and a service account, and we will download the authentication keys to our computer. A service account is like a user account but for apps and workloads; you may authorize or limit what resources are available to your apps with service accounts.
REmemeber I have to run the Terraform to set up tjher infrastuctrue!!!!! AKA the data lake and BigQuery Warehouse!!! Before Running Airflow!!!