Real-Time Aggregations

Real-time aggregations, or Rollups are a class of Ingest Transformations that enable you to aggregate data as it is ingested, combining multiple documents into one.

As new data comes in, Rockset will transform and aggregate it before storing it in your rollup Collection. For time-series data, even out-of-order data arrivals that come in after the fact will be properly aggregated automatically.

Using rollups provides two main benefits:

  • Reduced storage size because only the aggregated data is stored and indexed.
  • Improved query performance because the metrics you care about have been precomputed at ingestion time.

To use rollups, all you need is a GROUP BY clause in your ingest transformation SQL. See the examples below for sample usage.

Rollups are currently available for the following streaming data sources:

  • Apache Kafka
  • Confluent Cloud
  • Confluent Platform (self-managed)
  • Amazon Kinesis
  • Amazon MSK
  • Azure Service Bus
  • Azure Event Hubs

Rollups are also available for the following Data Lakes and Warehouses:

  • Amazon S3
  • Google Cloud Storage
  • Azure Blob Storage

Real-time Rollups

Rollups in Rockset are unique and preserve the real-time nature of your data analytics.

Traditionally, rollups are implemented with ahead-of-time aggregation, which means you don’t have access to your data until the end of the aggregation interval. In this approach, aggregation groups are considered immutable, so all data for an aggregation interval must be present for the aggregation to happen.

In contrast, rollups in Rockset allow updates to aggregation groups, so your data remains real-time. You can query your Rockset rollup collection and get up-to-date results as incoming data continues to be aggregated. In addition, because updates to aggregation groups are allowed, out-of-order arrivals "just work" in Rockset.

Rollups Examples

Example:

SELECT
    DATE_TRUNC('HOUR', PARSE_TIMESTAMP_ISO8601(ts)) AS _event_time,
    location,
    SUM(cat_count) AS cat_count,
    MAX(cat_count + dog_count) AS peak_animals_seen,
    SQRT(COUNT(*) + AVG(cat_count)) * APPROX_DISTINCT(dog_count) AS my_crazy_metric
FROM
    _input
GROUP BY
    _event_time, location
HAVING SUM(cat_count) > 5

Imagine the data source is a network of cameras that detect cats and dogs. Every time a cat and/or dog is found, an event is recorded in the form: {"ts": "2021-04-20T16:20:42+0000", "location": "San Francisco", "cat_count": 1, "dog_count": 2}. There is a requirement to count the total number of cats per location, per hour. There is also a requirement to track the locations and times with the highest numbers of animals seen (perhaps to detect if cameras need faster processors that can count more animals). In addition, the my_crazy_metric expression is included here as an example to show just how complex a rollup query expression can get. The HAVING clause keeps the aggregated rows where the total number of cats seen is greater than five.

Example:

SELECT
DATE_TRUNC('DAY', PARSE_TIMESTAMP('%Y/%m/%d %H:%M', datetimeStr)) AS _event_time,
region,
SUM(purchasePrice) AS revenue,
AVG(purchasePrice) AS avgPrice,
MAX(purchasePrice) AS largestSale,
SUM(CAST(purchasePrice > 3 AS int)) AS largeSales,
SUM(purchasePrice * 2) + 1 AS allTheTransforms,
COUNT(*) AS counts
FROM _input
GROUP BY _event_time, region

In this case, there is raw data for a business’s sales in different regions. Each time a purchase is made, the timestamp, the location of the sale, and the price of the item purchased are recorded. To best understand the business’s financial health, a data-driven approach is taken to aggregate the purchase price data in several different ways. The GROUP BY clause groups data by date and region.

Querying Rollups

When querying rollup collections, you can leverage a special REGROUP syntax. REGROUP allows values from rollup collections with fine-grained aggregations to be reaggregated (regrouped) at a coarser granularity. For example, you can compute per-day averages from a collection that uses per-hour aggregation in the ingest transformation; or you can compute per-region averages from a collection that aggregates per-(region, day) in the ingest transformation.

For example, suppose we define a rollup collection called commons.sales_data using the rollup query from previous example section which contains GROUP BY _event_time, region. If we want to compute the average price of all items on a per-region granularity, we can use this query to do so:

SELECT region, AVG(REGROUP avgPrice) regionalAvgPrice
FROM commons.sales_data
GROUP BY region

Note that AVG(REGROUP avgPrice) computes the average price over all items in a given region, while AVG(avgPrice) would compute the average of average prices in the region, which is semantically different.