Live feeds

LiveFeed Protocol for real-time streams. Heartbeat, reconnect, surfaced sequence gaps.

horizon.data.providers covers historical REST (Yahoo, Polygon, Alpaca REST, …). Live execution needs a different shape: a WebSocket (or similar) stream that pushes events as they happen, surfaces session health, and does not silently lose data across a reconnect. See Data providers for the historical side.

horizon.data.live is the Protocol every concrete live feed implements. L0 ships the Protocol, a NullLiveFeed for tests, and the shape every L1 concrete feed conforms to. For the first shipped feed see Alpaca.

Why the Protocol is narrow

Real-time data fails in many ways: broker rate limits, silent disconnects, sequence skips, batched catch-up. The Protocol surfaces heartbeat time, current status, and sequence-gap callbacks so consumers can read them instead of guessing.

python
class LiveFeed(Protocol):
    feed_name: str

    # Lifecycle
    def connect(self) -> None: ...
    def close(self) -> None: ...
    def status(self) -> LiveFeedStatus: ...
    def last_heartbeat(self) -> datetime | None: ...

    # Subscriptions
    def subscribe(self, markets: list[str], kind: SubscriptionKind) -> None: ...
    def unsubscribe(self, markets: list[str], kind: SubscriptionKind) -> None: ...
    def subscriptions(self) -> dict[str, set[SubscriptionKind]]: ...

    # Callbacks (idempotent)
    def on_tick(self, callback: TickCallback) -> None: ...
    def on_gap(self, callback: GapCallback) -> None: ...

Subscription granularity

Pick the minimum the strategy needs. Subscribing to raw trades when 1-minute bars would do is wasteful.

python
from horizon.data.live import SubscriptionKind

SubscriptionKind.Trades       # individual prints
SubscriptionKind.Quotes       # NBBO level-1
SubscriptionKind.OrderBook    # level-2 depth
SubscriptionKind.Bars1s
SubscriptionKind.Bars1m
SubscriptionKind.Bars5m
SubscriptionKind.Status       # exchange halts, LULD, circuit breakers

Feed status

The execution watchdog reads this every tick. Degraded or Disconnected pauses new orders. StaleQuotes widens price bands or force-widens limits.

python
from horizon.data.live import LiveFeedStatus

LiveFeedStatus.Disconnected
LiveFeedStatus.Connecting
LiveFeedStatus.Connected
LiveFeedStatus.Degraded       # heartbeat late or rate-limited
LiveFeedStatus.StaleQuotes    # no ticks for > stale_threshold_s
LiveFeedStatus.Reconnecting
LiveFeedStatus.Halted         # operator halt
LiveFeedStatus.Error

Unified tick

FeedTick is the one type callbacks receive. kind tells which fields are populated:

python
from horizon.data.live import FeedTick

# Trade
FeedTick(
    market_id="AAPL",
    kind=SubscriptionKind.Trades,
    timestamp=datetime.now(timezone.utc),
    price=180.02, size=100, trade_id="...", is_buy=True,
    venue="alpaca", sequence=42,
)

# Quote
FeedTick(
    market_id="AAPL",
    kind=SubscriptionKind.Quotes,
    timestamp=...,
    bid=180.01, ask=180.03, bid_size=500, ask_size=300,
)

# Bar (1m)
FeedTick(
    market_id="AAPL",
    kind=SubscriptionKind.Bars1m,
    timestamp=...,
    open=180.00, high=180.15, low=179.98, close=180.08,
    volume=12_345, vwap=180.05,
)

# Orderbook (L2)
FeedTick(
    market_id="AAPL", kind=SubscriptionKind.OrderBook, timestamp=...,
    bids=((180.00, 500), (179.99, 800), ...),
    asks=((180.01, 300), (180.02, 600), ...),
)

# Status
FeedTick(
    market_id="AAPL", kind=SubscriptionKind.Status, timestamp=...,
    status="luld_limit_up",
)

Sequence gaps

Most broker WebSockets include a monotonic sequence number. If the stream goes 41, 42, 45, events 43 and 44 are missing (broker batched and dropped, or a brief disconnect). Silent resume means the book is wrong.

on_gap(callback) surfaces every gap. The handler decides: request a snapshot refresh, re-pull positions and open orders from the broker REST API, or halt trading until reconciled.

python
feed.on_gap(lambda market_id, last_seen, current:
    reconciler.resync(market_id))

NullLiveFeed

Tests and explicit no-data mode.

python
from horizon.data.live import NullLiveFeed, FeedTick, SubscriptionKind

feed = NullLiveFeed()
feed.connect()

received = []
feed.on_tick(received.append)

feed.subscribe(["AAPL"], SubscriptionKind.Trades)
feed.emit(FeedTick(
    market_id="AAPL", kind=SubscriptionKind.Trades,
    timestamp=datetime.now(timezone.utc), price=180, size=100,
))

assert received[-1].market_id == "AAPL"

Uses:

  • Unit tests without a real WebSocket.
  • Deterministic paper runs that replay pre-recorded ticks.
  • Degraded mode: live feed down, paper loop ticks against stale data so the bot does not hang.

Concrete feeds

FeedWrapsStatus
AlpacaLiveFeedAlpaca v2 streamsShipped. Equities, options, crypto. IEX or SIP.
IbkrTickFeedib_asyncL1. Equities, options, futures. Subscription required for quotes.
PolygonLiveFeedPolygon WebSocketL1. US equities, paid tier.
HyperliquidLiveFeedhyperliquid-python-sdkL1. Spot and perps.
CcxtLiveFeedccxt.proL1. Coinbase, Binance, Kraken.
PolymarketLiveFeedpy_clob_clientL1. Binary events.
KalshiLiveFeedKalshi WSL1. Regulated US prediction markets.

Every concrete feed uses subscribe / on_tick / on_gap. The execution loop does not care which is which.

Building your own

Implement the Protocol. Contract:

  • connect() is idempotent. Reconnect logic stays inside.
  • subscribe and unsubscribe are idempotent per (market, kind) tuple.
  • on_tick registration is idempotent (same callback added once).
  • Every tick passes through every callback. Observer failures must not break the feed loop (catch and keep going).
  • status() reflects current health. If heartbeat has not landed in the expected window, transition to Degraded.
  • When a sequence gap is observed, call every on_gap callback with (market_id, last_seen, current) and log an AuditCategory.FeedGap event. See Audit trail.
  • Timestamps are tz-aware. Prefer exchange timestamps; fall back to ingest time.

See horizon/data/live/base.py for NullLiveFeed as a reference implementation.

Watchdog interaction

The execution watchdog reads feed.status() and feed.last_heartbeat() every tick:

  • Disconnected or Reconnecting: pause new submissions until reconnect.
  • Degraded: allow only Urgency.Immediate orders; widen limit prices.
  • StaleQuotes: reject limits priced near stale NBBO; escalate to kill-switch if extended.
  • Gap callback: emit AuditCategory.FeedGap and run reconciler.

See Watchdog.