snapshot()
Creates a read-only, point-in-time view of the database, allowing you to "time-travel" 🕰️. Snapshots are essential for isolating long-running queries from concurrent writes, analyzing the application's past states, or ensuring a consistent view of data for complex, multi-step operations.
Objects retrieved from a snapshot are immutable. Attempting to modify them will raise an exception. This guarantees that your historical view remains unchanged.
db0.snapshot(state_spec=None, frozen=False)Parameters
state_spec(int|dict, optional)Specifies which database state to capture.
- If
None(the default), the snapshot captures the most recently committed state. - If an
int, it's treated as astate_numto capture a specific historical state globally. You can get state numbers usingdbzero.get_state_num(). - If a
dict, it maps prefix names to state numbers (e.g.,{{'my/prefix': 123}}), allowing you to create a snapshot from different historical points for different data prefixes.
- If
frozen(bool, optional)If
True, creates a "frozen" snapshot of the current head transaction, including uncommitted changes. This is an advanced feature primarily used to get a stable view of the very latest data within a short-lived context.⚠️A
frozensnapshot must be created within the context of another active snapshot.
Return Value
Returns a Snapshot object. This object is a context manager, and the recommended way to use it is with a with statement. This ensures that the snapshot's resources are automatically released.
# The snapshot is active only inside the 'with' block
with db0.snapshot() as snap:
# Queries here are isolated from outside changes
results = snap.find("some-tag")
# Accessing 'snap' here would raise an exceptionAlternatively, if you create a query directly from a snapshot, the snapshot's lifetime is bound to the query object itself.
# The snapshot is kept alive as long as the 'query' object exists
query = db0.snapshot(some_state_num).find("some-tag")
# You can now iterate over the query results
for item in query:
print(item)Snapshot Object API
The returned Snapshot object has its own set of methods that mirror the top-level dbzero API but operate on the frozen-in-time data:
snap.find(): Queries for objects within the snapshot.snap.fetch(): Retrieves a single object by UUID or a singleton by class from the snapshot.snap.get_state_num(): Gets the state number for a prefix within the snapshot.snap.deserialize(): Deserializes and runs a query against the snapshot's state.snap.close(): Manually closes the snapshot if not using awithblock.
Examples
Basic Isolation
Here's how to run a query on a stable dataset while new data is being added.
# Add 5 items
for i in range(5):
db0.tags(MemoTestClass(i)).add("my-tag")
db0.commit()
# Create a snapshot of the current state
snap = db0.snapshot()
# Add 3 more items to the live database
for i in range(5, 8):
db0.tags(MemoTestClass(i)).add("my-tag")
db0.commit()
# The live query sees all 8 items
assert len(list(db0.find("my-tag"))) == 8
# The snapshot query only sees the original 5 items
assert len(list(snap.find("my-tag"))) == 5Time-Traveling to Compare Object Versions
You can capture snapshots at different points to inspect how an object has changed.
# Create an object and commit it
obj = MemoTestClass(100)
db0.tags(obj).add("temp")
db0.commit()
snap_1 = db0.snapshot()
# Modify the object and commit again
obj.value = 200
db0.commit()
snap_2 = db0.snapshot()
# Fetch the two different versions using their snapshots
uuid = db0.uuid(obj)
version_1 = snap_1.fetch(uuid)
version_2 = snap_2.fetch(uuid)
print(f"Version 1 value: {version_1.value}") # Outputs: 100
print(f"Version 2 value: {version_2.value}") # Outputs: 200Finding Changes Between Two States (Delta Query)
Use two snapshots to find which objects were created or deleted between two points in time.
# Create initial state
for i in range(3):
db0.tags(MemoTestClass(i)).add("some-tag")
db0.commit()
snap1 = db0.snapshot()
# Create new state: add 3 items, remove 1
for i in range(3, 6):
db0.tags(MemoTestClass(i)).add("some-tag")
object_to_remove = next(db0.find(MemoTestClass, value=1))
db0.tags(object_to_remove).remove("some-tag")
db0.commit()
snap2 = db0.snapshot()
# Query the differences
query1 = snap1.find("some-tag")
query2 = snap2.find("some-tag")
# Use db0.no() to get the difference
created = snap2.find(query2, db0.no(query1))
deleted = snap1.find(query1, db0.no(query2))
print(f"Created values: {[x.value for x in created]}") # Outputs: [3, 4, 5]
print(f"Deleted values: {[x.value for x in deleted]}") # Outputs: [1]