Data Science over the Movies Dataset with Spark, Scala and some SQL. And some Python.(Part 1).

Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called DataFrames and can also act as a distributed SQL query engine. Let us use it on Databricks to perform queries over the movies dataset.

Borja González
Towards Data Science

--

Photo by Jake Hills on Unsplash

One day, a friend of mine who works as a SQL developer told me that she was interested on big data processing engines but she has not any experience on other programming languages apart from SQL. I wanted to demonstrate her that knowing SQL is a good starting point to learn big data because thanks to Spark it is possible to perform plane SQL queries on tables and also its code sintax is similar to SQL. I hope you enjoy this story.

To prove it I have performed some queries and descriptive statistics to extract insights from a fancy dataset, the movie lens dataset, which is available on https://grouplens.org/datasets/movielens/and contains lots of rates of different users over more almost 30000 movies. This report might be useful to learn how to make aggregations and perform basics spark queries. I am not an expert on Spark neither on Scala so the code might not be implemented on the most efficient way but I do not stop learning and I am very open to suggestions and comments.

One of the biggest problem we face when we want to learn big data and to use Spark with Scala on the Hadoop ecosystem is always installing and integrating all the tools from these frameworks. However, Databricks community edition will save us from that problem. It has literally, everything we need to use: Its own file system, and all the APIs installed (and working properly) that we are going to use ( Hive well integrated with Spark and Scala and the rest of Spark libraries such as MLlib or Graphx). In order not to make this article too long I will not cover those techologies but good documentation about can be found on their website: https://databricks.com/spark/about.

The main topic of this article is not Databricks usage but scala-Spark coding over the movies datset (statistics, queries, aggregations…) . Some queries will be shown with their equivalent in SQL. Databricks will also allow us to manage this huge dataset that might fit in the memory of our local machine. Spark will provide us an efficient way to process the data.

1) Importing the data

The first and necessary step will be to download the two long format datasets that are on the recommended for new research section. After that, we have to import them on the databricks file system and then load them into Hive tables. Now we can perform some basic queries on both datasets/tables, the one with information about the movies and the one with the rates on the movies. Describe and printSchema methods are always a good entry point:

val movies = table("movies")
val ratings = sql("select userId, movieId, rating from ratingsB")
ratings.describe().show
movies.printSchema
ratings.printSchema

2) Performing some queries and knowledge extraction.

To improve our spark coding we can perform whatever query we can think of with learning purposes. Firstly, let us check the users that have rated the most and the least number of movies:

import org.apache.spark.sql.functions.count
import org.apache.spark.sql.functions.{desc,asc}
ratings.groupBy("userId").agg(count("*").alias("number_cnt"))
.orderBy(desc("number_cnt"))
ratings.groupBy("userId").agg(count("*").alias("number_cnt"))
.orderBy(asc("number_cnt"))

The equivalent SQL query (It is important to note that as long as we can we should write this queries with spark since it will give us the errors at compile time and not on running time as the pure sql queries, to avoid wasting time in the future specially on large processes):

sql("select userId, count(*) from ratingsB group by userId order by count(*) desc")

There are some users that have rated more than 5000 movies!! That is crazy!

That made me wonder how long they’ve spent watching movies considering an average time for the length of the movies of 100 minutes. This result is worth to be shown!! Some queries will be packed into functions to be used whenever we want/need them and even on different cells of the notebook or parts of the code:

ratings.groupBy("userId").agg(count("*").alias("number_cnt")).withColumn("years_Watching", round($"number_cnt" * 100/ 60 / 24 / 365,3)).orderBy(desc("years_Watching")).show(10)

As we can see, user number 123100 has spent more than 4.5 years watching movies and watched more than 20000 films. What a cinephile!! Regarding the code, it has some useful methods such as a proper gropuBy agregation with a count with the round function applied on the new field created with .withColumn (since we do not need that much decimals Spark displays). Now, we will join both dataframes to have all the information in just 1 df to toy around:

val df_full = ratings.join(movies, $"ratingsb.movieId" === $"movies.movieId").drop($"ratingsb.movieId")

Are you interested on knowing which movies have been rated with 3 or 5 stars by the cinephile? We can know that with this function that receives as input a scala Seq of integers, together with the number of stars we want to query and the number of the user we are asking for or. All the movies rated by that user will be displayed if the seq comes empty (this is checked with the scala .isEmpty method):

