How it works

Mental model, event flow, and what actually happens when an event arrives at the engine. Read this before writing code.

The flow module is a pipeline, not a single classifier. A MarketEvent enters the engine, and six things happen in a specific order before the next event arrives. This page walks the lifecycle end-to-end so you can reason about what the module is doing, debug when it isn’t doing what you expected, and extend it with confidence.

The picture

text
ingestion source (live WS / audit log / replay)
│
▼
┌───────────────────────┐
│ FlowObserver │ normalize: whatever the venue
│ FlowFeedHandler │ sent into a unified MarketEvent
│ PolymarketFlowSource│
└──────────┬────────────┘
│
▼ MarketEvent
┌───────────────────────┐
│ FlowEngine │ orchestrator
│ .ingest(event) │
└──────────┬────────────┘
│ (under one threading.Lock)
│
┌────────────┬───────┼──────────────┬────────────┐
│ │ │ │ │
▼ ▼ ▼ ▼ ▼
rolling actor taxonomy detectors findings
state profile classify fan-out emit
(trades, update (soft label) (7 detectors │
book, (incr. (if refresh in parallel) │
orders) features) due) │
│
▼
┌─────────┬──────────┬─────────┐
│ │ │ │
▼ ▼ ▼ ▼
FlowStore AuditLog Alerter user
(SQLite) (hash (Slack / callbacks
chain) email/ (strategy
page) hook)

The seven steps on one event

When engine.ingest(market_event) runs, here’s what happens in order. All under one threading.Lock. Serialized across threads so state updates never interleave.

1. Increment the ingest counter

engine._ingest_count += 1. This is the local sequence stand-in when no audit log is attached. Every profile snapshot carries it so auditors can replay deterministically.

2. Update rolling state

Three per-market deques, bounded by FlowConfig.max_trades_per_market and max_book_snapshots_per_market:

  • _trades[market_id]. Last N trades (price, size, side, timestamp).
  • _books[market_id]. Last N book snapshots (bids, asks, timestamp).
  • _orders[market_id]. Last N placements / cancels / amends (per-market, used by some detectors that want raw order-event recall rather than actor-profile history).

These deques are what the detectors look back on. They are bounded so memory is predictable under long-lived observation. Detectors that need more history maintain their own state.

3. Update the actor profile

Only if the event has an actor_id AND profile_extractor is attached (default make_default_engine wires it).

python
updated = self._profile_extractor.ingest(event)

The extractor maintains per-actor _ActorState. Placements, cancels, fills, inter-arrival times, market histogram, hour-of-day histogram, gas prices, nonces. ingest returns True if a snapshot flush is due. Which fires on the first-ever event for an actor AND every profile_refresh_event_interval (default 50) events thereafter.

This keeps write amplification tolerable: we don’t persist a new profile row on every single event. Between flushes, the extractor holds the live state in memory.

4. Run the taxonomy classifier (on flush)

When flush is due, the extractor produces an ActorProfile and the engine runs the Kirilenko classifier on it:

python
profile = extractor.snapshot(actor_id=..., venue_name=..., last_updated_seq=...)
probs = taxonomy.classify(profile.features)
profile = replace(profile, taxonomy_probs=probs)

The 6 probabilities sum to 1.0. Below FlowConfig.actors.profile_min_events (default 30) a partial profile with event_count only and a uniform taxonomy prior is emitted instead. We refuse to guess from tiny samples.

5. Persist profile + emit audit event (on flush)

  • store.upsert_profile(profile, at=event.timestamp). Write to the flow store (mutable upsert keyed on (venue, actor_id)).
  • audit_log.record(AuditCategory.ActorProfiled, ...). Append a hash-chained audit event. The audit event’s payload carries event_count + taxonomy_probs so compliance can replay profile evolution from the log alone.

Failures in either sink are swallowed; the engine moves on.

6. Fan out to detectors

python
ctx = self._snapshot_context(event) # recent_book + recent_trades for THIS market
for detector in self._detectors:
 findings = detector.detect([event], ctx)
 for f in findings:
 emit_finding(f)

Every detector sees the current event plus the rolling tail for its market. Detectors maintain their OWN state. The engine provides the fresh-event signal, not a detector-specific replay.

Detector failures are caught and swallowed; one broken detector cannot break the ingest loop.

7. Emit findings

For each AnomalyFinding a detector returns:

python
# a. FlowStore: queryable row
store.write_anomaly(finding)

# b. AuditLog: hash-chained regulatory record
audit_event = audit_log.record(
 AuditCategory.BotDetected if (confidence >= 0.9 and severity >= High)
 else AuditCategory.FlowAnomaly,
 ...
)

