Live feeds
LiveFeed Protocol for real-time streams. Heartbeat, reconnect, surfaced sequence gaps.
horizon.data.providers covers historical REST (Yahoo, Polygon, Alpaca REST, …). Live execution needs a different shape: a WebSocket (or similar) stream that pushes events as they happen, surfaces session health, and does not silently lose data across a reconnect. See Data providers for the historical side.
horizon.data.live is the Protocol every concrete live feed implements. L0 ships the Protocol, a NullLiveFeed for tests, and the shape every L1 concrete feed conforms to. For the first shipped feed see Alpaca.
Why the Protocol is narrow
Real-time data fails in many ways: broker rate limits, silent disconnects, sequence skips, batched catch-up. The Protocol surfaces heartbeat time, current status, and sequence-gap callbacks so consumers can read them instead of guessing.
class LiveFeed(Protocol):
feed_name: str
# Lifecycle
def connect(self) -> None: ...
def close(self) -> None: ...
def status(self) -> LiveFeedStatus: ...
def last_heartbeat(self) -> datetime | None: ...
# Subscriptions
def subscribe(self, markets: list[str], kind: SubscriptionKind) -> None: ...
def unsubscribe(self, markets: list[str], kind: SubscriptionKind) -> None: ...
def subscriptions(self) -> dict[str, set[SubscriptionKind]]: ...
# Callbacks (idempotent)
def on_tick(self, callback: TickCallback) -> None: ...
def on_gap(self, callback: GapCallback) -> None: ...
Subscription granularity
Pick the minimum the strategy needs. Subscribing to raw trades when 1-minute bars would do is wasteful.
from horizon.data.live import SubscriptionKind
SubscriptionKind.Trades # individual prints
SubscriptionKind.Quotes # NBBO level-1
SubscriptionKind.OrderBook # level-2 depth
SubscriptionKind.Bars1s
SubscriptionKind.Bars1m
SubscriptionKind.Bars5m
SubscriptionKind.Status # exchange halts, LULD, circuit breakers
Feed status
The execution watchdog reads this every tick. Degraded or Disconnected pauses new orders. StaleQuotes widens price bands or force-widens limits.
from horizon.data.live import LiveFeedStatus
LiveFeedStatus.Disconnected
LiveFeedStatus.Connecting
LiveFeedStatus.Connected
LiveFeedStatus.Degraded # heartbeat late or rate-limited
LiveFeedStatus.StaleQuotes # no ticks for > stale_threshold_s
LiveFeedStatus.Reconnecting
LiveFeedStatus.Halted # operator halt
LiveFeedStatus.Error
Unified tick
FeedTick is the one type callbacks receive. kind tells which fields are populated:
from horizon.data.live import FeedTick
# Trade
FeedTick(
market_id="AAPL",
kind=SubscriptionKind.Trades,
timestamp=datetime.now(timezone.utc),
price=180.02, size=100, trade_id="...", is_buy=True,
venue="alpaca", sequence=42,
)
# Quote
FeedTick(
market_id="AAPL",
kind=SubscriptionKind.Quotes,
timestamp=...,
bid=180.01, ask=180.03, bid_size=500, ask_size=300,
)
# Bar (1m)
FeedTick(
market_id="AAPL",
kind=SubscriptionKind.Bars1m,
timestamp=...,
open=180.00, high=180.15, low=179.98, close=180.08,
volume=12_345, vwap=180.05,
)
# Orderbook (L2)
FeedTick(
market_id="AAPL", kind=SubscriptionKind.OrderBook, timestamp=...,
bids=((180.00, 500), (179.99, 800), ...),
asks=((180.01, 300), (180.02, 600), ...),
)
# Status
FeedTick(
market_id="AAPL", kind=SubscriptionKind.Status, timestamp=...,
status="luld_limit_up",
)
Sequence gaps
Most broker WebSockets include a monotonic sequence number. If the stream goes 41, 42, 45, events 43 and 44 are missing (broker batched and dropped, or a brief disconnect). Silent resume means the book is wrong.
on_gap(callback) surfaces every gap. The handler decides: request a snapshot refresh, re-pull positions and open orders from the broker REST API, or halt trading until reconciled.
feed.on_gap(lambda market_id, last_seen, current:
reconciler.resync(market_id))
NullLiveFeed
Tests and explicit no-data mode.
from horizon.data.live import NullLiveFeed, FeedTick, SubscriptionKind
feed = NullLiveFeed()
feed.connect()
received = []
feed.on_tick(received.append)
feed.subscribe(["AAPL"], SubscriptionKind.Trades)
feed.emit(FeedTick(
market_id="AAPL", kind=SubscriptionKind.Trades,
timestamp=datetime.now(timezone.utc), price=180, size=100,
))
assert received[-1].market_id == "AAPL"
Uses:
- Unit tests without a real WebSocket.
- Deterministic paper runs that replay pre-recorded ticks.
- Degraded mode: live feed down, paper loop ticks against stale data so the bot does not hang.
Concrete feeds
| Feed | Wraps | Status |
|---|---|---|
AlpacaLiveFeed | Alpaca v2 streams | Shipped. Equities, options, crypto. IEX or SIP. |
IbkrTickFeed | ib_async | L1. Equities, options, futures. Subscription required for quotes. |
PolygonLiveFeed | Polygon WebSocket | L1. US equities, paid tier. |
HyperliquidLiveFeed | hyperliquid-python-sdk | L1. Spot and perps. |
CcxtLiveFeed | ccxt.pro | L1. Coinbase, Binance, Kraken. |
PolymarketLiveFeed | py_clob_client | L1. Binary events. |
KalshiLiveFeed | Kalshi WS | L1. Regulated US prediction markets. |
Every concrete feed uses subscribe / on_tick / on_gap. The execution loop does not care which is which.
Building your own
Implement the Protocol. Contract:
connect()is idempotent. Reconnect logic stays inside.subscribeandunsubscribeare idempotent per (market, kind) tuple.on_tickregistration is idempotent (same callback added once).- Every tick passes through every callback. Observer failures must not break the feed loop (catch and keep going).
status()reflects current health. If heartbeat has not landed in the expected window, transition toDegraded.- When a sequence gap is observed, call every
on_gapcallback with(market_id, last_seen, current)and log anAuditCategory.FeedGapevent. See Audit trail. - Timestamps are tz-aware. Prefer exchange timestamps; fall back to ingest time.
See horizon/data/live/base.py for NullLiveFeed as a reference implementation.
Watchdog interaction
The execution watchdog reads feed.status() and feed.last_heartbeat() every tick:
DisconnectedorReconnecting: pause new submissions until reconnect.Degraded: allow onlyUrgency.Immediateorders; widen limit prices.StaleQuotes: reject limits priced near stale NBBO; escalate to kill-switch if extended.- Gap callback: emit
AuditCategory.FeedGapand run reconciler.
See Watchdog.