CLI

horizon flow scan / profile / anomalies / cluster / reverse / verify. Operational commands for the flow module.

Every flow capability is accessible from the command line. The commands sit under horizon flow SUBCOMMAND and match the pattern of the existing audit CLI. All output is JSON so the commands pipe cleanly into jq or duckdb.

Run horizon flow --help for the full tree. The subcommands:

scan

Replay a recorded feed log (JSONL) through the default flow engine and write findings to the flow store. Optionally, also record into an existing audit log.

bash
horizon flow scan \
 --feed-log=polymarket_2026_04_19.jsonl \
 --db=flow.db \
 --venue=polymarket \
 --audit-db=audit.db

Output (JSON):

json
{
 "events_ingested": 14821,
 "db": "flow.db",
 "venue": "polymarket"
}

Useful for:

  • Regression testing. The same fixture should always produce the same findings (the module is deterministic with a seeded config).
  • Retrospective scans. A regulator requests “what did you observe between dates X and Y”; record the fixture covering that span, run scan, export.
  • End-of-day batch scoring. Schedule a nightly run against a rolling fixture.

profile

Dump one actor’s ActorProfile as JSON.

bash
horizon flow profile --db=flow.db --actor=0xabc... --venue=polymarket

Emits features, taxonomy probabilities, cluster assignment, last-updated sequence.

Returns exit code 1 and {"error": "not_found", "actor_id": "..."} if no profile exists. A missing profile is distinct from an empty one.

anomalies

List findings matching a filter. Any combination of --market, --actor, --category, --since-hours, --limit.

bash
horizon flow anomalies \
 --db=flow.db \
 --market=0xTRUMP_2024 \
 --category=spoofing \
 --since-hours=24 \
 --limit=50 \
 | jq '.[] | {detected_at, actor_id, confidence, message}'

Categories: spoofing, layering, quote_stuffing, wash_trade, momentum_ignition, iceberg, split_order, execution_algo_fingerprint, hft_cluster.

cluster

Show the most recent WalletCluster that contains a given actor.

bash
horizon flow cluster --db=flow.db --actor=0xabc...

Output includes the method that produced the cluster (behavioral, temporal, network, wallet_heuristic, or composite), the confidence, and the full actor list.

reverse

Fit or inspect a policy model. In v0.1, only the shadow path ships; --method=irl emits a polite defer message pointing to v0.3.

bash
horizon flow reverse --db=flow.db --actor=0xabc... --method=shadow

Output includes top_rule, holdout_accuracy, feature_names, and the full summary (SHAP / importances + decision-tree rules).

Returns exit code 1 if no policy has been fit for that actor. CLI-driven fitting lands in v0.2; for now, fit from a script using ShadowPolicyFitter directly and write to the store.

verify

Integrity report: row counts across the four tables.

bash
horizon flow verify --db=flow.db

Output:

json
{
 "ok": true,
 "counts": {
 "anomalies": 1428,
 "actor_profiles": 93,
 "wallet_clusters": 12,
 "policy_models": 4
 }
}

More thorough integrity (hash-chain verify) is the audit log’s job. Run horizon audit verify --db=audit.db to check the chain that contains flow findings.

replay

Alias for scan. Kept for clarity in test scripts and documentation.

bash
horizon flow replay --feed-log=fixture.jsonl --db=flow.db

Byte-identical output across runs given the same fixture and config. This is the regression-test pattern.

Wiring

The flow subcommand group is registered by horizon.flow.cli.register(parent_subparsers) from horizon.cli.build_parser(). If horizon.flow imports fail for any reason, the registration is skipped silently and the rest of the CLI still works. The flow module is opt-in at the CLI surface too.

Example: daily surveillance cadence

bash
# 1. Scan yesterday's recorded feed
horizon flow scan \
 --feed-log=/var/horizon/feeds/polymarket_$(date -d yesterday +%Y-%m-%d).jsonl \
 --db=/var/horizon/flow.db \
 --audit-db=/var/horizon/audit.db \
 --venue=polymarket

# 2. Any high-severity findings?
horizon flow anomalies \
 --db=/var/horizon/flow.db \
 --since-hours=24 \
 --limit=500 \
 | jq '[.[] | select(.severity=="high" or .severity=="critical")] | length'

# 3. Verify both stores
horizon flow verify --db=/var/horizon/flow.db
horizon audit verify --db=/var/horizon/audit.db

Wire the pipe to Slack / email via cron or a GitHub Actions scheduled workflow and the surveillance loop runs unattended.