Risk & alert integration
FlowAnomalyCheck rejects orders in markets with active flow findings; AuditLogAlertBridge routes findings to Slack / email / PagerDuty.
Findings are only useful if the trading loop can act on them. Two integration points:
- Pre-trade rejection.
FlowAnomalyCheckplugs intoRiskConfig.extra_checksand rejects orders in markets with active anomalies above a severity + confidence threshold. - Operational alerts. Findings emit into the audit log via new
AuditCategoryvalues, which the existingAuditLogAlertBridgeroutes to Slack, email, PagerDuty, or Twilio.
No core SDK modification in either direction. Both seams already exist and this module just wires into them.
FlowAnomalyCheck
from horizon.flow import SQLiteFlowStore, FlowAnomalyCheck
from horizon.flow.events import AnomalyCategory
import horizon as hz
store = SQLiteFlowStore("flow.db")
check = FlowAnomalyCheck(
store=store,
categories={
AnomalyCategory.Spoofing,
AnomalyCategory.Layering,
AnomalyCategory.WashTrade,
AnomalyCategory.MomentumIgnition,
},
lookback_s=60.0,
min_severity="medium", # "low" | "medium" | "high" | "critical"
min_confidence=0.7,
)
risk = hz.risk.RiskConfig(
# ...existing risk config...
extra_checks=[check],
)
hz.run(mode="live", risk=risk, ...)
Decision logic
For each pre-trade OrderAction:
- Query the flow store for findings on
action.market_id, within the lastlookback_sseconds. - Filter by
categories(the set passed in) and bymin_severity+min_confidence. - Any match →
Decision.reject(reason="flow anomaly active: ..."). - No match →
Decision.pass_().
Why conservative by design
The check only blocks trades in markets actively flagged right now. A finding from yesterday does not permanently blacklist a market. That would be too brittle. The lookback_s=60 default matches the detectors’ short-window focus: if no fresh finding is within a minute, the check assumes the situation resolved.
Tighten to lookback_s=300 for more-cautious strategies or loosen to lookback_s=10 for latency-sensitive ones.
Behavior on store failure
If the store query raises an exception, the check returns Decision.pass_() rather than Decision.reject(...). A store outage must NOT block all trading. That would amplify one problem into another. Operationally, monitor the flow store’s health separately; the trade loop degrades gracefully.
What the rejection looks like in audit
Because FlowAnomalyCheck sits inside the risk pipeline, a rejection emits the usual AuditCategory.RiskDecision event with the extra_check’s reason string. The rejection trail is exactly what a compliance auditor expects: pre-trade control fires, reason recorded, no order sent.
AuditLogAlertBridge: routing findings to humans
Findings emit to the audit log via FlowAnomaly, BotDetected, ActorProfiled, ClusterAssigned, PolicyInferred. The existing alerter subsystem routes audit events to Slack / email / PagerDuty / Twilio based on a categories filter.
from horizon.audit import AuditCategory, AuditLog, SQLiteSink
from horizon.observability.alerts import (
SlackAlerter,
AuditLogAlertBridge,
DEFAULT_ALERT_CATEGORIES,
)
log = AuditLog(sink=SQLiteSink("audit.db"))
# Expand the default page-worthy set to include high-severity flow events
flow_page = frozenset(DEFAULT_ALERT_CATEGORIES | {
AuditCategory.BotDetected,
AuditCategory.FlowAnomaly,
})
bridge = AuditLogAlertBridge(
alerter=SlackAlerter(min_severity="warning"),
categories=flow_page,
min_severity="warning",
)
bridge.bind(log)
The flow engine records FlowAnomaly events at Info / Notice / Warning / Critical severity mapped from the finding’s AnomalySeverity. Pair that with the bridge’s min_severity="warning" to keep pages high-signal (only High + Critical findings page; Medium and Low stay in the audit log for retrospective review).
Engine-level filter
FlowEngine also enforces its own paging filter via FlowConfig.alerts:
from horizon.flow.config import AlertConfig, FlowConfig
cfg = FlowConfig(alerts=AlertConfig(
min_confidence_for_alert=0.8,
min_severity_for_alert="high",
categories_paged=frozenset({"spoofing", "wash_trade"}),
))
This is applied in the engine before the alerter gets the event, so the alerter never sees low-signal noise. Tune it to taste. The audit log still receives every finding regardless of paging.
Composite example
import horizon as hz
from horizon.audit import AuditLog, SQLiteSink
from horizon.flow import (
SQLiteFlowStore, make_default_engine, FlowAnomalyCheck,
)
from horizon.flow.events import AnomalyCategory
from horizon.observability.alerts import (
SlackAlerter, AuditLogAlertBridge, DEFAULT_ALERT_CATEGORIES,
)
# Stores
audit_log = AuditLog(sink=SQLiteSink("audit.db"))
flow_store = SQLiteFlowStore("flow.db")
# Flow engine wired to audit + alerter
alerter = SlackAlerter(min_severity="warning")
engine = make_default_engine(
venue_name="polymarket",
store_path="flow.db",
audit_log=audit_log,
alerter=alerter,
)
# Alert bridge (routes selected audit events to Slack)
bridge = AuditLogAlertBridge(
alerter=alerter,
categories=frozenset(DEFAULT_ALERT_CATEGORIES | {
AnomalyCategory.Spoofing,
AnomalyCategory.WashTrade,
}),
)
bridge.bind(audit_log)
# Risk integration: reject orders in markets with active findings
flow_check = FlowAnomalyCheck(store=flow_store, min_severity="high")
hz.run(
mode="live",
audit_log=audit_log,
risk=hz.risk.RiskConfig(extra_checks=[flow_check], ...),
# ...rest of pipeline...
)
Two sources of information (engine’s own alerter + audit bridge), one risk integration, everything converging into one hash-chained audit log and one queryable flow store.