Flow store

SQLiteFlowStore. Append-only findings, mutable actor profiles and clusters, pickled policy models. Schema, WORM trigger, query API.

SQLiteFlowStore is the queryable operational layer on top of the audit log. It mirrors the pattern in SQLiteSink: SQLite with WAL journal mode, PRAGMA synchronous=FULL, a single threading.Lock serializing writes, WORM-style triggers on the append-only table.

Findings also emit into the regular AuditLog via the FlowAnomaly, ActorProfiled, ClusterAssigned, PolicyInferred, and BotDetected audit categories. The audit log is the book of record (hash-chained); the flow store is the query layer.

Schema

Four tables:

anomalies: append-only

sql
CREATE TABLE anomalies (
 finding_id TEXT PRIMARY KEY,
 detected_at TEXT NOT NULL,
 detector_name TEXT NOT NULL,
 category TEXT NOT NULL,
 severity TEXT NOT NULL,
 market_id TEXT NOT NULL,
 venue_name TEXT NOT NULL,
 actor_id TEXT,
 cluster_id TEXT,
 confidence REAL NOT NULL,
 score REAL NOT NULL,
 message TEXT,
 evidence_json TEXT NOT NULL,
 related_event_ids_json TEXT NOT NULL,
 citation TEXT NOT NULL,
 audit_event_id TEXT
);

CREATE TRIGGER anom_no_update BEFORE UPDATE ON anomalies
 BEGIN SELECT RAISE(ABORT, 'anomalies is append-only (UPDATE denied)'); END;
CREATE TRIGGER anom_no_delete BEFORE DELETE ON anomalies
 BEGIN SELECT RAISE(ABORT, 'anomalies is append-only (DELETE denied)'); END;

Indices on market_id, actor_id, category, detected_at so the query paths the CLI and API use are all O(log n) in the common filters.

actor_profiles: mutable upsert

Keyed on (venue_name, actor_id). A new snapshot replaces the prior row but carries last_updated_seq so auditors can reconstruct the state at any historical sequence number via replay from the audit log.

wallet_clusters: mutable upsert

Keyed on cluster_id. Replacing a cluster assignment writes a new row; the audit log captures every ClusterAssigned event for historical reconstruction.

policy_models: append-only

Keyed on policy_id. Each fit writes a new row; the actor_id → latest policy lookup is ORDER BY trained_at DESC LIMIT 1. Pickled model blobs live in model_blob.

Usage

python
from horizon.flow import SQLiteFlowStore

store = SQLiteFlowStore("flow.db")

# Write from a detector/engine
store.write_anomaly(finding)

# Query
recent = store.list_anomalies(
 market_id="0xTRUMP_2024",
 actor_id="0xabc...",
 category=AnomalyCategory.Spoofing,
 since=datetime.utcnow() - timedelta(hours=1),
 limit=100,
)

# Iterate a date range
for f in store.iter_anomalies(since=start, until=end):
 ...

# Profile + cluster lookups
profile = store.get_profile("polymarket", "0xabc...")
cluster = store.cluster_of("0xabc...")
policy = store.latest_policy("0xabc...")

# Integrity scan
report = store.verify()
# {"ok": True, "counts": {"anomalies": 1428, "actor_profiles": 93, ...}}

store.close()

Thread-safety: a single threading.Lock serializes writes. Reads and writes can interleave across threads. For solo-advisor deployments this is far below contention; larger deployments should front the store with a connection-pool wrapper.

Public API helpers

python
import horizon as hz
from horizon.flow import SQLiteFlowStore, set_default_store

set_default_store(SQLiteFlowStore("flow.db"))

# These now use the default store without explicit argument
profile = hz.flow.actor_profile("0xabc...", venue="polymarket")
findings = hz.flow.anomalies(market_id="0xTRUMP_2024")
cluster = hz.flow.cluster_of("0xabc...")
policy = hz.flow.shadow_policy("0xabc...")

Pass store=... explicitly when working with multiple stores (for example, separate per-venue databases or a test fixture).

Retention

Flow store rows do not auto-expire. The audit log’s RetentionEnforcer handles retention for the hash-chained record; for the flow store itself, the operational pattern is:

  • Keep the full store hot for at least the current compliance year.
  • Snapshot archived rolling windows to cold storage (S3 with Object Lock) by exporting via iter_anomalies(since, until).
  • Never DELETE from anomalies. The triggers prevent it. Drop-and-recreate the whole DB when you genuinely need to purge (for example, privacy-request compliance on pseudonymous data).

Relationship to the audit log

Every finding writes to both stores:

  • Flow store. Queryable by market / actor / category. The dashboard and CLI hit this.
  • Audit log. Append-only hash chain. The regulator’s “what did you know and when” question reads from this.

The audit log is authoritative; the flow store is a materialized index. If the flow store is ever corrupted, engine.scan_from_audit(sink) rebuilds it deterministically from the hash chain.

WORM semantics

The SQLite triggers are belt-and-suspenders. Real WORM at storage level also requires filesystem support (S3 Object Lock, WORM-configured ZFS, immutable Linux attributes). For a solo-advisor deployment on local disk, the triggers + filesystem permissions + nightly backup to an immutable-storage bucket is the documented pattern.

The flow store is NOT regulator-grade WORM by itself. The hash-chained audit log is.