The world’s leading publication for data science, AI, and ML professionals.

Using Python and Spark to research the Climate Change, Part 2

Create your own Insights on Global Warming using publicly available Data.

Photo by NOAA on Unsplash
Photo by NOAA on Unsplash

This is the second part of a small series dedicated to perform some analytics on publicly available weather data in order to find significant indicators for the climate change, specifically for the global warming.

I am by no means an expert for meteorology nor for climate models, but I have lots of experience in working with data -many of us are in a similar boat since there are probably many more mathematicians, data scientists and data engineers than meteorologists. Performing an analysis based on trustworthy and publicly available data is thrilling precisely in this specific situation, such that anyone interested with some development skills can follow all steps and create their own insights, simply by applying one’s knowledge in working with data combined with common sense.

Specifically, this capability of a broad audience to reproduce scientific insights via commonly available methodologies becomes more and more important in this world, where important decisions are increasingly based on data and mathematical models while at the same time social media simplifies to spread false information. Along with that idea, this article series is my take for researching the climate change.

Source Code

Many details of processing steps are omitted in this article to keep focus on the general approach. You can find a Jupyter notebook containing the complete working code on GitHub.


Outline

The whole journey from downloading some data until making some pictures is split up into three separate stories, since each step already contains lots of information.

  1. Getting the data. The first part is about getting publicly available weather data and about extracting relevant metrics from this data. Depending on the data source, this turns out to be more complicated than one might guess.
  2. Preparing the data (you are just reading this part). The next step will focus on preparing the data in such a way that we can easily answer questions about average measures like temperature or wind speed per weather station and per country.
  3. Generating insights. Finally in the last part we will be able to perform some analysis based on the prepared data that will show the climate change.

1. Retrospective and next Steps

The first part of this series focused on getting some raw weather data from the National Oceanic and Atmospheric Administration (NOAA) and on transforming the data into a more convenient file format. This is one of many possible sources, but I decided to use it for several reasons:

  • It contains (more or less) raw weather measurements from weather stations. Some basic processing has been performed, but (AFAIK) no aggregation has been done.
  • The data is recorded hourly and goes back until 1901.
  • The data contains measurements from weather stations all around the world.
  • Among the measurements are air temperature, wind speed, precipitation, dew point, atmospheric pressure, cloudiness and much more.
Weather stations map - source NOAA
Weather stations map – source NOAA

Since we are talking about 100GB of compressed raw data, all processing so far has been performed using PySpark, the Python binding for Apache Spark, which in turn is a very popular framework in the world of Big Data to build typical data processing pipelines.

This time we will be focusing on making sense of the data – not so much in a technical sense (since this was addresses in the previous part), but more in a semantic sense. As we will see, weather data (and probably sensor data in general) has some very unique challenges, which are (hopefully) not to be seen within for example financial data.

So what will we learn this time?

  • How to aggregate data with PySpark
  • How to clean data

2. Prerequisites

I assume that you already followed the first part of the series, since we will be building upon the resulting Parquet files. Again, you need some free space left on your computer, since we will create new derived data sets. Those data sets will be smaller in size, but they will still occupy a couple of gigabytes.

3. Daily Preaggregation

Let’s pick up where we stopped last time. You should now have a set of directories (one per year from 1901 until 2020) filled with Parquet files, which have been created from the original raw weather data.

3.1 Inspect Data

To warm up again, let’s start by reading in the data from last time and inspect its schema:

# Read in weather data from Parquet files
weather = spark.read.parquet(hourly_weather_location)
# Inspect the schema
weather.printSchema()
The data schema of the converted weather data
The data schema of the converted weather data

