Aggregating data for graphs, analysis, portfolios, or even machine learning can be an arduous task and difficult to scale. In this article, I will go over MongoDB’s new(ish) $merge pipeline that I feel resolves a lot of these scaling issues and automates certain design practices that previously took a lot of custom development to accomplish, however, Mongo’s documentation fails to provide extrapolated examples or multiple use cases. This article will be diving heavily into MongoDB’s Aggregation operation. It will assume you already have knowledge of how to aggregate data and will be focused primarily on the $merge pipeline which covers scalability, caching and data growth.
Table of Contents
Basic Usage
Incrementing New Subsets of Data
Incrementing or Replacing a Field based off a conditional
Aggregating Data from Multiple Collections
Creating a Basic Graph or Machine Learning Data Set
Let’s create a simple example with some mock data. In this example we will aggregate generic posts and determine how many posts each profile has, then we will aggregate comments. If you are using the code snippets to follow this article, you will want to create a few data points following the style below. However this solution would easily scale for a Database with a large amount of data points
db.posts.insert({profileId: "1", title: "title", body: "body", createdAt: new Date()})
db.comments.insert({profileId: "1", body: "body", createdAt: new Date()})
Next we will aggregate the data with a simple grouping
db.posts.aggregate([{
"$group": {
"_id": "$profileId",
"totalPostCount": {"$sum": 1}
}}])
This will give us an array of documents looking something like this
[
{_id: "1", totalPostCount: 5},
{_id: "2", totalPostCount: 4},
{_id: "3", totalPostCount: 9}
]
To avoid having to run a collection scan regularly – an operation that scans the entire collection rather than a subset of a collection, we might store this information in the profile and update it occasionally with a cronjob, or perhaps we will cache the contents somewhere and rerun the entire aggregation to resync the counts. The problem becomes more evident when there are millions of profiles and millions of posts. Suddenly an aggregation will take a considerable amount of computing resources and time driving up costs and server load. This becomes even worse if we are showing some sort of portfolio view, or some scenario where the end user is physically waiting for these counts and they need to be 100% up-to-date, and even worse that many users may be making this request at the same time, overloading our servers and database and crashing our app.
On Demand Materialized Views
This introduces the need for Mongodb‘s On Demand Materialized Views.
In Computer Science, A Materialized View is the result of a previously run data query stored separate from the original dataset. In this case, it describes the $merge operation and how it outputs the results directly into another collection rather than into a cursor to immediately return to the application. Mongo’s documentation page describes that the content is updated each time it is run – i.e On Demand. However, it fails to properly explain how to show incremental representations of data when scanning smaller, more recent subsets of data. In this rest of this article, I will show several examples and use cases on exactly how to do that
This approach adds the $merge pipeline to the end of an aggregation operation. It can output the contents of an aggregation operation into a specific collection created for this purpose, and either replace or merge with a document that it matches with. The contents will be outputted as 1 document per element in the returned array, allowing for further aggregation and calculations to be done on the new aggregated collection. This is a huge upgrade from the previous $out pipeline operator which would overwrite all of the matching entries. $merge adds data that didn’t exist before and replaces data that already exists. The link shows a very clear example of that behavior.
Basic Usage
db.posts.aggregate([
{
"$group": {
"_id": "$profileId",
"totalPostCount": {"$sum": 1}
}
},
{
"$merge": {
"into": "metricAggregates"
}
}])
Now we can collect the data with a normal query of the metricAggregates collection
db.metricAggregates.find()
->
[{_id: "1", totalPostCount: 5},
{_id: "2", totalPostCount: 4},
{_id: "3", totalPostCount: 9}]
The example fails to cover more complicated use-cases. It covers only the addition of new profiles, but in our example, what about new posts for existing profiles? How can we avoid re-scanning previously aggregated data. With millions of profiles and millions of posts, we cannot afford such a heavy operation.
Incrementing New Subsets of Data
When dealing with millions of documents, we need to find a way to only aggregate the most recent data, and only the data we have not aggregated already. We don’t want to replace the fields that already exist, we want to increment them. The solution hides in the documentation of $merge on the bottom of the optional "whenMatched" field
An aggregation pipeline to update the document in the collection. [ , … ]
The pipeline can only consist of the following stages: $addFields and its alias $set $project and its alias $unset $replaceRoot and its alias $replaceWith
By applying the whenMatched option, we can apply a $project pipeline operator allowing us to increment fields
db.posts.aggregate([
{ "$match": {"createdAt": {"$gt": aggregationLastUpdatedAt}}},
{
"$group": {
"_id": "$profileId",
"totalPostCount": {"$sum": 1}
}
},
{
"$merge": {
"into": "metricAggregates",
"whenMatched": [{
"$project": {
"_id": "$_id",
updatedAt: new Date,
totalPostCount: {
"$sum": ["$totalPostCount", "$$new.totalPostCount"]
}
}
}]
}
}])
There are two things we added to this operation. The first is the $match. Now we need to query the most recent updatedAt field and withdraw it from the collection. After we can include it in the match so we only pull the posts that were created since the last time we called the operation. Down in the $merge pipeline, we add a $project operation, so every time there is a match on the _id field, the updatedAt will be refreshed and the totalCount will be incremented instead of replaced. The syntax $$new is a keyword that relates to the data from the aggregation operation we just performed
The Data only ever needs to be looked at once, and only in bite sized increments
Incrementing or Replacing a Field based off of a conditional
But what if it is more complicated? What if we need to also show counts for the posts made this week? Where we need to conditionally increment or replace the field depending on a timestamp or some other information
let aggregationLastUpdatedAt = //retrieve most recent timestamp in the metricAggregates collection
let startOfWeekDate = //new Date representing the beginning of the week
db.posts.aggregate([
{
"$match": {"createdAt": {"$gt": aggregationLastUpdatedAt}}
},
{
"$group": {
"_id": "$profileId",
"totalPostCount": {"$sum": 1},
"postsThisWeek": {"$sum": {
"$cond": {
"if": {"$gte": ["$createdAt", startOfWeekDate]},
"then": 1, "else": 0}}},
}
},
{
"$merge": {
"into": "metricAggregates",
"whenMatched": [{
"$project": {
"_id": "$_id",
"updatedAt": new Date,
"weekStartedAt": startOfWeekDate,
"totalPostCount": {
"$sum": ["$totalPostCount", "$$new.totalPostCount"]
},
"postsThisWeek": {
"$cond": {
"if": {"$eq": ["$weekStartedAt", startOfWeekDate]},
"then": {
"$sum": ["$postsThisWeek", "$$new.postsThisWeek"]
},
"else": "$$new.postsThisWeek"
}
}
}
}]
}
}
])
Now we conditionally increment postsThisWeek if it matches the weekStartedAt date, or replace it if it does not
Aggregating Data from Multiple Collections
What if we have other collections we need to aggregate data from? Previously we might have to use a $lookUp operator, but $lookUp fails in that it only matches with the base collection. For example, what if we need to gather metrics from our comments collection? A $lookup would skip all of the profiles that have never made a post, causing profiles that only made comments to be completely missing from the aggregated results. $merge easily resolves this by allowing us to aggregate on different collections at different times, places or services, and all output to the same collection and document
db.comments.aggregate([
{
"$match": {"createdAt": {"$gt": commentsAggregationLastUpdatedAt}}
},
{
"$group": {
"_id": "$profileId",
"totalComments": {"$sum": 1},
"commentsThisWeek": {
"$sum": {"$cond": {
"if": {"$gte": ["$createdAt", startOfWeekDate]},
"then": 1, "else": 0}}},
}
},
{
"$project": {
"_id": "$_id",
"totalComments": 1,
"commentsThisWeek": 1,
"weekStartedAt": startOfWeekDate,
"postsThisWeek": {"$literal": 0}, // explained below
}
},
{
"$merge": {
"into": "metricAggregates",
"whenMatched": [{
"$project": {
"_id": "$_id",
"commentsUpdatedAt": new Date(),
"weekStartedAt": startOfWeekDate,
"totalComments": {
"$sum": ["$totalComments", "$$new.totalComments"]
},
"commentsThisWeek": {"$cond": {
"if": {"$eq": ["$weekStartedAt", startOfWeekDate]},
"then": {
"$sum": ["$commentsThisWeek", "$$new.commentsThisWeek"]
},
"else": "$$new.commentsThisWeek"
}},
//explained below
"postsThisWeek": {"$cond": {
"if": {"$eq": ["$weekStartedAt", startOfWeekDate]},
"then": {"$sum": ["$thisWeek", "$$new.thisWeek"]},
"else": "$$new.thisWeek"
}},
}
}]
}
}])
Now in the comments collection, we quickly follow the same aggregation principle, and the collection will automatically be merged in the exact way we want. You may have noticed an extra $project operation as well as a the postsThisWeek field still in the $merge pipeline. The reason for this is because if the comments aggregation operation occurs in a new week, the totalComments will accurately be reset, and the weekStartedDate correctly updated. However if the post aggregation occurs later, the start of week replacement will not fire as the weekStartedAt will already be matched, causing the post fields to erroneously be incremented when they should be reset. By including those fields and setting the field to {$literal 0} – $literal sets that field to the literal integer value of 0 rather than being interpreted as an exclusion. The code translates to "If it is a new week, set the field to 0 otherwise increment it by 0"
Notice we also set a unique date field in the $merge. We need to separate when the comments were last aggregated and the posts, otherwise there will be a potential for missing data
When the end user requests the data, they simply pull it from the output collection like any normal mongoDB operation. It can be easily sorted, or paginated and filtered as well as indexed, even though the data is an aggregate of multiple aggregation queries and collections.
This approach guarantees that even for complicated calculations, we only need to scan the data a single time, and only in bite size pieces. The data can be additionally aggregated every time a page is viewed, or it can be managed by a cronjob. It can span any number of collections without the need for $lookup, and the complexity can be increased depending on the use case.
Finally, the new output collection can also be aggregated to come up with different interesting metrics which could greatly aid various machine learning applications or portfolio views.
Creating a Basic Graph or Machine Learning Data Set
As a final example, I will include an aggregation operation that sorts the total counts by week, this would be useful to create a visual graph or for a machine learning data set
db.posts.aggregate([
{"$match": {"createdAt": {"$gt": aggregationLastUpdatedAt}}},
{
"$project": {
"createdAt": 1,
"week": {"$trunc":
{"$divide": [
{"$divide": [{"$subtract": ["$createdAt", startDate]}, 86400000]},
7
]}
}
}
},
{
"$group": {
"_id": "$week",
"week": {"$first": "$week"},
"totalPostCount": {"$sum": 1}
}
},
{
"$merge": {
"into": "metricsByWeek",
"on": ["week"], // this requires a unique index on the metricsByWeek collection
"whenMatched": [{
"$project": {
"week": 1,
"updatedAt": new Date,
"totalPostCount": {
"$sum": ["$totalPostCount", "$$new.totalPostCount"]
}
}
}]
}
}])
If you are following the code examples live, you will need to copy and paste the following code snippet before running the above code
db.metricsByWeek.createIndex({week:1}, {unique:true})
This is because when you customize which fields the $merge operator is looking for as a match, the field (or combination of fields) must have a unique index so that it is guaranteed that mongo will only find a single match
This now creates a collection with documents like this which can be plugged into a graphing library or any other application
{
week: 0,
totalCount: 3
}
{
week: 1,
totalCount: 9,
}
{
week:2,
totalCount: 25
}