Getting Started

Have you ever wonder how Spotify is able to put up a list of songs every week in "Discover Weekly" and you ended up adding some of these songs to your playlists because you like them? What about the those shows recommended for you by Netflix because you watched a particular show yesterday? How are these tech giants so smart? The answer is Recommender System.
A Recommender System makes prediction based on users’ historical behaviours like view, search or purchase histories. Two common approaches of recommender system are Content Based __ and Collaborative Filtering, and this article will dive deeper into the latter.
What is Collaborative Filtering?
Collaborative Filtering is a mathematical method to find the predictions about how users can rate a particular item based on ratings of other similar users. Typical Collaborative Filtering involves 4 different stages:
- Data Collection – Collecting user behaviours and associated data items
- Data Processing – Processing the collected data
- Recommendation Calculation – Calculate referrals based on processed data
- Result Derivation – Extract the similarity and return the top N results
Explicit & Implicit Feedbacks
So what type of data are being collected in the first stage of Collaborative Filtering? There’s two different categories of data (referred as feedbacks), which can be Explicit or Implicit.
An example of Explicit Feedback is the ratings given by users, which Netflix is collecting to make recommendations to their customers after they provide ratings to the movies they have watched. Implicit Feedback is less straightforward as it’s based on the users’ interactions with the platform, ranging from clicks, views, likes and purchases. Spotify makes use of Implicit Feedbacks to implement it’s Recommendation System.
Calculating similarity
Once the data has been collected and processed, some mathematical formula is needed to make the similarity calculation. The two most common measures are:
- Euclidean Distance – Distance of the preference between two users. If the distance is small, similarity between both users is high
- Pearson Correlation – If the cosine values (angle of incidence) between two users coincide, similarity between both users is high

Implementation in Pyspark
The library package spark.ml currently supports model-based collaborative filtering, in which users and products are described by a small set of latent factors that can be used to make predictions. It uses the Alternating Least Squares (ALS) algorithm to learn these latent factors.
We will use the dataset from https://www.last.fm/api/ which contains 3 files:
- user_artist_data.txt 3 columns: userid artistid playcount
- artist_data.txt 2 columns: artistid artist_name
- artist_alias.txt 2 columns: badid, goodid [known incorrectly spelt artists and the correct artist id]
Firstly, we set up Spark with the following.
# import libraries
from pyspark import SparkContext
from pyspark.ml.recommendation import ALS
from pyspark.sql import SparkSession ,Row
appName="Collaborative Filtering with PySpark"
# initialize the spark session
spark = SparkSession.builder.appName(appName).getOrCreate()
# get sparkcontext from the sparksession
sc = spark.sparkContext
Then, we define the data structures and convert the Resilient Distributed Dataframe (RDDs) into dataframes.
from pyspark.sql.types import StructType,StructField,IntegerType,StringType,LongType
from pyspark.sql.functions import col
#Define the schema for the datasets
schema_artist = StructType([StructField("artistId",StringType(),True),StructField("artistName",StringType(),True)])
schema_user_artist = StructType([StructField("userId",StringType(),True),StructField("artistId",StringType(),True),StructField("playCount",StringType(),True)])
schema_alias = StructType([StructField("badId",StringType(),True),StructField("goodId",StringType(),True)])
#Convert RDDs into Dataframes
artistRDD = rawArtistData.map(lambda k: k.split("t"))artist_df = spark.createDataFrame(artistRDD,schema_artist,['artistId','artistName'])
userArtistRDD = rawUserArtistData.map(lambda k: k.split())user_artist_df = spark.createDataFrame(userArtistRDD,['userId','artistId','playCount'])
aliasRDD = rawArtistAlias.map(lambda k: k.split())alias_df = spark.createDataFrame(aliasRDD,['badId', 'goodId'])
#First for convenience, we can create aliases for each dataframes
ua = user_artist_df.alias('ua')
ub = artist_df.alias('ub')
Once the datasets have been processed, we can proceed train our ALS model. Before that, we should split our dataset into training and testing data so that we will know how well our model has performed.
# dataset split into training and testing set
(training, test) = ua.randomSplit([0.8, 0.2])
# training the model
als = ALS(maxIter=5, implicitPrefs=True,userCol="userId", itemCol="artistId", ratingCol="playCount",coldStartStrategy="drop")
model = als.fit(training)
# predict using the testing datatset
predictions = model.transform(test)
predictions.show()

We can try using the model to find Top Artists recommended for each user. We can use the recommendForAllUsers function available in the ALS model to get the list of top recommendations for each users.
The function below takes userId and limit as the input. For the given userId, it gets the list of current top liked artists (based on the playcount). Let’s try to display the top liked artists for user (2062243).
def currentLikes(ua,ub,userId,limit):
df = ua.join(ub,ua.artistId==ub.artistId)
.filter(ua.userId==userId)
.sort(ua.playCount.desc())
.select(ua.userId,ua.playCount,ub.artistName)
.limit(limit)
return df
# display top 10 liked artists for user 2062243
currentLikes(ua,ub,2062243,10).show(truncate=False)

The following function then uses the model to give top recommendations of artists for each users. Let’s try to display the recommended artists for the same user (2062243).
def recommendedArtists(userId,limit):
test = model.recommendForAllUsers(limit).filter(col('userId')==userId).select("recommendations").collect()
topArtists = []
for item in test[0][0]:
topArtists.append(item.artistId)
schema = StructType([StructField("artistId",IntegerType(),True)])
artists = spark.createDataFrame(topArtists,IntegerType())
final=artists.join(ub,artists.value==ub.artistId).select(ub.artistId,ub.artistName)
return final
# display top 10 recommended artists for user 2062243
recommendedArtists(2062243,10).show(truncate=False)

Summary
In this article, we have introduced what’s Collaborative Filtering is about and it’s 4 different stages. The two categories of data collected for Collaborative Filtering, mainly Implicit and Explicit Feedbacks, and calculating the similarity using Euclidean Distance or Pearson Coefficient. Finally, a brief walkthrough of implementing Collaborative Filtering was introducted in Pyspark using inbuilt Alternating Least Squares (ALS) algorithm in Spark. Hope you guys have a brief idea of how Recommendation System works and don’t be surprised to see good recommendations for you next time you surf the net and use Spotify/Amazon, cheers!
