Recommendation System Using Pyspark, Kafka, and Spark streaming

Induraj
5 min readSep 20, 2020

This Guide will elaborate full pipelining stages from data production setup to model creation and evaluation. Here for illustration Movielens dataset is used with Kafka as producer.

Using Pyspark, a Machine learning model using the Alternating least square method is build and its performance is compared with the deep learning models built using the TensorFlow framework in Databricks.

The data set can be found in Kaggle here.

Intro to Collaborative Filtering

Collaborative filtering (CF) is a popular recommendation algorithm that bases its predictions and recommendations on the ratings or behavior of other users in the system. In simple words, If a user “Andrew” likes product A and also likes product B, what is the likelihood that user “Britany” likes product A given that she already likes product B. Here the implementation of a recommendation system based on collaborative modeling is shown as it is not complex as content-based filtering which primarily focuses on finding items similar to the items liked by a user using textual similarity in metadata.

Alternating Least Squares (ALS) is the model we’ll use to fit our data and find similarities so as to give recommendations.

what is Alterating least square (ALS)

ALS works best on sparse datasets like the one in our hand. Basically, it tries to leverage the similarity between users in the sparse matrix and fill in the missing ratings with likelihood as if the users would have given a rating by themself. so If the likelihood of a given rating is higher then this movie is recommended to the user.

For example, say you are 18 years old male and you have liked and rated action movies acted by Arnold Schwarzenegger and Sylvester Stallone, say some other user of your age, gender and from your countries have liked and rated action movies acted by Jason Statham, then the movies acted by Jason Statham are most likely recommended to you.

Since this guide is too detail on practical implementation, we will not dive into the mathematics behind ALS. But mathematical details could be found here

Setting up Kafka as producer and Streaming consumer:

you need to install the Kafka package and import KafkaConsumer and KafkaProducer. After which we import our data that is originally in CSV format and convert it into JSON format.

pip install kafka-python
from kafka import KafkaConsumer, KafkaProducer
import jsonmovie_csv = pd.read_csv("movie_lens_integrated.csv", delimiter="|", engine='python')
movie_json_convert = movie_csv.to_json("movies.json")

To facilitate streaming of our data as Kafka topics, we set the necessary setting and then start sending each movie within the JSON through the Kafka topics.

from kafka import KafkaConsumer, KafkaProducer
import time
import pandas as pd

KAFKA_TOPIC_NAME = "movielence"
KAFKA_BOOTSTRAP_SERVER_CONN = "192.168.99.100:9092"

kafka_producer_object = KafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVER_CONN,
value_serializer=lambda x: json.dumps(x).encode('utf-8'))

movies_json = pd.read_json("movies.json")
movie_list= movies_json.to_dict(orient="records")

for movie in movie_list:
print("Message to be send : ", movie)
kafka_producer_object.send(KAFKA_TOPIC_NAME,movie)
time.sleep(1)

Spark Streaming consumer setup

Now that Kafka producer is set up, we proceed with configuring spark streaming to act as a consumer.

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

KAFKA_TOPIC_NAME_CONS = "movielence"
KAFKA_OUTPUT_TOPIC_NAME_CONS = "outputtopic"
KAFKA_BOOTSTRAP_SERVERS_CONS = '192.168.99.100:9092'

if __name__ == "__main__":
print("PySpark Structured Streaming with Kafka Application Started ...")
spark = SparkSession \
.builder \
.appName("PySpark Structured Streaming with Kafka Demo") \
.config("spark.jars", "spark-sql-kafka-0-10_2.12-3.0.0-preview.jar,kafka-clients-2.4.1.jar") \
.config("spark.executor.extraClassPath", "spark-sql-kafka-0-10_2.12-3.0.0-preview.jar,kafka-clients-2.4.1.jar") \
.config("spark.executor.extraLibrary", "spark-sql-kafka-0-10_2.12-3.0.0-preview.jar,kafka-clients-2.4.1.jar") \
.config("spark.driver.extraClassPath", "spark-sql-kafka-0-10_2.12-3.0.0-preview.jar,kafka-clients-2.4.1.jar") \
.getOrCreate()

spark.sparkContext.setLogLevel("ERROR")
print(" kafka Started ...")
# Construct a streaming DataFrame that reads from testtopic
transaction_detail_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS_CONS) \
.option("subscribe", KAFKA_TOPIC_NAME_CONS) \
.option("startingOffsets", "latest") \
.load()

