Housing Prediction Boston using PySpark

6 minute read

Housing Prediction Using Dataset from Kaggle

*Using PySpark (Apart of Hadoop Ecosystem)

  • Distributed Computing
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Boston').getOrCreate()

from pyspark.sql import SQLContext

house_df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('BostonHousing.csv')
house_df.take(1)
[Row(crim=0.00632, zn=18.0, indus=2.31, chas=0, nox=0.538, rm=6.575, age=65.2, dis=4.09, rad=1, tax=296, ptratio=15.3, b=396.9, lstat=4.98, medv=24.0)]
#house_df.cache()
house_df.show()
+-------+----+-----+----+-----+-----+-----+------+---+---+-------+------+-----+----+
|   crim|  zn|indus|chas|  nox|   rm|  age|   dis|rad|tax|ptratio|     b|lstat|medv|
+-------+----+-----+----+-----+-----+-----+------+---+---+-------+------+-----+----+
|0.00632|18.0| 2.31|   0|0.538|6.575| 65.2|  4.09|  1|296|   15.3| 396.9| 4.98|24.0|
|0.02731| 0.0| 7.07|   0|0.469|6.421| 78.9|4.9671|  2|242|   17.8| 396.9| 9.14|21.6|
|0.02729| 0.0| 7.07|   0|0.469|7.185| 61.1|4.9671|  2|242|   17.8|392.83| 4.03|34.7|
|0.03237| 0.0| 2.18|   0|0.458|6.998| 45.8|6.0622|  3|222|   18.7|394.63| 2.94|33.4|
|0.06905| 0.0| 2.18|   0|0.458|7.147| 54.2|6.0622|  3|222|   18.7| 396.9| 5.33|36.2|
|0.02985| 0.0| 2.18|   0|0.458| 6.43| 58.7|6.0622|  3|222|   18.7|394.12| 5.21|28.7|
|0.08829|12.5| 7.87|   0|0.524|6.012| 66.6|5.5605|  5|311|   15.2| 395.6|12.43|22.9|
|0.14455|12.5| 7.87|   0|0.524|6.172| 96.1|5.9505|  5|311|   15.2| 396.9|19.15|27.1|
|0.21124|12.5| 7.87|   0|0.524|5.631|100.0|6.0821|  5|311|   15.2|386.63|29.93|16.5|
|0.17004|12.5| 7.87|   0|0.524|6.004| 85.9|6.5921|  5|311|   15.2|386.71| 17.1|18.9|
|0.22489|12.5| 7.87|   0|0.524|6.377| 94.3|6.3467|  5|311|   15.2|392.52|20.45|15.0|
|0.11747|12.5| 7.87|   0|0.524|6.009| 82.9|6.2267|  5|311|   15.2| 396.9|13.27|18.9|
|0.09378|12.5| 7.87|   0|0.524|5.889| 39.0|5.4509|  5|311|   15.2| 390.5|15.71|21.7|
|0.62976| 0.0| 8.14|   0|0.538|5.949| 61.8|4.7075|  4|307|   21.0| 396.9| 8.26|20.4|
|0.63796| 0.0| 8.14|   0|0.538|6.096| 84.5|4.4619|  4|307|   21.0|380.02|10.26|18.2|
|0.62739| 0.0| 8.14|   0|0.538|5.834| 56.5|4.4986|  4|307|   21.0|395.62| 8.47|19.9|
|1.05393| 0.0| 8.14|   0|0.538|5.935| 29.3|4.4986|  4|307|   21.0|386.85| 6.58|23.1|
| 0.7842| 0.0| 8.14|   0|0.538| 5.99| 81.7|4.2579|  4|307|   21.0|386.75|14.67|17.5|
|0.80271| 0.0| 8.14|   0|0.538|5.456| 36.6|3.7965|  4|307|   21.0|288.99|11.69|20.2|
| 0.7258| 0.0| 8.14|   0|0.538|5.727| 69.5|3.7965|  4|307|   21.0|390.95|11.28|18.2|
+-------+----+-----+----+-----+-----+-----+------+---+---+-------+------+-----+----+
only showing top 20 rows
house_df.cache()
house_df.printSchema()
root
 |-- crim: double (nullable = true)
 |-- zn: double (nullable = true)
 |-- indus: double (nullable = true)
 |-- chas: integer (nullable = true)
 |-- nox: double (nullable = true)
 |-- rm: double (nullable = true)
 |-- age: double (nullable = true)
 |-- dis: double (nullable = true)
 |-- rad: integer (nullable = true)
 |-- tax: integer (nullable = true)
 |-- ptratio: double (nullable = true)
 |-- b: double (nullable = true)
 |-- lstat: double (nullable = true)
 |-- medv: double (nullable = true)
## Perform Predicitive Analytics
house_df.describe().toPandas().transpose()

