Operations: running flow in production
Cron surveillance, live feeds, Slack / PagerDuty wiring, debugging why a detector didn't fire, performance tuning.
Running flow in production looks like running any other durable data pipeline: a long-running process feeding from a venue, a datastore accumulating findings, alerts going to humans on call, a retrospective job for batch analysis. Five recipes cover the common operational concerns.
Recipe 1: live production wiring (Polymarket example)
The shape of a long-running live deployment. Works for Polymarket today; swap the ingestion source for Hyperliquid or Kalshi as needed.
# flow_daemon.py: runs as a long-lived process.
import logging
import signal
import sys
from horizon.audit import AuditLog, SQLiteSink
from horizon.flow import SQLiteFlowStore, make_default_engine
from horizon.flow.ingestion.polymarket import PolymarketFlowSource
from horizon.observability.alerts import (
SlackAlerter, AuditLogAlertBridge, DEFAULT_ALERT_CATEGORIES,
)
from horizon.audit.events import AuditCategory
logging.basicConfig(level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s %(message)s")
log = logging.getLogger("flow_daemon")
# 1. Stores (both on local disk for a solo-advisor deployment; S3 Object
# Lock / Glacier sinks for regulated-desk deployments).
flow_store = SQLiteFlowStore("/var/horizon/flow.db")
audit_log = AuditLog(sink=SQLiteSink("/var/horizon/audit.db"))
# 2. Alerter: Slack + audit-log bridge so you see both operational and
# finding-level events in one channel.
slack = SlackAlerter(min_severity="warning")
bridge = AuditLogAlertBridge(
alerter=slack,
categories=frozenset(DEFAULT_ALERT_CATEGORIES | {
AuditCategory.BotDetected,
AuditCategory.FlowAnomaly,
}),
min_severity="warning",
)
bridge.bind(audit_log)
# 3. Engine
engine = make_default_engine(
venue_name="polymarket",
store_path="/var/horizon/flow.db",
audit_log=audit_log,
alerter=slack,
)
# 4. Live ingestion source
source = PolymarketFlowSource(
token_ids=[
"0xTRUMP_2024_YES",
"0xTRUMP_2024_NO",
# ... your watched markets ...
],
polygon_rpc_url="https://polygon-mainnet.g.alchemy.com/v2/YOUR_KEY",
enrich_on_chain=True,
)
source.on_event(engine.ingest)
# 5. Clean shutdown on SIGTERM / SIGINT
def shutdown(*_):
log.info("shutting down")
source.close()
engine.close()
audit_log.close()
sys.exit(0)
signal.signal(signal.SIGTERM, shutdown)
signal.signal(signal.SIGINT, shutdown)
log.info("connecting to Polymarket CLOB")
source.connect() # blocks (runs its own thread)
# Keep main thread alive
signal.pause()
Deploy.
# systemd unit
cat > /etc/systemd/system/horizon-flow.service <<EOF
[Unit]
Description=Horizon Flow Surveillance Daemon
After=network.target
[Service]
Type=simple
User=horizon
WorkingDirectory=/opt/horizon
ExecStart=/opt/horizon/venv/bin/python flow_daemon.py
Restart=on-failure
RestartSec=10
[Install]
WantedBy=multi-user.target
EOF
systemctl enable --now horizon-flow
What to monitor.
- Systemd restart count. Frequent restarts suggest either a network issue with Polygon RPC or a bug. Check
journalctl -u horizon-flow -n 100. FeedGapandFeedStaleaudit events. The Polymarket WS is trying to reconnect; events may be lost.- Row count growth in
/var/horizon/flow.db. Flatlines mean events aren’t flowing.
Recipe 2: nightly retrospective scan
Daily cron job that re-scans the previous day’s feed, in case detectors weren’t connected during the day, or to apply new detector configs retrospectively.
#!/usr/bin/env bash
# /etc/cron.daily/horizon-flow-scan
set -euo pipefail
DATE=$(date -d 'yesterday' +%Y-%m-%d)
FEED=/var/horizon/feeds/polymarket_${DATE}.jsonl
if [[ ! -f "$FEED" ]]; then
echo "no feed file for $DATE" >&2
exit 0
fi
# 1. Replay through the engine into the flow store
/opt/horizon/venv/bin/horizon flow scan \
--feed-log="$FEED" \
--db=/var/horizon/flow.db \
--audit-db=/var/horizon/audit.db \
--venue=polymarket
# 2. Report high-severity findings
/opt/horizon/venv/bin/horizon flow anomalies \
--db=/var/horizon/flow.db \
--since-hours=24 \
--limit=1000 \
| /opt/horizon/venv/bin/jq '[.[] | select(.severity | IN("high", "critical"))]' \
> /tmp/flow_daily_$(date +%Y%m%d).json
# 3. Audit log integrity (separate from flow findings)
/opt/horizon/venv/bin/horizon audit verify --db=/var/horizon/audit.db
# 4. Notify if there are any high-severity findings
COUNT=$(/opt/horizon/venv/bin/jq 'length' < /tmp/flow_daily_$(date +%Y%m%d).json)
if [[ "$COUNT" -gt 0 ]]; then
echo "{\"text\": \"$COUNT high-severity flow findings for $DATE\"}" \
| curl -s -X POST -H 'Content-Type: application/json' \
--data @- "$SLACK_WEBHOOK"
fi
Why a retrospective scan when you already have the live daemon? Two reasons:
- Config changes. You tightened a threshold at 2pm; findings from before the change used the old threshold. Replay with the new config to re-score the whole day.
- New detectors. You wrote a custom detector on Tuesday. Replay the week’s recorded feed to see what it would have caught.
Recipe 3: Slack / PagerDuty routing per category
Different findings go to different places. Spoofing during trading hours pages the desk; wash trading is an email for compliance review.
from datetime import datetime, timezone
from horizon.audit.events import AuditCategory, AuditSeverity
from horizon.observability.alerts import (
SlackAlerter, EmailAlerter, PagerDutyAlerter,
AuditLogAlertBridge, CompositeAlerter,
)
# Three routes
slack_desk = SlackAlerter(webhook_url=DESK_WEBHOOK, min_severity="warning")
email_compliance = EmailAlerter(
smtp_host="smtp.example.com",
from_addr="horizon@firm.com",
to_addrs=["compliance@firm.com"],
min_severity="notice",
)
pagerduty = PagerDutyAlerter(routing_key=PD_ROUTING_KEY, min_severity="critical")
# Route by category + severity via filter_fn
def is_desk_alert(event) -> bool:
if event.category not in (AuditCategory.FlowAnomaly, AuditCategory.BotDetected):
return False
cat_value = (event.payload or {}).get("category", "")
# Desk cares about immediate-action items during market hours
hour = event.timestamp.astimezone(timezone.utc).hour
is_market_hours = 13 <= hour <= 21
return is_market_hours and cat_value in {
"spoofing", "layering", "momentum_ignition", "quote_stuffing",
}
def is_compliance_email(event) -> bool:
if event.category not in (AuditCategory.FlowAnomaly, AuditCategory.BotDetected):
return False
cat_value = (event.payload or {}).get("category", "")
return cat_value in {"wash_trade", "iceberg"}
def is_pagerduty(event) -> bool:
if event.severity != AuditSeverity.Critical:
return False
return True
# Three bridges on the same audit log
AuditLogAlertBridge(alerter=slack_desk, filter_fn=is_desk_alert).bind(audit_log)
AuditLogAlertBridge(alerter=email_compliance, filter_fn=is_compliance_email).bind(audit_log)
AuditLogAlertBridge(alerter=pagerduty, filter_fn=is_pagerduty).bind(audit_log)
Rule of thumb for choosing the channel.
- Slack (warning+). Anything actionable in the next 15 minutes.
- Email (notice+). Retrospective review; compliance reads it at start of day.
- PagerDuty (critical only). Things that wake someone up. Kill-switch fires, reconciliation mismatches, feed outages, highest-confidence bot detections. Over-paging is how on-call rotations break.
Recipe 4: debugging “why didn’t my detector fire?”
The single most common question. Works the same for every detector.
Step 1: did the event get in?
import horizon as hz
# engine.ingest_count should grow every time you call .ingest()
print(f"ingested: {engine.ingest_count}")
# If it's 0, the observer wiring didn't connect.
# Re-check: log.subscribe(FlowObserver(...)) or feed.on_tick(handler)
Step 2: is the event-kind right?
from horizon.flow.events import MarketEventKind
# Most detectors only fire on specific event_kinds.
# - spoofing requires OrderPlaced + OrderFilled + OrderCanceled from same actor
# - wash requires OrderFilled + TradeTape
# - iceberg requires BookSnapshot + OrderFilled
# ...
# Count what you're actually feeding the engine
from collections import Counter
kinds = Counter(ev.event_kind.value for ev in events_sent)
print(kinds)
# Counter({'trade.tape': 400, 'quote.update': 800, 'book.snapshot': 12, ...})
Many venues emit mostly QuoteUpdate; without BookSnapshot, iceberg detection has nothing to look at. Without OrderPlaced / OrderCanceled, spoofing and layering can’t fire either.
Step 3: are thresholds too tight?
from horizon.flow.config import FlowConfig, SpoofingConfig
from dataclasses import replace
# Loosen everything way down temporarily
cfg = FlowConfig()
cfg.detectors.spoofing = replace(
cfg.detectors.spoofing,
min_bait_size=1.0, # was 500
cancel_window_ms=30_000.0, # was 2000
min_book_imbalance=0.0, # was 0.3
bait_to_aggressor_ratio=1.1, # was 5.0
)
Run the same events through an engine built with this loose config. If detections fire now, the original thresholds are too tight. Tighten back gradually.
Step 4: is the detector actually attached?
print([d.name for d in engine._detectors])
# ['spoofing', 'layering', 'quote_stuffing', 'wash_trade', 'momentum_ignition',
# 'iceberg', 'split_order']
If you built the engine via a custom constructor instead of make_default_engine, you may have omitted detectors. Add the missing one.
Step 5: is the detector seeing errors silently?
Engine swallows detector exceptions to keep ingest resilient. For debugging, monkey-patch to unwrap:
_orig = engine._run_detectors
def _debug_run(ctx, event):
findings = []
for d in engine._detectors:
try:
for f in d.detect([event], ctx) or ():
findings.append(f)
except Exception as e:
print(f"detector {d.name} failed on {event.market_id}: {e}")
raise
return findings
engine._run_detectors = _debug_run
Revert after debugging. Leaving this in production removes the safety net.
Recipe 5: performance and scale
Flow is designed for Polymarket-ish cadence (tens of events per second). If you’re feeding it Alpaca consolidated tape (hundreds per second) or a multi-symbol Hyperliquid feed (thousands), a few notes.
SQLite write throughput
The default SQLiteFlowStore with PRAGMA synchronous=FULL does roughly 100-500 writes/s. Options above that:
# Option 1: synchronous=NORMAL: 3-5x faster, slightly less crash-safe
# (one committed write may be lost on OS crash, but file structure stays consistent)
store = SQLiteFlowStore("flow.db")
store._conn.execute("PRAGMA synchronous=NORMAL")
# Option 2: separate per-category stores
spoof_store = SQLiteFlowStore("flow_spoof.db")
wash_store = SQLiteFlowStore("flow_wash.db")
# ...then route findings to the right one in an on_finding callback
# Option 3: batch writes via a queue
from queue import Queue
from threading import Thread
q: Queue = Queue(maxsize=10_000)
def writer_thread(store, q):
while True:
batch = []
while len(batch) < 100:
try:
batch.append(q.get(timeout=0.1))
except:
break
if batch:
with store._conn:
for f in batch:
store.write_anomaly(f)
engine.on_finding(q.put_nowait)
Thread(target=writer_thread, args=(store, q), daemon=True).start()
CPU
The default 7 detectors process an event in tens of microseconds each on a modern laptop. Bottlenecks are typically:
- The MDX-parser in
ingestion/polymarket.py. JSON parse is the biggest cost on live data. Useorjsonif available. PolicyFeatureExtractor.featurize. Called synchronously if you do runtime policy lookup from a strategy. Cache the featurized state per (actor, market, timestamp-truncated-to-second).- Hawkes MLE. Skip
_mleviaHawkesConfig.kernel_decay_s=0equivalent; fall back to moments estimator always.
Measure before optimizing. engine.ingest_count + wall-clock timestamps bookending a replay gives you per-event throughput.
Sharding across venues
One engine per venue. Share nothing across engines; they each have their own store, audit log, and detector state.
engines = {
"polymarket": make_default_engine(venue_name="polymarket", store_path="pm.db", ...),
"kalshi": make_default_engine(venue_name="kalshi", store_path="kalshi.db", ...),
"hyperliquid": make_default_engine(venue_name="hyperliquid", store_path="hl.db", ...),
}
Cross-venue attribution (v1.0) will join these; until then, keep them cleanly separated.
The production checklist
Before flipping a flow deployment live:
-
SQLiteFlowStorepath is on a persistent volume, not/tmp. -
AuditLogis usingSQLiteSink(notInMemorySink). - Alerter webhook URL is resolved from
Secrets, not hardcoded. - Ingestion source reconnects on disconnect (default behavior; verify in logs).
- Nightly audit-chain verify is cron’d (
horizon audit verify). - Daily flow-store integrity check (
horizon flow verify). - At least one test synthetic pattern fires a finding through the full production path.
- Slack / email alerter delivers a test message.
- Blue-green deploy story: you know how to restart the daemon without losing events in flight (the audit log’s
scan_from_auditlets you replay missed events).
Related
- CLI. The commands referenced in scripts above.
- Risk integration. Wiring flow findings into the trade loop.
- How it works. Internals you need when the daemon is misbehaving.