Zoom Data Engineering Part 3

10 minute read

Warehouse!!!

Data Warehouses commonly use the ETL model. Transformation takes place before loading into the Warehouse!

A DW receives data from different data sources which is then processed in a staging area before being ingested to the actual warehouse (a database) and arranged as needed. DWs may then feed data to separate Data Marts; smaller database systems which end users may use for different purposes.

BigQuery: Cloud Data Warehouse

BigQuery (BQ) is a Data Warehouse solution offered by Google Cloud Platform.

Notes on BigQuery:

  • BQ is serverless. There are no servers to manage or database software to install; this is managed by Google and it’s transparent to the customers.

  • BQ is scalable and has high availability. Google takes care of the underlying software and infrastructure for us.

  • BQ has built-in features like Machine Learning, Geospatial Analysis and Business Intelligence among others.

  • BQ maximizes flexibility by separating data analysis and storage in different compute engines, thus allowing the customers to budget accordingly and reduce costs.

Alternartives to BQ from other cloud providers would be Amazaon Webservices Redshift

External Tables in BigQuery

BigQuery supports a few external data sources: you may query these sources directly from BigQuery even though the data itself isn’t stored in BQ.

An external table is a table that acts like a standard BQ table. The table metadata (such as the schema) is stored in BQ storage but the data itself is external.

You may create an external table from a CSV or Parquet file stored in a Cloud Storage Bucket (gs which stands for Google Storage):

CREATE OR REPLACE EXTERNAL TABLE `new-try-zoom-data.trips_data_all.external_yellow_tripdata`
OPTIONS (
  format = 'CSV',
  uris = ['gs://nyc-tl-data/trip data/yellow_tripdata_2019-*.csv', 'gs://nyc-tl-data/trip data/yellow_tripdata_2020-*.csv']
);

This query will create an external table based on 2 CSV files. BQ will figure out the table schema and the datatypes based on the contents of the files.

Assuming that our location to our .csv file(s) is nyc-tl-data/trip data/ in the Google Bucket Storage.

Partitions

BigQuery tables can be partitioned into multiple smaller tables. A partitioned table is a special table that is divided into segments, called partitions, that make it easier to manage and query your data.

'Insert link to Parition image"

Here’s an example query for creating a partitioned table:

CREATE OR REPLACE TABLE `stackoverflow.questions_2018_partitioned`
PARTITION BY
DATE(creation_date) AS
SELECT
 *
FROM
 `bigquery-public-data.stackoverflow.posts_questions`
WHERE
 creation_date BETWEEN '2018-01-01' AND '2018-07-01';

Clustering

Clustering consists of rearranging a table based on the values of its columns so that the table is ordered according to any criteria.

Clustering can improve the performance of certain types of queries, such as those using filter clauses and queries aggregating data.

Examples of Clustering SQL Query:

CREATE OR REPLACE TABLE `stackoverflow.questions_2018_clustered`
PARTITION BY
 DATE(creation_date)
CLUSTER BY
 tags AS
SELECT
 *
FROM
 `bigquery-public-data.stackoverflow.posts_questions`
WHERE
 creation_date BETWEEN '2018-01-01' AND '2018-07-01';

Partitioning vs Clustering

Clustering Partitioning
Cost benefit unknown. BQ cannot estimate the reduction in cost before running a query. Cost known upfront. BQ can estimate the amount of data to be processed before running a query.
High granularity. Multiple criteria can be used to sort the table. Low granularity. Only a single column can be used to partition the table.
Clusters are “fixed in place”. Partitions can be added, deleted, modified or even moved between storage options.
Benefits from queries that commonly use filters or aggregation against multiple particular columns. Benefits when you filter or aggregate on a single column.
Unlimited amount of clusters; useful when the cardinality of the number of values in a column or group of columns is large. Limited to 4000 partitions; cannot be used in columns with larger cardinality.

An external table is a table that acts like a standard BigQuery table.

Internals and Architecture

