If you know SQL but need to work in PySpark, this post is for you!

Spark is rapidly becoming one of the most widely adopted frameworks for big Data processing. But why work in native PySpark instead of SQL?
Well, you don’t have to. PySpark allows you to create a tempView that does not sacrifice runtime performance. On the backend, spark runs the same transformations regardless of the language, in the exact same way. **** So, if you want to stick to SQL your code won’t execute any differently.
However, when working in the DataFrame API you will get compile-time errors whereas with raw SQL you’ll get errors at runtime. If you’re working with large data, the same error can be surfaced much earlier when working in native Pyspark.
In this post we will leverage Spark: the Definitive Guide and sequentially handle each clause in a basic SQL query and explain how to duplicate this logic in PySpark.
Without further ado, let’s dive in…
SELECT
Any good SQL query starts with a SELECT statement – it determines what columns will be pulled and whether they should be transformed or renamed.
SQL
SELECT
column_1,
CASE WHEN column_2 IS NULL THEN 0 ELSE 1 END AS is_not_null,
SUM(column_3) OVER(PARTITION BY column_1)
PySpark
import pyspark.sql.functions as F
df = (
df.withColumn("is_not_null",
F.when(F.col("column_2").isNull(), 0).otherwise(1)
).withColumn("sum_column_3_over_partition", F.sum("column_3").over(
Window.partitionBy("column_1")
))
)
As shown above, Sql and PySpark have very similar structure. The df.select()
method takes a sequence of strings passed as positional arguments. Each of the SQL keywords have an equivalent in PySpark using: dot notation e.g. df.method()
, pyspark.sql
, or pyspark.sql.functions
.
Pretty much any SQL select structure is easy to duplicate with some googling for the SQL keywords.
Tip: use
[df.selectExpr()](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrame.selectExpr.html)
to run SQL commands with SQL strings.
FROM
Now our SELECT statements are worthless without a good FROM clause.
SQL
FROM df
PySpark
df
Pretty complex, right?
As shown above, the FROM table is defined by the DataFrame you reference prior to a method.
If you’re accustomed to using CTE’s in your SQL code, you can duplicate CTE logic by assigning a set of transformations to a variable.
import pypsark.sql.functions as F
df = spark.read.table(my_table)
cte_1 = df.withColumn(
"is_not_null",
F.when(F.col("column_2").isNull(), 0).otherwise(1)
)
cte_2 = cte_1.withColumn(
"sum_column_3_over_partition",
F.sum("column_3").over(
Window.partitionBy("column_1")
)
)
WHERE
The WHERE clause is an underrated clause that can dramatically improve query time. In PySpark, there are two identical methods that allow you to filter data: df.where()
and df.filter()
.
SQL
WHERE column_2 IS NOT NULL
AND column_1 > 5
PySpark
df.where("column_2 IS NOT NULL and column_1 > 5")
As you’ll note above, both support SQL strings and native PySpark, so leveraging SQL syntax helps smooth the transition to PySpark. But, for readability and error-raising purposes, completely native PySpark should (probably) be the end goal.
JOIN
JOINs are another very underrated clause – if you can get really good at joins, the number of bugs in your code decreases dramatically. According to Spark: the Definitive Guide, there are 8 broad categories of joins, some of which include INNER and LEFT OUTER.
We won’t be covering each, but in general PySpark joins follow the below syntax:
<LEFT>.join(<RIGHT>, <JOIN_EXPRESSION>, <JOIN_TYPE>)
<LEFT>
and<RIGHT>
are PySpark DataFrames<JOIN_EXPRESSION>
is boolean comparison between columns in the two DataFrames<JOIN_TYPE>
is a string which determines the join type
SQL
FROM table_1
INNER JOIN table_2
ON table_1.x = table_2.y
PySpark
table_1.join(table_2, table_1["x"] == table_2["y"], "inner")
Tip: use
<DF>.dropDuplicates().count() == <DF>.count()
to check if you have duplicates in the left, right, or joined tables. These errors are sometimes hard to spot of you’re not looking for them.
GROUP BY
Moving on to the more complex SQL concept of grouping, PySpark has very similar syntax to pandas in this realm.
SQL
SELECT
column_1,
SUM(column_3) AS col_3_sum
FROM df
GROUP BY 1
PySpark
import pypspark.sql.functions as F
df.groupBy("column_1").agg(F.sum("column_3").alias("col_3_sum"))
There are many different ways to group data in PySpark, however the most versatile syntax is the above. We leverage .agg()
and pass many positional arguments that define how we transform the columns. Note that we can chain .alias()
to rename our column to something more usable than sum(column_3)
.
If you memorize this syntax you’ll always be able to make any transformation you want. To be perfectly clear, the syntax is…
df.groupBy(['<col_1>','<col_2>',...]).agg(
F.<agg_func>('<col_3>').alias('<name_3>'),
F.<agg_func>('<col_4>').alias('<name_4>'),
...
)
For a list of aggregation functions and examples of each, check out sparkbyexamples.
Conclusion
Here we covered the very basics of moving from SQL to PySpark. With the above structure and some help from google, you can write pretty much any SQL query in native PySpark.
Note that there are lots of SELECT statement keywords, such as CASE, COALESCE, or NVL, all of which can be written using df.selectExpr()
. If you want to to move to native PySpark, they’re very straightforward to google.
Hope this helps and good luck!
Thanks for reading! I’ll be writing 13 more posts that bring academic research to the DS industry. Check out my comment for links to the main source for this post and some useful resources.