print("Printing Schema of transaction_detail_df: ")
transaction_detail_df.printSchema()
# Write final result into console for debugging purpose
trans_detail_write_stream = transaction_detail_df \
.writeStream \
.trigger(processingTime='1 seconds') \
.outputMode("update") \
.option("truncate", "false")\
.format("console") \
.start()
trans_detail_write_stream.awaitTermination()
spark.stop()

Machine learning:

Using ALS and Regression Evaluator for training and testing the performance metrics. Here we train our ALS-based model and evaluate the test data using mean square error as the performance metrics.

from pyspark.ml.recommendation import ALS
als = (ALS()
.setUserCol("userId")
.setItemCol("movieId")
.setRatingCol("rating")
.setPredictionCol("predictions")
.setMaxIter(2)
.setSeed(seed)
.setRegParam(0.1)
.setColdStartStrategy("drop")
.setRank(12))
alsModel = als.fit(trainingDF)from pyspark.ml.evaluation import RegressionEvaluator
regEval = RegressionEvaluator(predictionCol="predictions", labelCol="rating", metricName="mse")

predictedTestDF = alsModel.transform(testDF)
testMse = regEval.evaluate(predictedTestDF)

It is found that this model based on ALS gives a mean square error of 0.802690 . This error is fairly high compared to the deep learning model trained and tested using HorovodEstimator. The deep learning model has less error of 0.706399 proving that deep learning models are better in learning complex nonlinear relations that exist in the data.

Building a DeepLearning Model

As this guide is to demonstrate the pipeline through which data passes from start to end, here the deep learning models implementations are not shown, But it can be found here

import tensorflow as tf
import horovod.tensorflow as hvd

tf.set_random_seed(seed=40)

def model_fn(features, labels, mode, params):
print("HVD Size: ", hvd.size())
features_with_shape = tf.reshape(features["features"], [-1, 24]) # Explicitly specify dimensions

hidden_layer1 = tf.layers.dense(inputs=features_with_shape, units=params["hidden_layer1"], activation=tf.nn.relu)
hidden_layer2 = tf.layers.dense(inputs=hidden_layer1, units=params["hidden_layer2"], activation=tf.nn.relu)
predictions = tf.squeeze(tf.layers.dense(inputs=hidden_layer2, units=1, activation=None), axis=-1)

# If the estimator is running in PREDICT mode, we can stop building our model graph here and simply return
# our model's inference outputs
serving_key = tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY
export_outputs = {serving_key: tf.estimator.export.PredictOutput({"predictions": predictions})}
if mode == tf.estimator.ModeKeys.PREDICT:
return tf.estimator.EstimatorSpec(mode=mode, predictions=predictions, export_outputs=export_outputs)

# Calculate Loss (for both TRAIN and EVAL modes)
loss = tf.losses.mean_squared_error(labels, predictions)
if mode == tf.estimator.ModeKeys.TRAIN:
optimizer = tf.train.AdamOptimizer(learning_rate=params["learning_rate"])
optimizer = hvd.DistributedOptimizer(optimizer)

train_op = optimizer.minimize(loss=loss, global_step=tf.train.get_global_step())
return tf.estimator.EstimatorSpec(mode=mode, loss=loss, train_op=train_op,
export_outputs=export_outputs)
# If running in EVAL mode, add model evaluation metrics (accuracy) to our EstimatorSpec so that
# they're logged when model evaluation runs
eval_metric_ops = {"rmse": tf.metrics.root_mean_squared_error(labels=labels, predictions=predictions)}
return tf.estimator.EstimatorSpec(
mode=mode, loss=loss, eval_metric_ops=eval_metric_ops, export_outputs=export_outputs)

For testing the performance of the build deep learning model, a Horovod estimator is used

from sparkdl.estimators.horovod_estimator.estimator import HorovodEstimator

est = HorovodEstimator(modelFn=model_fn,
featureMapping={"features":"features"},
modelDir=model_dir,
labelCol="rating",
batchSize=128,
maxSteps=20000,
isValidationCol="isVal",
modelFnParams={"hidden_layer1": 30, "hidden_layer2": 20, "learning_rate": 0.0001},
saveCheckpointsSecs=30)
transformer = est.fit(trainValDF)

Conclusion

We see that the performance of the deep learning model is fairly good than the machine learning model. For testing the codes the full codes are available in the GitHub repository shown below.

--

--