Data Engineering with Postgres
Goals:
Set up ETL Pipeline
using Postgres and Python!
- New
SQL Queries to Create Database
SQL.py
# DROP TABLES
songplay_table_drop = "DROP table IF EXISTS songplays"
user_table_drop = "DROP table IF EXISTS users"
song_table_drop = "DROP table IF EXISTS songs"
artist_table_drop = "DROP table IF EXISTS artists"
time_table_drop = "DROP table IF EXISTS time"
# CREATE TABLES
# song_id and artist_id haven't constraints NOT NULL because the sample data to test ETL doesn't have all the
# combinations and most are null in both cases.
songplay_table_create = ("""
CREATE TABLE IF NOT EXISTS songplays \
(songplay_id SERIAL PRIMARY KEY,
start_time bigint NOT NULL,
user_id int NOT NULL,
level varchar,
song_id varchar,
artist_id varchar,
session_id int,
location text,
user_agent text)
""")
user_table_create = ("""
CREATE TABLE IF NOT EXISTS users
(user_id int PRIMARY KEY,
first_name varchar,
last_name varchar,
gender varchar(1),
level varchar)
""")
song_table_create = ("""
CREATE TABLE IF NOT EXISTS songs
(song_id varchar PRIMARY KEY,
title text,
artist_id varchar,
year int,
duration numeric);
""")
artist_table_create = ("""
CREATE TABLE IF NOT EXISTS artists
(artist_id varchar PRIMARY KEY,
name varchar,
location text,
latitude decimal,
longitude decimal)
""")
time_table_create = ("""
CREATE TABLE IF NOT EXISTS time
(start_time bigint PRIMARY KEY,
hour int,
day int,
week int,
month int,
year int,
weekday varchar)
""")
# INSERT RECORDS
songplay_table_insert = ("""
INSERT INTO songplays (start_time, user_id, level, song_id, artist_id, session_id, location, user_agent)
VALUES ( %s, %s, %s, %s, %s, %s, %s, %s)
""")
user_table_insert = ("""
INSERT INTO users (user_id, first_name, last_name, gender, level)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (user_id)
DO UPDATE
SET level = EXCLUDED.level
""")
song_table_insert = ("""
INSERT INTO songs (song_id, title, artist_id, year, duration)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (song_id)
DO NOTHING
""")
artist_table_insert = ("""
INSERT INTO artists (artist_id, name, location, latitude, longitude)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (artist_id)
DO NOTHING
""")
time_table_insert = ("""
INSERT INTO time (start_time, hour, day, week, month, year, weekday) \
VALUES (%s, %s, %s, %s, %s, %s, %s) \
ON CONFLICT (start_time) \
DO NOTHING
""")
# FIND SONGS
song_select = ("""
SELECT S.song_id, A.artist_id FROM songs S
JOIN artists A ON S.artist_id = A.artist_id
WHERE S.title = %s AND A.name = %s AND S.duration = %s
""")
# QUERY LISTS
create_table_queries = [songplay_table_create, user_table_create, song_table_create,
artist_table_create, time_table_create]
drop_table_queries = [songplay_table_drop, user_table_drop, song_table_drop, artist_table_drop,
time_table_drop]
Python Script to Connect to Database and Create Tables
- Connects to database, creates tables inside database!
import psycopg2
from sql_queries import create_table_queries, drop_table_queries
def create_database():
"""
This function connects to the Database to insert the data
Return: cursor an connection
"""
conn = psycopg2.connect(host = "localhost",user = "devinpowers", port="5432", dbname = "sparkifydb")
cur = conn.cursor()
return cur, conn
def drop_tables(cur, conn):
"""
Drop all the tables.
:param cur: cursor.
:param conn: connection.
"""
for query in drop_table_queries:
cur.execute(query)
conn.commit()
def create_tables(cur, conn):
"""
Create all the tables.
:param cur: cursor.
:param conn: connection.
"""
for query in create_table_queries:
cur.execute(query)
conn.commit()
def main():
cur, conn = create_database()
drop_tables(cur, conn)
create_tables(cur, conn)
conn.close()
if __name__ == "__main__":
main()
After this, the tables will be created inside our database.
Inserting Data into our Database Sparkify
import os
import glob
import psycopg2
import pandas as pd
from sql_queries import *
# Connect to our database
conn = psycopg2.connect(host = "localhost",user = "devinpowers", port="5432", dbname = "sparkifydb")
cur = conn.cursor()
# Return the files from our data folder
def get_files(filepath):
all_files = []
for root, dirs, files in os.walk(filepath):
files = glob.glob(os.path.join(root,'*.json'))
for f in files :
all_files.append(os.path.abspath(f))
return all_files
# Lets Process the song data
song_files = get_files("./data/song_data")
filepath = song_files[0]
df = pd.read_json(filepath, lines=True)
song_data = df[['song_id','title','artist_id', 'year', 'duration']].values[0].tolist()
# insertinfg data into our tables
cur.execute(song_table_insert, song_data)
conn.commit()
# Artist table
artist_data = df[['artist_id','artist_name','artist_location', 'artist_latitude', 'artist_longitude']].values[0].tolist()
cur.execute(artist_table_insert, artist_data)
conn.commit()
# Process log_data
log_files = get_files("./data/log_data")
GOOD CODE !!!
import os
import glob
import psycopg2
import pandas as pd
from sql_queries import *
def insert_record(cur, insert_query, df, fields):
"""
Insert a record into a DB table.
:param cur: connection cursor to insert the data in DB.
:param insert_query: query SQL for Insert.
:param df: dataframe with the record.
:param fields: array of fields of the data to insert.
"""
record = df[fields].values[0].tolist()
cur.execute(insert_query, record)
def insert_dataframe(cur, df, insert_query):
"""
Insert a pandas dataframe into a DB table
:param cur: connection cursor to insert the data in DB.
:param df: dataframe with the record.
:param insert_query: query SQL for Insert.
"""
for i, row in df.iterrows():
cur.execute(insert_query, list(row))
def process_song_file(cur, filepath):
"""
Process the songs files and insert data into dimension tables: songs and artists.
:param cur: connection cursor to insert the data in DB.
:param filepath: path/to/the/song/file.
"""
# open song file
df = pd.read_json(filepath, lines=True)
# insert song record
insert_record(cur, song_table_insert, df, ['song_id', 'title', 'artist_id', 'year', 'duration'])
# insert artist record
insert_record(cur, artist_table_insert, df,
['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude'])
def expand_time_data(df, ts_field):
"""
Add more time elements to a dataframe from a UNIX timestamp in milliseconds.
:param df: pandas dataframe to add time fields.
:param ts_field: name of the timestamp field field.
:return: pandas dataframe with more time fields.
"""
df['datetime'] = pd.to_datetime(df[ts_field], unit='ms')
t = df
t['year'] = t['datetime'].dt.year
t['month'] = t['datetime'].dt.month
t['day'] = t['datetime'].dt.day
t['hour'] = t['datetime'].dt.hour
t['weekday_name'] = t['datetime'].dt.day_name()
t['week'] = t['datetime'].dt.week
return t
def get_songid_artistid(cur, song, artist, length):
"""
Gets the song_id and the artist_id from song tittle, artist name and gon duration.
:param cur: connection cursor to query the data in DB.
:param song: song tittle
:param artist: artist name
:param length: song duration
:return: returns song_id and artist_id
"""
# get songid and artistid from song and artist tables
cur.execute(song_select, (song, artist, length))
results = cur.fetchone()
if results:
songid, artistid = results
else:
songid, artistid = None, None
return songid, artistid
def insert_facts_songplays(cur, df):
"""
Insert songplays fact table
:param cur: connection cursor to insert the data in DB.
:param df: dataframe with song plays data.
"""
# insert songplay records
for index, row in df.iterrows():
song_id, artist_id = get_songid_artistid(cur, row.song, row.artist, row.length)
# insert songplay record
songplay_data = (row.ts, row.userId, row.level, song_id, artist_id,
row.itemInSession, row.location, row.userAgent)
cur.execute(songplay_table_insert, songplay_data)
def process_log_file(cur, filepath):
"""
Process the log files and insert data into dimension tables: time and users.
Insert data into the facts table songplays.
:param cur: connection cursor to insert the data in DB.
:param filepath: path/to/the/log/file.
"""
# open log file
df = pd.read_json(filepath, lines=True)
# filter by NextSong action
df = df.loc[df['page'] == 'NextSong']
# convert timestamp column to datetime
t = expand_time_data(df, 'ts')
# insert time data records
time_df = t[['ts', 'hour', 'day', 'week', 'month', 'year', 'weekday_name']]
insert_dataframe(cur, time_df, time_table_insert)
# load user table
user_df = df[['userId', 'firstName', 'lastName', 'gender', 'level']]
# insert user records
insert_dataframe(cur, user_df, user_table_insert)
# insert songplay records
insert_facts_songplays(cur, df)
def get_all_files_matching_from_directory(directorypath, match):
"""
Get all the files that match into a directory recursively.
:param directorypath: path/to/directory.
:param match: match expression.
:return: array with all the files that match.
"""
# get all files matching extension from directory
all_files = []
for root, dirs, files in os.walk(directorypath):
files = glob.glob(os.path.join(root, match))
for f in files :
all_files.append(os.path.abspath(f))
return all_files
def process_data(cur, conn, filepath, func):
"""
Process all the data, either songs files or logs files
:param cur: connection cursor to insert the data in DB.
:param conn: connection to the database to do the commit.
:param filepath: path/to/data/type/directory
:param func: function to process, transform and insert into DB the data.
"""
all_files = get_all_files_matching_from_directory(filepath, '*.json')
# get total number of files found
num_files = len(all_files)
print('{} files found in {}'.format(num_files, filepath))
# iterate over files and process
for i, datafile in enumerate(all_files, 1):
func(cur, datafile)
conn.commit()
print('{}/{} files processed.'.format(i, num_files))
def main():
#conn = psycopg2.connect("host=127.0.0.1 dbname=sparkifydb user=postgres password=postgres")
conn = psycopg2.connect(host = "localhost",user = "devinpowers", port="5432", dbname = "sparkifydb")
cur = conn.cursor()
process_data(cur, conn, filepath='data/song_data', func=process_song_file)
process_data(cur, conn, filepath='data/log_data', func=process_log_file)
conn.close()
- Learn about Slicing and Dicing in SQL
- Learn about Roll Up and Drill