With the increasing demand for high-speed querying and analysis on large datasets, Apache Spark has stood out as one of the most popular analytical engines in recent years. It is powerful in distributed data processing due to its master-worker architecture. This includes a driver program that coordinates with the cluster manager (master) and controls the execution of distributing smaller tasks to worker nodes. Besides, designed as an in-memory data processing engine, Spark primarily uses RAM to store and manipulate data, rather than relying on disk storage. These coordinately facilitate faster execution of overall tasks.

Apache Spark: From low-level to high-level
At the lower level, its architecture is designed based on two main abstractions:
- Resilient Distributed Dataset (RDD) – A low-level data abstraction in which each dataset can be divided into logical portions and executed on cluster worker nodes, thus aiding in parallel programming.
- Directed Acyclic Graph (DAG) – The representation that facilitates optimizing and scheduling the dependencies and sequences of tasks.
At the higher level, we can leverage a rich set of high-level tools using languages Scala, Python, or R. Examples of tools include Spark SQL for SQL and DataFrames, Pandas API on Spark for Pandas workloads, and Structured Streaming for stream processing.
However, before enjoying these functionalities, we may need much effort to self-manage a Spark cluster with the setup of infrastructure and a bunch of complex tools, which could cause a headache.
PySpark on Databricks
To address these challenges, PySpark on Databricks is recently one of the high-level solutions in the industry. PySpark is the Python API for Spark, while Databricks is a full software platform built on top of Spark. It includes notebooks, infrastructure orchestration (auto-provisioning and scaling), process orchestration (job submission and scheduling), managed clusters, and even source control.
Using PySpark APIs in Databricks, we will demonstrate and perform a feature engineering project on time series data. In this hands-on journey, we will simulate how Pandas library generally behaves for data processing, with the extra benefits of scalability and parallelism.
Note: If you want to know further how to dynamically orchestrate this Databricks notebook written in PySpark APIs in Azure, you can click here.

Consider a scenario in which you have the electric power consumption data for your household on hand, sampled at a one-minute rate from December 2006 to November 2010. Our objective is to ingest and manipulate data, extract features, and generate visualizations.
The dataset [with license as Database: Open Database, Contents: Database Contents], obtained from Kaggle, includes various fields such as date, time, global power (active and reactive), voltage, global intensity, and submetering (1, 2 and 3). We can begin our analysis now.
The initial setup
To begin, we need to create a user account for Databricks Community Edition, which gives a suitable Databricks environment for our proof-of-concept purpose. Afterward, we can upload the input data file to the FileStore, a dedicated Databricks path. By clicking "Create Table in Notebook", you are provided with the code template to initiate data ingestion.


Create a feature engineering project
#1 Ingest data
- Static data
We use the method spark.read()
to read our data source and return a DataFrame, a relational table. It supports various data sources such as CSV, JSON, Parquet, and more. In this case, we read the power consumption data in CSV format with a defined schema, where the first row serves as the header, and ";" is as the delimiter.
# File location and type
file_location = "/FileStore/tables/household_power_consumption.csv"
file_type = "csv"
# CSV options
schema = "Date STRING, Time STRING, Global_active_power DOUBLE, Global_reactive_power DOUBLE, Voltage DOUBLE, Global_intensity DOUBLE, Sub_metering_1 DOUBLE, Sub_metering_2 DOUBLE, Sub_metering_3 DOUBLE"
first_row_as_header = "true"
delimiter = ";"
# Read CSV files
org_df = spark.read.format(file_type)
.schema(schema)
.option("header", first_row_as_header)
.option("delimiter", delimiter)
.load(file_location)
display(org_df)
The output of the DataFrame with the first several rows:

