Time Series Data

This document describes how to create collections over event data and setup time-based retention to automatically delete old events from the collection.

#Event Data

Event data represents a sequence of events, each paired with a time field. The time field represents when the event occurred. Web search logs, web site clicks, stock prices, weather data, security logs, and so on are all applicable event data set examples.

Sample weather data event (source):

dt represents the time of data collection in UTC

{
  "coord": {
    "lon": 139,
    "lat": 35
  },
  "sys": {
    "country": "JP",
    "sunrise": 1369769524,
    "sunset": 1369821049
  },
  "weather": [
    {
      "id": 804,
      "main": "clouds",
      "description": "overcast clouds",
      "icon": "04n"
    }
  ],
  "main": {
    "temp": 289.5,
    "humidity": 89,
    "pressure": 1013,
    "temp_min": 287.04,
    "temp_max": 292.04
  },
  "wind": {
    "speed": 7.31,
    "deg": 187.002
  },
  "rain": {
    "3h": 0
  },
  "clouds": {
    "all": 92
  },
  "dt": 1369824698,
  "id": 1851632,
  "name": "Shuzenji",
  "cod": 200
}

Such data sets are used extensively for statistics, monitoring, finance, forecasting, tracking application events and in any field that involve a time component. Refer here to learn more about event data and building applications around it.

Rockset allows you to create collections over event data, run queries efficiently and join event data with other data sets using standard SQL.

#Retention

Event data sets tend to grow very quickly and users may want to set a time-based retention (eg. past 24 hours, past 7 days, past 30 days etc.) to automatically purge older events. Rockset allows efficient deletion of old events by setting a retention period while creating a collection.

#Creating Event Data Collection

#Prerequisites

  1. Setup Rockset Python SDK or Rockset CLI.
  2. Integrate with Amazon S3 and copy the event data into a S3 bucket. You can also load streaming data into S3 using a service like Amazon Kinesis Data Firehose.

Alternatively, you can use REST API to write events to Rockset.

Example collection:

This collection represents two taxi ride events. pickup_time field represents when the ride started.

[
  {
    "ride_id": "ride1",
    "driver_id": 1,
    "pickup_time": 1534406486170,
    "drop_time": 1534406487540,
    "pickup": {
      "latitude": 37.22,
      "longitude": 122.77
    },
    "destination": {
      "latitude": 40.88,
      "longitude": 123.78
    },
    "cost": 20,
    "payment_id": 1
  },
  {
    "ride_id": "ride2",
    "driver_id": 2,
    "pickup_time": 1534406486150,
    "drop_time": 1534406488440,
    "pickup": {
      "latitude": 37.22,
      "longitude": 122.77
    },
    "destination": {
      "latitude": 39.27,
      "longitude": 121.78
    },
    "cost": 30,
    "payment_id": 2
  }
]

We will map pickup_time to the _event_time field in Rockset by defining field mappings using the CLI and Python SDK below. We will also set retention for this collection as 10 days.

Note that queries with WHERE _event_time > are typically faster because of the way the data is stored in Rockset's underlying system, where the data is already sorted by _event_time in descending order so you avoid intermediate sorting steps.

#Using CLI

To map pickup_time to the _event_time field using CLI, we will need to define the field mapping in a YAML file specified at collection creation time:

name: tax_rides
field_mappings:
  - name: transformation1
    input_fields:
      - field_name: 'pickup_time'
        if_missing: 'PASS'
        is_drop: true
        param: 'pickup_time'
    output_field:
      field_name: '_event_time'
      value:
        sql: 'CAST(:pickup_time AS TIMESTAMP)'
      on_error: 'SKIP'
# one day
retention_secs: 86400

Then, we will create the collection based on the YAML specification using this command:

$ rockset api:collections:createCollection commons --body mappings.yaml

#Using Python

from rockset import Client
rs = Client()

field_mappings = [
  rs.FieldMapping.mapping(
    name="transformation1",
    input_fields=[
      rs.FieldMapping.input_field(
        field_name="pickup_time",
          if_missing="PASS",
          is_drop=True,
          param="pickup_time"
      )
    ],
    output_field=rs.FieldMapping.output_field(
        field_name="_event_time",
        sql_expression="CAST(:pickup_time AS TIMESTAMP)",
        on_error="SKIP"
    )
  )
]

first_event_collection = rs.Collection.create(
                        "taxi_rides",
                        field_mappings=field_mappings,
                        retention_secs=10*24*60*60)

Refer here for supported time formats and time zones.

Add document:

$ cat data.yaml
data:
  - ride_id: ride2
$ rockset api:documents:addDocuments commons taxi_rides --body data.yaml

Query:

  1. Select all rides since 1534406486170
SELECT
    ride_id
FROM
    taxi_rides
WHERE
    _event_time >= TIMESTAMP_MICROS(1534406486170000);
+-----------+
| ride_id   |
|-----------|
| ride2     |
+-----------+
  1. Join on another collection that contains payment id to description mapping to count number of rides of each payment type since 1534406486150
SELECT
    a.type,
    COUNT(b.driver_id) num \
FROM
    taxi_rides b
    JOIN payment_types a ON a.id = b.payment_id \
where
    b._event_time >= TIMESTAMP_MICROS(1534406486150000) \
GROUP BY
    a.type
ORDER BY
    num;
+-------+--------+
| num   | type   |
|-------+--------|
| 1     | cash   |
| 1     | credit |
+-------+--------+
Join us on Slack!
Building on Rockset? Come chat with us!