Apache Sedona stands out for processing geospatial data at Scale

The spatial data deluge

Mo Sarwat
Towards Data Science

--

In the past decade, the volume of available geospatial data increased tremendously. Such data includes but not limited to: weather maps, socio-economic data, and geo-tagged social media. For example, spacecrafts from NASA keep monitoring the status of the earth, including land temperature, atmosphere humidity. As of today, NASA has released over 22PB satellite data. Today, we have close 5 billion mobile devices all around the world. In consequence, Mobile Apps generate tons of gesoaptial data. For instance, Lyft, Uber, and Mobike collect terabytes of GPS data from millions of riders every day. In fact, everything we do on our mobile devices leaves digital traces on the surface of the Earth. Moreover, the unprecedented popularity of GPS-equipped mobile devices and Internet of Things (IoT) sensors has led to continuously generating large-scale location information combined with the status of surrounding environments. For example, several cities have started installing sensors across the road intersections to monitor the environment, traffic and air quality.

Making sense of the rich geospatial properties hidden in the data may greatly transform our society. This includes many subjects undergoing intense study, such as climate change analysis, study of deforestation, population migration, analyzing pandemic spread, urban planning, transportation, commerce and advertisement. These data-intensive geospatial analytics applications highly rely on the underlying data management systems (DBMSs) to efficiently retrieve, process, wrangle and manage data.

Apache Sedona (Formerly GeoSpark) Overview

