Operations: running flow in production

Cron surveillance, live feeds, Slack / PagerDuty wiring, debugging why a detector didn't fire, performance tuning.

Running flow in production looks like running any other durable data pipeline: a long-running process feeding from a venue, a datastore accumulating findings, alerts going to humans on call, a retrospective job for batch analysis. Five recipes cover the common operational concerns.

Recipe 1: live production wiring (Polymarket example)

The shape of a long-running live deployment. Works for Polymarket today; swap the ingestion source for Hyperliquid or Kalshi as needed.

python
# flow_daemon.py: runs as a long-lived process.
import logging
import signal
import sys

from horizon.audit import AuditLog, SQLiteSink
from horizon.flow import SQLiteFlowStore, make_default_engine
from horizon.flow.ingestion.polymarket import PolymarketFlowSource
from horizon.observability.alerts import (
 SlackAlerter, AuditLogAlertBridge, DEFAULT_ALERT_CATEGORIES,
)
from horizon.audit.events import AuditCategory

logging.basicConfig(level=logging.INFO,
 format="%(asctime)s %(levelname)s %(name)s %(message)s")
log = logging.getLogger("flow_daemon")

# 1. Stores (both on local disk for a solo-advisor deployment; S3 Object
# Lock / Glacier sinks for regulated-desk deployments).
flow_store = SQLiteFlowStore("/var/horizon/flow.db")
audit_log = AuditLog(sink=SQLiteSink("/var/horizon/audit.db"))

# 2. Alerter: Slack + audit-log bridge so you see both operational and
# finding-level events in one channel.
slack = SlackAlerter(min_severity="warning")
bridge = AuditLogAlertBridge(
 alerter=slack,
 categories=frozenset(DEFAULT_ALERT_CATEGORIES | {
 AuditCategory.BotDetected,
 AuditCategory.FlowAnomaly,
 }),
 min_severity="warning",
)
bridge.bind(audit_log)

# 3. Engine
engine = make_default_engine(
 venue_name="polymarket",
 store_path="/var/horizon/flow.db",
 audit_log=audit_log,
 alerter=slack,
)

# 4. Live ingestion source
source = PolymarketFlowSource(
 token_ids=[
 "0xTRUMP_2024_YES",
 "0xTRUMP_2024_NO",
 # ... your watched markets ...
 ],
 polygon_rpc_url="https://polygon-mainnet.g.alchemy.com/v2/YOUR_KEY",
 enrich_on_chain=True,
)
source.on_event(engine.ingest)

# 5. Clean shutdown on SIGTERM / SIGINT
def shutdown(*_):
 log.info("shutting down")
 source.close()
 engine.close()
 audit_log.close()
 sys.exit(0)

signal.signal(signal.SIGTERM, shutdown)
signal.signal(signal.SIGINT, shutdown)

log.info("connecting to Polymarket CLOB")
source.connect() # blocks (runs its own thread)

# Keep main thread alive
signal.pause()

Deploy.

bash
# systemd unit
cat > /etc/systemd/system/horizon-flow.service <<EOF
[Unit]
Description=Horizon Flow Surveillance Daemon
After=network.target

[Service]
Type=simple
User=horizon
WorkingDirectory=/opt/horizon
ExecStart=/opt/horizon/venv/bin/python flow_daemon.py
Restart=on-failure
RestartSec=10

[Install]
WantedBy=multi-user.target
EOF

systemctl enable --now horizon-flow

What to monitor.

  • Systemd restart count. Frequent restarts suggest either a network issue with Polygon RPC or a bug. Check journalctl -u horizon-flow -n 100.
  • FeedGap and FeedStale audit events. The Polymarket WS is trying to reconnect; events may be lost.
  • Row count growth in /var/horizon/flow.db. Flatlines mean events aren’t flowing.

Recipe 2: nightly retrospective scan

Daily cron job that re-scans the previous day’s feed, in case detectors weren’t connected during the day, or to apply new detector configs retrospectively.

bash
#!/usr/bin/env bash
# /etc/cron.daily/horizon-flow-scan
set -euo pipefail

DATE=$(date -d 'yesterday' +%Y-%m-%d)
FEED=/var/horizon/feeds/polymarket_${DATE}.jsonl

if [[ ! -f "$FEED" ]]; then
 echo "no feed file for $DATE" >&2
 exit 0
fi

# 1. Replay through the engine into the flow store
/opt/horizon/venv/bin/horizon flow scan \
 --feed-log="$FEED" \
 --db=/var/horizon/flow.db \
 --audit-db=/var/horizon/audit.db \
 --venue=polymarket

# 2. Report high-severity findings
/opt/horizon/venv/bin/horizon flow anomalies \
 --db=/var/horizon/flow.db \
 --since-hours=24 \
 --limit=1000 \
 | /opt/horizon/venv/bin/jq '[.[] | select(.severity | IN("high", "critical"))]' \
 > /tmp/flow_daily_$(date +%Y%m%d).json