import org.apache.spark.sql.functions.{asc, desc}def movie_query (df: org.apache.spark.sql.DataFrame, stars: Seq[Double], user: Int ) : Unit = 
{
if (stars.isEmpty)
{
println(s"All movies rated by user $user")
df.filter(df("userId") ===user).orderBy(desc("rating")).show(20, false)
}
else
stars.foreach(starsNum =>{
println(s"All movies rated by user $user with $starsNum stars:")
df.filter(df("userId") === user).filter(df("rating") === starsNum).orderBy(asc("movieId")).show(7, false)})
}
movie_query(df_full, Seq((3.0, 4.5), 21)

Usually, we all got lots of movie recommendations from friends and family and we make a list out of those movies. Right after that, when I am hesitant about which movie should I watch first I always go to filmaffinity or imdb to check the best rated movies out of my list. It turns out that we can do something similar. To do so, we need to obtain the mean rating for the movies and retrieve them orderedBy their rate from best to worst. It is important to note that a good movie searcher might not recieve the exact name of the movie and that can be solved with the scala contains method:

import org.apache.spark.sql.functions._def movie_grade (df: org.apache.spark.sql.DataFrame, movie: String ) : Unit = df.filter((df("title").contains(movie))).groupBy("title").agg((round(avg($"rating"),3)).as("averageStars")).orderBy(desc("averageStars")).show(false)

We will test our function on two famous sagas:

movie_grade(df_full, "Wild Bunch")
movie_grade(df_full, "Lord of the Rings")

Yay! We love Lord of the Rings! The lowest rated movie corresponds to the animated version which I have yet to see.

Aragorn with 2 hobbits on the Lord of the Rings animated movie.

I am sure that there also a lot of fans of Star Wars saga so let us get the score of all the movies with its similar SQL query:

sql("Select title, avg(rating) as averageRate from full_movies where title like '%Star Wars%' group by title order by averageRate desc").show(false)
Photo by Daniel Cheung on Unsplash

As we can see, the old movies are the best rated but in my opinion the first 3 episodes are not that bad to get that low score below 3.5 stars. But that is just opinion!! We have also obtained some spin offs and parallel movies that also could be interesting for Star Wars freaks.

The next thing I have wondered is which are the most and the least rated movies. Since this is rather extense dataset, it contains lots of movies that have poor number of visualizations, some of them with less than 5 rates. Of course, if we want to make a consistent study we should neglect those movies. I am just going to pay attention at the movies that have more than 600 rates that represents a 1% of the total movies rates (a totally not mainstream movie). To do that, we will create a new dataframe with the total rates of each movie and their average grade. After that, we will filter the movies with more than 600 rates and display the by their average rate in an ascending and descending way to check the worst and the best movies.

val df_count = df_full.groupBy("title").agg(avg("rating").as("averageRating"), 
count("title").as("RatesCount"))
df_count.withColumn("roundedAverageRating", round(df_count("averageRating"),2)).filter(df_count("RatesCount") > 600).orderBy(asc("averageRating")).drop("averageRating").show(20, false) df_count.withColumn("roundedAverageRating", round(df_count("averageRating"),2)).filter(df_count("RatesCount") > 600).orderBy(desc("averageRating")).drop("averageRating").show(20, false)

The lowest ranked movies with more than 600 rates.

As you can see, Glitter is the worst movie. I have searched for it on imbd and indeed it looks horrible:

And now it comes the good part, paying attention to the head of the list:

Which is again consequent to its rating on imdb:

3) Recommender System (I)

This dataset is perfect to improve and understand how do recommender systems work. I am working on machine learing models for recommender systems using the spark machine learning library MLlib and they will be shown on next articles. I have just come up with a simple solution for it just performing a big query. It is a simple model that could work as a dummy solution. The solution is about retrieving the categories an user has watched the most and output the movies that are best rated from those categories that the user has not watched yet. That is a more complex query packed into the easy_recommender fuction that will receive the number of the user we want the recommendations for, the number or rates as a minimun threshold we are going to use for a movie (remember that from a statistical point of view is not a good practice to retrieve movies with just a bit of rates) and the number of movies we want to display. These are the steps we are going to follow:

  1. Obtain the categories that user X has seen the most. To achieve this, we need to filter by that user, groubBy the genres, make a count and then order by that count. Once we have that, we will just select the genres column and map the column with map to then perform a collect operation and then convert it into a Scala list using: .map(r => r.getString(0)).collect.toList.
  2. After that, we will mark the movies that user X has watched with a new column “ToDelete” containing a simple string such as “DELETE” to be easily found once we perform the join. With this we will have the movies user X has seen well identified.
  3. We will make the join of that dataframe with the big dataframe. The movies we want to neglect are marked with “DELETE” (so we will filter out the ones that have that column empty with .filter(“ToDelete is null”)).
  4. To conclude, we will loop over the categories we want to filter with the foreach scala method now tht we have selected the movies that user X has not watched yed. Now we just have to groupBy title, get the average grade, filter once again to have more than Y rates by movie (remember, for statistical reasons) and order them in a descending way by their average rating.