While it’s not strictly necessary to understand how the internals of BigQuery work, it can be useful to know in order to understand the reasoning behind the best practices.

BigQuery Architecture

Column-Oriented vs Record-Oriented Storage

Traditional methods for tabular data storage are record-oriented (also known as row-oriented). Data is read sequentially row by row and then the columns are accessed per row.

BiGQuery uses a columnar storage format. Data is stored according to the columns of the table rather than the rows. This is beneficial when dealing with massive amounts of data because it allows us to discard right away the columns we’re not interested in when performing queries, thus reducing the amount of processed data.

This stuff is heavily about how database management systems are built from a low-code level! I have notes and material about this subject.

Machine Learning with BigQuery

Here is an example of training a linear regression model in BQ on the dataset we uploaded to GCS and BQ in the previous lesson (Part 2) using Airflow, and also how to do hyperparameter tuning:

  • Commonly used for Machine Learning with all our data

Create a Machine Learning Table

CREATE OR REPLACE TABLE `new-try-zoom-data.trips_data_all.yellow_tripdata_ml` (
`passenger_count` FLOAT64,
`trip_distance` FLOAT64,
`PULocationID` STRING,
`DOLocationID` STRING,
`payment_type` STRING,
`fare_amount` FLOAT64,
`tolls_amount` FLOAT64,
`tip_amount` FLOAT64
) AS (
SELECT passenger_count, trip_distance, cast(PULocationID AS STRING), CAST(DOLocationID AS STRING),
CAST(payment_type AS STRING), fare_amount, tolls_amount, tip_amount
FROM `new-try-zoom-data.trips_data_all.yellow_tripdata` where fare_amount != 0 );

This will create a new table named yellow_tripdata_ml

Lets Create a Model with default setting called tip_model

  • BigQuery ML increases the speed of model development* and innovation by removing the need to export data from the data warehouse. Instead, BigQuery ML brings ML to the data.
CREATE OR REPLACE MODEL `new-try-zoom-data.trips_data_all.tip_model`
OPTIONS
(model_type='linear_reg',
input_label_cols=['tip_amount'],
DATA_SPLIT_METHOD='AUTO_SPLIT') AS
SELECT
*
FROM
`new-try-zoom-data.trips_data_all.yellow_tripdata_ml`
WHERE
tip_amount IS NOT NULL;
  • This will create a Model in BigQuery named tip_model

CHECK FEATURES

SELECT * FROM ML.FEATURE_INFO(MODEL `new-try-zoom-data.trips_data_all.tip_model`);

Output:

Row input min max mean median stddev category_count null_count dimension
1 passenger_count 0.0 9.0 1.5713114623695559 1.0 1.2270713937973978 null 91264 null
2 trip_distance 0.0 831.8 2.924479001109217 1.6 3.8415366910854285 null 0 null
3 PULocationID null null null null null 263 0 null
4 DOLocationID null null null null null 262 0 null
5 payment_type null null null null null 5 0 null
6 fare_amount -447.0 943274.8 12.912979364787876 9.0 292.16502049238812 null 0 null
7 tolls_amount -70.0 3288.0 0.34772420656045427 0.0 1.8066459071038299 null 0 null

Evaluate the Model

SELECT
*
FROM
ML.EVALUATE(MODEL `new-try-zoom-data.trips_data_all.tip_model`,
(
SELECT
*
FROM
`new-try-zoom-data.trips_data_all.yellow_tripdata_ml`
WHERE
tip_amount IS NOT NULL
));

Output:

Results from Checking Features

mean_absolute_error mean_squared_error mean_squared_log_error median_absolute_error r2_score explained_variance
0.935111557019694 479.42476744541432 0.39864870660870216 0.57709302471312185 0.462852276471617 0.46285227647163496

Predict the Model

Predict the Model using Predict

SELECT
*
FROM
ML.PREDICT(MODEL `new-try-zoom-data.trips_data_all.tip_model`,
(
SELECT
*
FROM
`new-try-zoom-data.trips_data_all.yellow_tripdata_ml`
WHERE
tip_amount IS NOT NULL
));

