Recorded captures
Capture a live venue session to JSONL, replay it through the engine deterministically, and use it as a regression fixture.
Recorded captures are how you pin the engine’s behaviour against a real market session. A capture is a JSONL file of normalized MarketEvents; a replay feeds that file through a fresh engine and must produce bit-identical findings every time. If a detector threshold or a profiler change alters the output, the diff tells you exactly what moved and on which event.
Three pieces cooperate:
- Capture CLI.
scripts/flow_capture.pyconnects to a venue’s public feed and writes events to JSONL in the formatReplayFlowSourcereads. - Replay source.
horizon.flow.ingestion.ReplayFlowSourcereads any JSONL produced by the capture (or by the synthetic fixture builder) and fires events into the engine in original order. - Harness test.
tests/flow/test_recorded_day.pyruns every fixture undertests/flow/fixtures/recorded/through a fresh engine, asserting ingestion, chain integrity, profile emergence, and determinism.
Capturing a live session
The capture script takes a venue, a subscription list, and an output path. It writes one JSONL line per event, flushes after each write, and handles Ctrl-C cleanly.
Polymarket
python scripts/flow_capture.py polymarket \
--token-ids 0xTOKEN_A 0xTOKEN_B \
--duration-seconds 1800 \
--polygon-rpc-url "$POLYGON_RPC_URL" \
--output tests/flow/fixtures/recorded/polymarket_2026-04-21.jsonl
The token IDs are Polymarket CLOB asset IDs (the same ones you pass to PolymarketFlowSource). Without --polygon-rpc-url, trade events lack gas price, nonce, and block number; with it, every trade is enriched on-chain. Use a paid RPC endpoint for real captures; the free public Polygon RPC will throttle.
Hyperliquid
python scripts/flow_capture.py hyperliquid \
--coins BTC ETH \
--duration-seconds 1800 \
--output tests/flow/fixtures/recorded/hyperliquid_2026-04-21.jsonl
Hyperliquid’s public WS exposes trades and L2 book snapshots per coin. L3 per-wallet resting orders will land in v0.2.
Heartbeat output
Every 30 seconds the capture prints a progress line so you know it is alive:
[capture:polymarket] t+30s events=1420 rate=47.3/s [book.snapshot=890, order.filled=530]
[capture:polymarket] t+60s events=2905 rate=48.4/s [book.snapshot=1810, order.filled=1095]
On exit (SIGINT or duration elapsed), a summary line is printed with total events and breakdown by kind.
Replaying a capture
from horizon.flow import make_default_engine
from horizon.flow.ingestion import ReplayFlowSource
engine = make_default_engine(
venue_name="polymarket",
store_path="flow.db",
)
src = ReplayFlowSource(path="tests/flow/fixtures/recorded/polymarket_2026-04-21.jsonl")
src.on_event(engine.ingest)
src.connect() # synchronous, returns when the file is exhausted
Replay is synchronous: it runs to completion, then the connection closes. Findings and profiles produced during replay land in the attached store exactly as they would during live operation.
Using a capture as a regression fixture
Drop any JSONL file into tests/flow/fixtures/recorded/ and the harness picks it up automatically. Filename convention is <venue>_<YYYY-MM-DD>[_<suffix>].jsonl.
The harness runs three tests per fixture:
- Format validity. Every non-comment line parses as JSON and has
event_kind,market_id,timestamp. - End-to-end replay. Every event ingests without crashing; the audit chain over emitted findings verifies with no gaps; actor profiles emerge; per-event latency stays below 500 μs.
- Determinism. Two fresh runs over the same fixture produce the same finding count and the same audit-chain length.
To run just the harness:
pytest tests/flow/test_recorded_day.py -v
With zero fixtures in the directory, the tests skip. Add a fixture (either via capture or via the synthetic builder below) and they run.
The synthetic reference fixture
tests/flow/fixtures/recorded/polymarket_synthetic_calibrated.jsonl ships as a calibrated synthetic reference so the harness has something to run on a fresh checkout. It is regenerable bit-for-bit:
python scripts/flow_build_synthetic_fixture.py \
--output tests/flow/fixtures/recorded/polymarket_synthetic_calibrated.jsonl \
--seed 42
The builder uses the same SyntheticFlowGenerator the test suite uses: ten background actors trading for 30 minutes, plus injected spoofing, iceberg, wash-trade, and layering patterns at known offsets. Deterministic given the seed. Replace with real captures when available.
Diffing replays for regression
After a detector or threshold change, compare replays:
# Run the fixture on the old code (main branch)
git checkout main
pytest tests/flow/test_recorded_day.py -v
sqlite3 /tmp/flow_main.db 'SELECT category, severity, confidence FROM anomalies ORDER BY detected_at' > /tmp/findings_main.txt
# Run on the feature branch
git checkout feature-branch
pytest tests/flow/test_recorded_day.py -v
sqlite3 /tmp/flow_feature.db 'SELECT category, severity, confidence FROM anomalies ORDER BY detected_at' > /tmp/findings_feature.txt
# Review
diff /tmp/findings_main.txt /tmp/findings_feature.txt
Any diff is an intended or unintended behaviour change. Reviewable, auditable, defensible.
What the fixture format looks like
One line per event:
{"event_kind":"order.filled","market_id":"0xTRUMP_2024","venue_name":"polymarket","timestamp":"2026-04-21T12:00:02.271Z","actor_id":"0xabc","order_id":"ord_001","side":"buy","price":0.52,"quantity":100,"tx_hash":"0xdeadbeef","gas_price":30000000000,"nonce":42,"block_number":54321000,"source":"polymarket_ws","raw":{"event_type":"trade"}}
Required: event_kind, market_id, timestamp. Everything else is optional. See horizon/flow/ingestion/replay.py for the full schema. Lines starting with # are comments and are ignored.