Investigate: profile a wallet, export findings, build reports

How to use the flow store as a forensic tool. Profile a suspicious wallet, trace a bot's policy, export evidence for compliance or a regulator.

Investigation is the batch-mode, retrospective use of flow. Five recipes covering the common patterns: profile one actor deeply, slice findings for a market over time, fit a policy from scratch, build a cluster report, export for an outside reviewer.

All of these read from the flow store (and sometimes the audit log). None of them run the live engine. They work on the data it has already written.

Recipe 1: deep-dive one actor

You’ve noticed a wallet in findings and want to understand it fully.

python
import json

import horizon as hz
from horizon.flow import SQLiteFlowStore
from horizon.flow.store import _features_to_dict


def actor_report(actor_id: str, *, venue: str = "polymarket", store_path: str = "flow.db"):
 store = SQLiteFlowStore(store_path)

 # 1. Profile
 profile = store.get_profile(venue, actor_id)
 if profile is None:
 print(f"no profile for {actor_id}")
 return

 print(f"=== {actor_id} on {venue} ===\n")
 print(f"events observed: {profile.features.event_count}")
 print(f"first seen: {profile.features.first_seen}")
 print(f"last seen: {profile.features.last_seen}")

 print("\ntop 3 taxonomy categories:")
 for cat, p in sorted(profile.taxonomy_probs.items(), key=lambda kv: -kv[1])[:3]:
 print(f" {cat:25s} {p:.2%}")

 f = profile.features
 print(f"\nbehavioral fingerprint:")
 print(f" order-to-trade ratio: {f.order_to_trade_ratio}")
 print(f" cancel-before-fill rate: {f.cancel_before_fill_rate}")
 print(f" median time-to-cancel: {f.median_time_to_cancel_ms} ms")
 print(f" maker ratio: {f.maker_ratio}")
 print(f" median inter-arrival: {f.inter_arrival_median_s} s")
 print(f" Hawkes branching proxy: {f.hawkes_branching_ratio}")
 print(f" markets traded (entropy): {f.market_entropy_bits} bits")

 if f.gas_price_mode_wei is not None:
 print(f"\non-chain (Polymarket):")
 print(f" modal gas price: {f.gas_price_mode_wei} wei")
 print(f" gas price std: {f.gas_price_std_wei}")
 print(f" nonce cadence CV: {f.nonce_cadence_cv}")

 # 2. Findings
 findings = store.list_anomalies(actor_id=actor_id, limit=100)
 print(f"\n{len(findings)} findings on this actor:")
 for fd in findings:
 print(f" {fd.detected_at.isoformat()[:19]} "
 f"{fd.category.value:20s} conf={fd.confidence:.2f} "
 f"{fd.market_id}")

 # 3. Cluster
 cluster = store.cluster_of(actor_id)
 if cluster:
 print(f"\ncluster: {cluster.cluster_id} ({cluster.method.value}, "
 f"confidence {cluster.confidence:.2f})")
 print(f" {len(cluster.actor_ids)} actors in group")
 for a in cluster.actor_ids[:10]:
 print(f" {a}")

 # 4. Policy
 policy = store.latest_policy(actor_id)
 if policy:
 print(f"\nlatest policy ({policy.method.value}):")
 print(f" trajectories used: {policy.trajectories_used}")
 print(f" holdout accuracy: {policy.holdout_accuracy}")
 print(f" top rule: {policy.top_rule}")
 if "rules" in policy.summary:
 print(f" more rules:")
 for r in policy.summary["rules"][:5]:
 print(f" - {r['description']} (support={r['support']})")

 store.close()


actor_report("0xabc123...")

Output:

text
=== 0xabc123... on polymarket ===

events observed: 1420
first seen: 2026-04-15 09:00:12+00:00
last seen: 2026-04-20 08:45:03+00:00

top 3 taxonomy categories:
 hft 67.30%
 opportunistic 21.10%
 intermediary 9.40%