- Streaming data
In scenarios where data is continuously generated, we use stream processing techniques to read it incrementally. To demonstrate Spark’s behavior, I partitioned the original dataset into 10 subsets and stored them at the path "/FileStore/tables/stream/" beforehand. We then use another method spark.readStream()
for streaming data.
sourceStream=spark.readStream.format("csv")
.option("header",True)
.schema(schema)
.option("mode","dropMalformed")
.option("maxFilesPerTrigger",1)
.option("ignoreLeadingWhiteSpace",True)
.load("dbfs:/FileStore/tables/stream")
It is worth mentioning that the mode
setting as "dropMalformed" means that we discard the corrupted records, no matter whether the corruption is due to structural inconsistencies or other factors that make them unusable. Also, we choose to process only one file per trigger event.
By starting to receive data and checking the record count every ten seconds, we can observe the continuous arrival of streaming data.
import time
# Stream the content of the DataFrame
query = sourceStream.writeStream
.queryName("count")
.format("memory")
.outputMode("append")
.start()
# Display the count of rows
for _ in range(10):
spark.sql("SELECT COUNT(*) AS no_of_rows FROM count").show()
time.sleep(10)
#2 Manipulate and explore data
- Data transformation
Since the number of rows with missing values is relatively negligible, we choose to drop them. Besides, we extract time-related features so that patterns can potentially be observed in higher dimensions later.
from pyspark.sql.functions import col, concat_ws, to_date
# Drop rows with missing values
df = org_df.na.drop()
# Convert columns "Date" and "Time" into new column "DateTime"
df = df.withColumn("Date", to_date(col("Date"),"d/M/y"))
df = df.withColumn("Date", df["Date"].cast("date"))
df = df.select(concat_ws(" ", to_date(col("Date"),"d/M/y"), col("Time")).alias("DateTime"), "*")
df = df.withColumn("DateTime", df["DateTime"].cast("timestamp"))
# Add time-related features
df = df.withColumn("year", year("DateTime"))
df = df.withColumn("month", month("DateTime"))
df = df.withColumn("week_num", weekofyear("DateTime"))
df = df.withColumn("hour", hour("DateTime"))
- Data exploration
We can explore data with various basic PySpark methods.
(1) Select
The "’select" method allows us to create a subset of the data frame column-wise. In this example, we select columns in descending order of global active power.
df.select("DateTime", "Global_active_power", "Global_intensity").sort("Global_active_power", ascending=False).show(5)

(2) Filter
This filters data points based on column values. In this example, we filter by two columns: "year" and "Global_intensity".
df.filter(
(col("year") == 2009) &
(col("Global_intensity") > 40)
).count()
# Output: 10
(3) groupby
We can also perform some aggregations. In our dataset, we calculate the average of global active power and sub-meterings for different months.
df.groupby("month").agg(
round(mean("Global_active_power"), 2).alias("Avg_global_active_power"),
round(mean("Sub_metering_1"), 2).alias("Avg_sub_metering_1"),
round(mean("Sub_metering_2"), 2).alias("Avg_sub_metering_2"),
round(mean("Sub_metering_3"), 2).alias("Avg_sub_metering_3"),
).sort(["month"]).show(5)

#3 Extract features using Window functions
In addition to the above basic PySpark methods and functions, we can leverage Window functions to generate additional features to capture temporal dependencies and relationships in time series data. Assuming we have a transformed dataset ("df2") where the total global active power is aggregated by day from one-minute rate samples. Let’s explore how we can obtain these features.
(1) Lag features
These represent the metrics’ values from previous days, which aids our models in learning from historical data and identifying trends.
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, round
# Create a Window specification based on the 'Date' column
windowSpec = Window.orderBy("Date")
# Calculate the lagged value of 'Total_global_active_power'
df2 = df2.withColumn("power_lag1", round(lag(col("Total_global_active_power"), 1).over(windowSpec), 2))
display(df2)

(2) Delta features
This is to take a subsequent step to capture short-term changes or variations over time by calculating the difference between original data fields and the lag features.
# Calculate the difference between columns
df2 = df2.withColumn("power_lag1_delta", round(col("power_lag1") - col("Total_global_active_power"), 2))
display(df2)

