Live mode
hz.run(mode='live', feed=...). The same pipeline, driven by a real-time feed instead of historical bars.
The professional layers (accounts, audit trail, IPS gates, bucket limits, block allocation, tax lots, PDT, locates) run identically in backtest and live. hz.run(mode="live", feed=...) swaps the tick source: instead of iterating data_source.iter_bars(), the loop consumes a LiveFeed stream and processes each arriving tick.
The same _process_tick runs. The same gates fire. The same audit events are emitted.
Minimal call
import horizon as hz
from horizon.data.live import NullLiveFeed
feed = NullLiveFeed()
result = hz.run(
mode="live",
feed=feed,
strategies=[MyStrategy],
asset_classes=[AssetClass.Equity],
universe=["AAPL", "MSFT"],
accounts=registry,
audit_log=audit_log,
# Stop conditions (pick at least one)
max_ticks=10_000,
max_duration_s=3600,
stop_event=my_stop_event, # threading.Event, call set() to stop cleanly
idle_timeout_s=30, # exit if no tick for 30s after the first one
)
mode="live" requires feed=. Without it, hz.run raises ValueError.
What the loop does
Register callbacks
feed.on_tick(...) and feed.on_gap(...) register internal callbacks that enqueue ticks into a thread-safe buffer.Subscribe to the universe
SubscriptionKind.Trades. Additional subscriptions (quotes, bars, orderbook) can be added by the caller before the run starts.Connect
feed.connect() opens the stream. NullLiveFeed flushes queued ticks; real feeds start their WebSocket session. Emits FeedConnected.Poll and batch
Process each tick batch
FeedData snapshot, and call the same _process_tick the backtest uses. All risk gates, block allocation, lot books, PDT counter, and audit events work unchanged.Honor stop conditions
max_ticks, max_duration_s, stop_event, idle_timeout_s. Whichever fires first exits the loop.Shutdown
feed.close(). Emits FeedDisconnected. Runs end-of-run wash-sale detection. Returns a BacktestResult.Stop conditions
The loop runs until one of these fires:
| Condition | Behavior |
|---|---|
max_ticks | Stop after N ticks processed. Used in tests. |
max_duration_s | Stop after N wall-clock seconds. Used for scheduled runs. |
stop_event (threading.Event) | Set from a signal handler for graceful shutdown. |
idle_timeout_s | Stop when no tick arrives for N seconds after the first tick. |
feed.status() is Disconnected, Halted, or Error | Loop exits once the buffer is drained. |
Configure at least one. In tests, pair max_ticks with a max_duration_s ceiling so a stuck feed cannot hang CI.
Feed health events
Three feed-health categories land in the audit log automatically:
FeedConnected emitted on feed.connect()
FeedDisconnected emitted on feed.close() (always fires, in a finally block)
FeedGap emitted from feed.on_gap; carries (market_id, last_seen, current)
Audit-log subscribers (alerting, metrics, surveillance) see these in real time.
Threading model
_run_live is single-threaded on the Python side. The main loop polls. Callbacks fire from whichever thread the feed uses. NullLiveFeed fires callbacks on the calling thread. Real feeds (Alpaca, IBKR) fire from their own WebSocket reader thread.
Tick buffer access is guarded by a threading.Lock, so a callback from another thread can safely append while the main thread drains.
Writing a custom LiveFeed
Implement the LiveFeed Protocol:
class MyBrokerFeed:
feed_name = "my_broker"
def __init__(self, api_key: str, account: str):
self._client = MyBrokerClient(api_key)
self._account = account
self._tick_cbs = []
self._gap_cbs = []
self._status = LiveFeedStatus.Disconnected
def connect(self):
self._client.connect()
self._status = LiveFeedStatus.Connected
threading.Thread(target=self._reader_thread, daemon=True).start()
def _dispatch(self, raw_msg):
tick = self._parse(raw_msg)
for cb in self._tick_cbs:
cb(tick)
# on_tick, on_gap, subscribe, status, last_heartbeat, close
Plug it into hz.run(mode="live", feed=MyBrokerFeed(...), ...).
NullLiveFeed for tests
Ticks emitted before a callback registers are queued and flushed through the first callback. Pre-stage a whole run:
from horizon.data.live import FeedTick, NullLiveFeed, SubscriptionKind
ticks = [
FeedTick(market_id="AAPL", kind=SubscriptionKind.Trades,
timestamp=t, price=180.0, size=100)
for t in timestamps
]
feed = NullLiveFeed(queued_ticks=ticks)
result = hz.run(
mode="live", feed=feed, max_ticks=len(ticks),
max_duration_s=1.0, # CI safety net
...
)
Status
| Item | State |
|---|---|
Real broker LiveFeed adapters | Alpaca shipped. IBKR, Hyperliquid, Polymarket pending. |
Real broker Venue.submit() | Alpaca shipped. Others pending. |
| Asyncio loop | L2 |
| Multi-feed aggregation (SIP + direct + internalizer) | L2 |
| Automatic flatten-on-stop | L2 |
| Feed staleness auto-halt | LiveWatchdog |
| Prometheus metrics exporter | L2 |
Same pipeline, different source
Backtest and live call the same _process_tick. Live reveals real-world costs (slippage, fees, latency) without introducing a separate code path. Replay a captured audit log against a DictSource to reproduce a live issue in backtest.