behavioral fingerprint:
 order-to-trade ratio: 18.3
 cancel-before-fill rate: 0.92
 median time-to-cancel: 85.0 ms
 maker ratio: 0.55
 median inter-arrival: 0.8 s
 Hawkes branching proxy: 0.68
 markets traded (entropy): 2.7 bits

on-chain (Polymarket):
 modal gas price: 30000000000 wei
 gas price std: 1250000.0
 nonce cadence CV: 0.15

12 findings on this actor:
 2026-04-19 14:22:11 spoofing conf=0.87 0xTRUMP_2024
 2026-04-19 11:05:33 layering conf=0.71 0xBIDEN_...
 ...

cluster: bcl_a1b2c3 (behavioral, confidence 0.82)
 3 actors in group
 0xabc123...
 0xdef456...
 0xghi789...

latest policy (shadow_gbdt):
 trajectories used: 520
 holdout accuracy: 0.88
 top rule: buy when ofi_5s > 0.3 AND spread_bps ≤ 10
 more rules:
 - buy when ofi_5s > 0.3 AND spread_bps ≤ 10 (support=210)
 - sell when ofi_5s ≤ -0.3 AND depth_imbalance ≤ -0.2 (support=185)
 ...

This is the single most useful investigation tool. One wallet, all the evidence in one view. Pipe it to a file and attach to any compliance memo.

Recipe 2: all findings in a market over time

The market-level view. Answers “what happened on this market last week.”

python
from datetime import datetime, timedelta, timezone
from collections import Counter

import horizon as hz


def market_report(market_id: str, *, days: int = 7):
 since = datetime.now(timezone.utc) - timedelta(days=days)
 findings = hz.flow.anomalies(market_id=market_id, since=since, limit=10_000)

 print(f"=== {market_id} last {days} days ===\n")
 print(f"{len(findings)} findings total")

 by_category = Counter(f.category.value for f in findings)
 print(f"\nby category:")
 for cat, n in by_category.most_common():
 print(f" {cat:25s} {n:>5d}")

 by_severity = Counter(f.severity.value for f in findings)
 print(f"\nby severity:")
 for sev, n in by_severity.most_common():
 print(f" {sev:10s} {n:>5d}")

 by_actor = Counter(f.actor_id for f in findings if f.actor_id)
 print(f"\ntop 10 actors:")
 for actor, n in by_actor.most_common(10):
 print(f" {actor:45s} {n:>4d}")

 # Highest-severity items worth reading in full
 critical = [f for f in findings if f.severity.value in ("high", "critical")]
 print(f"\n{len(critical)} high+critical findings:")
 for f in critical[:20]:
 print(f" {f.detected_at.isoformat()[:19]} "
 f"{f.category.value:20s} conf={f.confidence:.2f}")
 print(f" actor={f.actor_id} msg={f.message[:80]}")


market_report("0xTRUMP_2024", days=7)

Use cases:

  • Pre-trade diligence. Before entering a new market, see what surveillance has flagged in it recently.
  • Post-mortem. After a bad trade, see what the surrounding flow looked like.
  • Market comparison. Run this against two markets you consider interchangeable. The one with less toxic flow is the better venue.

Recipe 3: fit a shadow policy for one actor from scratch

You’ve observed an actor long enough and want to reverse-engineer their policy. The engine doesn’t fit policies automatically (expensive, and most actors don’t warrant it); you kick it off explicitly when you want it.

python
import horizon as hz
from horizon.flow import SQLiteFlowStore
from horizon.flow.config import FlowConfig
from horizon.flow.policy.features import PolicyFeatureExtractor, FEATURE_NAMES
from horizon.flow.policy.shadow import ShadowPolicyFitter


