This is a follow-up article to my previous introduction to Gallia, a schema-aware data transformation library in Scala. This article will focus on the most important changes that were included in the latest release: 0.4.0 (available for Scala 2.12 and 2.13).
Table Of Contents
-
Reading/Writing from Apache Avro/Parquet – Apache Avro – Apache Parquet
- Reading from case classes
- Union Types
- Metaschema
- Complex aggregations
-
Complex transformations/cotransformations – Transformation via data class (for nested entities) – Cotransformation via a data class (for current level) – Cotransformation via custom processing
-
Newly supported types – Enums – Binary data – Temporal types
- Conclusion
Reading/Writing from Apache Avro/Parquet
Recall from the previous article that a typical processing in Gallia looks something like:
Which produces:
<root>
title _String
air_date _String
doctor _Int
title | air_date | doctor
----------------------- | ---------------- | ------
THE ELEVENTH HOUR | 3 April 2010 | 11
THE DOCTOR'S WIFE | 14 May 2011 | 11
...
Note: A TSV version of the episodes dataset is available as a gist here.
Apache Avro
Similarly, an Avro file can now be consumed via the following:
Which produces the exact same result.
Conversely, one can write results to an Avro file with .writeAvro("./mydata.avro")
Notes:
- It does not matter whether the origin was itself an Avro file, as inputs and outputs in Gallia are completely independent
- A direct consequence of the above note is that one could use Gallia purely for conversion purposes: Avro->JSON, Avro->TSV, Avro->Parquet, Parquet->Avro, etc. One caveat however: the data must comply with Gallia’s data model (for instance, no use of Avro’s map, no 2+D arrays, etc).
- In the future, the
streamAvro/writeAvro
methods will be replaced with genericstream/write
methods, with the extension acting as reference (see I/O section)
Apache Parquet
To process Parquet files instead of Avro, the code is exactly the same except:
- The module name is
gallia-**parquet
** instead ofgallia-avro
- The import is import gallia.parquet. instead of `import gallia.avro.`
- The methods names are streamParquet/writeParquet instead of
streamAvro/writeAvro
Notes:
- Gallia uses the Avro converter under the hood, namely AvroParquetReader and AvroParquetWriter
- In future versions, we will offer direct processing instead, via the custom ReadSupport/WriteSupport abstractions provided by Parquet
Reading from case classes
Let’s consider the following case class:
case class Person(name: String, age: Int, income: Int)
It can now be ingested as a single entity:
val peter = gallia.aobjFromDataClass(
Person(name = "Peter" , age = 29, income = 80000))
Meanwhile, a collection thereof can be ingested as follows:
So for instance, the following:
Will produce the expected:
<root>
name _String
age _Int
hourly_rate _Double
name | age | hourly_rate
------ | --- | ------------------
Peter | 29 | 38.31417624521073
Joanna | 29 | 14.367816091954023
Samir | 28 | 38.31417624521073
Note: While Gallia does not allow writing to case classes yet, it is scheduled for the next release – see NEXT_RELEASES (will allow it both via reflection and macros)
Union Types
Partial support for union types was also added in 0.4.0.
An example of usage is as follows:
This results in the following output:
<root> [... schema unchanged]
name | age
----- | ------------
Peter | 29
Samir | TWENTY-EIGHT
Since toUpperCase
is a String-only operation, the entity with an integer value for age is left untouched.
The above example presents a trivial case, but there can of course be more complex cases. An example would be when multiple nested entities are part of an union:
It produces the following output:
<root> [... schema unchanged]
{ "data": "Michael (27yo)" }
{ "data": { "name": "PETER", "age": 29 }}
{ "data": "Joanna (29 years old)" }
{ "data": { "name": "Samir", "dob": "March 14, 1971" } }
Only the value "Peter"
was upper-cased, due to the fact that it’s the only nested entity with an age entry.
Notes:
- See union_types.md
- More examples can be seen in UnionTypeTest.scala
- Union types in Gallia are still considered experimental at this point, and not all operations support them (the essential ones do however).
- One of the main reasons for supporting union types is to help data cleaning/feature engineering efforts. Indeed cases where fields are captured with different types are sadly very common in legacy datasets (think
true
mixed with"Yes"
and the likes)
Metaschema
A result of the added support for union type is the ability for Gallia to provide it’s own metaschema:
This means Gallia could dogfood itself for schema transformations. For instance here’s what a nested field renaming would look like (from f to F here):
Notes:
- Gallia does not actually use the metaschema in this manner internally
- A schema being a special case of data, namely metadata (or "data about data"), Gallia’s metachema is therefore also metametadata. And since Gallia’s metaschema can also be used to model itself, it is also its own metametaschema. Hence it is also metametametadata. Obviously.
Complex aggregations
Let’s reuse thepeople
handle from earlier. A very simple way to aggregate data would be:
people
.group("name" ~> "names").by("age")
.printJsonl()
It produces the following:
{ "age": 29, "names": [ "Peter", "Joanna" ] }
{ "age": 28, "names": [ "Samir" ] }
It could also (unnecessarily) be achieved via:
people
.aggregateBy("age").as("names")
.using { _.strings("name") }
.printJsonl()
Which produces the same result as above, but shows a trivial use of aggregateBy
.
And while there are more built-in aggregators readily available (SUM BY, MEAN BY, …) in Gallia, the aggregateBy
construct will be necessary to achieve processing like this in single action:
people
.aggregateBy("age").as("names")
.using { _.strings("name").concatenateStrings }
.printJsonl()
Which produces:
{ "age": 29, "names": "PeterJoanna" }
{ "age": 28, "names": "Samir" }
And is actually a shorthand for:
people
.aggregateBy("age").as("names")
.using { _.strings("name")
.mapV { _.reduceLeft(_ + _) } }
.printJsonl()
It could therefore be customized as required, for instance:
people
.aggregateBy("age").as("names")
.using { _.strings("name")
.mapV { _.reduceLeft(
_.toUpperCase + "|" + _.toUpperCase) } }
.printJsonl()
To produce:
{ "age": 29, "names": "PETER|JOANNA" }
{ "age": 28, "names": "SAMIR" }
But the real power of theaggregateBy
construct lies in the ability to do the following – more custom – type of aggregations:
people
.aggregateBy("age").using { group =>
("names" -> group.strings("name"),
"mean_income" -> group.ints ("income").mean) }
.printJsonl()
Which results in:
{"age": 29, "names": [ "Peter", "Joanna" ], "mean_income": 55000.0}
{"age": 28, "names": [ "Samir" ], "mean_income": 80000.0}
Note: The tuple-based entity creation used above is just a shorthand for the more explicit gallia.headO("names" -> ...)
, available for up to 5 entries
Complex transformations/cotransformations
Let’s switch over to a more serious domain to highlight these new features. Consider the following dataset:
Note: Reminder that the bobj
and bobjs
constructs are a convenience mechanism that allows the construction of entities whose schema can be easily inferred. Thereforebobj("f" -> 1)
is equivalent to the more explicitaobj("f".int)(obj("f" -> 1))
.
Transformation via data class (for nested entities)
Gallia now offers the ability to transform nested entities via case classes ("data classes"). Consider for instance:
case class Change(
chromosome: String,
position : Int ,
from : String,
to : String) {
def shorthand: String =
s"${chromosome}:${position};${from}>${to}"
}
Which models the change nested entity in the mutations
dataset above, and encapsulates an operation that produces a shorthand notation for the genetic change (e.g. "3:14532127;C>GG"
).
The following code will transform the change entity to its shorthand counterpart:
mutations
.transformDataClass[Change]("change")
.using(_.shorthand)
.display()
Which produces:
[...]
patient_id | age | change
---------- | --- | ----------------
p1 | 23 | 3:14532127;C>GG
p2 | 22 | 4:1554138;C>T
p3 | 21 | Y:16552149;AA>GT
Notes:
- This would apply to any other combination of optional/required and single/multiple for the nested entity, via .transformDataClass[Option[Change]], .transformDataClass[Seq[Change]], “ and .transformDataClass[Option[Seq[Change]]
- Subsequent versions of Gallia will leverage macros to render this mechanism more efficient (currently relies on reflection)
Cotransformation via a data class (for current level)
Let’s now consider the following case class which models a subset of the fields at the current level (as opposed to a nested entity this time):
import java.time.Year
case class Demographics(
age: Int,
sex: String) {
def toNewDemographics =
NewDemographics(
year_of_birth = Year.now().getValue - age,
has_Y_chromosome = sex == "male")
}
Along with the following case class which models a desired model change:
case class NewDemographics(
year_of_birth : Int,
has_Y_chromosome: Boolean)
The following code co-transforms the two fields by using the encapsulated method in the origin case class:
mutations
.cotransformViaDataClass[Demographics]
.usingWithErasing(_.toNewDemographics)
.display()
Which results in the expected:
<root>
patient_id _String
year_of_birth _Int
has_Y_chromosome _Boolean
change ...
{ "patient_id" : "p1",
"change" : { ... },
"year_of_birth" : 1999,
"has_Y_chromosome": true }
...
Notes:
- As it stands it would make more sense to transform age and sex separately, but the goal here is to highlight a co-transformation where the fields could be mixed and matched at will
.usingWithErasing
removes the original entries, while.using
would preserve them
Cotransformation via custom processing
Gallia also now offers an improved mechanism for truly custom processing, although generally a bad idea (as we lose the guarantee that the schema/data will co-evolve accordingly). For instance to reproduce the co-transformation above, we could create the following object, extending gallia.ObjToObj
:
And the following code would make use of it:
mutations
.custom(CustomDemographicsTransformation)
.display()
This would yield the same output.
Newly supported types
Gallia now supports additional types:
- Enums
- Binary data
- Temporal types
Enums
Enums can be created/used like so:
Which produces:
<root>
choice _Enm(List(rock, paper, scissors, spock))
choice
------
paper
Binary data
Binary data can be created/used like so:
Which produces:
bin
-----------
base64:Ym9v
Notes:
"Ym9v"
encodes"boo"
in Base64, since we put a'b'
byte (0x62
) where the'f'
byte was (0x66
).- The Base64 output is only used for serialization, the in-memory representation remains
java.nio.ByteBuffer
throughout processing
Temporal types
The temporal types supported in Gallia match their Java counterparts: java.time.{LocalDate, LocalTime, LocalDateTime, OffsetDateTime, ZonedDateTime, Instant}
They can be created/used like so (for instance with LocalDateTime
):
Which produces:
<root>
hungover _LocalDateTime
hungover
-------------------
2022-01-01T00:00:00
Conclusion
This concludes our tour of the main changes brought by 0.4.0, at least the ones that will improve the client code experience.
Other noteworthy additions include:
-
Experimental integration with the Python ecosystem: – Pandas: see ScalaPyPandasTest.scala and GalliaPandasTest.scala – Seaborn: see GalliaVizTest.scala
- Experimental memory optimization for dense data, captured as
(size: Int, data: Array[Any])
instead ofArray((Symbol, Any))
, with some corresponding operations. See Obg9.scala and example usage at Obg9Test.scala
For a more complete list of changes and improvements that came with this release, see CHANGELOG.md.
Feedback is always welcome!