- Performance and Scaling
- Performance Tuning Guide
Performance Tuning Guide
Rockset enables you to run SQL on your semi-structured data and get results in milliseconds without having to worry about data modeling or indexing. To make this possible, Rockset has built most of its database components from the ground up, including the compiler, planner, optimizer, indexing engine, and execution engine.
Rockset's current optimizer uses a set of heuristics to estimate the optimal execution plan, but sometimes these heuristics may not be optimal given your data distribution. In these cases you can leverage optimizer hints to get significant performance improvements. Failing to provide these hints can increase your query latency by orders of magnitude. For example, choosing the wrong order for a series of joins, the wrong index to use for a particular collection, or the wrong join strategy are some of the common missteps.
This document will equip you with the toolkit to avoid the common missteps and write queries that unlock the full performance potential of Rockset's schemaless design.
#Index Optimizations
All data that comes into Rockset is indexed in 3 ways:
#Row Index
In the row index, values for all fields in the same document are stored contiguously. This lets you
efficiently retrieve all fields for a particular document, aka SELECT *
. You cannot force a query
to use this index since it is much slower for anything other than SELECT *
.
#Columnar Index
In the columnar index, all values for the same field across all documents are stored contiguously.
This lets you efficiently retrieve all values for a particular field, i.e. SELECT AVG(a) FROM foo
.
You can force the use of the column index by adding HINT(access_path=column_scan)
immediately
after the collection name. For very large collections that are being scanned, you may also want to
set HINT(data_scan_batch_size=30000)
. This is the batch size for data sent from the leaf to the
aggregator, and the default is 4096. If you have a huge collection where each shard is serving
millions of rows, then using this larger batch size will typically lead to a significant speedup.
#Search Index
In the search index, records are sorted by (field, value), so all values for a field that are equal
are stored contiguously. This lets you efficiently retrieve documents where a field has a particular
value, i.e. SELECT COUNT(*) FROM foo WHERE a = 5
. You can force the use of the search index by
adding HINT(access_path=index_filter)
immediately after the collection name. There is also a
variation of this hint that can be used for range queries in which you want the result sorted by a
particular field. In that case you add HINT(access_path=index_scan, index_scan_sort_field=XYZ)
where you substitute XYZ
as appropriate. This may be helpful if you want the final output sorted
by the field with the filter.
The search index is always used if there is an indexable term in the WHERE
clause of a query. If
the WHERE
clause is not very selective, you will likely achieve better performance by using the
column index in your query. You can force the use of the column index by adding
HINT(access_path=column_scan)
immediately after the collection name.
In addition, the special field _event_time
is optimized differently in our search index. Querying
this field is more performant than querying regular fields. See our _event_time
field
documentation for more details.
To learn more about how our indexing works, check out our blog on converged indexing.
#SQL Join Optimizations
Rockset’s SQL engine supports 4 kinds of joins listed below. Rockset uses the Aggregator Leaf Architecture (ALT): data is stored in leaf nodes and aggregators are used to compute the join.
#Hash Join
This is the default and is usually the best join strategy. With hash join,
A INNER JOIN B ON A.x = B.x
the aggregator will construct a hash set on the values of B.x
(which
come from the leaves) and then stream the values of A (from another set of leaves) and emit those
records for which A.x
is in the hash set. This means that we need to fit the entire collection B
in memory. For equality joins like this one, the work can be distributed across many aggregators.
With N aggregators, we send each B.x
to aggregator number hash(B.x) % N
. Similarly we send the
values of A.x
to the appropriate aggregator. In this way, each hash table constructed by the
aggregators is only sizeOf(B) / N
in size. This number N
is called the aggregator parallelism,
and it is set based on the tier of your Rockset account (free, c4, c12, c36).
Note that the same applies to left outer joins, so A LEFT JOIN B
will still build a hash set on
B
and stream the values of A
. However right outer joins are internally converted to left outer
joins, so A RIGHT JOIN B
is internally rewritten to B LEFT JOIN A
, and in this case a hash set
is built on top of A
and B
is streamed.
Hash join can be forced by adding HINT(join_strategy=hash)
immediately after the ON clause, i.e.
A INNER JOIN B ON A.x = B.x HINT(join_strategy=hash)
. Keep in mind that hash join is the default,
so this hint is a no-op.
#Nested Loop Join
Nested loop join is a slower, less memory-intensive join strategy that closely follows the programming paradigm of a nested loop.
for a in A
for b in B
if a.x == b.x
emit (a, b)
Because of this N^2 behavior, nested loop joins are usually undesirable. However, its saving grace
is that it can handle arbitrarily large collections without worrying about fitting them into memory.
Like with hash join, equality nested loop joins can be distributed across many aggregators, up to
your account’s aggregator parallelism limit. Nested loop join can be forced by adding
HINT(join_strategy=nested_loops)
immediately after the ON clause of the join.
#Broadcast Join
Broadcast join is effectively a leaf-side (see ALT description above for difference between leaf and
aggregator) hash join. With broadcast join A INNER JOIN B ON A.x = B.x
, we will send (or
broadcast) the values of B.x
to all the leaves that serve shards of collection A. Each of these
leaves will then locally perform a hash join by constructing a hash set on B.x
and will stream its
local portion of collection A to perform the join and forward the results to the aggregator. This
can be helpful if collection A
is significantly larger than collection B
and the join is
selective since that way you avoid a significant amount of network overhead from shipping all the
records of A
to the aggregators.
Broadcast join can be forced by adding HINT(join_broadcast=true)
immediately after the ON clause
of the join. The reason this is a different hint than above is because technically whether to
broadcast the join is independent from the strategy. So you could, for example, set the join
strategy to nested loops and also set broadcast true. However broadcast will virtually always only
be helpful when the collection being broadcasted is small, in which case using the default hash join
strategy is best.
#Lookup Join
Lookup join is also a leaf-side join, but this time instead of constructing a hash set on B
and
performing the join it looks up the values of B
directly in the search index. This means lookup
joins are only relevant for equality joins and is only a good idea if there are a very small number
of rows in B
. For example let’s consider the join A INNER JOIN B ON A.x = B.x WHERE B.y = 5
. If
the predicate B.y = 5
was highly selective and resulted in the right side of the join only
containing a few (< 100) rows, then it would probably be more effective to use a lookup join. Then
we would ship the ~100 values of B.x
to the leaves serving the shards of collection A
, and for
each of those values we would very efficiently lookup whether a matching A.x
exists using the
search index, and if so, emit a match to the aggregators for further processing or returning of the
results. You can force the use of lookup join by adding HINT(join_strategy=lookup)
immediately
after the ON clause of the join.
#Data Clustering
During collection creation, you may also optionally specify a clustering scheme for the columnar index to optimize specific query patterns. This decreases the query execution time on scans with predicates matching specified clustering fields. If a clustering scheme is specified, documents with the same clustering field values are then automatically stored together on storage during ingest. Then, during query execution, this enables our execution engine to scan only documents in partitions with predicates that match some or all specified clustering keys, rather than having to scan the entire collection.
For instance, let's say you're creating a collection salaries
with the fields email
, country
,
occupation
, age
, and income
, and you're looking to speed up the execution time on a query
which aggregates the income
for people with a specified occupation
from a specified country
.
During collection creation, you might specify the clustering fields as country
and occupation
.
This will result in the values for email
, age
, and income
for a given (country
,
occupation
) to be stored contiguously on disk and can thus be efficiently scanned.
We then execute the example query
SELECT AVG(income) FROM salaries WHERE country = 'US' AND occupation = 'Software Engineer'
.
- Without clustering, the execution engine might perform a scan of the entire collection using the columnar index, followed by a filter for each predicate.
- With clustering, however, the execution engine will perform a sequential scan on only the
partition corresponding to (
country='US'
,occupation='Software Engineer'
), which will be significantly faster than having to scan the entire collection. Even if the query predicates only match a subset of the clustering fields, the execution engine will still save execution time by only scanning the appropriate partitions. This makes the query run faster and use less CPU.
Please contact support@rockset.com to access this feature.
#General SQL Guidelines & Tips
#Understanding Execution Strategy
Our backend supports a command EXPLAIN
, which will print out the execution strategy for a query in
human readable text. You can take any query and preface it with EXPLAIN
in the query editor
console to see this. For example,
EXPLAIN
SELECT
COUNT(*)
FROM
A
INNER JOIN B ON A.x = B.x
WHERE
B.x = 5;
select "?COUNT":$3
aggregate on (): $3=count_star_()
hash inner join on ($1 = $2)
column scan on commons.A: fields($1=x)
index filter on commons.B: fields($2=x), query($2:float[5,5], int[5,5])
Here we can see that we are using the search index on collection B
and looking for rows where
field x
equals 5. We are using the columnar index on collection A
, and we are performing a hash
join on them followed by an aggregation. Explain can be your best friend as you try to understand
what is happening under the hood and add hints to get more performance out of the system. For
example, if I now add a couple hints
EXPLAIN
SELECT
COUNT(*)
FROM
A
INNER JOIN B HINT(access_path = column_scan) ON A.x = B.x HINT(join_broadcast = true)
WHERE
B.x = 5;
select "?COUNT":$3
aggregate on (): $3=count_star_()
hash inner join on ($1 = $2) hints(join_broadcast=1)
column scan on commons.A: fields($1=x)
filter on ($2 = 5)
column scan on commons.B: fields($2=x)
And we see we are now scanning collection B using the column index and broadcasting it during the join.
Note: this is just an example of how to apply hints, not a good example of when you should apply them.
#Duplicating Predicates
Manually push down and duplicate predicates as much as possible. More or more selective predicates being applied to collections means less data to be streamed and overall much faster execution. While this is also valid in any SQL system, due to the nature of the RBO you may also need to explicitly duplicate some predicates where logically applicable. For example in the query
SELECT
COUNT(*)
FROM
A
INNER JOIN B ON A.x = B.x
WHERE
B.x = 5
we can see that logically this imposes a filter on A.x = 5
as well, so we can add that in
SELECT
COUNT(*)
FROM
A
INNER JOIN B ON A.x = B.x
WHERE
B.x = 5
AND A.x = 5
#Selecting Join Strategies
Determine which join strategy makes most sense. The default is hash join and this is usually a good choice, but if the collections are too large to store in memory then consider nested loop join, or if one collection is significantly smaller than the other (after applying predicates from the WHERE clause) consider broadcast or lookup join.
#Hash Join Ordering
If using hash join, determine whether there may be a better join ordering. Your query will run out
of memory and return an error if the side of the join where the hash table is being built on (right
side for all joins except right outer join since it’s rewritten) is larger than your query memory
limit * aggregator parallelism (both of which are account limits that we set based on your tier). A
good rule of thumb is to put smaller collections on the right side of the join. If there are a
series of joins in a row, i.e. A INNER JOIN B ON ... INNER JOIN C ON .... INNER JOIN D ON ...
it’s
generally best to put the largest ones first so they can be filtered down during the (hopefully
selective) join.
#Overriding Access Paths
Determine if the access path being used is ideal. The only time you should really have to override
this is if you have a non-selective predicate on a huge table. For example WHERE A.x > 0
is
probably not selective and if A
is huge, the default access path of index filter will be much
slower than a column scan followed by a filter step. In that case, you should probably do:
SELECT
COUNT(*)
FROM
A HINT(
access_path = column_scan,
data_scan_batch_size = 30000
)
WHERE
A.x > 0