The world’s leading publication for data science, AI, and ML professionals.

Unmarshalling ERDDAP data into Akka/Scala with Spray

Science data is available at your fingertips. Here's a way to ingest it into your Akka/Scala projects.

Hands-on Tutorials

ERDDAP is a tool produced by the NOAA to provide scientific data sets in extensible formats for broad public use. If you have ever worked with scientific data before, you know that it is enormously challenging to work with because the data formats are built for very specific scientific uses, with complex requirements, such as working in multidimensional arrays, and often built using computer programming languages like Fortran that are rarely used outside the science world. Since it’s unlikely long-standing organizations like NASA or CERN will change their practices overnight, ERDDAP is an amazing service to the public, even if it’s a little bit lacking on the aesthetic end. It ingests data in a wide variety of common scientific data formats and provides easy access apis in a range of useful formats for easy use. A lot of organizations use ERDDAP to handle and distribute their data for open access (and other) purposes.

Most links in a ERDDAP based data service lead to the ERDDAP form, which is a bit intimidating on its own.

The ERDAP form - a bit ugly, but given the amount of data you can extract from an ERDAP server, the ugliness is forgiven.
The ERDAP form – a bit ugly, but given the amount of data you can extract from an ERDAP server, the ugliness is forgiven.

The default submit screen will provide you with an html table. I suppose you could try to parse this, but it makes more sense to work with the raw JSON data. There are a wide number of other formats you can use as well, including MATLAB binaries and netcdfs. To access the json data, you simply change the file type.

Get some good old json data!
Get some good old json data!

We will work with the plain JSON for now, but we will also look at the .jsonlKVP (key-value pair stream) a bit later. If you run submit, ERDDAP will download the json to your computer – but what if you want to access the data from a server or programmatically? For that, you can use the "just generate the URL" service. Alternately, you can learn how ERDDAP produces urls and skip the form process entirely.

Maybe it wasn't necessary to use the form after all.
Maybe it wasn’t necessary to use the form after all.

For this work, I decided to gather data from Canada’s Department of Fisheries and Oceans – a water temperature and pressure gauge in Pool’s Cove, Nova Scotia, Canada. The .json output looks something like this:

