Investigate: profile a wallet, export findings, build reports
How to use the flow store as a forensic tool. Profile a suspicious wallet, trace a bot's policy, export evidence for compliance or a regulator.
Investigation is the batch-mode, retrospective use of flow. Five recipes covering the common patterns: profile one actor deeply, slice findings for a market over time, fit a policy from scratch, build a cluster report, export for an outside reviewer.
All of these read from the flow store (and sometimes the audit log). None of them run the live engine. They work on the data it has already written.
Recipe 1: deep-dive one actor
You’ve noticed a wallet in findings and want to understand it fully.
import json
import horizon as hz
from horizon.flow import SQLiteFlowStore
from horizon.flow.store import _features_to_dict
def actor_report(actor_id: str, *, venue: str = "polymarket", store_path: str = "flow.db"):
store = SQLiteFlowStore(store_path)
# 1. Profile
profile = store.get_profile(venue, actor_id)
if profile is None:
print(f"no profile for {actor_id}")
return
print(f"=== {actor_id} on {venue} ===\n")
print(f"events observed: {profile.features.event_count}")
print(f"first seen: {profile.features.first_seen}")
print(f"last seen: {profile.features.last_seen}")
print("\ntop 3 taxonomy categories:")
for cat, p in sorted(profile.taxonomy_probs.items(), key=lambda kv: -kv[1])[:3]:
print(f" {cat:25s} {p:.2%}")
f = profile.features
print(f"\nbehavioral fingerprint:")
print(f" order-to-trade ratio: {f.order_to_trade_ratio}")
print(f" cancel-before-fill rate: {f.cancel_before_fill_rate}")
print(f" median time-to-cancel: {f.median_time_to_cancel_ms} ms")
print(f" maker ratio: {f.maker_ratio}")
print(f" median inter-arrival: {f.inter_arrival_median_s} s")
print(f" Hawkes branching proxy: {f.hawkes_branching_ratio}")
print(f" markets traded (entropy): {f.market_entropy_bits} bits")
if f.gas_price_mode_wei is not None:
print(f"\non-chain (Polymarket):")
print(f" modal gas price: {f.gas_price_mode_wei} wei")
print(f" gas price std: {f.gas_price_std_wei}")
print(f" nonce cadence CV: {f.nonce_cadence_cv}")
# 2. Findings
findings = store.list_anomalies(actor_id=actor_id, limit=100)
print(f"\n{len(findings)} findings on this actor:")
for fd in findings:
print(f" {fd.detected_at.isoformat()[:19]} "
f"{fd.category.value:20s} conf={fd.confidence:.2f} "
f"{fd.market_id}")
# 3. Cluster
cluster = store.cluster_of(actor_id)
if cluster:
print(f"\ncluster: {cluster.cluster_id} ({cluster.method.value}, "
f"confidence {cluster.confidence:.2f})")
print(f" {len(cluster.actor_ids)} actors in group")
for a in cluster.actor_ids[:10]:
print(f" {a}")
# 4. Policy
policy = store.latest_policy(actor_id)
if policy:
print(f"\nlatest policy ({policy.method.value}):")
print(f" trajectories used: {policy.trajectories_used}")
print(f" holdout accuracy: {policy.holdout_accuracy}")
print(f" top rule: {policy.top_rule}")
if "rules" in policy.summary:
print(f" more rules:")
for r in policy.summary["rules"][:5]:
print(f" - {r['description']} (support={r['support']})")
store.close()
actor_report("0xabc123...")
Output:
=== 0xabc123... on polymarket ===
events observed: 1420
first seen: 2026-04-15 09:00:12+00:00
last seen: 2026-04-20 08:45:03+00:00
top 3 taxonomy categories:
hft 67.30%
opportunistic 21.10%
intermediary 9.40%
behavioral fingerprint:
order-to-trade ratio: 18.3
cancel-before-fill rate: 0.92
median time-to-cancel: 85.0 ms
maker ratio: 0.55
median inter-arrival: 0.8 s
Hawkes branching proxy: 0.68
markets traded (entropy): 2.7 bits
on-chain (Polymarket):
modal gas price: 30000000000 wei
gas price std: 1250000.0
nonce cadence CV: 0.15
12 findings on this actor:
2026-04-19 14:22:11 spoofing conf=0.87 0xTRUMP_2024
2026-04-19 11:05:33 layering conf=0.71 0xBIDEN_...
...
cluster: bcl_a1b2c3 (behavioral, confidence 0.82)
3 actors in group
0xabc123...
0xdef456...
0xghi789...
latest policy (shadow_gbdt):
trajectories used: 520
holdout accuracy: 0.88
top rule: buy when ofi_5s > 0.3 AND spread_bps ≤ 10
more rules:
- buy when ofi_5s > 0.3 AND spread_bps ≤ 10 (support=210)
- sell when ofi_5s ≤ -0.3 AND depth_imbalance ≤ -0.2 (support=185)
...
This is the single most useful investigation tool. One wallet, all the evidence in one view. Pipe it to a file and attach to any compliance memo.
Recipe 2: all findings in a market over time
The market-level view. Answers “what happened on this market last week.”
from datetime import datetime, timedelta, timezone
from collections import Counter
import horizon as hz
def market_report(market_id: str, *, days: int = 7):
since = datetime.now(timezone.utc) - timedelta(days=days)
findings = hz.flow.anomalies(market_id=market_id, since=since, limit=10_000)
print(f"=== {market_id} last {days} days ===\n")
print(f"{len(findings)} findings total")
by_category = Counter(f.category.value for f in findings)
print(f"\nby category:")
for cat, n in by_category.most_common():
print(f" {cat:25s} {n:>5d}")
by_severity = Counter(f.severity.value for f in findings)
print(f"\nby severity:")
for sev, n in by_severity.most_common():
print(f" {sev:10s} {n:>5d}")
by_actor = Counter(f.actor_id for f in findings if f.actor_id)
print(f"\ntop 10 actors:")
for actor, n in by_actor.most_common(10):
print(f" {actor:45s} {n:>4d}")
# Highest-severity items worth reading in full
critical = [f for f in findings if f.severity.value in ("high", "critical")]
print(f"\n{len(critical)} high+critical findings:")
for f in critical[:20]:
print(f" {f.detected_at.isoformat()[:19]} "
f"{f.category.value:20s} conf={f.confidence:.2f}")
print(f" actor={f.actor_id} msg={f.message[:80]}")
market_report("0xTRUMP_2024", days=7)
Use cases:
- Pre-trade diligence. Before entering a new market, see what surveillance has flagged in it recently.
- Post-mortem. After a bad trade, see what the surrounding flow looked like.
- Market comparison. Run this against two markets you consider interchangeable. The one with less toxic flow is the better venue.
Recipe 3: fit a shadow policy for one actor from scratch
You’ve observed an actor long enough and want to reverse-engineer their policy. The engine doesn’t fit policies automatically (expensive, and most actors don’t warrant it); you kick it off explicitly when you want it.
import horizon as hz
from horizon.flow import SQLiteFlowStore
from horizon.flow.config import FlowConfig
from horizon.flow.policy.features import PolicyFeatureExtractor, FEATURE_NAMES
from horizon.flow.policy.shadow import ShadowPolicyFitter
def fit_policy_for_actor(
actor_id: str,
*,
venue: str = "polymarket",
feed_log_path: str, # JSONL recorded feed
store_path: str = "flow.db",
):
from horizon.flow.ingestion.replay import ReplayFlowSource
# 1. Build the feature extractor; observe the whole feed
feat = PolicyFeatureExtractor()
trajectories: list[tuple[dict, str]] = []
def collect(ev):
feat.observe(ev)
if ev.actor_id == actor_id and ev.side in ("buy", "sell"):
state = feat.featurize(actor_id=actor_id, market_id=ev.market_id, now=ev.timestamp)
trajectories.append((state, ev.side))
source = ReplayFlowSource(path=feed_log_path)
source.on_event(collect)
source.connect()
print(f"collected {len(trajectories)} (state, action) pairs for {actor_id}")
if len(trajectories) < FlowConfig().policy.policy_min_events:
print(f"below floor of {FlowConfig().policy.policy_min_events}. Refusing to fit")
return
# 2. Fit
fitter = ShadowPolicyFitter(FlowConfig())
model = fitter.fit(
actor_id=actor_id,
trajectories=trajectories,
feature_names=FEATURE_NAMES,
)
if model is None:
print("fit returned None. Check logs")
return
# 3. Persist
store = SQLiteFlowStore(store_path)
store.write_policy(model)
store.close()
print(f"\nfitted policy {model.policy_id}")
print(f"holdout accuracy: {model.holdout_accuracy:.2%}")
print(f"top rule: {model.top_rule}")
print(f"top 5 features:")
for entry in (model.summary.get("top_features") or [])[:5]:
key = "mean_abs_shap" if "mean_abs_shap" in entry else "importance"
print(f" {entry['feature']:25s} {entry.get(key, 0):.3f}")
fit_policy_for_actor("0xabc...", feed_log_path="polymarket_2026_04.jsonl")
What to do with the output.
holdout_accuracy >= 0.85andtop_ruleinvolves a single obvious feature. You have a well-characterized bot. Reference the rule in your strategy (Alpha recipe 4).holdout_accuracyin[0.65, 0.85]. The bot’s behavior is partially predictable. Useful as a regime signal, not a directional one.holdout_accuracy < 0.65. You don’t have a good model of this actor. Re-examine feature set or accept they trade on signals you can’t observe.
Recipe 4: build a wallet-cluster report
Cluster assignments are one of the most useful outputs for compliance: “this wallet group appears to act as one entity.” Export the full picture.
import horizon as hz
from horizon.flow import SQLiteFlowStore
def cluster_report(cluster_id: str, *, store_path: str = "flow.db"):
store = SQLiteFlowStore(store_path)
cluster = store.get_cluster(cluster_id)
if cluster is None:
print(f"no cluster {cluster_id}")
return
print(f"=== {cluster_id} ({cluster.method.value}) ===")
print(f"formed: {cluster.formed_at.isoformat()}")
print(f"confidence: {cluster.confidence:.2f}")
print(f"venue: {cluster.venue_name or '(cross-venue)'}")
print(f"evidence: {cluster.evidence}")
print(f"\n{len(cluster.actor_ids)} member wallets:")
for actor in cluster.actor_ids:
profile = store.get_profile(cluster.venue_name, actor)
n_findings = len(store.list_anomalies(actor_id=actor, limit=100_000))
if profile:
top = max(profile.taxonomy_probs.items(), key=lambda kv: kv[1], default=(None, 0))
print(f" {actor}")
print(f" events={profile.features.event_count} "
f"top_category={top[0]} ({top[1]:.0%}) "
f"findings={n_findings}")
else:
print(f" {actor} (no profile yet)")
# Aggregate finding categories across the cluster
from collections import Counter
all_findings = []
for a in cluster.actor_ids:
all_findings.extend(store.list_anomalies(actor_id=a, limit=10_000))
by_cat = Counter(f.category.value for f in all_findings)
print(f"\naggregate findings across cluster:")
for cat, n in by_cat.most_common():
print(f" {cat:25s} {n:>5d}")
store.close()
cluster_report("bcl_a1b2c3...")
Hand this report to compliance as the artifact for why you believe a group of wallets represents a single entity. The evidence dict on the cluster carries the method-specific justification (HDBSCAN cluster label, Louvain community ID, common-input transaction hash). Auditable back to the source.
Recipe 5: export findings for a regulator request
Regulators sometimes ask: “show us what you observed in market X between dates Y and Z.” Here’s the direct response.
import csv
import json
from datetime import datetime, timezone
import horizon as hz
from horizon.flow import SQLiteFlowStore
def regulator_export(
*,
market_id: str | None = None,
since: datetime,
until: datetime,
out_path: str,
store_path: str = "flow.db",
):
store = SQLiteFlowStore(store_path)
findings = []
if market_id:
findings = store.list_anomalies(market_id=market_id, since=since, until=until, limit=1_000_000)
else:
for f in store.iter_anomalies(since=since, until=until):
findings.append(f)
with open(out_path, "w", newline="") as fh:
writer = csv.writer(fh)
writer.writerow([
"finding_id", "detected_at", "detector_name", "category",
"severity", "market_id", "venue_name", "actor_id", "cluster_id",
"confidence", "score", "message", "citation", "evidence",
"audit_event_id",
])
for f in findings:
writer.writerow([
f.finding_id,
f.detected_at.isoformat(),
f.detector_name,
f.category.value,
f.severity.value,
f.market_id,
f.venue_name,
f.actor_id or "",
f.cluster_id or "",
f"{f.confidence:.4f}",
f"{f.score:.4f}",
f.message,
f.citation,
json.dumps(f.evidence, default=str),
f.audit_event_id or "",
])
store.close()
print(f"wrote {len(findings)} rows to {out_path}")
regulator_export(
market_id="0xTRUMP_2024",
since=datetime(2026, 4, 1, tzinfo=timezone.utc),
until=datetime(2026, 4, 30, tzinfo=timezone.utc),
out_path="/tmp/findings_april.csv",
)
Pair with the audit-log export. horizon audit export --db=audit.db --since=2026-04-01 --until=2026-04-30 --format=json gets you the hash-chained side; the CSV above gets you the flow-specific findings. Together they’re the complete picture. Include the hash chain’s last anchor so a reviewer can verify the audit log hasn’t been altered since the events were recorded.
Using the CLI for ad-hoc queries
Most of these recipes can be done one-off from the CLI when you don’t need to script. See CLI reference for the full surface.
# Quick finding listing
horizon flow anomalies --db=flow.db --market=0xTRUMP_2024 --since-hours=72 --limit=200 \
| jq '.[] | {detected_at, actor_id, category, confidence, message}'
# Wallet profile dump
horizon flow profile --db=flow.db --actor=0xabc... --venue=polymarket | jq
# Cluster lookup
horizon flow cluster --db=flow.db --actor=0xabc...
# Latest fitted policy
horizon flow reverse --db=flow.db --actor=0xabc... --method=shadow | jq '.top_rule, .holdout_accuracy'
The principle
Investigation is about TRACING. Following a single question from its source to a defensible conclusion. Each recipe here is a query that answers ONE kind of question. Compose them: start with recipe 2 (market overview), drill into a suspect with recipe 1, confirm or refute a group hypothesis with recipe 4, fit a policy for the worst actor with recipe 3, export the evidence pack with recipe 5.
Every output carries a citation and an audit_event_id. The reader can follow the breadcrumb all the way to the hash-chained audit record. That’s what makes these outputs defensible, not just informative.
Related
- Flow store reference. Schema and all query methods.
- CLI. The same operations from the command line.
- Operations recipes. Automating the daily / weekly cadence of these investigations.