# 3. Audit log integrity (separate from flow findings)
/opt/horizon/venv/bin/horizon audit verify --db=/var/horizon/audit.db

# 4. Notify if there are any high-severity findings
COUNT=$(/opt/horizon/venv/bin/jq 'length' < /tmp/flow_daily_$(date +%Y%m%d).json)
if [[ "$COUNT" -gt 0 ]]; then
 echo "{\"text\": \"$COUNT high-severity flow findings for $DATE\"}" \
 | curl -s -X POST -H 'Content-Type: application/json' \
 --data @- "$SLACK_WEBHOOK"
fi

Why a retrospective scan when you already have the live daemon? Two reasons:

  • Config changes. You tightened a threshold at 2pm; findings from before the change used the old threshold. Replay with the new config to re-score the whole day.
  • New detectors. You wrote a custom detector on Tuesday. Replay the week’s recorded feed to see what it would have caught.

Recipe 3: Slack / PagerDuty routing per category

Different findings go to different places. Spoofing during trading hours pages the desk; wash trading is an email for compliance review.

python
from datetime import datetime, timezone

from horizon.audit.events import AuditCategory, AuditSeverity
from horizon.observability.alerts import (
 SlackAlerter, EmailAlerter, PagerDutyAlerter,
 AuditLogAlertBridge, CompositeAlerter,
)

# Three routes
slack_desk = SlackAlerter(webhook_url=DESK_WEBHOOK, min_severity="warning")
email_compliance = EmailAlerter(
 smtp_host="smtp.example.com",
 from_addr="horizon@firm.com",
 to_addrs=["compliance@firm.com"],
 min_severity="notice",
)
pagerduty = PagerDutyAlerter(routing_key=PD_ROUTING_KEY, min_severity="critical")


# Route by category + severity via filter_fn
def is_desk_alert(event) -> bool:
 if event.category not in (AuditCategory.FlowAnomaly, AuditCategory.BotDetected):
 return False
 cat_value = (event.payload or {}).get("category", "")
 # Desk cares about immediate-action items during market hours
 hour = event.timestamp.astimezone(timezone.utc).hour
 is_market_hours = 13 <= hour <= 21
 return is_market_hours and cat_value in {
 "spoofing", "layering", "momentum_ignition", "quote_stuffing",
 }


def is_compliance_email(event) -> bool:
 if event.category not in (AuditCategory.FlowAnomaly, AuditCategory.BotDetected):
 return False
 cat_value = (event.payload or {}).get("category", "")
 return cat_value in {"wash_trade", "iceberg"}


def is_pagerduty(event) -> bool:
 if event.severity != AuditSeverity.Critical:
 return False
 return True


# Three bridges on the same audit log
AuditLogAlertBridge(alerter=slack_desk, filter_fn=is_desk_alert).bind(audit_log)
AuditLogAlertBridge(alerter=email_compliance, filter_fn=is_compliance_email).bind(audit_log)
AuditLogAlertBridge(alerter=pagerduty, filter_fn=is_pagerduty).bind(audit_log)

Rule of thumb for choosing the channel.

  • Slack (warning+). Anything actionable in the next 15 minutes.
  • Email (notice+). Retrospective review; compliance reads it at start of day.
  • PagerDuty (critical only). Things that wake someone up. Kill-switch fires, reconciliation mismatches, feed outages, highest-confidence bot detections. Over-paging is how on-call rotations break.

Recipe 4: debugging “why didn’t my detector fire?”

The single most common question. Works the same for every detector.

Step 1: did the event get in?

python
import horizon as hz

# engine.ingest_count should grow every time you call .ingest()
print(f"ingested: {engine.ingest_count}")

# If it's 0, the observer wiring didn't connect.
# Re-check: log.subscribe(FlowObserver(...)) or feed.on_tick(handler)

Step 2: is the event-kind right?

python
from horizon.flow.events import MarketEventKind

# Most detectors only fire on specific event_kinds.
# - spoofing requires OrderPlaced + OrderFilled + OrderCanceled from same actor
# - wash requires OrderFilled + TradeTape
# - iceberg requires BookSnapshot + OrderFilled
# ...

# Count what you're actually feeding the engine
from collections import Counter
kinds = Counter(ev.event_kind.value for ev in events_sent)
print(kinds)
# Counter({'trade.tape': 400, 'quote.update': 800, 'book.snapshot': 12, ...})

Many venues emit mostly QuoteUpdate; without BookSnapshot, iceberg detection has nothing to look at. Without OrderPlaced / OrderCanceled, spoofing and layering can’t fire either.

Step 3: are thresholds too tight?

python
from horizon.flow.config import FlowConfig, SpoofingConfig
from dataclasses import replace

