Spark is an analytics engine used for large-scale data processing. It lets you spread both data and computations over clusters to achieve a substantial performance increase.
Since it is getting easier and less expensive to collect and store data, we are likely to have huge amounts of data when working on a real life problem. Thus, distributed engines like Spark are becoming the predominant tools in the Data Science ecosystem.
PySpark is a Python API for Spark. It combines the simplicity of Python with the efficiency of Spark which results in a cooperation that is highly appreciated by both data scientists and engineers.
In this article, we will go over several examples to introduce SQL module of PySpark which is used for working with structured data.
We first need to create a SparkSession which serves as an entry point to Spark SQL.
from pyspark.sql import SparkSession
sc = SparkSession.builder.getOrCreate()
sc.sparkContext.setLogLevel("WARN")
print(sc)
<pyspark.sql.session.SparkSession object at 0x7fecd819e630>
We will use this SparkSession object to interact with functions and methods of Spark SQL. Let’s create a spark data frame by reading a csv file. We will be using the Melbourne housing dataset available on Kaggle.
df = sc.read.option("header", "true").csv(
"/home/sparkuser/Desktop/melb_housing.csv"
)
We will make the data frame smaller by selecting only 5 columns. The list of columns can be viewed using the "df.columns" method.
df = df.select("Type", "Landsize", "Distance", "Regionname", "Price")
df.show(5)
As we will see in the examples, PySpark syntax seems like a mixture of Pandas and SQL. Thus, if you are already working with these tool, it will be relatively easy for you to learn PySpark.
We can sort the rows in data frame using the orderBy function. Let’s check the 5 most expensive houses in the dataset.
df.orderBy("Price", ascending=False).show(5)
The SQL module of PySpark has numerous functions available for data analysis and manipulation. We can either import all of them at once or separately.
# import all functions
from pyspark.sql import functions as F
There are 3 different types of houses in the dataset. We can calculate the average price of each type using the groupby function.
df.groupby("Type").agg(F.round(F.mean("Price")).alias("Avg_price")).show()
Let’s elaborate on the syntax. We group the observations (i.e. rows) by the type column. Then we calculate the average price for each group. The round function is used for rounding up the decimal points. The alias method changes the name of the aggregated column. It is similar to the "as" keyword in SQL.
The distance column shows the distance to the central business district. The house prices usually decrease as we move away from the city centre. We can confirm our anticipation by calculating the average house price for houses with a distance higher than 3 miles.
df.filter(F.col("Distance") > 3).groupby("Type").agg(
F.round(F.mean("Price")).alias("Avg_price")
).show()
We use the filter function to apply a condition on the distance column. Results confirm our anticipation. The average price has decreased for each type.
Two other useful functions are count and countDistinct which count the number of total and distinct observations (i.e. rows) in a group, respectively. For instance, the following code returns the number of houses for each type along with the number of distinct prices.
df.groupby("Type").agg(
F.count("Price").alias("n_houses"),
F.countDistinct("Price").alias("n_distinct_price")
).show()
A typical task in data analysis is to derive new columns based on the existing ones. It may be a part of your feature engineering process. Regarding the housing dataset, we can create a new feature that represents the price per unit land size.
To accomplish this task, the withColumn function can be used as follows:
df = df.withColumn(
"Price_per_size",
F.round(F.col("Landsize")*1000 / F.col("Price"), 2)
)
The name of the new column is "Price_per_size". The next line specifies the steps in deriving the values. We multiply the land size with 1000 and divide it by the house price. The result is rounded up to two decimal points.
Conclusion
We have covered an introduction to data analysis and manipulation with PySpark. The SQL module of PySpark offers many more functions and methods to perform efficient data analysis.
It is important to note that Spark is optimized for large-scale data. Thus, you may not see any performance increase when working with small-scale data. In fact, Pandas might outperform PySpark when working with small datasets.
Thank you for reading. Please let me know if you have any feedback.