Recorded captures

Capture a live venue session to JSONL, replay it through the engine deterministically, and use it as a regression fixture.

Recorded captures are how you pin the engine’s behaviour against a real market session. A capture is a JSONL file of normalized MarketEvents; a replay feeds that file through a fresh engine and must produce bit-identical findings every time. If a detector threshold or a profiler change alters the output, the diff tells you exactly what moved and on which event.

Three pieces cooperate:

  1. Capture CLI. scripts/flow_capture.py connects to a venue’s public feed and writes events to JSONL in the format ReplayFlowSource reads.
  2. Replay source. horizon.flow.ingestion.ReplayFlowSource reads any JSONL produced by the capture (or by the synthetic fixture builder) and fires events into the engine in original order.
  3. Harness test. tests/flow/test_recorded_day.py runs every fixture under tests/flow/fixtures/recorded/ through a fresh engine, asserting ingestion, chain integrity, profile emergence, and determinism.

Capturing a live session

The capture script takes a venue, a subscription list, and an output path. It writes one JSONL line per event, flushes after each write, and handles Ctrl-C cleanly.

Polymarket

bash
python scripts/flow_capture.py polymarket \
    --token-ids 0xTOKEN_A 0xTOKEN_B \
    --duration-seconds 1800 \
    --polygon-rpc-url "$POLYGON_RPC_URL" \
    --output tests/flow/fixtures/recorded/polymarket_2026-04-21.jsonl

The token IDs are Polymarket CLOB asset IDs (the same ones you pass to PolymarketFlowSource). Without --polygon-rpc-url, trade events lack gas price, nonce, and block number; with it, every trade is enriched on-chain. Use a paid RPC endpoint for real captures; the free public Polygon RPC will throttle.

Hyperliquid

bash
python scripts/flow_capture.py hyperliquid \
    --coins BTC ETH \
    --duration-seconds 1800 \
    --output tests/flow/fixtures/recorded/hyperliquid_2026-04-21.jsonl

Hyperliquid’s public WS exposes trades and L2 book snapshots per coin. L3 per-wallet resting orders will land in v0.2.

Heartbeat output

Every 30 seconds the capture prints a progress line so you know it is alive:

[capture:polymarket] t+30s events=1420 rate=47.3/s [book.snapshot=890, order.filled=530]
[capture:polymarket] t+60s events=2905 rate=48.4/s [book.snapshot=1810, order.filled=1095]

On exit (SIGINT or duration elapsed), a summary line is printed with total events and breakdown by kind.

Replaying a capture

python
from horizon.flow import make_default_engine
from horizon.flow.ingestion import ReplayFlowSource

engine = make_default_engine(
    venue_name="polymarket",
    store_path="flow.db",
)

src = ReplayFlowSource(path="tests/flow/fixtures/recorded/polymarket_2026-04-21.jsonl")
src.on_event(engine.ingest)
src.connect()    # synchronous, returns when the file is exhausted

Replay is synchronous: it runs to completion, then the connection closes. Findings and profiles produced during replay land in the attached store exactly as they would during live operation.

Using a capture as a regression fixture

Drop any JSONL file into tests/flow/fixtures/recorded/ and the harness picks it up automatically. Filename convention is <venue>_<YYYY-MM-DD>[_<suffix>].jsonl.

The harness runs three tests per fixture:

  1. Format validity. Every non-comment line parses as JSON and has event_kind, market_id, timestamp.
  2. End-to-end replay. Every event ingests without crashing; the audit chain over emitted findings verifies with no gaps; actor profiles emerge; per-event latency stays below 500 μs.
  3. Determinism. Two fresh runs over the same fixture produce the same finding count and the same audit-chain length.

To run just the harness:

bash
pytest tests/flow/test_recorded_day.py -v

With zero fixtures in the directory, the tests skip. Add a fixture (either via capture or via the synthetic builder below) and they run.

The synthetic reference fixture

tests/flow/fixtures/recorded/polymarket_synthetic_calibrated.jsonl ships as a calibrated synthetic reference so the harness has something to run on a fresh checkout. It is regenerable bit-for-bit:

bash
python scripts/flow_build_synthetic_fixture.py \
    --output tests/flow/fixtures/recorded/polymarket_synthetic_calibrated.jsonl \
    --seed 42

The builder uses the same SyntheticFlowGenerator the test suite uses: ten background actors trading for 30 minutes, plus injected spoofing, iceberg, wash-trade, and layering patterns at known offsets. Deterministic given the seed. Replace with real captures when available.

Diffing replays for regression

After a detector or threshold change, compare replays:

bash
# Run the fixture on the old code (main branch)
git checkout main
pytest tests/flow/test_recorded_day.py -v
sqlite3 /tmp/flow_main.db 'SELECT category, severity, confidence FROM anomalies ORDER BY detected_at' > /tmp/findings_main.txt

# Run on the feature branch
git checkout feature-branch
pytest tests/flow/test_recorded_day.py -v
sqlite3 /tmp/flow_feature.db 'SELECT category, severity, confidence FROM anomalies ORDER BY detected_at' > /tmp/findings_feature.txt

# Review
diff /tmp/findings_main.txt /tmp/findings_feature.txt

Any diff is an intended or unintended behaviour change. Reviewable, auditable, defensible.

What the fixture format looks like

One line per event:

json
{"event_kind":"order.filled","market_id":"0xTRUMP_2024","venue_name":"polymarket","timestamp":"2026-04-21T12:00:02.271Z","actor_id":"0xabc","order_id":"ord_001","side":"buy","price":0.52,"quantity":100,"tx_hash":"0xdeadbeef","gas_price":30000000000,"nonce":42,"block_number":54321000,"source":"polymarket_ws","raw":{"event_type":"trade"}}

Required: event_kind, market_id, timestamp. Everything else is optional. See horizon/flow/ingestion/replay.py for the full schema. Lines starting with # are comments and are ignored.