MongoDB has a powerful system for aggregating data, called the aggregation pipeline. One particularly useful feature is called bucket operations.
Bucket operations take a collection of documents, and based on a property of those documents, and a list of boundaries, groups those documents into buckets. Each bucket is then transformed into an output document. One output document per bucket.
Buckets are great for time series analysis. Create buckets that break up the time series by day, or by hour, or whatever unit of time, and then perform aggregation operations on the documents within the bucket. For example, find the average temperature by day, the total number of messages sent by hour, or similar type of time series queries.
We are using MongoDB to store images and associated data in a collection called imageCapture
. Documents in this collection have many fields, but in this post we’ll focus only on aggregation and consider only two fields on the imageCapture
documents:
db.imageCapture.insertMany([
{
date: ISODate("2019-11-25T01:13:00.000Z"),
lbls: ["one", "two", "three"]
},
{
date: ISODate("2019-11-25T04:23:00.000Z"),
lbls: ["four"]
},
{
date: ISODate("2019-11-26T03:21:00.000Z"),
lbls: ["five", "six"]
},
]);
To understand the performance of our system, we want to know how many labels are being detected every day. That is, for every day, we want a sum of the size of the lbls
array. This is perfect for MongoDB’s aggregation and bucket capabilities.
Bucket operations in Mongo
To use a bucket operation, we specify the following:
groupBy
: the field that the boundaries will apply to. This field must be numeric or a date field.boundaries
: an array of boundary points. Documents which have agroupBy
field falling between two elements in the array go into that bucket. The between test here is half-open, so the first point is inclusive, second point is exclusive, which is the behavior developers would expectdefault
: any documents in the pipeline which don’t go into one of the buckets will go into default. This is required. Using a match operation in the pipeline before the bucket operation will remove documents which shouldn’t be processed.output
: an aggregation expression to generate the output document for each bucket
Putting it all together, we create a query as follows:
db.imageCapture.aggregate([
{
$bucket: {
groupBy: "$date",
boundaries: [ISODate("2019-11-25T00:00:00.000Z"), ISODate("2019-11-26T00:00:00.000Z"), ISODate("2019-11-27T00:00:00.000Z"),
ISODate("2019-11-28T00:00:00.000Z")],
default: 'defaultBucket',
output: {
"count": {"$sum": {"$size": "$lbls"}}
}
}
}
]);
We have arbitrarily selected some bucket boundaries. The match operation is skipped, but it would be recommended to put a match operation before the bucket in the pipeline, to reduce the number of documents going into the default bucket.
The output could have more fields, too. We might want another field for the most frequently used label in a bucket, or a list of all the labels used in a bucket, for example.
This query works exactly as expected and gives us a count of the total number of labels in each of these buckets. These buckets are not evenly spaced time periods, so this shows that buckets are quite flexible. Buckets are also often used for numeric data. A query might be used to show average income among people in different ranges of credit ratings, for example.
Buckets in Spring Data
Spring Data’s reference guide doesn’t give detailed examples of using buckets. Buckets are such an important feature of MongoDB, it’s worth presenting how to use them in Java. Of course, we could translate the query into a BasicDBObject
, but that’s equivalent to writing SQL query strings and passing them directly to a driver.
Every field in the output is created from an AggregationExpression
. In this case, we want an AggregationExpression
which is a sum of the size of all the arrays:
final AggregationExpression countingExpression = AccumulatorOperators.Sum.sumOf(ArrayOperators.Size.lengthOfArray("lbls"));
Make a starting point:
final Instant now = LocalDate.now().atStartOfDay(ZoneOffset.UTC).toInstant();
This starts the time at the start of the day in the UTC timzeone. Create the BucketOperation
with the necessary boundaries, the AggregationExpression
, and on the desired field:
final BucketOperation bucketOperation = Aggregation.bucket("date").
withBoundaries(now.minus(10, ChronoUnit.DAYS), now.minus(9, DAYS),
now.minus(8, DAYS), now.minus(7, DAYS), now.minus(6, DAYS),
now.minus(5, DAYS), now.minus(4, DAYS), now.minus(3, DAYS),
now.minus(2, DAYS), now.minus(1, DAYS), now.minus(0, DAYS)).
withDefaultBucket("defaultBucket").
andOutput(countingExpression).as("count");
Now this AggregationOperation can be used in an aggregation pipeline:
final Aggregation aggregation = Aggregation.newAggregation(bucketOperation);
In real use, we would normally want a match operation before the bucket operation in the pipeline. It would be part of the newAggregation
call.
Using the aggregation and getting results is very simple. If we had a class that matched the result format we could use that. Otherwise using an implementation of a Map
is simple:
final AggregationResults<HashMap> ar = mongoOperations.aggregate(aggregation, "imageCapture", HashMap.class);
The result gives access to a List
of HashMap
s, accessible by ar.getMappedResults()
.
Viewing it all together:
final AggregationExpression countingExpression = AccumulatorOperators.Sum.sumOf(ArrayOperators.Size.lengthOfArray("lbls"));
final Instant now = LocalDate.now().atStartOfDay(ZoneOffset.UTC).toInstant();
final BucketOperation bucketOperation = Aggregation.bucket("date").
withBoundaries(now.minus(10, ChronoUnit.DAYS), now.minus(9, DAYS),
now.minus(8, DAYS), now.minus(7, DAYS), now.minus(6, DAYS),
now.minus(5, DAYS), now.minus(4, DAYS), now.minus(3, DAYS),
now.minus(2, DAYS), now.minus(1, DAYS), now.minus(0, DAYS)).
withDefaultBucket("defaultBucket").
andOutput(countingExpression).as("count");
final Aggregation aggregation = Aggregation.newAggregation(bucketOperation);
final AggregationResults<HashMap> ar = mongoOperations.aggregate(aggregation, "imageCapture", HashMap.class);
LOG.info("And the list: " + ar.getMappedResults());
All these classes use a fluent coding style, and we can use static imports, and express it in a more naturally readable style:
final Instant now = now().atStartOfDay(UTC).toInstant();
final Aggregation aggregation = newAggregation(bucket("date").
withBoundaries(now.minus(10, DAYS), now.minus(9, DAYS),
now.minus(8, DAYS), now.minus(7, DAYS), now.minus(6, DAYS),
now.minus(5, DAYS), now.minus(4, DAYS), now.minus(3, DAYS),
now.minus(2, DAYS), now.minus(1, DAYS), now.minus(0, DAYS)).
withDefaultBucket("defaultBucket").
andOutput(sumOf(lengthOfArray("lbls"))).as("count"));
final AggregationResults<HashMap> ar = mongoOperations.aggregate(aggregation, "imageCapture", HashMap.class);
That’s all that’s needed to perform what would be a very complex query in SQL.
Spring Boot CLI
The code for this project is available on GitHub. The project itself runs as a Spring Boot CLI application. This makes it easy to test and experiment with the project. Download from GitHub, enter the project directory, and run:
mvn spring-boot:run