The data contains all measurements of all weather stations from all years. Let me explain again, what all these columns actually mean:

  • usaf and wban are two different identifiers for each weather station. You might wonder where these names come from: USAF actually is the abbreviation of "United States Air Force" and WBAN is the abbreviation of "Weather Bureau Army Navy". So both data sources have their origins in the US army, but I don’t know if all weather stations actually belong to the US army. Keep in mind that not all stations have valid identifiers from both institutions, which means that either usaf or wban can contain an invalid value (99999). This in turn implies that only the combination of both fields create a truly unique identifier. We will come back to that later.

  • ts simply is the timestamp when the measurement was recorded. Although the data is called hourly, the timestamp has a resolution of seconds. We will also discuss some important details later.
  • date simply is derived from the timestamp and contains the date of the measurement. I admit that I introduced this column rather early, but we will be interested mostly in average daily temperatures.
  • report_type denotes the type of geophysical surface observation, for example if it is an aerological report, a surface report or if it comes from a specific partner.
  • wind_direction provides the angle, measured in a clockwise direction, between true north and the direction from which the wind is blowing.
  • wind_direction_qual denotes a quality status of a reported wind direction angle.
  • wind_observation contains the report type of the wind. For example it may contain a 5 minute average speed, a 60 minute average speed or if it was calm.
  • wind_speed The rate of horizontal travel of air past a fixed point measured in meters per second.
  • wind_speed_qual denotes a quality status of the reported wind speed.
  • air_temperature contains the temperature of the air, measured in degrees Celsius.
  • air_temperature_qual denotes a quality status of the reported air temperature.
  • precipitation_... denotes various fields devoted to precipitation – but I don’t fully understand them yet.
  • year contains the year (actually automatically provided by PySpark from the name of each sub directory)

Remember that this is only a small subset of all the information contained in the original files, but these columns are sufficient for our purpose.

We will discuss most of these fields in more detail later, and I invite you again to study the official documentation of the original raw data format which provides many details on the meaning of these fields. But I’d already like to mention one important aspect, which might be somewhat unique to sensor data: In addition to the metrics themselves the data also contains quality indicators for each measurements. These indicators tell us if each metric of each measurement is valid or not. There are multiple different scenarios which result in partially invalid measurements:

  • A weather station might not have all sensors. For example a certain weather station might only measure temperature but not wind speed.
  • One of the sensors of a weather station might be broken while the other is still working fine.
  • Different sensors are collected in different time intervals.

Of course we can also peek inside the data:

weather.limit(10).toPandas()
Weather data as stored in the Parquet files
Weather data as stored in the Parquet files

Here we already see that the quality indicators are really important, since fortunately a wind speed of 999.9 meters per second still seems very unrealistic – even with the Climate Change in mind.

Just out of curiosity, let’s count the total number of records:

weather.count()

As you will see, we almost have 3,5 billion records – that is really a non-trivial amount of data for a single machine, but still manageable with the right tools.

3.2 Cleaning Data

As I explained above, the data contains many invalid measurements which we’d like to ignore. You might wonder why these records are present in the original data in the first place. There are many good reasons for their presence:

  • Whenever an individual metric of a weather station is invalid, that doesn’t mean that other metrics of the same record are also invalid.
  • The world of sensor data is more complex than a binary decision between "valid" and "invalid". This is reflected by the quality codes (which can be different for different metrics), as we will see.
  • For an operator of a weather station, collecting information on invalid measurements might also be interesting, for example to perform conclusions on the reliability of different components.
  • The decision how an invalid record is to be handled is not already taken by the data provider, but each consumer can come up with a strategy which fits to the specific problem to solve.

As I said, hopefully the financial systems within the bank of your trust doesn’t produce data which needs to be flagged as "erroneous" or "suspect".

We will be mainly concerned about the air temperature, which (according to the official format documentation) can have the following quality codes:

0 = Passed gross limits check
1 = Passed all quality control checks
2 = Suspect
3 = Erroneous
4 = Passed gross limits check, data originate from an NCEI data source
5 = Passed all quality control checks, data originate from an NCEI data source
6 = Suspect, data originate from an NCEI data source
7 = Erroneous, data originate from an NCEI data source
9 = Passed gross limits check if element is present
A = Data value flagged as suspect, but accepted as a good value
C = Temperature and dew point received from Automated Weather Observing System (AWOS) are reported in whole degrees Celsius. Automated QC flags these values, but they are accepted as valid.
I = Data value not originally in data, but inserted by validator
M = Manual changes made to value based on information provided by NWS or FAA
P = Data value not originally flagged as suspect, but replaced by validator
R = Data value replaced with value computed by NCEI software
U = Data value replaced with edited value

