Spring Data aggregation operations

MongoDB has a powerful system for aggregating data, called the aggregation pipeline. One particularly useful feature is called bucket operations.

Bucket operations take a collection of documents, and based on a property of those documents, and a list of boundaries, groups those documents into buckets. Each bucket is then transformed into an output document. One output document per bucket.

Buckets are great for time series analysis. Create buckets that break up the time series by day, or by hour, or whatever unit of time, and then perform aggregation operations on the documents within the bucket. For example, find the average temperature by day, the total number of messages sent by hour, or similar type of time series queries.

We are using MongoDB to store images and associated data in a collection called imageCapture. Documents in this collection have many fields, but in this post we’ll focus only on aggregation and consider only two fields on the imageCapture documents:

db.imageCapture.insertMany([
    {
        date: ISODate("2019-11-25T01:13:00.000Z"),
        lbls: ["one", "two", "three"]
    },
    {
        date: ISODate("2019-11-25T04:23:00.000Z"),
        lbls: ["four"]
    },
    {
        date: ISODate("2019-11-26T03:21:00.000Z"),
        lbls: ["five", "six"]
    },
]);

To understand the performance of our system, we want to know how many labels are being detected every day. That is, for every day, we want a sum of the size of the lbls array. This is perfect for MongoDB’s aggregation and bucket capabilities.

Bucket operations in Mongo

To use a bucket operation, we specify the following:

  • groupBy: the field that the boundaries will apply to. This field must be numeric or a date field.
  • boundaries: an array of boundary points. Documents which have a groupBy field falling between two elements in the array go into that bucket. The between test here is half-open, so the first point is inclusive, second point is exclusive, which is the behavior developers would expect
  • default: any documents in the pipeline which don’t go into one of the buckets will go into default. This is required. Using a match operation in the pipeline before the bucket operation will remove documents which shouldn’t be processed.
  • output: an aggregation expression to generate the output document for each bucket

Putting it all together, we create a query as follows:

db.imageCapture.aggregate([
    {
        $bucket: {
            groupBy: "$date",
            boundaries: [ISODate("2019-11-25T00:00:00.000Z"), ISODate("2019-11-26T00:00:00.000Z"), ISODate("2019-11-27T00:00:00.000Z"),
                ISODate("2019-11-28T00:00:00.000Z")],
            default: 'defaultBucket',
            output: {
                "count": {"$sum": {"$size": "$lbls"}}
            }

        }
    }
]);

We have arbitrarily selected some bucket boundaries. The match operation is skipped, but it would be recommended to put a match operation before the bucket in the pipeline, to reduce the number of documents going into the default bucket.

The output could have more fields, too. We might want another field for the most frequently used label in a bucket, or a list of all the labels used in a bucket, for example.

This query works exactly as expected and gives us a count of the total number of labels in each of these buckets. These buckets are not evenly spaced time periods, so this shows that buckets are quite flexible. Buckets are also often used for numeric data. A query might be used to show average income among people in different ranges of credit ratings, for example.

Buckets in Spring Data

Spring Data’s reference guide doesn’t give detailed examples of using buckets. Buckets are such an important feature of MongoDB, it’s worth presenting how to use them in Java. Of course, we could translate the query into a  BasicDBObject, but that’s equivalent to writing SQL query strings and passing them directly to a driver.

Every field in the output is created from an AggregationExpression. In this case, we want an AggregationExpression which is a sum of the size of all the arrays:

final AggregationExpression countingExpression = AccumulatorOperators.Sum.sumOf(ArrayOperators.Size.lengthOfArray("lbls"));

Make a starting point:

        final Instant now = LocalDate.now().atStartOfDay(ZoneOffset.UTC).toInstant();

This starts the time at the start of the day in the UTC timzeone.  Create the BucketOperation with the necessary boundaries, the AggregationExpression, and on the desired field:

final BucketOperation bucketOperation = Aggregation.bucket("date").
                withBoundaries(now.minus(10, ChronoUnit.DAYS), now.minus(9, DAYS),
                        now.minus(8, DAYS), now.minus(7, DAYS), now.minus(6, DAYS),
                        now.minus(5, DAYS), now.minus(4, DAYS), now.minus(3, DAYS),
                        now.minus(2, DAYS), now.minus(1, DAYS), now.minus(0, DAYS)).
                withDefaultBucket("defaultBucket").
                andOutput(countingExpression).as("count");

Now this AggregationOperation can be used in an aggregation pipeline:

final Aggregation aggregation = Aggregation.newAggregation(bucketOperation);

In real use, we would normally want a match operation before the bucket operation in the pipeline. It would be part of the newAggregation call.

Using the aggregation and getting results is very simple. If we had a class that matched the result format we could use that. Otherwise using an implementation of a Map is simple:

final AggregationResults<HashMap> ar = mongoOperations.aggregate(aggregation, "imageCapture", HashMap.class);

The result gives access to a List of HashMaps, accessible by ar.getMappedResults().

Viewing it all together:

        final AggregationExpression countingExpression = AccumulatorOperators.Sum.sumOf(ArrayOperators.Size.lengthOfArray("lbls"));

        final Instant now = LocalDate.now().atStartOfDay(ZoneOffset.UTC).toInstant();

        final BucketOperation bucketOperation = Aggregation.bucket("date").
                withBoundaries(now.minus(10, ChronoUnit.DAYS), now.minus(9, DAYS),
                        now.minus(8, DAYS), now.minus(7, DAYS), now.minus(6, DAYS),
                        now.minus(5, DAYS), now.minus(4, DAYS), now.minus(3, DAYS),
                        now.minus(2, DAYS), now.minus(1, DAYS), now.minus(0, DAYS)).
                withDefaultBucket("defaultBucket").
                andOutput(countingExpression).as("count");
        final Aggregation aggregation = Aggregation.newAggregation(bucketOperation);
        final AggregationResults<HashMap> ar = mongoOperations.aggregate(aggregation, "imageCapture", HashMap.class);
        LOG.info("And the list: " + ar.getMappedResults());

All these classes use a fluent coding style, and we can use static imports, and express it in a more naturally readable style:

        final Instant now = now().atStartOfDay(UTC).toInstant();

        final Aggregation aggregation = newAggregation(bucket("date").
                withBoundaries(now.minus(10, DAYS), now.minus(9, DAYS),
                        now.minus(8, DAYS), now.minus(7, DAYS), now.minus(6, DAYS),
                        now.minus(5, DAYS), now.minus(4, DAYS), now.minus(3, DAYS),
                        now.minus(2, DAYS), now.minus(1, DAYS), now.minus(0, DAYS)).
                withDefaultBucket("defaultBucket").
                andOutput(sumOf(lengthOfArray("lbls"))).as("count"));

        final AggregationResults<HashMap> ar = mongoOperations.aggregate(aggregation, "imageCapture", HashMap.class);

That’s all that’s needed to perform what would be a very complex query in SQL.

Spring Boot CLI

The code for this project is available on GitHub. The project itself runs as a Spring Boot CLI application. This makes it easy to test and experiment with the project. Download from GitHub, enter the project directory, and run:

mvn spring-boot:run