def fit_policy_for_actor(
 actor_id: str,
 *,
 venue: str = "polymarket",
 feed_log_path: str, # JSONL recorded feed
 store_path: str = "flow.db",
):
 from horizon.flow.ingestion.replay import ReplayFlowSource

 # 1. Build the feature extractor; observe the whole feed
 feat = PolicyFeatureExtractor()
 trajectories: list[tuple[dict, str]] = []

 def collect(ev):
 feat.observe(ev)
 if ev.actor_id == actor_id and ev.side in ("buy", "sell"):
 state = feat.featurize(actor_id=actor_id, market_id=ev.market_id, now=ev.timestamp)
 trajectories.append((state, ev.side))

 source = ReplayFlowSource(path=feed_log_path)
 source.on_event(collect)
 source.connect()

 print(f"collected {len(trajectories)} (state, action) pairs for {actor_id}")
 if len(trajectories) < FlowConfig().policy.policy_min_events:
 print(f"below floor of {FlowConfig().policy.policy_min_events}. Refusing to fit")
 return

 # 2. Fit
 fitter = ShadowPolicyFitter(FlowConfig())
 model = fitter.fit(
 actor_id=actor_id,
 trajectories=trajectories,
 feature_names=FEATURE_NAMES,
 )
 if model is None:
 print("fit returned None. Check logs")
 return

 # 3. Persist
 store = SQLiteFlowStore(store_path)
 store.write_policy(model)
 store.close()

 print(f"\nfitted policy {model.policy_id}")
 print(f"holdout accuracy: {model.holdout_accuracy:.2%}")
 print(f"top rule: {model.top_rule}")
 print(f"top 5 features:")
 for entry in (model.summary.get("top_features") or [])[:5]:
 key = "mean_abs_shap" if "mean_abs_shap" in entry else "importance"
 print(f" {entry['feature']:25s} {entry.get(key, 0):.3f}")


fit_policy_for_actor("0xabc...", feed_log_path="polymarket_2026_04.jsonl")

What to do with the output.

  • holdout_accuracy >= 0.85 and top_rule involves a single obvious feature. You have a well-characterized bot. Reference the rule in your strategy (Alpha recipe 4).
  • holdout_accuracy in [0.65, 0.85]. The bot’s behavior is partially predictable. Useful as a regime signal, not a directional one.
  • holdout_accuracy < 0.65. You don’t have a good model of this actor. Re-examine feature set or accept they trade on signals you can’t observe.

Recipe 4: build a wallet-cluster report

Cluster assignments are one of the most useful outputs for compliance: “this wallet group appears to act as one entity.” Export the full picture.

python
import horizon as hz
from horizon.flow import SQLiteFlowStore


def cluster_report(cluster_id: str, *, store_path: str = "flow.db"):
 store = SQLiteFlowStore(store_path)
 cluster = store.get_cluster(cluster_id)
 if cluster is None:
 print(f"no cluster {cluster_id}")
 return

 print(f"=== {cluster_id} ({cluster.method.value}) ===")
 print(f"formed: {cluster.formed_at.isoformat()}")
 print(f"confidence: {cluster.confidence:.2f}")
 print(f"venue: {cluster.venue_name or '(cross-venue)'}")
 print(f"evidence: {cluster.evidence}")

 print(f"\n{len(cluster.actor_ids)} member wallets:")
 for actor in cluster.actor_ids:
 profile = store.get_profile(cluster.venue_name, actor)
 n_findings = len(store.list_anomalies(actor_id=actor, limit=100_000))
 if profile:
 top = max(profile.taxonomy_probs.items(), key=lambda kv: kv[1], default=(None, 0))
 print(f" {actor}")
 print(f" events={profile.features.event_count} "
 f"top_category={top[0]} ({top[1]:.0%}) "
 f"findings={n_findings}")
 else:
 print(f" {actor} (no profile yet)")

 # Aggregate finding categories across the cluster
 from collections import Counter
 all_findings = []
 for a in cluster.actor_ids:
 all_findings.extend(store.list_anomalies(actor_id=a, limit=10_000))
 by_cat = Counter(f.category.value for f in all_findings)
 print(f"\naggregate findings across cluster:")
 for cat, n in by_cat.most_common():
 print(f" {cat:25s} {n:>5d}")

 store.close()


