Data Engineering with Postgres

6 minute read

Goals:

Set up ETL Pipeline using Postgres and Python!

  • New

SQL Queries to Create Database

SQL.py

# DROP TABLES

songplay_table_drop = "DROP table IF EXISTS songplays"
user_table_drop = "DROP table IF EXISTS users"
song_table_drop = "DROP table IF EXISTS songs"
artist_table_drop = "DROP table IF EXISTS artists"
time_table_drop = "DROP table IF EXISTS time"

# CREATE TABLES

# song_id and artist_id haven't constraints NOT NULL because the sample data to test ETL doesn't have all the
# combinations and most are null in both cases.
songplay_table_create = (""" 
CREATE TABLE IF NOT EXISTS songplays \
    (songplay_id SERIAL PRIMARY KEY, 
    start_time bigint NOT NULL, 
    user_id int NOT NULL, 
    level varchar,
    song_id varchar, 
    artist_id varchar, 
    session_id int, 
    location text, 
    user_agent text)
""")

user_table_create = (""" 
CREATE TABLE IF NOT EXISTS users 
    (user_id int PRIMARY KEY, 
    first_name varchar, 
    last_name varchar, 
    gender varchar(1), 
    level varchar)
""")

song_table_create = (""" 
CREATE TABLE IF NOT EXISTS songs 
    (song_id varchar PRIMARY KEY, 
    title text, 
    artist_id varchar, 
    year int, 
    duration numeric);
""")

artist_table_create = (""" 
CREATE TABLE IF NOT EXISTS artists 
    (artist_id varchar PRIMARY KEY, 
    name varchar, 
    location text, 
    latitude decimal, 
    longitude decimal)
""")

time_table_create = (""" 
CREATE TABLE IF NOT EXISTS time 
    (start_time bigint PRIMARY KEY, 
    hour int, 
    day int, 
    week int, 
    month int, 
    year int, 
    weekday varchar)

""")

# INSERT RECORDS

songplay_table_insert = (""" 
INSERT INTO songplays (start_time, user_id, level, song_id, artist_id, session_id, location, user_agent) 
    VALUES ( %s, %s, %s, %s, %s, %s, %s, %s) 
""")

user_table_insert = (""" 
INSERT INTO users (user_id, first_name, last_name, gender, level) 
    VALUES (%s, %s, %s, %s, %s) 
    ON CONFLICT (user_id) 
        DO UPDATE
        SET level = EXCLUDED.level
""")

song_table_insert = (""" 
INSERT INTO songs (song_id, title, artist_id, year, duration) 
    VALUES (%s, %s, %s, %s, %s) 
    ON CONFLICT (song_id) 
        DO NOTHING
""")

artist_table_insert = (""" 
INSERT INTO artists (artist_id, name, location, latitude, longitude) 
    VALUES (%s, %s, %s, %s, %s) 
    ON CONFLICT (artist_id) 
        DO NOTHING
""")


time_table_insert = (""" 
INSERT INTO time (start_time, hour, day, week, month, year, weekday) \
    VALUES (%s, %s, %s, %s, %s, %s, %s) \
    ON CONFLICT (start_time) \
    DO NOTHING 
""")

# FIND SONGS

song_select = (""" 
SELECT S.song_id, A.artist_id FROM songs S 
    JOIN artists A ON S.artist_id = A.artist_id
    WHERE S.title = %s AND A.name = %s AND S.duration = %s
""")

# QUERY LISTS

create_table_queries = [songplay_table_create, user_table_create, song_table_create,
                        artist_table_create, time_table_create]
drop_table_queries = [songplay_table_drop, user_table_drop, song_table_drop, artist_table_drop,
                      time_table_drop]

Python Script to Connect to Database and Create Tables

  • Connects to database, creates tables inside database!
import psycopg2
from sql_queries import create_table_queries, drop_table_queries


def create_database():
    """
    This function  connects to the Database to insert the data
    Return: cursor an connection
    """

    conn = psycopg2.connect(host = "localhost",user = "devinpowers", port="5432", dbname = "sparkifydb")
    
    cur = conn.cursor()
    
    return cur, conn

def drop_tables(cur, conn):
    """
    Drop all the tables.
    :param cur: cursor.
    :param conn: connection.
    """
    for query in drop_table_queries:
        cur.execute(query)
        conn.commit()


def create_tables(cur, conn):
    """
    Create all the tables.
    :param cur: cursor.
    :param conn: connection.
    """
    for query in create_table_queries:
        cur.execute(query)
        conn.commit()

def main():
    cur, conn = create_database()
    
    drop_tables(cur, conn)
    create_tables(cur, conn)

    conn.close()


if __name__ == "__main__":
    main() 

After this, the tables will be created inside our database.

Inserting Data into our Database Sparkify

import os
import glob
import psycopg2
import pandas as pd
from sql_queries import *

# Connect to our database
conn = psycopg2.connect(host = "localhost",user = "devinpowers", port="5432", dbname = "sparkifydb")

cur = conn.cursor()

# Return the files from our data folder
def get_files(filepath):
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.json'))
        for f in files :
            all_files.append(os.path.abspath(f))
    
    return all_files

# Lets Process the song data

song_files = get_files("./data/song_data")


filepath = song_files[0]

df = pd.read_json(filepath, lines=True)


song_data = df[['song_id','title','artist_id', 'year', 'duration']].values[0].tolist()

# insertinfg data into our tables

cur.execute(song_table_insert, song_data)
conn.commit()

# Artist table

artist_data = df[['artist_id','artist_name','artist_location', 'artist_latitude', 'artist_longitude']].values[0].tolist()

cur.execute(artist_table_insert, artist_data)
conn.commit()

