Fast Aggregate Queries with group_by
Overview
The dbzero.group_by function provides a powerful way to perform fast, incremental aggregate queries. By leveraging local caching, it can deliver high performance, especially for applications where data changes incrementally.
The core idea is simple: the first time you run a query, it's calculated fully. Subsequent runs of the same query only process the changes (additions or removals), making them incredibly fast.
Experimental Feature
Please note that the fast_query module is still an experimental feature. It is highly powerful for specific use cases involving incrementally calculable operations like count and sum.
However, many aggregation types, such as calculating the median, cannot benefit from delta updates as they require access to the entire dataset for each calculation. Use this feature with these limitations in mind.
Basic Grouping
The simplest way to use group_by is to provide a grouping function (like a lambda) and a data source (from dbzero.find). The function returns a dict where keys are the group identifiers and values are the count of items in that group.
# Group objects by their 'key' attribute
groups = db0.group_by(lambda row: row.key, db0.find("tag1"))
# Result:
# {'one': 4, 'two': 3, 'three': 3}Incremental Updates (Delta Queries)
This is where fast_query shines. After an initial query runs, its results are durably cached on a designated internal prefix, so the cache persists across application restarts. When you modify your data and later run the exact same query, dbzero uses this cache to perform a delta query, processing only the changes (additions or removals) since the last run. This is significantly faster than re-scanning the entire dataset.
Additions
If you add new objects that match the query's criteria, the cached groups are updated incrementally without re-evaluating the original items.
# 1. Run the initial query to populate the durable cache.
groups = db0.group_by(lambda row: row.key, db0.find("tag1"))
# 2. Add more objects
db0.tags(KVTestClass("one", 11)).add("tag1")
# 3. Rerunning the query is fast, as it only processes the new object.
groups = db0.group_by(lambda row: row.key, db0.find("tag1"))
# assert groups["one"] == 5Removals
The same principle applies when data is removed. dbzero efficiently updates the cached groups to reflect that an object no longer matches the query.
# ... after initial query has run and cache is populated ...
# Remove tags from two objects so they no longer match the query.
db0.tags(objects[1], objects[6]).remove("tag1")
# Rerun the delta query to update the cache.
groups = db0.group_by(lambda row: row.key, db0.find("tag1"))
# assert groups["two"] == 2Advanced Grouping Criteria
You're not limited to a single lambda function for grouping.
Grouping by Multiple Criteria
You can group by multiple criteria by passing a tuple of grouping functions. The resulting dictionary keys will be tuples.
# Group by color AND by whether the value is even or odd
groups = db0.group_by(
(Colors.values(), lambda x: x.value % 2),
db0.find(MemoTestClass)
)
# Result keys look like: ('RED', 0), ('RED', 1), ('GREEN', 0), ...You can even include constants in your grouping criteria to help structure the output.
groups = db0.group_by(
(Colors.values(), lambda x: "2024", lambda x: x.value % 2),
db0.find(MemoTestClass)
)
# Result keys look like: ('RED', '2024', 0), ...Grouping by Enum Values
dbzero has first-class support for Python's Enum. You can pass an enum's .values() directly as a grouping criterion. This is particularly useful for state management or categorizing objects.
groups = db0.group_by(Colors.values(), db0.find(MemoTestClass))
# Result:
# {'RED': 4, 'GREEN': 3, 'BLUE': 3}How does this work? dbzero identifies identical queries by inspecting the source code of lambda functions (get_lambda_source) and the representation of other objects used in the query definition. This allows it to reliably reuse cached results.
Custom Aggregation Operations
By default, group_by counts the items in each group. You can customize this behavior using the ops parameter to perform other aggregations, like summing values.
Single Custom Operation
Use dbzero.make_sum() to create a sum operation. The value for each group will now be the sum instead of the count.
# Sum the 'value' attribute for each group
sum_op = db0.make_sum(lambda x: x.value)
groups = db0.group_by(
lambda x: "A" if (x.value % 2 == 0) else "B",
db0.find(MemoTestClass),
ops=(sum_op,)
)
# assert groups["A"] + groups["B"] == 45Multiple Operations
You can provide a tuple of operations to ops. The value for each group will be a tuple containing the result of each operation in order. dbzero.count_op is the default count operation.
# Get both the count and the sum for each group
query_ops = (db0.count_op, db0.make_sum(lambda x: x.value))
groups = db0.group_by(
(Colors.values(), lambda x: x.value % 2),
db0.find(MemoTestClass),
ops=query_ops
)
# Result values look like: (count, sum)
# e.g., groups[('RED', 0)] would be (2, 6)Configuration and Concurrency
Custom Cache Location
By default, fast_query uses a predefined prefix name for its cache. While this works for a single application, it can cause conflicts in advanced scenarios. For example, if multiple independent applications are querying the same data prefixes, their fast_query caches would interfere with one another.
To solve this, you can initialize fast_query with a custom cache location—a separate prefix unique to each application. This ensures that each application's query cache is fully isolated, preventing conflicts and giving you explicit control over the cache's lifecycle and storage.
# Each application should use its own dedicated prefix for its fast_query cache.
DATA_PX = "px-shared-data"
FQ_CACHE_PX_APP1 = "px-fq-cache-for-app1"
# Initialize dbzero with a separate prefix for the cache
db0.open(DATA_PX, "r")
db0.open(FQ_CACHE_PX_APP1, "rw")
db0.init_fast_query(FQ_CACHE_PX_APP1)
# Queries from this application will now use the dedicated prefix for caching,
# avoiding conflicts with other applications.
groups = db0.group_by((Colors.values(), lambda x: "test"), db0.find(MemoDataPxClass))Real-time Refreshing
dbzero is designed for concurrent environments. A reader process can get live updates from a writer process by calling dbzero.refresh() before executing a query. This ensures the query runs on the latest snapshot of the data.
# In a reader process/thread:
db0.init_fast_query("__fq_cache/data")
db0.open("data_prefix", "r")
while True:
# refresh() returns True if new data is available
if db0.refresh():
# Rerunning the query is fast and reflects the latest data
result = db0.group_by(lambda x: x.value, db0.find(MemoTestClass, "tag1"))
time.sleep(0.05)