Output:

predicted_tip_amount passenger_count trip_distance PULocationID DOLocationID payment_type fare_amount tolls_amount tip_amount
7.50063555197471 1.0 12.54 88 138 1 35.5 5.76 8.91
2.5834528033492461 1.0 3.2 80 79 1 12.0 0.0 3.95
2.064685938112234 4.0 1.7 74 75 1 7.5 0.0 2.08
4.6820637899561461 1.0 10.95 65 41 1 34.0 0.0 8.7
2.7916576465349863 1.0 10.5 71 7 1 39.0 0.0 6.04
3.0505674288324371 1.0 3.9 181 255 1 14.5 0.0 4.59
3.5220206371827771 2.0 7.61 243 263 1 25.0 0.0 5.26
2.4762222355116137 1.0 2.56 74 239 1 11.0 0.0 3.69
1.9052375837331965 1.0 0.0 223 223 1 2.5 0.0 22.0

Predict and Explain

  • Use the Explain_Predict
SELECT
*
FROM
ML.EXPLAIN_PREDICT(MODEL `new-try-zoom-data.trips_data_all.tip_model`,
(
SELECT
*
FROM
`new-try-zoom-data.trips_data_all.yellow_tripdata_ml`
WHERE
tip_amount IS NOT NULL
), STRUCT(3 as top_k_features));

Output:

Row predicted_tip_amount top_fe…feature top_fe…attribution baseline_prediction_value prediction_value approximation_error passenger_count trip_distance PULocationID DOLocationID payment_type fare_amount tolls_amount tip_amount
1 3.6458775405344568 DOLocationID -1262.4723886656361 1147.0080772412966 3.6458775405344568 0.0 1.0 5.8 88 68 1 20.5 0.0 5.45
    payment_type 527.86471982111334                      
    PULocationID -409.52247892252444                      
2 2.0717152995168817 DOLocationID -1262.4831532210451 1147.0080772412966 2.0717152995168817 0.0 1.0 1.45 146 179 1 6.5 0.0 2.08
    payment_type 527.86471982111334                      
    PULocationID -409.69921264045661                      
3 -0.98259374471263072 DOLocationID -1262.664828644402 1147.0080772412966 -0.98259374471263072 0.0 1.0 0.0 226 226 4 -2.5 0.0 -0.11
    payment_type 525.55219619296031                      
    PULocationID -409.49673053683972                      
4 2.5389160378490487 DOLocationID -1262.0748857904809 1147.0080772412966 2.5389160378490487 0.0 1.0 1.66 80 112 1 10.0 0.0 3.39
    payment_type 527.86471982111334                      
    PULocationID -409.90298061472788                      
5 2.5322146626583617 DOLocationID -1262.5097156344825 1147.0080772412966 2.5322146626583617 0.0 1.0 3.01 41 143 1 13.5 0.0 3.46
    payment_type 527.86471982111334                      
    PULocationID -409.8466357860907                      

Hyper Param Tunning

CREATE OR REPLACE MODEL `new-try-zoom-data.trips_data_all.tip_hyperparam_model`
OPTIONS
(model_type='linear_reg',
input_label_cols=['tip_amount'],
DATA_SPLIT_METHOD='AUTO_SPLIT',
num_trials=5,
max_parallel_trials=2,
l1_reg=hparam_range(0, 20),
l2_reg=hparam_candidates([0, 0.1, 1, 10])) AS
SELECT
*
FROM
`new-try-zoom-data.trips_data_all.yellow_tripdata_ml`
WHERE
tip_amount IS NOT NULL;

We can see the new Hyper Para Model under Models folder as shown below;

"insert hyper model pic"

After Training the Model, we need to deploy it. Lets set that up.

Deploying the BigQuery Machine Learning Model

  • Extracting the BigQuery ML model to a Docker container steps