{
"table": {
  "columnNames": ["station_name", "time", "water_pressure", "water_temperature", "battery_voltage", "latitude", "longitude"],
  "columnTypes": ["String", "String", "float", "float", "float", "double", "double"],
  "columnUnits": [null, "UTC", "mbar", "degree_C", "V", "degrees_north", "degrees_east"],
  "rows": [
    ["POOLC_20200929", "2020-09-29T15:18:51Z", null, null, 0, 47.679933, -55.43001667],
    ["POOLC_20200929", "2020-09-29T15:18:51Z", null, null, 0, 47.679933, -55.43001667],
    ["POOLC_20200929", "2020-09-29T15:18:51Z", null, null, 0, 47.679933, -55.43001667],
  [...]
    ["POOLC_20200929", "2020-09-29T16:08:51Z", 83.5167, 14.4, 12.73, 47.679933, -55.42001667]
  [...]
}

If you know anything about Scala, you may already have had a heart attack. You see, unlike the more popular Python, Scala cares very much about the kind of data going in and out of its programs. As a strongly typed language, it will want to know exactly what data types are being used. As an object-oriented language, it will want to put the data into objects using classes. Finally, as a rather new Java-based language, it will want to avoid null values as much as possible.

The good news is that Spray was designed for putting all those datatypes into nice, clean java-like objects. Another great thing about Spray is that it is supported by the Akka Framework. While I will not discuss the inner workings of Akka too much here, the main gist is that with Akka, you can build streaming pipelines that operate concurrently without blocking inside a distributed system. So, if you wanted to take all this lovely ERDDAP data and launch it through a huge firehose quickly and reliably, Akka is a great way to work. Akka also has a sister library called Alpakka that will allow you to hook these streams into other services, such as into a Apache Kafka or Amazon Kinesis streams, into Cassandra or MongoDB, through serverless apps like Lambda and into the latest and greatest in big data formats like Parquet (which is something the ERDDAP people might consider down the road, although I haven’t found anyone asking for it yet.)

But enough of justifying Akka, let’s get into the process of coding. The first thing we need is an object to store all of our classes in. This requires a few imports.

import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import spray.json._
import DefaultJsonProtocol._

The first import allows us to use Spray in our Akka code. The second is the spray json library itself. Finally, the DefaultJsonProtocol library in spray contains all the basic data types that can appear in Json such as strings, arrays, objects, ints, floats and so on.

Next we build the object and create some case classes to store the data.

object erddapConstants extends SprayJsonSupport {

  //(implicit val will go here eventually)
  final case class Root(table: Table) // the root class
  final case class Table(
    columnNames: List[String],
    columnTypes: List[String],
    columnUnits: List[Option[String]],
    rows: ???)
  abstract class ErddapDataFormat {}
  final case class PoolCoveSensorRow(
    station_name: String,
    time: String,
    water_pressure: Option[Float],
    water_temperature: Option[Float],
    battery_voltage: Float,
    latitude: Double,
    longitude: Double) extends ErddapDataFormat
}

The object extends SprayJsonSupport to ensure we can get all the lovely json tools translated to Akka-based commands. We have a root class that will carry all the data and then a table class to correspond to the rest of the available data. Fortunately, we do not have to create objects to handle the List feature because Spray’s DefaultJsonProtocol already does that for us. We have also solved our null problem by asking Spray to create an Option[T] anytime it encounters null data. Again, the DefaultJsonProtocol does the heavy lifting for us there. Finally, there is a PoolCoveSensorRow case class to handle the row data itself.

However, we have a big problem in figuring out how to manage the rows. Spray does not support building a List that can have a value of Anytype. Furthermore, Spray will not be able to marshal the List directly into the PoolCoveSensorRow class because there are no key-value pairs that spray can use to match the row values to the case class. We will cover this problem later.

First we must look at how spray will automatically take json and put it into our case classes as we desire. The answer is by including implicit values to handle the data. Again, Spray provides us with some jsonFormat functions to help us convert json to and from our case classes. We only need to know how many parameters our case classes have.

implicit val poolcSensorRowJsonFormat: JsonFormat[PoolCoveSensorRow] = jsonFormat7(PoolCoveSensorRow)
implicit val rootFormat: RootJsonFormat[Root] = jsonFormat1(Root)
implicit val tableFormat: JsonFormat[Table] = jsonFormat4(Table)

Now for the last part. How do we handle the rows? My first attempt tried to use Spray’s custom formatting code. To do this, you build an implicit object with both a write and a read method. Something like this:

implicit object PoolCoveSensorRowFormat extends RootJsonFormat[PoolCoveSensorRow] {
  // writing is not so important here as you can simply 
  // create a JsObject out of the case class.
  // How you want to handle it is up to you.
  def write(item: PoolCoveSensorRow) = {} // do nothing
  def read(json: JsValue) = {
    val jsObject = json.asJsObject
    jsObject.getFields("rows") match {
      case List[List[???] if (some test) => (put List items into           PoolCoveSensorRow object)
      case _ => (handle error)
}

After working through a number of possible formatting options, I decided to go more simple. Instead of messing with the data format, we can just marshall it into spray’s JsValue type and then fix things as we need them. This involves changing the Table case class.

final case class Table (columnNames: List[String],
  columnTypes: List[String],
  columnUnits: List[Option[String]]
  rows: List[List[JsValue]) {
  def poolCoveSensorTest: Boolean = rows(0).length == 7 && rows(0)(0).convertTo[String] == "POOLC_20200929"
  def convertedRows[T <: ErddapDataFormat]: List[T] = rows match {
    case x if poolCoveSensorTest => rows.map({row: List[JsValue] =>
      PoolCoveSensorRow(
        row(0).convertTo[String],
        row(1).convertTo[String],
        row(2).convertTo[Float],
        (etc)
      )
    case _ => throw new Exception("Rows did not match any known ErrdapDataSet")})
    }
  }
}

One benefit of this approach is that you can create tests for other kinds of data as well and then include the test case in the convertedRows definition for data handling.

Finally, you just import this object into another class for data ingestion.

import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.stream.scaladsl.{Sink, Source}
import akka.http.scaladsl.model.{DateTime, HttpRequest, HttpResponse}
import java.util.concurrent.ExecutorService
import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService}
class ProcessErddapData () {
  implicit val system: ActorSystem = ActorSystem()
  val execution: ExecutorService = java.util.concurrent.Executors.newSingleThreadExecutor()
  implicit val ec: ExecutionContextExecutorService = ExecutionContext.fromExecutorService(execution)
  import erddapConstants._
  // Get source from url
  val poolCoveUrl: String = // the url
  def jsonDataSource(url: String): Future[HttpResponse] = { 
    val http = Http(system)
    http.singleRequest(HttpRequest(uri = url)
  }
  val poolCoveData: Source[PoolCoveSensorRow, NotUsed] = {
    val data = jsonDataSource(poolCoveUrl)
      .flatMap(resp => Unmarshal(resp.entity).to[Root])
      .map(_.table.convertedRows[PoolCoveSensorRow])
    Source.future(data).flatMapConcat(iter => Source(iter))
  }
}

Now you can use the poolCoveData value as a streaming source to pump into a Kafka producer, map through, merge with other data sources, filter and ultimate run with a Akka streams Sink for output.

Now there is another approach that could be used to build a streaming source. Instead of using the json format in Erddap, you can use the Jsonl format, which is simply a set of key value pairs separated by newline characters. It looks like this:

{"station_name":"POOLC_20200929", "time":"2020-09-29T15:18:51Z", "water_pressure":null, "water_temperature":null, "battery_voltage":0, "latitude":47.679933, "longitude":-55.43001667}
{"station_name":"POOLC_20200929", "time":"2020-09-29T15:38:51Z", "water_pressure":null, "water_temperature":null, "battery_voltage":0, "latitude":47.679933, "longitude":-55.43001667}
{"station_name":"POOLC_20200929", "time":"2020-09-29T16:18:51Z", "water_pressure":83.5167, "water_temperature":null, "battery_voltage":12.73, "latitude":47.679933, "longitude":-55.43001667}
[...] and so on...

Why didn’t I use this all along? Live and learn!

Needless to say, collecting the jsonl data using Akka and spray is much more straight-forward and barely worth running a tutorial. The disadvantage, of course, is that you do not have access to the column datatypes and units and will have to decide the conversions on your own. The main challenge is that you need to tell Akka how the data is framed – n delimited (as in this case) or something else like json list.

class ProcessErddapStreamData () {
  // This part is the same as before. Set up your actor system,
  // and the context for your concurrent operations (single thread
  // in this case).
  implicit val system: ActorSystem = ActorSystem()
  val execution: ExecutorService = java.util.concurrent.Executors.newSingleThreadExecutor()
  implicit val ec: ExecutionContextExecutorService = ExecutionContext.fromExecutorService(execution)
  import erddapConstants._
// Add a Flow to handle the framing of your Json with max size
  val jsonStream: Flow[ByteString, ByteString, NotUsed] = JsonFraming.objectScanner(1000)
  val poolCoveUrl = // the url ending with '.jsonlKVP'
  def jsonDataSource(url: String) = {
    // You need to fully run the Http request before 
    // processing the streams. If you do not, Akka will throw an
    // error to avoid having an http request open too long.
    Http()
      .singleRequest(HttpRequest(uri=url)
      .flatMap(resp => resp.entity.dataBytes.runReduce(_++_))}
  val poolCoveData = Source.future(jsonDataSource(poolCoveUrl))
      .via(jsonStream)
      .map(_.utf8String.parseJson.convertTo[PoolCoveSensorRow])
}

Using the JsonFraming class objectScanner method, you can both identify the separation values you want to use and set a maximum value for the data stream. Because the data here is newline delimited and that is handled by default in the JsonFraming class, there is not much to do other than simply create a Flow to handle the stream. Following that, you can easily convert the json object into the appropriate PoolCoveSensorRow class, using Spray’s .convertTo[Class] method.

Cleaning data is never easy, and working with unfamiliar data can produce many challenges. As always, it does not hurt to find a solution that works and improve on it over time as you learn more and more about what’s going on inside the data source and how your users would like to interact with it.

Here’s to learning more about data and finding new ways to work with it!


Related Articles