Well that are actually many more cases than I really understand, but this shows how much care has been taken for classifying each measurement. You will find similar descriptions for other metrics like wind speed, precipitation and so on – each metric has its own logic for determining its quality.

Now we still need a strategy how to handle the invalid values. Since dropping full records (i.e. rows in a table) would also remove valid values in different columns, we will use a different approach. Instead we will simply replace all invalid or suspect values with NULL values. This has two advantages over dropping records:

  • We still keep all valid values for different metrics in the same record
  • NULL values are ignored by all SQL aggregation functions – and this is what we will do

This replacement can be done with PySpark as follows (some columns are omitted in the example below):

valid_weather = weather 
    .withColumn("date", f.to_date(weather["ts"])) 
    .withColumn("hour", f.hour(weather["ts"])) 
    .withColumn("valid_wind_speed", 
        f.when(weather["wind_speed_qual"].isin('1','5'), 
        weather["wind_speed"])) 
    .withColumn("valid_air_temperature", 
        f.when(weather["air_temperature_qual"].isin('1','5','R'), 
        weather["air_temperature"]))

Note that we keep all wind speeds with a quality code of 1 or 5 and all air temperatures with a quality code of 1,5 or R.

3.3 Hourly Aggregation

In the next step, we now aggregate all measurements to hourly values per weather station and per date/hour. You might wonder why this could make sense given that the data set is an hourly data set. The reason is that the data set actually might contain multiple measurements per weather station per hour, for example when the different sensors (for wind and temperature) are recorded in slightly different time intervals. This can be seen when closely inspecting the timestamp column ts of individual weather stations, which now also explains why they are recorded at a precision of seconds instead of precision of one hour.

By performing an hourly aggregation, all these records within the same hour will be merged together to a single record per weather station and per hour.

We always use the AVG aggregation function for temperature and wind speed in order to obtain the average values within a specific hour. If we were to collect precipitation, we probably would use a SUM function in order to collect the total amount of rainfall within a specific hour.

Remember that all aggregation functions ignore any NULL values, which we used to mask suspect or invalid data.

hourly_weather = valid_weather 
    .withColumn("date", f.to_date(valid_weather["ts"])) 
    .withColumn("hour", f.hour(valid_weather["ts"])) 
    .groupBy("usaf", "wban", "date", "hour").agg(
        f.avg(valid_weather["valid_wind_speed"])
             .alias("wind_speed"),
        f.avg(valid_weather["valid_air_temperature"])
             .alias("temperature")
    )

3.4 Daily Aggregation

In a second step, we now create daily preaggregate of the data. We will also store the result as Parquet files again. While the first step guaranteed that we will have precisely a single record per hour per weather station, this step will now create daily summaries out of the hourly records.

Note that we not only collect average values using AVG but also minimum and maximum values.

daily_weather = hourly_weather.groupBy("usaf", "wban", "date")
    .agg(
        f.min("temperature").alias("min_temperature"),
        f.max("temperature").alias("max_temperature"),
        f.avg("temperature").alias("avg_temperature"),
        f.min("wind_speed").alias("min_wind_speed"),
        f.max("wind_speed").alias("max_wind_speed"),
        f.avg("wind_speed").alias("avg_wind_speed"),
    )
daily_weather.write.parquet(daily_weather_location)

Again this step might take a while (possibly a couple of hours), depending on the CPU power of your system.

3.5 Read Back

In order to use the preaggragted data set (which will speed up further processing significantly), we read the data back into a PySpark DataFrame.

daily_weather = spark.read.parquet(daily_weather_location)
daily_weather.limit(10).toPandas()
Aggregated daily weather data
Aggregated daily weather data

