For the most part, I found my transition from primarily working in SQL to primarily working in Spark to be smooth. Being familiar with ORMs like SQLalchemy and Django, it wasn’t hard to adapt. Selects, filters, joins, groupbys and things like that all work more or less the way they do in SQL. I think there’s a logic to structuring a data query, and that getting a feel for that logic is the hardest part of the battle – the rest is just implementation.
The difficult part of Spark, in my opinion, is resource management. Figuring out the right balance of cores, memory (both on heap and off heap), and executors is more art than science, and the error messages Spark gives you are often only minimally informative. And I’ve found very few good training materials for Spark. Maybe I was a little spoiled by coming from mostly using the PyData and SQL stack: when I have a question about Pandas or Postgres I Google it, which nearly always gets me some fairly complete documentation with working examples, often gets me a handful of informative blog posts, and usually gets me at least one helpful Stack Exchange question. When I have a question about Spark, I Google it, which more often than not gets me a documentation page that simply confirms that the method technically exists. And that’s it.
I’ve found resource management to be particularly tricky when it comes to PySpark user-defined functions (UDFs). Python UDFs are a convenient and often necessary way to do Data Science in Spark, even though they are not as efficient as using built-in Spark functions or even Scala UDFs. When using a Python UDF, it’s important to understand how Spark evaluates it. Consider the following example, which assumes a Spark data frame sdf
with two numeric columns col1
and col2
:
import pyspark.sql.functions as f
import pyspark.sql.types as t
def my_function(arg1, arg2):
argsum = arg1 + arg2
argdiff = arg1 - arg2
argprod = arg1 * arg2
return argsum, argdiff, argprod
schema = t.StructType([
t.StructField('sum', t.FloatType(), False),
t.StructField('difference', t.FloatType(), False),
t.StructField('product', t.FloatType(), False),
])
my_function_udf = f.udf(my_function, schema)
results_sdf = (
sdf
.select(
my_function_udf(
f.col('col1'), f.col('col2')
).alias('metrics')) # call the UDF
.select(f.col('metrics.*')) # expand into separate columns
)
If you call results_sdf.explain()
after executing the above code block, you should see a line that reads something like this:
BatchEvalPython [my_function(col1#87, col2#88), my_function(col1#87, col2#88), my_function(col1#87, col2#88)]
That means that in order to do the star expansion on your metrics field, Spark will call your udf three times – once for each item in your schema. This means you’ll be taking an already inefficient function and running it multiple times.
You can trick Spark into evaluating the UDF only once by making a small change to the code:
results_sdf = (
sdf
.select(
f.explode(
f.array(
my_function_udf(f.col('col1'), f.col('col2'))
)
).alias('metrics')
)
.select(f.col('metrics.*'))
)
Wrapping the results in an array and then exploding that array incurs some expense, but that expense is trivial compared to the resources saved by evaluating the UDF only once:
BatchEvalPython [my_function(col1#87, col2#88)]
You can achieve the same results by persisting or checkpointing the dataframe before doing star expansion, but that uses up memory and disk space, which you might not want to do.