# Process log_data

log_files = get_files("./data/log_data")


GOOD CODE !!!

import os
import glob
import psycopg2
import pandas as pd
from sql_queries import *


def insert_record(cur, insert_query, df, fields):
    """
    Insert a record into a DB table.
    :param cur: connection cursor to insert the data in DB.
    :param insert_query: query SQL for Insert.
    :param df: dataframe with the record.
    :param fields: array of fields of the data to insert.
    """
    record = df[fields].values[0].tolist()
    cur.execute(insert_query, record)


def insert_dataframe(cur, df, insert_query):
    """
    Insert a pandas dataframe into a DB table
    :param cur: connection cursor to insert the data in DB.
    :param df: dataframe with the record.
    :param insert_query: query SQL for Insert.
    """
    for i, row in df.iterrows():
        cur.execute(insert_query, list(row))


def process_song_file(cur, filepath):
    """
    Process the songs files and insert data into dimension tables: songs and artists.
    :param cur: connection cursor to insert the data in DB.
    :param filepath: path/to/the/song/file.
    """

    # open song file
    df = pd.read_json(filepath, lines=True)

    # insert song record
    insert_record(cur, song_table_insert, df, ['song_id', 'title', 'artist_id', 'year', 'duration'])
    
    # insert artist record
    insert_record(cur, artist_table_insert, df,
                  ['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude'])


def expand_time_data(df, ts_field):
    """
    Add more time elements to a dataframe from a UNIX timestamp in milliseconds.
    :param df: pandas dataframe to add time fields.
    :param ts_field: name of the timestamp field field.
    :return: pandas dataframe with more time fields.
    """

    df['datetime'] = pd.to_datetime(df[ts_field], unit='ms')
    t = df
    t['year'] = t['datetime'].dt.year
    t['month'] = t['datetime'].dt.month
    t['day'] = t['datetime'].dt.day
    t['hour'] = t['datetime'].dt.hour
    t['weekday_name'] = t['datetime'].dt.day_name()
    t['week'] = t['datetime'].dt.week

    return t


def get_songid_artistid(cur, song, artist, length):
    """
    Gets the song_id and the artist_id from song tittle, artist name and gon duration.
    :param cur: connection cursor to query the data in DB.
    :param song: song tittle
    :param artist: artist name
    :param length: song duration
    :return: returns song_id and artist_id
    """

    # get songid and artistid from song and artist tables
    cur.execute(song_select, (song, artist, length))
    results = cur.fetchone()

    if results:
        songid, artistid = results
    else:
        songid, artistid = None, None

    return songid, artistid


def insert_facts_songplays(cur, df):
    """
    Insert songplays fact table
    :param cur: connection cursor to insert the data in DB.
    :param df: dataframe with song plays data.
    """

    # insert songplay records
    for index, row in df.iterrows():
        song_id, artist_id = get_songid_artistid(cur, row.song, row.artist, row.length)

        # insert songplay record
        songplay_data = (row.ts, row.userId, row.level, song_id, artist_id,
                         row.itemInSession, row.location, row.userAgent)
        cur.execute(songplay_table_insert, songplay_data)


def process_log_file(cur, filepath):
    """
    Process the log files and insert data into dimension tables: time and users.
    Insert data into the facts table songplays.
    :param cur: connection cursor to insert the data in DB.
    :param filepath: path/to/the/log/file.
    """

    # open log file
    df = pd.read_json(filepath, lines=True)

    # filter by NextSong action
    df = df.loc[df['page'] == 'NextSong']

    # convert timestamp column to datetime
    t = expand_time_data(df, 'ts')

    # insert time data records
    time_df = t[['ts', 'hour', 'day', 'week', 'month', 'year', 'weekday_name']]

    insert_dataframe(cur, time_df, time_table_insert)

    # load user table
    user_df = df[['userId', 'firstName', 'lastName', 'gender', 'level']]

    # insert user records
    insert_dataframe(cur, user_df, user_table_insert)

    # insert songplay records
    insert_facts_songplays(cur, df)


def get_all_files_matching_from_directory(directorypath, match):
    """
    Get all the files that match into a directory recursively.
    :param directorypath: path/to/directory.
    :param match: match expression.
    :return: array with all the files that match.
    """
    # get all files matching extension from directory
    all_files = []
    for root, dirs, files in os.walk(directorypath):
        files = glob.glob(os.path.join(root, match))
        for f in files :
            all_files.append(os.path.abspath(f))

    return all_files


def process_data(cur, conn, filepath, func):
    """
    Process all the data, either songs files or logs files
    :param cur: connection cursor to insert the data in DB.
    :param conn: connection to the database to do the commit.
    :param filepath: path/to/data/type/directory
    :param func: function to process, transform and insert into DB the data.
    """

    all_files = get_all_files_matching_from_directory(filepath, '*.json')

    # get total number of files found
    num_files = len(all_files)
    print('{} files found in {}'.format(num_files, filepath))

    # iterate over files and process
    for i, datafile in enumerate(all_files, 1):
        func(cur, datafile)
        conn.commit()
        print('{}/{} files processed.'.format(i, num_files))


def main():

    #conn = psycopg2.connect("host=127.0.0.1 dbname=sparkifydb user=postgres password=postgres")
    conn = psycopg2.connect(host = "localhost",user = "devinpowers", port="5432", dbname = "sparkifydb")
    cur = conn.cursor()

    process_data(cur, conn, filepath='data/song_data', func=process_song_file)
    process_data(cur, conn, filepath='data/log_data', func=process_log_file)

    conn.close()
  • Learn about Slicing and Dicing in SQL
  • Learn about Roll Up and Drill