It has been a bit complex process and I am sure there are better ways to do it. Here you can see the code:

def easy_recommender(nUser: Int, nRates: Int, nMovies: Int) : Unit = {
val mostSeenGenresList = df_full.filter(df_full("userId") === nUser).groupBy("genres").agg(count("*").alias("cuenta")).orderBy(desc("cuenta")).limit(3).select("genres").map(r => r.getString(0)).collect.toList
println(s"List of genres user $nUser has seen the most : $mostSeenGenresList")
val movies_watched_by_userX = df_full.filter($"userId" === nUser).withColumn("ToDelete", lit("DELETE")).select($"ToDelete", $"title".as("title2")) var df_filt = df_full.join(movies_watched_by_userX, $"title" === $"title2", "left_outer") df_filt = df_filt.filter("ToDelete is null").select($"title", $"rating", $"genres") mostSeenGenresList.foreach(e => {
println(s"Top $nMovies movies user number $nUser has not seen from category $e with more than $nRates rates: ")
df_filt.filter($"genres" === e).groupBy("title").agg(avg("rating").as("avgByRating"), count("*").alias("nRates")).filter($"nRates" > nRates).orderBy(desc("avgByRating")).show(nMovies, false)
})
}
easy_recommender(134596, 1000, 5)

The results we obtain from our “recommender system” are printed out with scala string interpolation:

You can see that firstly we print out the categories user X has seen the most. We can control the number of categories to be recommended with the .limit(3) method. As you can see we can control most of the parameters we want to include in our model with the inputs of the function.

4) The visualization moment

To conclude, this is not a good Data Science work without its proper visualization/plot. To accomplish this, Python is always a good choice and together with it I am going to show you another wonderful feature of databricks that allows us to run Python and Scala code on the same notebook.

The first step will be saving our df_full with just the column that is important for our visualization into a temporary Hive table (which will only persists during the current session):

val ratesDF = df_full.select("rating").groupBy("rating").agg(count("rating").as("NumberOfRates")).orderBy(asc("rating"))

ratesDF.createOrReplaceTempView("ratedDF")
spark.sql("select * from ratedDF").show

And now it is the moment when the magic comes. Just by typing %python at the top of a cell we ca execute python code with all the Spark (pyspark) advantages ad features. We load the table into a pyspark dataframe and convert both columns into Python lists using these Python oneliners:

%pythondf = table("ratedDF")
rating_list = df.select("rating").collect()
number_rates = df.select("NumberOfRates").collect()
rate_category = [float(row.rating) for row in rating_list]
n_rates = [int(row.NumberOfRates) for row in number_rates]

And to conclude, we will make our visualization. This is a total freestyle process and some plots will be more prettier than others. Remember that a categorical variable should be represented with a barplot:

%python
import matplotlib.pyplot as plt
import numpy as np
fig, ax = plt.subplots()
ax.bar(rate_category, n_rates, align='center', width=0.4, facecolor='b', edgecolor='b', linewidth=3, alpha=.3)
plt.title('Number of Rates vs Stars')
plt.xlabel('Stars (*)')
plt.xlim(0,5.5)
plt.ylim(0,8000000)
ax.set_xticks(rate_category)
display(fig)

Not much to comment about it except that there are no movies rated with 0 stars and that this does not look as a normal distribution as one could expect.

5) To conclude

And that’s all folks. I hope you have enjoyed this article as much as I have done it learning about scala, spark and Databricks and thinking about insights on the movies dataset. Now I am implementing and improving the perfomance of recommender systems on this dataset using the machine learning library from spark Spark MLlib. These models might be shown along with more complex queries and descriptive statistics over this dataset on future articles. I have played a bit with the genres column and obtained deeper statistics but I did not want this article to super dense.

Since this is my very first article, once again, any feedback and comments are well appreciated.

Photo by Gwen Ong on Unsplash

Originally published at https://medium.com on July 23, 2019.

--

--

Data scientist. I love big data processing technologies, artificial intelligence and data science.