Risk & alert integration

FlowAnomalyCheck rejects orders in markets with active flow findings; AuditLogAlertBridge routes findings to Slack / email / PagerDuty.

Findings are only useful if the trading loop can act on them. Two integration points:

  • Pre-trade rejection. FlowAnomalyCheck plugs into RiskConfig.extra_checks and rejects orders in markets with active anomalies above a severity + confidence threshold.
  • Operational alerts. Findings emit into the audit log via new AuditCategory values, which the existing AuditLogAlertBridge routes to Slack, email, PagerDuty, or Twilio.

No core SDK modification in either direction. Both seams already exist and this module just wires into them.

FlowAnomalyCheck

python
from horizon.flow import SQLiteFlowStore, FlowAnomalyCheck
from horizon.flow.events import AnomalyCategory
import horizon as hz

store = SQLiteFlowStore("flow.db")

check = FlowAnomalyCheck(
 store=store,
 categories={
 AnomalyCategory.Spoofing,
 AnomalyCategory.Layering,
 AnomalyCategory.WashTrade,
 AnomalyCategory.MomentumIgnition,
 },
 lookback_s=60.0,
 min_severity="medium", # "low" | "medium" | "high" | "critical"
 min_confidence=0.7,
)

risk = hz.risk.RiskConfig(
 # ...existing risk config...
 extra_checks=[check],
)

hz.run(mode="live", risk=risk, ...)

Decision logic

For each pre-trade OrderAction:

  1. Query the flow store for findings on action.market_id, within the last lookback_s seconds.
  2. Filter by categories (the set passed in) and by min_severity + min_confidence.
  3. Any match → Decision.reject(reason="flow anomaly active: ...").
  4. No match → Decision.pass_().

Why conservative by design

The check only blocks trades in markets actively flagged right now. A finding from yesterday does not permanently blacklist a market. That would be too brittle. The lookback_s=60 default matches the detectors’ short-window focus: if no fresh finding is within a minute, the check assumes the situation resolved.

Tighten to lookback_s=300 for more-cautious strategies or loosen to lookback_s=10 for latency-sensitive ones.

Behavior on store failure

If the store query raises an exception, the check returns Decision.pass_() rather than Decision.reject(...). A store outage must NOT block all trading. That would amplify one problem into another. Operationally, monitor the flow store’s health separately; the trade loop degrades gracefully.

What the rejection looks like in audit

Because FlowAnomalyCheck sits inside the risk pipeline, a rejection emits the usual AuditCategory.RiskDecision event with the extra_check’s reason string. The rejection trail is exactly what a compliance auditor expects: pre-trade control fires, reason recorded, no order sent.

AuditLogAlertBridge: routing findings to humans

Findings emit to the audit log via FlowAnomaly, BotDetected, ActorProfiled, ClusterAssigned, PolicyInferred. The existing alerter subsystem routes audit events to Slack / email / PagerDuty / Twilio based on a categories filter.

python
from horizon.audit import AuditCategory, AuditLog, SQLiteSink
from horizon.observability.alerts import (
 SlackAlerter,
 AuditLogAlertBridge,
 DEFAULT_ALERT_CATEGORIES,
)

log = AuditLog(sink=SQLiteSink("audit.db"))

# Expand the default page-worthy set to include high-severity flow events
flow_page = frozenset(DEFAULT_ALERT_CATEGORIES | {
 AuditCategory.BotDetected,
 AuditCategory.FlowAnomaly,
})

bridge = AuditLogAlertBridge(
 alerter=SlackAlerter(min_severity="warning"),
 categories=flow_page,
 min_severity="warning",
)
bridge.bind(log)

The flow engine records FlowAnomaly events at Info / Notice / Warning / Critical severity mapped from the finding’s AnomalySeverity. Pair that with the bridge’s min_severity="warning" to keep pages high-signal (only High + Critical findings page; Medium and Low stay in the audit log for retrospective review).

Engine-level filter

FlowEngine also enforces its own paging filter via FlowConfig.alerts:

python
from horizon.flow.config import AlertConfig, FlowConfig

cfg = FlowConfig(alerts=AlertConfig(
 min_confidence_for_alert=0.8,
 min_severity_for_alert="high",
 categories_paged=frozenset({"spoofing", "wash_trade"}),
))

This is applied in the engine before the alerter gets the event, so the alerter never sees low-signal noise. Tune it to taste. The audit log still receives every finding regardless of paging.

Composite example

python
import horizon as hz
from horizon.audit import AuditLog, SQLiteSink
from horizon.flow import (
 SQLiteFlowStore, make_default_engine, FlowAnomalyCheck,
)
from horizon.flow.events import AnomalyCategory
from horizon.observability.alerts import (
 SlackAlerter, AuditLogAlertBridge, DEFAULT_ALERT_CATEGORIES,
)

# Stores
audit_log = AuditLog(sink=SQLiteSink("audit.db"))
flow_store = SQLiteFlowStore("flow.db")

# Flow engine wired to audit + alerter
alerter = SlackAlerter(min_severity="warning")
engine = make_default_engine(
 venue_name="polymarket",
 store_path="flow.db",
 audit_log=audit_log,
 alerter=alerter,
)

# Alert bridge (routes selected audit events to Slack)
bridge = AuditLogAlertBridge(
 alerter=alerter,
 categories=frozenset(DEFAULT_ALERT_CATEGORIES | {
 AnomalyCategory.Spoofing,
 AnomalyCategory.WashTrade,
 }),
)
bridge.bind(audit_log)

# Risk integration: reject orders in markets with active findings
flow_check = FlowAnomalyCheck(store=flow_store, min_severity="high")

hz.run(
 mode="live",
 audit_log=audit_log,
 risk=hz.risk.RiskConfig(extra_checks=[flow_check], ...),
 # ...rest of pipeline...
)

Two sources of information (engine’s own alerter + audit bridge), one risk integration, everything converging into one hash-chained audit log and one queryable flow store.