# Loosen everything way down temporarily
cfg = FlowConfig()
cfg.detectors.spoofing = replace(
 cfg.detectors.spoofing,
 min_bait_size=1.0, # was 500
 cancel_window_ms=30_000.0, # was 2000
 min_book_imbalance=0.0, # was 0.3
 bait_to_aggressor_ratio=1.1, # was 5.0
)

Run the same events through an engine built with this loose config. If detections fire now, the original thresholds are too tight. Tighten back gradually.

Step 4: is the detector actually attached?

python
print([d.name for d in engine._detectors])
# ['spoofing', 'layering', 'quote_stuffing', 'wash_trade', 'momentum_ignition',
# 'iceberg', 'split_order']

If you built the engine via a custom constructor instead of make_default_engine, you may have omitted detectors. Add the missing one.

Step 5: is the detector seeing errors silently?

Engine swallows detector exceptions to keep ingest resilient. For debugging, monkey-patch to unwrap:

python
_orig = engine._run_detectors

def _debug_run(ctx, event):
 findings = []
 for d in engine._detectors:
 try:
 for f in d.detect([event], ctx) or ():
 findings.append(f)
 except Exception as e:
 print(f"detector {d.name} failed on {event.market_id}: {e}")
 raise
 return findings

engine._run_detectors = _debug_run

Revert after debugging. Leaving this in production removes the safety net.

Recipe 5: performance and scale

Flow is designed for Polymarket-ish cadence (tens of events per second). If you’re feeding it Alpaca consolidated tape (hundreds per second) or a multi-symbol Hyperliquid feed (thousands), a few notes.

SQLite write throughput

The default SQLiteFlowStore with PRAGMA synchronous=FULL does roughly 100-500 writes/s. Options above that:

python
# Option 1: synchronous=NORMAL: 3-5x faster, slightly less crash-safe
# (one committed write may be lost on OS crash, but file structure stays consistent)
store = SQLiteFlowStore("flow.db")
store._conn.execute("PRAGMA synchronous=NORMAL")
python
# Option 2: separate per-category stores
spoof_store = SQLiteFlowStore("flow_spoof.db")
wash_store = SQLiteFlowStore("flow_wash.db")
# ...then route findings to the right one in an on_finding callback
python
# Option 3: batch writes via a queue
from queue import Queue
from threading import Thread

q: Queue = Queue(maxsize=10_000)

def writer_thread(store, q):
 while True:
 batch = []
 while len(batch) < 100:
 try:
 batch.append(q.get(timeout=0.1))
 except:
 break
 if batch:
 with store._conn:
 for f in batch:
 store.write_anomaly(f)

engine.on_finding(q.put_nowait)
Thread(target=writer_thread, args=(store, q), daemon=True).start()

CPU

The default 7 detectors process an event in tens of microseconds each on a modern laptop. Bottlenecks are typically:

  • The MDX-parser in ingestion/polymarket.py. JSON parse is the biggest cost on live data. Use orjson if available.
  • PolicyFeatureExtractor.featurize. Called synchronously if you do runtime policy lookup from a strategy. Cache the featurized state per (actor, market, timestamp-truncated-to-second).
  • Hawkes MLE. Skip _mle via HawkesConfig.kernel_decay_s=0 equivalent; fall back to moments estimator always.

Measure before optimizing. engine.ingest_count + wall-clock timestamps bookending a replay gives you per-event throughput.

Sharding across venues

One engine per venue. Share nothing across engines; they each have their own store, audit log, and detector state.

python
engines = {
 "polymarket": make_default_engine(venue_name="polymarket", store_path="pm.db", ...),
 "kalshi": make_default_engine(venue_name="kalshi", store_path="kalshi.db", ...),
 "hyperliquid": make_default_engine(venue_name="hyperliquid", store_path="hl.db", ...),
}

Cross-venue attribution (v1.0) will join these; until then, keep them cleanly separated.

The production checklist

Before flipping a flow deployment live:

  • SQLiteFlowStore path is on a persistent volume, not /tmp.
  • AuditLog is using SQLiteSink (not InMemorySink).
  • Alerter webhook URL is resolved from Secrets, not hardcoded.
  • Ingestion source reconnects on disconnect (default behavior; verify in logs).
  • Nightly audit-chain verify is cron’d (horizon audit verify).
  • Daily flow-store integrity check (horizon flow verify).
  • At least one test synthetic pattern fires a finding through the full production path.
  • Slack / email alerter delivers a test message.
  • Blue-green deploy story: you know how to restart the daemon without losing events in flight (the audit log’s scan_from_audit lets you replay missed events).

Related

  • CLI. The commands referenced in scripts above.
  • Risk integration. Wiring flow findings into the trade loop.
  • How it works. Internals you need when the daemon is misbehaving.