INSERT INTO s3

🛠️

INSERT INTO s3 is currently in Beta.

Query Structure

INSERT INTO s3_uri
  credentials
  [FORMAT = (format_options, ...) ]
  select_query

INSERT INTO s3_uri exports query results from Rockset and writes them directly to Amazon S3.

  • s3_uri specifies which bucket the data is uploaded to (e.g. 's3://bucket/prefix').
  • credentials define the credentials Rockset uses to write to the S3 bucket. The credentials must include one of the following references:
    • INTEGRATION = 'integration_name' to use the credentials of an existing S3 integration.
    • CREDENTIALS = (AWS_ROLE = 'aws role arn', AWS_EXTERNAL_ID = 'externalid')
  • FORMAT is an optional field that can customize the output:
    • TYPE specifies the desired output format: supported values are 'JSON' and 'PARQUET'
    • INCLUDE_QUERY_ID=true specifies that the query_id is appended to the S3 URI. This option helps ensure that output files for multiple query invocations are seperated
  • select_query must represent a valid SELECT command.

Query Results

Rockset exports the query results to the S3 destination path and encodes the data in JSON format as specified in Data Types. Date, time, and geography data types are encoded in a special format with the __rockset_type wrapping.

Rockset chunks the results and generates multiple files to optimize parallelized consumption downstream. Rockset targets for each file to contain approximately 1,000 documents by default. This is configurable using s3_sync_op_output_chunk_size HINT (see Examples below for more info).

You can also utilize PARTITION BY to split the results such that records with the same field value will be emitted together in files prefixed with the value. The format for this looks like the following:

INSERT INTO s3_uri
  credentials
SELECT bar FROM foo
PARTITION BY bar

There are a couple limitations when using PARTITION BY:

  • Including the ORDER BY clause in the SQL statement in combination with PARTITION BY does not guarantee that the specified order is preserved in the emitted files, as sorting will be done based on the partitioned field.
  • You can only partition by a single field in the query, not an arbitrary number of expressions. The expression for the field may be as complex as you would like, but it must be a single field that is projected by the SELECT statement.

You can monitor the query status to track the status of the export. Once the query executes successfully, the query results will contain:

  • num_docs_inserted: the result count of the SELECT command (equivalent toresult_set_document_count in the Rockset API).
  • num_files_written: the number of files written to S3.

Associated Permissions and Credentials

Users must have the EXPORT_DATA_GLOBAL and LIST_INTEGRATIONS_GLOBAL privileges to run the command. The built-in admin and member roles have these privileges by default. The read-only role does not have the EXPORT_DATA_GLOBAL privilege. If you are creating a custom role, you must add the EXPORT_DATA_GLOBAL and LIST_INTEGRATIONS_GLOBAL privileges through the Create a Role or Update a Role endpoints or through the Console (note that Console support for adding the EXPORT_DATA_GLOBAL privilege will be available after the Public Preview release).

The associated S3 integration and credentials must have the s3:PutObject permission, and you must configure the role to have your Rockset account as a trusted entity. Follow the S3 integration step-by-step guide for creating the role and add the s3:PutObject privilege during the process:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": ["s3:List*", "s3:GetObject", "s3:PutObject"],
      "Resource": [
        "arn:aws:s3:::<your-bucket>",
        "arn:aws:s3:::<your-bucket>/*"]
    }
  ]
}

Other Considerations

  • TIMESTAMP values are encoded as a special type to match behavior with other date-time types for the json format. A timestamp value will be encoded like {"__rockset_type": "timestamp", value":"1513108026000000"} where value is microseconds since unix epoch.
  • The query must execute within 30 minutes, which represents the maximum query execution time limit within Rockset. We recommend using async mode to export large amounts of data. You can also batch your export into multiple smaller exports using the s3syncop_output_chunk_size HINT to mitigate hitting this time limit (see Examples below for more info).
  • Rockset only supports exporting data to S3 buckets residing in the same region as the Rockset collection.

Parquet format

Rockset exports the data in Parquet format when FORMAT = (TYPE = 'PARQUET') is specified; and Rockset types get mapped to the appropriate parquet types:

Rockset TypeParquet Type
intint64
floatdouble
boolboolean
stringstring
bytesBYTE_ARRAY
nullUNKNOWN (or null)
dateDATE
timeTIME (MICROS precision)
timestampTIMESTAMP (MICROS precision)
datetimeTIMESTAMP (MICROS precision) assuming UTC
month_interval(not supported)
microsecond_intervalduration (MICRO precision)
GeographyGeoJSON string
u256Fixed 32-byte binary in little endian
objectstruct
arraylist

Object and array types are exported into Parquet’s nested types (Maps and Lists). The fields schema will be inferred based on data found in collection. The records will be shredded according to Parquet/Dremel object shredding logic.

Parquet field metadata will contain a rockset_type field indicating the actual Rockset type. It can be used to disambiguate geography from normal strings.

Parquet format doesn’t support mixed-type fields and Parquet exports will fail if a field has incompatible values (e.g. int and string). However, Fields that mix int and float will be exported as float.

INSERT INTO s3 Examples

Export an entire collection using the credentials of an S3 Integration with s3:PutObject permissions:

INSERT INTO 's3://analyticsdata/query1'
  INTEGRATION = 's3export'
SELECT * FROM commons.analytics

To export the data in Parquet format and append the query id to output:

INSERT INTO 's3://analyticsdata/query1'
  INTEGRATION = 's3export'
  FORMAT = (TYPE='PARQUET', INCLUDE_QUERY_ID=true)
SELECT * FROM commons.analytics

To export the data, targeting 2,000 documents per file:

INSERT INTO 's3://analyticsdata/query1'
  INTEGRATION = 's3export'
SELECT * FROM commons.analytics
HINT(s3_sync_op_output_chunk_size=2000)

To export the data, partitioning by date extracted from the _event_time field and converting to a string:

INSERT INTO 's3://analyticsdata/query1'
  INTEGRATION = 's3export'
SELECT FORMAT_ISO8601(DATE(_event_time)) as event_date, * FROM commons.analytics
PARTITION BY event_date

To export the data, partitioning by date extracted from the _event_time field prefixed with 'date_' using a UDF:

script {{{
export function func1(p) {
  return 'date_' + p
}
}}}  
INSERT INTO 's3://analyticsdata/query1'
  INTEGRATION = 's3export'
SELECT _script.func1(FORMAT_ISO8601(DATE(_event_time))) as prefixed_date, * FROM commons.analytics
PARTITION BY prefixed_date;

Export the results of an aggregation query to S3 using a hardcoded AWS Role:

INSERT INTO 's3://companyanalysis/cities'
  CREDENTIALS = (
    AWS_ROLE = 'arn:aws:iam::012345678901:role/rocksetexport',
    AWS_EXTERNAL_ID = '79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be'
  )
SELECT
    offices.value.state,
    ARRAY_AGG(offices.value.city) AS cities
FROM
    companies,
    UNNEST(companies.offices AS value) AS offices
GROUP BY
    offices.value.state