0 1 2 3 4
summary count mean stddev min max
crim 506 3.6135235573122535 8.601545105332491 0.00632 88.9762
zn 506 11.363636363636363 23.32245299451514 0.0 100.0
indus 506 11.136778656126504 6.860352940897589 0.46 27.74
chas 506 0.0691699604743083 0.2539940413404101 0 1
nox 506 0.5546950592885372 0.11587767566755584 0.385 0.871
rm 506 6.284634387351787 0.7026171434153232 3.561 8.78
age 506 68.57490118577078 28.148861406903595 2.9 100.0
dis 506 3.795042687747034 2.10571012662761 1.1296 12.1265
rad 506 9.549407114624506 8.707259384239366 1 24
tax 506 408.2371541501976 168.53711605495903 187 711
ptratio 506 18.455533596837967 2.1649455237144455 12.6 22.0
b 506 356.67403162055257 91.29486438415782 0.32 396.9
lstat 506 12.653063241106723 7.141061511348571 1.73 37.97
medv 506 22.532806324110698 9.197104087379815 5.0 50.0

Scatter matrix is a great way to roughly determine if we have a linear correlation between multiple independent variables.

import pandas as pd
from pandas.plotting import scatter_matrix


# scatter_matrix(iris_df, alpha=0.2, figsize=(10, 10))


numeric_features = [t[0] for t in house_df.dtypes if t[1] == 'int' or t[1] == 'double']
sampled_data = house_df.select(numeric_features).sample(False, 0.8).toPandas()
axs = scatter_matrix(sampled_data, figsize=(10, 10))

n = len(sampled_data.columns)
for i in range(n):
    v = axs[i, 0]
    v.yaxis.label.set_rotation(0)
    v.yaxis.label.set_ha('right')
    v.set_yticks(())
    h = axs[n-1, i]
    h.xaxis.label.set_rotation(90)
    h.set_xticks(())

png

It’s hard to see. Let’s find correlation between independent variables and target variable.

import six
for i in house_df.columns:
    
    if not( isinstance(house_df.select(i).take(1)[0][0], six.string_types)):
        
        print( "Correlation to MV for ", i, house_df.stat.corr('medv',i))
        
        
        

Correlation to MV for  crim -0.38830460858681154
Correlation to MV for  zn 0.3604453424505433
Correlation to MV for  indus -0.4837251600283728
Correlation to MV for  chas 0.1752601771902987
Correlation to MV for  nox -0.4273207723732821
Correlation to MV for  rm 0.6953599470715401
Correlation to MV for  age -0.3769545650045961
Correlation to MV for  dis 0.249928734085904
Correlation to MV for  rad -0.38162623063977735
Correlation to MV for  tax -0.46853593356776674
Correlation to MV for  ptratio -0.5077866855375622
Correlation to MV for  b 0.3334608196570661
Correlation to MV for  lstat -0.7376627261740145
Correlation to MV for  medv 1.0

The correlation coefficient ranges from –1 to 1. When it is close to 1, it means that there is a strong positive correlation; for example, the median value tends to go up when the number of rooms goes up. When the coefficient is close to –1, it means that there is a strong negative correlation; the median value tends to go down when the percentage of the lower status of the population goes up. Finally, coefficients close to zero mean that there is no linear correlation. We are going to keep all the variables, for now. Prepare data for Machine Learning. And we need two columns only — features and label(“MV”):