# c. Alerter: only if confidence >= alert_threshold AND severity >= alert_severity
# AND category in categories_paged
alerter.send(severity=..., title=..., message=..., extra=...)

# d. User callbacks: any engine.on_finding(cb) subscribers fire
for cb in user_callbacks:
 cb(finding)

Step 7 runs synchronously inside the ingest call. Expensive sinks (S3, external alerters) should be wrapped in a buffered / async adapter before attaching. The hot path is assumed to be fast.

Why the order matters

The order is not cosmetic. A few subtle properties depend on it:

  • Rolling state updates BEFORE detectors run so detectors see the current event’s price / size / actor reflected in recent_trades if they look there (though most detectors operate on the events iterable passed in directly).
  • Profile update BEFORE detectors run so that detectors that want to use an updated ActorProfile (future work) can. In v0.1, no detector consumes the live profile at detection time; they all operate on raw events.
  • Profile flush + audit emission BEFORE detectors so the ActorProfiled audit event lands before any FlowAnomaly that references the same actor. This keeps the audit log in causal order when replayed.
  • Findings emitted AFTER all detectors run so a detector can’t fire a cascade of self-referential findings by mutating state mid-loop.

The three data stores

Findings go to multiple stores. Each serves a different consumer:

StoreMutabilityConsumerQuery pattern
AuditLogappend-only, hash-chainedregulators, compliance“what did you know and when”
SQLiteFlowStoreappend-only (anomalies), upsert (profiles/clusters), append-only (policies)strategies, ops dashboards“current state of market X”
Alerterfire-and-forgethumans on-call“page me on spoofing now”

The audit log is authoritative. The flow store is a materialized view you can rebuild from the audit log via engine.scan_from_audit(sink). Keep both; they answer different questions.

State you can inspect

The engine exposes a few helpers for debugging and for custom detectors:

python
# The engine's idea of "now": useful in deterministic replays
engine.ingest_count # events processed so far
engine.venue_name

# Read-only context snapshot for a custom calculation
ctx = engine.snapshot_context()
ctx.recent_book["0xTRUMP_2024"] # -> (bids, asks) or absent
ctx.recent_trades["0xTRUMP_2024"] # -> list[(price, size, side, ts)]

# Wire a callback for downstream consumers
engine.on_finding(lambda f: print(f.category.value, f.confidence))

None of these mutate state. They’re read-only windows into the engine’s current memory.

Mental model, in one sentence

The engine is an event-driven finite state machine where the state is per-actor rolling features and per-market rolling tape, the transitions are each new MarketEvent, and the outputs are AnomalyFinding records routed through three stores with different durability and mutability properties.

Hold onto that sentence. Everything else. Detectors, clustering, policy recovery, risk integration. Is layered on top of it.

Extending with a custom detector

python
from horizon.flow.detectors.base import Detector
from horizon.flow.events import (
 AnomalyCategory, AnomalyFinding, AnomalySeverity,
 DetectorContext, MarketEvent, MarketEventKind,
)

class MyDetector:
 """Flag any actor whose cancel rate exceeds 99% over 50 recent orders."""

 name = "my_detector"

 def __init__(self) -> None:
 self._recent: dict[str, list[MarketEventKind]] = dict()

 def detect(self, events, ctx: DetectorContext):
 for ev in events:
 if not ev.actor_id:
 continue
 seq = self._recent.setdefault(ev.actor_id, [])
 seq.append(ev.event_kind)
 if len(seq) > 50:
 seq.pop(0)
 if len(seq) < 50:
 continue
 cancels = sum(1 for k in seq if k == MarketEventKind.OrderCanceled)
 if cancels >= 50 * 0.99:
 yield AnomalyFinding.make(
 detector_name=self.name,
 category=AnomalyCategory.QuoteStuffing,
 severity=AnomalySeverity.High,
 market_id=ev.market_id,
 venue_name=ev.venue_name,
 actor_id=ev.actor_id,
 confidence=0.85,
 message=f"{cancels}/50 recent events were cancels",
 citation="custom: extreme cancel ratio",
 )
 seq.clear()

# Register on any engine
engine.add_detector(MyDetector())

That’s it. No engine modifications, no protocol registration. Detector fails silently, runs deterministically, ships through the same stores as the built-ins.

What it is NOT

  • Not async-friendly yet. The hot path is synchronous under a single lock. Throwing an asyncio.Queue between the ingestion source and the engine works today; making the engine itself async is a v0.2+ question.
  • Not a streaming DB. Queries hit SQLite. For sub-millisecond retrieval against a large flow store, front it with Redis or DuckDB.
  • Not a black box. Every finding carries a citation (regulatory or academic) and an evidence dict. If you can’t explain why the detector fired, the detector is buggy. Open the source.

Where to go next