Data Transformations in Scala with Gallia: Version 0.4.0 Is Out

Presenting new features available with this supercharged release

Anthony Cros
Towards Data Science

--

Photo by Shubham Dhage on Unsplash

This is a follow-up article to my previous introduction to Gallia, a schema-aware data transformation library in Scala. This article will focus on the most important changes that were included in the latest release: 0.4.0 (available for Scala 2.12 and 2.13).

Table Of Contents

Reading/Writing from Apache Avro/Parquet

Recall from the previous article that a typical processing in Gallia looks something like:

Which produces:

<root>
title _String
air_date _String
doctor _Int
title | air_date | doctor
----------------------- | ---------------- | ------
THE ELEVENTH HOUR | 3 April 2010 | 11
THE DOCTOR'S WIFE | 14 May 2011 | 11
...

Note: A TSV version of the episodes dataset is available as a gist here.

Apache Avro

Similarly, an Avro file can now be consumed via the following:

Which produces the exact same result.

Conversely, one can write results to an Avro file with .writeAvro(“./mydata.avro”)

Notes:

  • It does not matter whether the origin was itself an Avro file, as inputs and outputs in Gallia are completely independent
  • A direct consequence of the above note is that one could use Gallia purely for conversion purposes: Avro->JSON, Avro->TSV, Avro->Parquet, Parquet->Avro, etc. One caveat however: the data must comply with Gallia’s data model (for instance, no use of Avro’s map, no 2+D arrays, etc).
  • In the future, the streamAvro/writeAvro methods will be replaced with genericstream/write methods, with the extension acting as reference (see I/O section)

Apache Parquet

To process Parquet files instead of Avro, the code is exactly the same except:

  • The module name is gallia-parquet instead of gallia-avro
  • The import is import gallia.parquet._ instead of import gallia.avro._
  • The methods names are streamParquet/writeParquet instead of streamAvro/writeAvro

Notes:

Reading from case classes

Let’s consider the following case class:

    case class Person(name: String, age: Int, income: Int)

It can now be ingested as a single entity:

  val peter = gallia.aobjFromDataClass(
Person(name = "Peter" , age = 29, income = 80000))

Meanwhile, a collection thereof can be ingested as follows:

So for instance, the following:

Will produce the expected:

<root>
name _String
age _Int
hourly_rate _Double
name | age | hourly_rate
------ | --- | ------------------
Peter | 29 | 38.31417624521073
Joanna | 29 | 14.367816091954023
Samir | 28 | 38.31417624521073

Note: While Gallia does not allow writing to case classes yet, it is scheduled for the next release — see NEXT_RELEASES (will allow it both via reflection and macros)

Union Types

Partial support for union types was also added in 0.4.0.

An example of usage is as follows:

This results in the following output:

<root> [... schema unchanged]name  | age
----- | ------------
Peter | 29
Samir | TWENTY-EIGHT

Since toUpperCase is a String-only operation, the entity with an integer value for age is left untouched.

The above example presents a trivial case, but there can of course be more complex cases. An example would be when multiple nested entities are part of an union:

It produces the following output:

<root> [... schema unchanged]{ "data":           "Michael (27yo)" }
{ "data": { "name": "PETER", "age": 29 }}
{ "data": "Joanna (29 years old)" }
{ "data": { "name": "Samir", "dob": "March 14, 1971" } }

Only the value "Peter" was upper-cased, due to the fact that it’s the only nested entity with an age entry.

Notes:

  • See union_types.md
  • More examples can be seen in UnionTypeTest.scala
  • Union types in Gallia are still considered experimental at this point, and not all operations support them (the essential ones do however).
  • One of the main reasons for supporting union types is to help data cleaning/feature engineering efforts. Indeed cases where fields are captured with different types are sadly very common in legacy datasets (think true mixed with"Yes"and the likes)

Metaschema

A result of the added support for union type is the ability for Gallia to provide it’s own metaschema:

This means Gallia could dogfood itself for schema transformations. For instance here’s what a nested field renaming would look like (from f to F here):

Notes:

  • Gallia does not actually use the metaschema in this manner internally
  • A schema being a special case of data, namely metadata (or “data about data”), Gallia’s metachema is therefore also metametadata. And since Gallia’s metaschema can also be used to model itself, it is also its own metametaschema. Hence it is also metametametadata. Obviously.

Complex aggregations

Let’s reuse thepeoplehandle from earlier. A very simple way to aggregate data would be:

people
.group("name" ~> "names").by("age")
.printJsonl()

It produces the following:

{ "age": 29, "names": [ "Peter", "Joanna" ] }
{ "age": 28, "names": [ "Samir" ] }

It could also (unnecessarily) be achieved via:

people
.aggregateBy("age").as("names")
.using { _.strings("name") }
.printJsonl()

Which produces the same result as above, but shows a trivial use of aggregateBy.

And while there are more built-in aggregators readily available (SUM BY, MEAN BY, …) in Gallia, the aggregateBy construct will be necessary to achieve processing like this in single action:

people
.aggregateBy("age").as("names")
.using { _.strings("name").concatenateStrings }
.printJsonl()

Which produces:

{ "age": 29, "names": "PeterJoanna" }
{ "age": 28, "names": "Samir" }

And is actually a shorthand for:

people
.aggregateBy("age").as("names")
.using { _.strings("name")
.mapV { _.reduceLeft(_ + _) } }
.printJsonl()

It could therefore be customized as required, for instance:

people
.aggregateBy("age").as("names")
.using { _.strings("name")
.mapV { _.reduceLeft(
_.toUpperCase + "|" + _.toUpperCase) } }
.printJsonl()

To produce:

{ "age": 29, "names": "PETER|JOANNA" }
{ "age": 28, "names": "SAMIR" }

But the real power of theaggregateBy construct lies in the ability to do the following — more custom — type of aggregations:

people
.aggregateBy("age").using { group =>
("names" -> group.strings("name"),
"mean_income" -> group.ints ("income").mean) }
.printJsonl()

Which results in:

{"age": 29, "names": [ "Peter", "Joanna" ], "mean_income": 55000.0}
{"age": 28, "names": [ "Samir" ], "mean_income": 80000.0}

Note: The tuple-based entity creation used above is just a shorthand for the more explicit gallia.headO("names" -> ...), available for up to 5 entries

Complex transformations/cotransformations

Let’s switch over to a more serious domain to highlight these new features. Consider the following dataset:

Note: Reminder that the bobj and bobjs constructs are a convenience mechanism that allows the construction of entities whose schema can be easily inferred. Thereforebobj("f" -> 1) is equivalent to the more explicitaobj("f".int)(obj("f" -> 1)).

Transformation via data class (for nested entities)

Gallia now offers the ability to transform nested entities via case classes (“data classes”). Consider for instance:

case class Change(
chromosome: String,
position : Int ,
from : String,
to : String) {

def shorthand: String =
s"${chromosome}:${position};${from}>${to}"

}

Which models the change nested entity in the mutations dataset above, and encapsulates an operation that produces a shorthand notation for the genetic change (e.g. "3:14532127;C>GG").

The following code will transform the change entity to its shorthand counterpart:

mutations
.transformDataClass[Change]("change")
.using(_.shorthand)
.display()

Which produces:

[...]patient_id | age | change
---------- | --- | ----------------
p1 | 23 | 3:14532127;C>GG
p2 | 22 | 4:1554138;C>T
p3 | 21 | Y:16552149;AA>GT

Notes:

  • This would apply to any other combination of optional/required and single/multiple for the nested entity, via .transformDataClass[Option[Change]], .transformDataClass[Seq[Change]], and .transformDataClass[Option[Seq[Change]]
  • Subsequent versions of Gallia will leverage macros to render this mechanism more efficient (currently relies on reflection)

Cotransformation via a data class (for current level)

Let’s now consider the following case class which models a subset of the fields at the current level (as opposed to a nested entity this time):

import java.time.Yearcase class Demographics(
age: Int,
sex: String) {

def toNewDemographics =
NewDemographics(
year_of_birth = Year.now().getValue - age,
has_Y_chromosome = sex == "male")

}

Along with the following case class which models a desired model change:

case class NewDemographics(
year_of_birth : Int,
has_Y_chromosome: Boolean)

The following code co-transforms the two fields by using the encapsulated method in the origin case class:

mutations
.cotransformViaDataClass[Demographics]
.usingWithErasing(_.toNewDemographics)
.display()

Which results in the expected:

<root>
patient_id _String
year_of_birth _Int
has_Y_chromosome _Boolean
change ...
{ "patient_id" : "p1",
"change" : { ... },
"year_of_birth" : 1999,
"has_Y_chromosome": true }
...

Notes:

  • As it stands it would make more sense to transform age and sex separately, but the goal here is to highlight a co-transformation where the fields could be mixed and matched at will
  • .usingWithErasing removes the original entries, while .using would preserve them

Cotransformation via custom processing

Gallia also now offers an improved mechanism for truly custom processing, although generally a bad idea (as we lose the guarantee that the schema/data will co-evolve accordingly). For instance to reproduce the co-transformation above, we could create the following object, extending gallia.ObjToObj:

And the following code would make use of it:

mutations
.custom(CustomDemographicsTransformation)
.display()

This would yield the same output.

Newly supported types

Gallia now supports additional types:

  • Enums
  • Binary data
  • Temporal types

Enums

Enums can be created/used like so:

Which produces:

<root>
choice _Enm(List(rock, paper, scissors, spock))
choice
------
paper

Binary data

Binary data can be created/used like so:

Which produces:

bin
-----------
base64:Ym9v

Notes:

  • "Ym9v" encodes"boo" in Base64, since we put a'b'byte (0x62) where the 'f'byte was (0x66).
  • The Base64 output is only used for serialization, the in-memory representation remainsjava.nio.ByteBuffer throughout processing

Temporal types

The temporal types supported in Gallia match their Java counterparts: java.time.{LocalDate, LocalTime, LocalDateTime, OffsetDateTime, ZonedDateTime, Instant}

They can be created/used like so (for instance with LocalDateTime):

Which produces:

<root>
hungover _LocalDateTime
hungover
-------------------
2022-01-01T00:00:00

Conclusion

This concludes our tour of the main changes brought by 0.4.0, at least the ones that will improve the client code experience.

Other noteworthy additions include:

For a more complete list of changes and improvements that came with this release, see CHANGELOG.md.

Feedback is always welcome!

--

--

Independent software engineer/architect with a focus on big data processing, domain modelling, software architecture, and bioinformatics.