from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ['crim', 'zn', 'indus', 'chas', 'nox', 'rm', 'age', 'dis', 'rad', 'tax', 'ptratio', 'b', 'lstat'], outputCol = 'features')
vhouse_df = vectorAssembler.transform(house_df)
vhouse_df = vhouse_df.select(['features', 'medv'])
vhouse_df.show(3)
+--------------------+----+
|            features|medv|
+--------------------+----+
|[0.00632,18.0,2.3...|24.0|
|[0.02731,0.0,7.07...|21.6|
|[0.02729,0.0,7.07...|34.7|
+--------------------+----+
only showing top 3 rows
splits = vhouse_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

Linear Regression

from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='medv', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [-0.012919886620891899,0.007801587120031393,0.0,1.2758077912365353,-4.349376356510117,5.54550994615256,-0.005812151077554293,-0.5245463964553757,0.0,-0.000963617211548077,-0.8030395455495745,0.00636300202224636,-0.4840071938333394]
Intercept: 11.399447972103305

Summarize the model over the training set and print out some metrics:

trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)
RMSE: 4.738763
r2: 0.751399
train_df.describe().show()
+-------+------------------+
|summary|              medv|
+-------+------------------+
|  count|               334|
|   mean|23.075748502994017|
| stddev| 9.518417986703056|
|    min|               5.0|
|    max|              50.0|
+-------+------------------+
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","medv","features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="medv",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))
+------------------+----+--------------------+
|        prediction|medv|            features|
+------------------+----+--------------------+
| 27.85587118199514|22.0|[0.01096,55.0,2.2...|
|  33.0604561523744|32.7|[0.01301,35.0,1.5...|
|28.043288598760007|24.5|[0.01501,80.0,2.0...|
| 42.21418097115222|50.0|[0.01501,90.0,1.2...|
| 27.39884102621034|33.0|[0.01951,17.5,1.3...|
+------------------+----+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.590412
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)
Root Mean Squared Error (RMSE) on test data = 5.40253

Root Mean Squared Error (RMSE) on test data = 5.52048 Sure enough, we achieved worse RMSE and R squared on the test set.

print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()
numIterations: 10
objectiveHistory: [0.5000000000000004, 0.43105053530827003, 0.22202903300987303, 0.1957452408198365, 0.16306502507251586, 0.15980313355494838, 0.15923222585645128, 0.1586981916050444, 0.15827980238357606, 0.15800122748304885, 0.15794252467842634]
+-------------------+
|          residuals|
+-------------------+
| -6.680595226784874|
|0.33460117634318465|
|  3.941988505651004|
| 2.1969729464171692|
| 10.028493101958027|
|-0.2034997568715866|
|-1.7682837210723648|
|   7.84881657563016|
| 2.6059952146062173|
| 0.7881451923566445|
|-3.2989538093189203|
|-0.9143852651515445|
|  8.916247336419922|
|-1.4878044572054847|
|  4.370498274312752|
|-10.399196901650118|
| -4.181554914939888|
|  2.435140179098134|
|  1.576489199623719|
| -1.592334351601238|
+-------------------+
only showing top 20 rows



/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/context.py:125: FutureWarning: Deprecated in 3.0.0. Use SparkSession.builder.getOrCreate() instead.
  warnings.warn(
predictions = lr_model.transform(test_df)
predictions.select("prediction","medv","features").show()
+------------------+----+--------------------+
|        prediction|medv|            features|
+------------------+----+--------------------+
| 27.85587118199514|22.0|[0.01096,55.0,2.2...|
|  33.0604561523744|32.7|[0.01301,35.0,1.5...|
|28.043288598760007|24.5|[0.01501,80.0,2.0...|
| 42.21418097115222|50.0|[0.01501,90.0,1.2...|
| 27.39884102621034|33.0|[0.01951,17.5,1.3...|
|31.798301742075743|31.1|[0.02187,60.0,2.9...|
|25.477254332205305|21.6|[0.02731,0.0,7.07...|
|26.298944817518098|28.7|[0.02985,0.0,2.18...|
| 32.76984367476659|34.9|[0.03359,75.0,2.9...|
| 27.66634815941965|24.1|[0.03445,82.5,2.0...|
|27.884536154563712|22.0|[0.03537,34.0,6.0...|
| 24.15823047871639|22.9|[0.03551,25.0,4.8...|
|25.915841617050788|23.2|[0.03871,52.5,5.3...|
| 21.50849298237092|21.1|[0.03961,0.0,5.19...|
|16.501753862659243|18.2|[0.04301,80.0,1.9...|
|23.050593203824203|19.4|[0.04379,80.0,3.3...|
|24.161496441345186|19.8|[0.04544,0.0,3.24...|
| 23.50119399087348|23.3|[0.0456,0.0,13.89...|
|26.197815707785804|22.3|[0.0459,52.5,5.32...|
|23.868445429269094|22.2|[0.05083,0.0,5.19...|
+------------------+----+--------------------+
only showing top 20 rows
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'medv')
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
dt_evaluator = RegressionEvaluator(
    labelCol="medv", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
Root Mean Squared Error (RMSE) on test data = 4.45627

Feature Importance

dt_model.featureImportances
SparseVector(13, {0: 0.0188, 2: 0.0051, 4: 0.0245, 5: 0.657, 6: 0.0093, 7: 0.0414, 9: 0.0081, 10: 0.0311, 11: 0.0037, 12: 0.2012})
house_df.take(1)
[Row(crim=0.00632, zn=18.0, indus=2.31, chas=0, nox=0.538, rm=6.575, age=65.2, dis=4.09, rad=1, tax=296, ptratio=15.3, b=396.9, lstat=4.98, medv=24.0)]

Apparently, the number of rooms is the most important feature to predict the house median price in our data.

Gradient-boosted tree regression

from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol = 'features', labelCol = 'medv', maxIter=10)
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)
gbt_predictions.select('prediction', 'medv', 'features').show(5)

+------------------+----+--------------------+
|        prediction|medv|            features|
+------------------+----+--------------------+
|20.709846484903586|22.0|[0.01096,55.0,2.2...|
|30.765907955396884|32.7|[0.01301,35.0,1.5...|
|28.143802115510642|24.5|[0.01501,80.0,2.0...|
| 41.97964129751568|50.0|[0.01501,90.0,1.2...|
| 30.85729974970605|33.0|[0.01951,17.5,1.3...|
+------------------+----+--------------------+
only showing top 5 rows
gbt_evaluator = RegressionEvaluator(
    labelCol="medv", predictionCol="prediction", metricName="rmse")
rmse = gbt_evaluator.evaluate(gbt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
Root Mean Squared Error (RMSE) on test data = 4.46242

Root Mean Squared Error (RMSE) on test data = 4.19795

Gradient-boosted tree regression performed the best on our data.

Updated: