ETL Pipeline
Big Data: Pipeline -> ETL Extract Transform Load
Use Twitter API to take tweets and import them into and SQL Table
- I Will use mySQL since I know it!
import mysql.connector
from mysql.connector import Error
import tweepy
import json
from dateutil import parser
import time
consumer_key = ''
consumer_secret = ''
access_token = ''
access_token_secret = ''
password = 'password'
def connect(username, created_at, tweet, retweet_count, place , location):
"""
connect to MySQL database and insert twitter data
"""
try:
con = mysql.connector.connect(host = 'localhost',
database='twitterdb', user='root', password = password, charset = 'utf8')
if con.is_connected():
"""
Insert twitter data
"""
print("Connected!!")
cursor = con.cursor()
# twitter, golf
query = "INSERT INTO tweets (username, created_at, tweet, retweet_count,place, location) VALUES (%s, %s, %s, %s, %s, %s)"
cursor.execute(query, (username, created_at, tweet, retweet_count, place, location))
con.commit()
except Error as e:
print(e)
cursor.close()
con.close()
return
# Tweepy class to access Twitter API
class Streamlistener(tweepy.StreamListener):
def on_connect(self):
print("You are connected to the Twitter API")
def on_error(self):
if status_code != 200:
print("error found")
# returning false disconnects the stream
return False
"""
This method reads in tweet data as Json
and extracts the data we want.
"""
def on_data(self,data):
try:
raw_data = json.loads(data)
if 'text' in raw_data:
username = raw_data['user']['screen_name']
created_at = parser.parse(raw_data['created_at'])
tweet = raw_data['text']
retweet_count = raw_data['retweet_count']
if raw_data['place'] is not None:
place = raw_data['place']['country']
print(place)
else:
place = None
location = raw_data['user']['location']
#insert data just collected into MySQL database
connect(username, created_at, tweet, retweet_count, place, location)
print("Tweet colleted at: {} ".format(str(created_at)))
except Error as e:
print(e)
if __name__== '__main__':
"""
# # #Allow user input
track = []
while True:
input1 = input("what do you want to collect tweets on?: ")
track.append(input1)
input2 = input("Do you wish to enter another word? y/n ")
if input2 == 'n' or input2 == 'N':
break
print("You want to search for {}".format(track))
print("Initialising Connection to Twitter API....")
time.sleep(2)
"""
# authentification so we can access twitter
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api =tweepy.API(auth, wait_on_rate_limit=True)
# create instance of Streamlistener
listener = Streamlistener(api = api)
stream = tweepy.Stream(auth, listener = listener)
track = ['NBA', 'Lebron', 'Jordan', 'Lakers']
#track = ['nba', 'cavs', 'celtics', 'basketball']
# choose what we want to filter by
stream.filter(track = track, languages = ['en'])
OK here I will upload some photos describing how we set up the MySQL Server on our computer and then stream tweets into a database
Part 2: Performing some analysis on the raw data
- Clean the data and then perform simple analysis
import mysql.connector
from mysql.connector import Error
import os
import re
import pandas as pd
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from nltk.stem import PorterStemmer
import nltk
nltk.download('stopwords')
from wordcloud import WordCloud, STOPWORDS
import numpy as np
import matplotlib.pyplot as plt
from textblob import TextBlob
class TweetObject():
def __init__(self, host, database, user):
self.password = 'password'
self.host = host
self.database = database
self.user = user
def MySQLConnect(self,query):
"""
Connects to database and extracts
raw tweets and any other columns we
need
Parameters:
----------------
arg1: string: SQL query
Returns: Pandas Dataframe
----------------
"""
try:
con = mysql.connector.connect(host = self.host, database = self.database, \
user = self.user, password = self.password, charset = 'utf8')
if con.is_connected():
print("Successfully connected to database")
cursor = con.cursor()
query = query
cursor.execute(query)
data = cursor.fetchall()
# store in dataframe
df = pd.DataFrame(data,columns = ['date', 'tweet'])
except Error as e:
print(e)
cursor.close()
con.close()
return df
def clean_tweets(self, df):
"""
Takes raw tweets and cleans them
so we can carry out analysis
remove stopwords, punctuation,
lower case, html, emoticons.
This will be done using Regex
? means option so colou?r matches
both color and colour.
"""
# Do some text preprocessing
stopword_list = stopwords.words('english')
ps = PorterStemmer()
df["clean_tweets"] = None
df['len'] = None
for i in range(0,len(df['tweet'])):
# get rid of anythin that isnt a letter
exclusion_list = ['[^a-zA-Z]','rt', 'http', 'co', 'RT']
exclusions = '|'.join(exclusion_list)
text = re.sub(exclusions, ' ' , df['tweet'][i])
text = text.lower()
words = text.split()
words = [word for word in words if not word in stopword_list]
# only use stem of word
#words = [ps.stem(word) for word in words]
df['clean_tweets'][i] = ' '.join(words)
# Create column with data length
df['len'] = np.array([len(tweet) for tweet in data["clean_tweets"]])
return df
def sentiment(self, tweet):
"""
This function calculates sentiment
on our cleaned tweets.
Uses textblob to calculate polarity.
Parameters:
----------------
arg1: takes in a tweet (row of dataframe)
"""
# need to improce
analysis = TextBlob(tweet)
if analysis.sentiment.polarity > 0:
return 1
elif analysis.sentiment.polarity == 0:
return 0
else:
return -1
def save_to_csv(self, df):
"""
Save cleaned data to a csv for further
analysis.
Parameters:
----------------
arg1: Pandas dataframe
"""
try:
df.to_csv("clean_tweets.csv")
print("\n")
print("csv successfully saved. \n")
except Error as e:
print(e)
def word_cloud(self, df):
plt.subplots(figsize = (12,10))
wordcloud = WordCloud(
background_color = 'white',
width = 1000,
height = 800).generate(" ".join(df['clean_tweets']))
plt.imshow(wordcloud)
plt.axis('off')
plt.show()
if __name__ == '__main__':
t = TweetObject( host = 'localhost', database = 'twitterdb', user = 'root')
data = t.MySQLConnect("SELECT created_at, tweet FROM `twitterdb`.`tweets`;")
data = t.clean_tweets(data)
data['Sentiment'] = np.array([t.sentiment(x) for x in data['clean_tweets']])
t.word_cloud(data)
t.save_to_csv(data)
pos_tweets = [tweet for index, tweet in enumerate(data["clean_tweets"]) if data["Sentiment"][index] > 0]
neg_tweets = [tweet for index, tweet in enumerate(data["clean_tweets"]) if data["Sentiment"][index] < 0]
neu_tweets = [tweet for index, tweet in enumerate(data["clean_tweets"]) if data["Sentiment"][index] == 0]
#Print results
print("percentage of positive tweets: {}%".format(100*(len(pos_tweets)/len(data['clean_tweets']))))
print("percentage of negative tweets: {}%".format(100*(len(neg_tweets)/len(data['clean_tweets']))))
print("percentage of neutral tweets: {}%".format(100*(len(neu_tweets)/len(data['clean_tweets']))))