cluster_report("bcl_a1b2c3...")

Hand this report to compliance as the artifact for why you believe a group of wallets represents a single entity. The evidence dict on the cluster carries the method-specific justification (HDBSCAN cluster label, Louvain community ID, common-input transaction hash). Auditable back to the source.

Recipe 5: export findings for a regulator request

Regulators sometimes ask: “show us what you observed in market X between dates Y and Z.” Here’s the direct response.

python
import csv
import json
from datetime import datetime, timezone

import horizon as hz
from horizon.flow import SQLiteFlowStore


def regulator_export(
 *,
 market_id: str | None = None,
 since: datetime,
 until: datetime,
 out_path: str,
 store_path: str = "flow.db",
):
 store = SQLiteFlowStore(store_path)
 findings = []
 if market_id:
 findings = store.list_anomalies(market_id=market_id, since=since, until=until, limit=1_000_000)
 else:
 for f in store.iter_anomalies(since=since, until=until):
 findings.append(f)

 with open(out_path, "w", newline="") as fh:
 writer = csv.writer(fh)
 writer.writerow([
 "finding_id", "detected_at", "detector_name", "category",
 "severity", "market_id", "venue_name", "actor_id", "cluster_id",
 "confidence", "score", "message", "citation", "evidence",
 "audit_event_id",
 ])
 for f in findings:
 writer.writerow([
 f.finding_id,
 f.detected_at.isoformat(),
 f.detector_name,
 f.category.value,
 f.severity.value,
 f.market_id,
 f.venue_name,
 f.actor_id or "",
 f.cluster_id or "",
 f"{f.confidence:.4f}",
 f"{f.score:.4f}",
 f.message,
 f.citation,
 json.dumps(f.evidence, default=str),
 f.audit_event_id or "",
 ])

 store.close()
 print(f"wrote {len(findings)} rows to {out_path}")


regulator_export(
 market_id="0xTRUMP_2024",
 since=datetime(2026, 4, 1, tzinfo=timezone.utc),
 until=datetime(2026, 4, 30, tzinfo=timezone.utc),
 out_path="/tmp/findings_april.csv",
)

Pair with the audit-log export. horizon audit export --db=audit.db --since=2026-04-01 --until=2026-04-30 --format=json gets you the hash-chained side; the CSV above gets you the flow-specific findings. Together they’re the complete picture. Include the hash chain’s last anchor so a reviewer can verify the audit log hasn’t been altered since the events were recorded.

Using the CLI for ad-hoc queries

Most of these recipes can be done one-off from the CLI when you don’t need to script. See CLI reference for the full surface.

bash
# Quick finding listing
horizon flow anomalies --db=flow.db --market=0xTRUMP_2024 --since-hours=72 --limit=200 \
 | jq '.[] | {detected_at, actor_id, category, confidence, message}'

# Wallet profile dump
horizon flow profile --db=flow.db --actor=0xabc... --venue=polymarket | jq

# Cluster lookup
horizon flow cluster --db=flow.db --actor=0xabc...

# Latest fitted policy
horizon flow reverse --db=flow.db --actor=0xabc... --method=shadow | jq '.top_rule, .holdout_accuracy'

The principle

Investigation is about TRACING. Following a single question from its source to a defensible conclusion. Each recipe here is a query that answers ONE kind of question. Compose them: start with recipe 2 (market overview), drill into a suspect with recipe 1, confirm or refute a group hypothesis with recipe 4, fit a policy for the worst actor with recipe 3, export the evidence pack with recipe 5.

Every output carries a citation and an audit_event_id. The reader can follow the breadcrumb all the way to the hash-chained audit record. That’s what makes these outputs defensible, not just informative.

Related