Apache Sedona (Formerly GeoSpark) (http://sedona.apache.org) is a cluster computing framework that can process geospatial data at scale. GeoSpark extends the Resilient Distributed Dataset (RDD), the core data structure in Apache Spark, to accommodate big geospatial data in a cluster. A SpatialRDD consists of data partitions that are distributed across the Spark cluster. A Spatial RDD can be created by RDD transformation or be loaded from a file that is stored on permanent storage. This layer provides a number of APIs which allow users to read heterogeneous spatial object from various data formats.

GeoSpark allows users to issue queries using the out-of-box Spatial SQL API and RDD API. The RDD API provides a set of interfaces written in operational programming languages including Scala, Java, Python and R. The Spatial SQL interfaces offers a declarative language interface to the users so they can enjoy more flexibility when creating their own applications. These SQL API implements the SQL/MM Part 3 standard which is widely used in many existing spatial databases such as PostGIS (on top of PostgreSQL). Next, we show how to use GeoSpark.

Supported spatial data sources in Apache Sedona

In the past, researchers and practitioners have developed a number of geospatial data formats for different purposes. However, the heterogeneous sources make it extremely difficult to integrate geospatial data together. For example, WKT format is a widely used spatial data format that stores data in a human readable tab-separated-value file. Shapefile is a spatial database file which includes several sub-files such as index file, and non-spatial attribute file. In addition, geospatial data usually possess different shapes such as points, polygons and trajectories.

Currently, Sedona (GeoSpark) can read WKT, WKB, GeoJSON, Shapefile, and NetCDF / HDF format data from different external storage systems such as local disk, Amazon S3 and Hadoop Distributed File System (HDFS) to Spatial RDDs. Spatial RDDs now can accommodate seven types of spatial data including Point, Multi-Point, Polygon, Multi-Polygon, Line String, Multi-Line String, GeometryCollection, and Circle. Moreover, spatial objects that have different shapes can co-exist in the same Spatial RDD because Sedona adopts a flexible design which generalizes the geometrical computation interfaces of different spatial objects.

Spatial RDD built-in geometrical library: It is quite common that spatial data scientists need to exploit some geometrical attributes of spatial objects in Apache Sedona, such as perimeter, area and intersection. Spatial RDD equips a built-in geometrical library to perform geometrical operations at scale so the users will not be involved into sophisticated computational geometry problems. Currently, the system provides over 20 different functions in this library and put them in two separate categories

Regular geometry functions are applied to every single spatial object in a Spatial RDD. For every object, it generates a corresponding result such as perimeter or area. The output must be either a regular RDD or Spatial RDD.

Geometry aggregation functions are applied to a Spatial RDD for producing an aggregate value. It only generates a single value or spatial object for the entire Spatial RDD. For example, the system can compute the bounding box or polygonal union of the entire Spatial RDD.

Run queries using RDD API

Here, we outline the steps to create Spatial RDDs and run spatial queries using GeoSpark RDD APIs. The example code is written in Scala but also works for Java.

Setup Dependencies: Before starting to use Apache Sedona (i.e., GeoSpark), users must add the corresponding package to their projects as a dependency. For the ease of managing dependencies, the binary packages of GeoSpark are hosted on the Maven Central Repository which includes all JVM based packages from the entire world. As long as the projects are managed by popular project management tools such as Apache Maven and sbt, users can easily add Apache Sedona by adding the artifact id in the project specification file such as POM.xml and build.sbt.

Initialize Spark Context: Any RDD in Spark or Apache Sedona must be created by SparkContext. Therefore, the first task in a GeoSpark application is to initiate a SparkContext. The code snippet below gives an example. In order to use custom spatial object and index serializer, users must enable them in the SparkContext.

val conf = new SparkConf()conf.setAppName(“GeoSparkExample”)// Enable GeoSpark custom Kryo serializer
conf.set(“spark.serializer”, classOf[KryoSerializer].getName)
conf.set(“spark.kryo.registrator”, classOf[GeoSparkKryoRegistrator].getName)val sc = new SparkContext(conf)

Create a Spatial RDD: Spatial objects in a SpatialRDD is not typed to a certain geometry type and open to more scenarios. It allows an input data file which contains mixed types of geometries. For instance, a WKT file might include three types of spatial objects, such as LineString, Polygon and MultiPolygon. Currently, the system can load data in many different data formats. This is done by a set of file readers such as WktReader and GeoJsonReader. For example, users can call ShapefileReader to read ESRI Shapefiles.

val spatialRDD = ShapefileReader.readToGeometryRDD(sc, filePath)

Transform the coordinate reference system: Apache Sedona doesn’t control the coordinate unit (i.e., degree-based or meter-based) of objects in a Spatial RDD. When calculating the distance between two coordinates, GeoSpark simply computes the euclidean distance. In practice, if users want to obtain the accurate geospatial distance, they need to transform coordinates from the degree-based coordinate reference system (CRS), i.e., WGS84, to a planar coordinate reference system (i.e., EPSG: 3857). GeoSpark provides this function to the users such that they can perform this transformation to every object in a Spatial RDD and scale out the workload using a cluster.

// epsg:4326: is WGS84, the most common degree-based CRSval sourceCrsCode = “epsg:4326" // epsg:3857: The most common meter-based CRSval targetCrsCode = “epsg:3857"objectRDD.CRSTransform(sourceCrsCode, targetCrsCode)

Build a spatial index: Users can call APIs to build a distributed spatial index on the Spatial RDD. Currently, the system provides two types of spatial indexes, Quad-Tree and R-Tree, as the local index on each partition. The code of this step is as follows:

spatialRDD.buildIndex(IndexType.QUADTREE, false) // Set to true only if the index will be used join query

Write a spatial range query: A spatial range query returns all spatial objects that lie within a geographical region. For example, a range query may find all parks in the Phoenix metropolitan area or return all restaurants within one mile of the user’s current location. In terms of the format, a spatial range query takes a set of spatial objects and a polygonal query window as input and returns all the spatial objects which lie in the query area. A spatial range query takes as input a range query window and a Spatial RDD and returns all geometries that intersect/are fully covered by the query window. Assume the user has a Spatial RDD. He or she can use the following code to issue a spatial range query on this Spatial RDD. The output format of the spatial range query is another Spatial RDD.

val rangeQueryWindow = new Envelope(-90.01, -80.01, 30.01, 40.01) /*If true, return gemeotries intersect or are fully covered by the window; If false, only return the latter. */val considerIntersect = false// If true, it will leverage the distributed spatial index to speed up the query executionval usingIndex = falsevar queryResult = RangeQuery.SpatialRangeQuery(spatialRDD, rangeQueryWindow, considerIntersect, usingIndex)

Write a spatial K Nearnest Neighbor query: takes as input a K, a query point and a Spatial RDD and finds the K geometries in the RDD which are the closest to the query point. If the user has a Spatial RDD, he or she then can perform the query as follows. The output format of the spatial KNN query is a list which contains K spatial objects.

val geometryFactory = new GeometryFactory()val pointObject = geometryFactory.createPoint(new Coordinate(-84.01, 34.01)) // query pointval K = 1000 // K Nearest Neighborsval usingIndex = falseval result = KNNQuery.SpatialKnnQuery(objectRDD, pointObject, K, usingIndex)

Write a spatial join query: Spatial join queries are queries that combine two datasets or more with a spatial predicate, such as distance and containment relations. There are also some real scenarios in life: tell me all the parks which have lakes and tell me all of the gas stations which have grocery stores within 500 feet. Spatial join query needs two sets of spatial objects as inputs. It finds a subset from the cross product of these two datasets such that every record satisfies the given spatial predicate. In Sedona, a spatial join query takes as input two Spatial RDDs A and B. For each object in A, finds the objects (from B) covered/intersected by it. A and B can be any geometry type and are not necessary to have the same geometry type. Spatial RDD spatial partitioning can significantly speed up the join query. Three spatial partitioning methods are available: KDB-Tree, Quad-Tree and R-Tree. Two Spatial RDDs must be partitioned by the same spatial partitioning grid file. In other words, If the user first partitions Spatial RDD A, then he or she must use the data partitioner of A to partition B. The example code is as follows:

// Perform the spatial partitioningobjectRDD.spatialPartitioning(joinQueryPartitioningType)queryWindowRDD.spatialPartitioning(objectRDD.getPartitioner)// Build the spatial indexval usingIndex = truequeryWindowRDD.buildIndex(IndexType.QUADTREE, true) // Set to true only if the index will be used join queryval result = JoinQuery.SpatialJoinQueryFlat(objectRDD, queryWindowRDD, usingIndex, considerBoundaryIntersection)

Run spatial queries using SQL APIs

Here, we outline the steps to manage spatial data using the Spatial SQL interface of GeoSpark. The SQL interface follows SQL/MM Part3 Spatial SQL Standard. In particular, GeoSpark put the available Spatial SQL functions into three categories: (1) Constructors: create a geometry type column (2) Predicates: evaluate whether a spatial condition is true or false. Predicates are usually used in WHERE clauses, HAVING clauses and so on (3) Geometrical functions: perform a specific geometrical operation on the given inputs. These functions can produce geometries or numerical values such as area or perimeter.

In order to use the system, users need to add GeoSpark as the dependency of their projects, as mentioned in the previous section.

Initiate SparkSession: Any SQL query in Spark or Sedona must be issued by SparkSession, which is the central scheduler of a cluster. To initiate a SparkSession, the user should use the code as follows:

var sparkSession = SparkSession.builder().appName(“GeoSparkExample”)// Enable GeoSpark custom Kryo serializer.config(“spark.serializer”, classOf[KryoSerializer].getName).config(“spark.kryo.registrator”, classOf[GeoSparkKryoRegistrator].getName).getOrCreate()

Register SQL functions: GeoSpark adds new SQL API functions and optimization strategies to the catalyst optimizer of Spark. In order to enable these functionalities, the users need to explicitly register GeoSpark to the Spark Session using the code as follows.

GeoSparkSQLRegistrator.registerAll(sparkSession)

Create a geometry type column: Apache Spark offers a couple of format parsers to load data from disk to a Spark DataFrame (a structured RDD). After obtaining a DataFrame, users who want to run Spatial SQL queries will have to first create a geometry type column on this DataFrame because every attribute must have a type in a relational data system. This can be done via some constructors functions such as ST\_GeomFromWKT. After this step, the users will obtain a Spatial DataFrame. The following example shows the usage of this function.

SELECT ST_GeomFromWKT(wkt_text) AS geom_col, name, address
FROM input

Transform the coordinate reference system: Similar to the RDD APIs, the Spatial SQL APIs also provide a function, namely ST_Transform, to transform the coordinate reference system of spatial objects. It works as follows:

SELECT ST_Transform(geom_col, “epsg:4326", “epsg:3857") AS geom_col
FROM spatial_data_frame

Write a spatial range query: GeoSpark Spatial SQL APIs have a set of predicates which evaluate whether a spatial condition is true or false. ST\_Contains is a classical function that takes as input two objects A and returns true if A contains B. In a given SQL query, if A is a single spatial object and B is a column, this becomes a spatial range query in GeoSpark (see the code below).

SELECT *
FROM spatial_data_frame
WHERE ST_Contains (ST_Envelope(1.0,10.0,100.0,110.0), geom_col)

Write a spatial KNN query: To perform a spatial KNN query using the SQL APIs, the user needs to first compute the distance between the query point and other spatial objects, rank the distances in an ascending order and take the top K objects. The following code finds the 5 nearest neighbors of Point(1, 1).

SELECT name, ST_Distance(ST_Point(1.0, 1.0), geom_col) AS distance
FROM spatial_data_frame
ORDER BY distance ASC
LIMIT 5

Write a spatial join query: A spatial join query in Spatial SQL also uses the aforementioned spatial predicates which evaluate spatial conditions. However, to trigger a join query, the inputs of a spatial predicate must involve at least two geometry type columns which can be from two different DataFrames or the same DataFrame. The following query involves two Spatial DataFrames, one polygon column and one point column. It finds every possible pair of $<$polygon, point$>$ such that the polygon contains the point.

SELECT *
FROM spatial_data_frame1 df1, spatial_data_frame2 df2
WHERE ST_Contains(df1.polygon_col, df2.point_col)

Perform geometrical operations: GeoSpark provides over 15 SQL functions. for geometrical computation. Users can easily call these functions in their Spatial SQL query and GeoSpark will run the query in parallel. For instance, a very simple query to get the area of every spatial object is as follows:

SELECT ST_Area(geom_col)
FROM spatial_data_frame

Aggregate functions for spatial objects are also available in the system. They usually take as input all spatial objects in the DataFrame and yield a single value. For example, the code below computes the union of all polygons in the Data Frame.

SELECT ST_Union_Aggr(geom_col)
FROM spatial_data_frame

Interact with GeoSpark via Zeppelin notebook

Although Spark bundles interactive Scala and SQL shells in every release, these shells are not user-friendly and not possible to do complex analysis and charts. Data scientists tend to run programs and draw charts interactively using a graphic interface. Starting from 1.2.0, GeoSpark (Apache Sedona) provides a Helium plugin tailored for Apache Zeppelin web-based notebook. Users can perform spatial analytics on Zeppelin web notebook and Zeppelin will send the tasks to the underlying Spark cluster.

Users can create a new paragraph on a Zeppelin notebook and write code in Scala, Python or SQL to interact with GeoSpark. Moreover, users can click different options available on the interface and ask GeoSpark to render different charts such as bar, line and pie over the query results. For example, Zeppelin can visualize the result of the following query as a bar chart and show that the number of landmarks in every US county.

SELECT C.name, count(*)
FROM US_county C, US_landmark L
WHERE ST_Contains(C.geom_col, L.geom_col)
GROUPBY C.name

Another example is to find the area of each US county and visualize it on a bar chart. The corresponding query is as follows. This actually leverages the geometrical functions offered in GeoSpark.

SELECT C.name, ST_Area(C.geom_col) AS area
FROM US_county C

How Apache Sedona can handle the Spatial Data Deluge

Spatial Data Partitioning

Moreover, Spatial RDDs equip distributed spatial indices and distributed spatial partitioning to speed up spatial queries. The adopted data partitioning method is tailored to spatial data processing in a cluster. Data in Spatial RDDs are partitioned according to the spatial data distribution and nearby spatial objects are very likely to be put into the same partition. The effect of spatial partitioning is two-fold: (1) when running spatial queries that target at particular spatial regions, GeoSpark can speed up queries by avoiding the unnecessary computation on partitions that are not spatially close. (2) it can chop a Spatial RDD to a number of data partitions which have similar number of records per partition. This way, the system can ensure the load balance and avoid stragglers when performing computation in the cluster.

Spatial Indexes

Sedona employs a distributed spatial index to index Spatial RDDs in the cluster. This distributed index consists of two parts (1) global index: is stored on the master machine and generated during the spatial partitioning phase. It indexes the bounding box of partitions in Spatial RDDs. The purpose of having such a global index is to prune partitions that are guaranteed to have no qualified spatial objects. (2) local index: is built on each partition of a Spatial RDD. Since each local index only works on the data in its own partition, it can have a small index size. Given a spatial query, the local indices in the Spatial RDD can speed up queries in parallel.

Spatial RDD customized serializer

Sedona provides a customized serializer for spatial objects and spatial indexes. The proposed serializer can serialize spatial objects and indices into compressed byte arrays. This serializer is faster than the widely used kryo serializer and has a smaller memory footprint when running complex spatial operations, e.g., spatial join query. When converting spatial objects to a byte array, the serializer follows the encoding and decoding specification of Shapefile.

The serializer can also serialize and deserialize local spatial indices, such as Quad-Tree and R-Tree. For serialization, it uses the Depth-First Search (DFS) to traverse each tree node following the pre-order strategy (first write current node information then write its children nodes). For de-serialization, it will follow the same strategy used in the serialization phase. The de-serialization is also a recursive procedure. When serialize or de-serialize every tree node, the index serializer will call the spatial object serializer to deal with individual spatial objects.

Conclusion

In Conclusion, Apache Sedona provides an easy to use interface for data scientists to process geospatial data at scale. Currently, the system supports SQL, Python, R, and Scala as well as so many spatial data formats, e.g., ShapeFiles, ESRI, GeoJSON, NASA formats. Here is a link to the GitHub repository:

GeoSpark has a small active community of developers from both industry and academia. You can also try more coding examples here:

If you have more questions please feel free to message me on Twitter

--

--

Mo is the founder of Wherobots.ai, CS Prof at Arizona State University, & the creator of Apache Sedona (a scalable system for processing big geospatial data)