A simple daily_weather.count() should tell us that the daily aggregates still contain about 200 million records. But since the data is stored in Parquet files, the pain will be much less while working with the data than with the original 3,5 billion records.

4. Country Level Aggregation

Now we have a nice data set containing daily aggregates for each weather station. In this section, we try to derive a new data set containing aggregated weather information per day and per country instead of per day and per weather station.

Actually, if we wanted to do this correctly, this would be much more difficult than what I will be doing here. Let me try to explain some problems that normally need to be addressed:

  • The active weather stations introduce bias that changes over time. For example a weather station in the mountains will report much lower temperatures than a weather station in the valley. Every change in the set of active weather stations will change the bias over time.
  • Especially huge countries like the USA, China or Russia are far from being homogeneous with regards to temperature, wind or rainfall. A global average value in these country might not make so much sense. On top of that the bias introduced by new or vanishing weather stations at extreme places is even stronger.
  • Simply averaging over all weather station doesn’t incorporate the distance between the stations. If two stations are located within low distance, they will report very similar weather but with twice the weight of a single station.

In order to address these issues, probably a global model would be required that describes the whole weather at uniform distances over the whole globe. This approach by far exceeds my skills and knowledge and is therefore out of scope for this article. We will have to follow a much simplified approach.

4.1 Joining Master Data

The measurements themselves only contain two columns identifying the weather station, but no information on the country is provided. But if you remember from the first part of this series, NOAA also provides a CSV file with the master data of all weather stations containing information like each station’s geo location, its lifespan and the country where it is located in.

We therefore now need to join the aggregated daily weather measurements with the stations master data, which contains the relevant information. The join can be easily performed by PySpark and we now use both ID columns usaf and wban , which only in their combination uniquely identify each weather station.

daily_weather = spark.read.parquet(daily_weather_location)
stations = spark.read.parquet(stations_location)
joined_data = daily_weather.join(stations, ["usaf", "wban"], "left")
joined_data.printSchema()
The schema of the joined data
The schema of the joined data

4.2 Country Aggregation

I reiterate that a country level aggregation of weather data (and probably of many other geo data sets) is not the most appropriate way to go. But for the sake of simplicity, we ignore all these concerns and we now aggregate all the data onto country and date. We will see later, how good or bad this approach is.

daily_country_weather = joined_data
    .groupBy("CTRY", "STATE", "date").agg(
        f.min(f.col("min_temperature")).alias("min_temperature"),
        f.max(f.col("max_temperature")).alias("max_temperature"),
        f.avg(f.col("avg_temperature")).alias("avg_temperature"),
        f.min(f.col("min_wind_speed")).alias("min_wind_speed"),
        f.max(f.col("max_wind_speed")).alias("max_wind_speed"),
        f.avg(f.col("avg_wind_speed")).alias("avg_wind_speed"),
    )
daily_country_weather.printSchema()
The schema of the aggregated data
The schema of the aggregated data

This now really looks like a neat schema for our purpose, therefore we save the transformed data for further investigations, which we will discuss in the next (and final) part of this article series.

daily_country_weather.write.parquet(daily_country_weather_location)

Since the source data set provided in daily_country_weather is now much smaller, this step should be much faster than all the previous transformations.

Finally, let’s inspect the resulting data set by picking some country:

daily_country_weather.filter("CTRY = 'FI'") 
    .orderBy("date", "CTRY", "STATE") 
    .toPandas()
The weather in Finland from 1901 until 2020
The weather in Finland from 1901 until 2020

Conclusion

In this second part of the series, we achieved two important goals: We cleaned up the data by ignoring invalid values to simplify downstream analytics and we created a semantically compressed aggregate to speed up downstream analytics. These two steps are also very common in many Data Science projects and are most often performed by a team of data scientists (who should understand the semantics of the data) and data engineers (who know the best tools to perform efficient processing).

Outlook

Now that we have a compact aggregate containing some minimal (but important) Weather metrics per country and per year, we will be digging deeper and researching the change of the temperature over time.


Related Articles