(3) Window average features
These features calculate the average value of our target data fields within a sliding window, enabling us to capture the smoothed patterns and relatively long-term trends. Here I pick the window sizes as 14 (2 weeks) and 30 (roughly 1 month).
# Add window average fields to the DataFrame for the specified window sizes
def add_window_avg_features(df, window_sizes):
for window_size in window_sizes:
window_col_name = f"avg_power_l{window_size}"
windowSpec = Window.orderBy("Date").rowsBetween(-window_size, 0)
df = df.withColumn(window_col_name, round(avg(col("Total_global_active_power")).over(windowSpec), 2))
return df
window_sizes = [14, 30]
df2 = add_window_avg_features(df2, window_sizes)
df2.select("Date", "Total_global_active_power", "avg_power_l14", "avg_power_l30").sort("Date", ascending=False).show(5)

(4) Exponentially weighted moving average (EWMA) features
EWMA features are the rectified version of window average features by assigning more weight and emphasis to recent data, and less to past data. A higher value of weight (alpha) means that the EWMA features track more closely to the original time series. Here I pick two separate weights: 0.2 and 0.8.
import pyspark.pandas as ps
# Add EWMA features to the DataFrame for the specified alpha values
def add_ewma_features(df, alphas):
for alpha in alphas:
ewma_col_name = f"ewma_power_w{str(alpha).replace('.', '')}"
windowSpec = Window.orderBy("Date")
df[ewma_col_name] = df.Total_global_active_power.ewm(alpha=alpha).mean().round(2)
return df
alphas = [0.2, 0.8]
# Convert into a pandas-on-Spark DataFrame, to use EWM function
df2_pd = df2.pandas_api()
df2_pd = add_ewma_features(df2_pd, alphas)
# Convert back to a Spark DataFrame
df2 = df2_pd.to_spark()
df2.select("Date", "Total_global_active_power", "ewma_power_w02", "ewma_power_w08").sort("Date", ascending=False).show(5)

#4 Generate visualizations on Notebook
After extracting time-related data and features using various PySpark functions and methods, we can use the built-in support from Databricks to create visualizations efficiently. This works by dragging and dropping data fields and configuring visual settings in the visualization editor. Some examples are shown below.
- Scatter plot: Relationship between global active power and global intensity
Interpretation: There is a highly positive correlation between the two fields.

- Box plot: The distribution of global active power across hours
Interpretation: There are relatively large variations of global active power from 7:00 to 21:00.

- Line chart: The changes in total global active power, EWMA with alpha 0.2, and EWMA with alpha 0.8 from Jan 2008 to Mar 2008
Interpretation: EWMA with alpha 0.8 sticks more closely to the original time series than EWMA with alpha 0.2.

Besides, we can generate the default data profiles to display summary statistics such as count, % of missing values, and data distributions. This ensures data quality throughout the entire feature engineering process. These above visualizations can alternatively be created by the query output using Databricks SQL.
Wrapping it up
In our hands-on exploration, we used PySpark for feature engineering of time-series data using the Databricks platform:
- Ingesting static and streaming data, by using the
spark.read()
andspark.readStream()
methods respectively. - Manipulating and exploring data, by using a range of basic PySpark functions available in
pyspark.sql.functions
, and DataFrame methods. - Extract trend-related features, by calculating the relationship within groups of data using
pyspark.sql.Window
. - Visualization, by using the built-in features of Databricks Notebook.
When dealing with a massive dataset, PySpark is often preferred over Pandas due to its scalability and performance capabilities. PySpark’s support for lazy evaluation also means that computations are only performed when necessary, which reduces the overhead. However, Scala can sometimes be a better option, because we can closely catch up with the latest features as Spark itself is written in Scala. And more likely that systems which are less error-prone can be designed with the use of immutable objects. Different languages or libraries have their edges. Ultimately, the choice depends on the enterprise requirements, developers’ learning curves, and integration with other systems.
Before you go
If you enjoy this reading, I invite you to **** follow my Medium page and LinkedIn page. By doing so, you can stay updated with exciting content related to Data Science side projects, and Machine Learning Operations (MLOps) demonstrations methodologies.