Note: I need the emacski/tensorflow-serving docker image instead for Mac M1 Chip

Step 1: Log into gcloud in the commandline terminal

gcloud auth login

Step 2: Export the project into our Google Cloud Storage

Note: Had to create the Data Lake in GCS named taxi_ml_model_devin_test and I set the region to central1 (Iowa) to match with my other bucket

bq --project_id new-try-zoom-data extract -m trips_data_all.tip_model gs://taxi_ml_model_devin_test/tip_model

Here is some photos of our new gcc bucket we created in Google Cloud.

"insert model pic"

Step 3: Create a new directory in my week_3_data_warehouse directory

 mkdir /tmp/model

Step 4: Copy our model from GCS into our location machine folder we created above

gsutil cp -r gs://taxi_ml_model_devin_test/tip_model /tmp/model

Step 5: Creating a serving directory

mkdir -p serving_dir/tip_model/1

Step 6: Then copy components into our serving directory

 cp -r /tmp/model/tip_model/* serving_dir/tip_model/1

"insert  model pic"

Step 7: The Docker image below is for Mac M1 Chip for other macs/systems we can use docker pull tensorflow/serving

docker pull emacski/tensorflow-serving:latest

Step 8. Run Docker Container

docker run \
  -p 8501:8501 \
  --mount type=bind,source=`pwd`/serving_dir/tip_model,target=/models/tip_model \
  -e MODEL_NAME=tip_model \
  -t emacski/tensorflow-serving &

Step 9. With the image running, run a Prediction with curl, providing values for the features used for the predictions.

  curl \
    -d '{"instances": [{"passenger_count":1, "trip_distance":12.2, "PULocationID":"193", "DOLocationID":"264", "payment_type":"2","fare_amount":20.4,"tolls_amount":0.0}]}' \
    -X POST http://localhost:8501/v1/models/tip_model:predict

Output:

{
    "predictions": [[0.57611719958651975]
    ]
}% 

Exporting a BigQuery ML model for online prediction

We can write a Shell Script to Automate this Process here:

Integrating BigQuery with Airflow ( Part 3 Starts here!! )

We will now use Airflow to automate the creation of BQ tables, both normal and partitioned, from the files we stored in our Data Lake in lesson 2! (Green Taxi, Yellow Taxi, and FHV)

Dags for Yellow and Green Taxis

Airflow Setup

Files Needed:

  • Dockerfile for creating a custom Airflow image with the gcloud sdk.
  • docker-compose.yaml to deploying airflow with Docker
  • our .env file with the AIRFLOW_UID
  • The Dags folder

Now we can deploy airflow:

Build the Image:

  • Might take up to 10 minutes the first time you build the Image.
docker-compose build 

Intitialize the Airflow Configuration:

docker-compose up airflow-init

Start Airflow:

docker-compose up

Now we can open up localhost:8080 and explore Airflow Interface.

Creating a Cloud Storage to BigQuery DAG

Now we can create a new Dag in our Dag folder called gcs_2_bq_dag.py

Define the task dependencies at the end of the DAG. We will use the following structure:

Here is link to our Dag!

Homework for Part 3

  • Run SQL Queries on our BQ Data Warehouse

Question 1:

What is count for fhv vehicles data for year 2019 Can load the data for cloud storage and run a count(*)

SELECT COUNT(*) FROM `new-try-zoom-data.trips_data_all.fhv_tripdata`;

Output:

5277989

Question 2:

How many distinct dispatching_base_num we have in fhv for 2019 Can run a distinct query on the table from question 1

SELECT COUNT(DISTINCT dispatching_base_num) FROM `new-try-zoom-data.trips_data_all.fhv_tripdata`;

Output:

634

Conclusion

In this week, we focused more on data warehouses and reviewed Google BigQuery and Amazon Redshift services. We learned how to do normal SQL queries and also how to do machine learning in these warehouses using SQL.

Next Lesson we will work on Analytics!!! (using dbt)