ATOMiK gives you four operations. What you build with them depends on how you compose them. Here are five patterns we've seen work well in production systems.
Accumulator Fan-In
Multiple producers write deltas to the same context from different threads, processes, or machines. Because XOR is commutative, no coordination is needed.
from atomik_core import AtomikContext
import threading
ctx = AtomikContext()
ctx.load(0)
def worker(deltas):
for d in deltas:
ctx.accum(d) # Lock-free, order-independent
# 4 threads writing concurrently
threads = [threading.Thread(target=worker, args=([0xFF << (i*8)],))
for i in range(4)]
for t in threads: t.start()
for t in threads: t.join()
# All deltas composed — result is deterministic
print(f"0x{ctx.read():08x}") # 0xff_ff_ff_ffUse when: Aggregating metrics, counters, or flags from multiple sources. Sensor fusion, multi-player game state, distributed log aggregation.
Epoch Checkpointing
Use SWAP to create periodic checkpoints. Each SWAP captures the current state as the new reference and resets the accumulator — starting a fresh epoch.
ctx = AtomikContext()
ctx.load(initial_state)
while running:
# Accumulate deltas for this epoch
for delta in incoming_deltas():
ctx.accum(delta)
# Checkpoint: capture state, start new epoch
epoch_state = ctx.swap()
save_checkpoint(epoch_state)
# Accumulator is now 0, reference is epoch_stateUse when: Periodic snapshots (every N seconds, every N deltas), database write-ahead log compaction, or game save points.
Fingerprint Gate
Use Fingerprint as a fast gate before expensive operations. Only proceed if the data actually changed — O(1) check instead of byte-by-byte comparison.
from atomik_core import Fingerprint
fp = Fingerprint()
fp.load(current_data)
while True:
new_data = read_sensor()
fp.update(new_data)
if fp.changed:
# Data actually changed — do the expensive thing
transmit(new_data) # Network send
update_database(new_data) # DB write
fp.load(new_data) # Update reference
# else: skip — data unchanged, save bandwidth + CPUUse when: Polling loops, cache invalidation, sensor sampling, file sync. Anywhere you check "did anything change?" before acting.
Rollback Chain
Build an undo stack using self-inverse. Store each delta; to undo, re-apply it. No snapshots, no deep copies — just 8 bytes per undo level.
ctx = AtomikContext()
ctx.load(document_state)
undo_stack = []
def apply_edit(delta):
ctx.accum(delta)
undo_stack.append(delta)
def undo():
if undo_stack:
delta = undo_stack.pop()
ctx.accum(delta) # Self-inverse: re-apply = undo
# Apply 3 edits
apply_edit(0x0001)
apply_edit(0x0010)
apply_edit(0x0100)
# Undo last 2
undo() # Removes 0x0100
undo() # Removes 0x0010
assert ctx.read() == document_state ^ 0x0001 # Only first edit remainsUse when: Text editors, drawing apps, configuration management, transaction rollback, A/B testing (try a change, measure, undo if worse).
Multi-Stream Convergence
Run multiple DeltaStreams on different nodes. Exchange deltas over any transport (TCP, UDP, message queue). All streams converge to the same state.
from atomik_core import DeltaStream, DeltaMessage
# Node A
stream_a = DeltaStream()
stream_a.load(addr=0, initial_state=0xCAFE)
# Node B (different machine)
stream_b = DeltaStream()
stream_b.load(addr=0, initial_state=0xCAFE)
# A makes a change, sends delta to B
stream_a.accum(addr=0, delta=0x00FF)
msg = DeltaMessage(addr=0, delta=0x00FF, seq=1)
wire = msg.to_bytes() # 16 bytes over the network
# B receives and applies (could arrive out of order)
received = DeltaMessage.from_bytes(wire)
stream_b.accum(received.addr, received.delta)
# Both converge
assert stream_a.read(0) == stream_b.read(0) # ✓Use when: Multi-region sync, edge-cloud convergence, peer-to-peer state sharing, real-time collaboration, multi-player games.
Try ATOMiK today
Get the SDK
Join 247+ developers building with delta-state algebra