FeatureStore
Rolling histories + per-tick caching + DAG resolution
The FeatureStore is the stateful backbone of the feature layer. It holds one PriceHistory per market, updates them on every tick from the feed snapshots, and caches feature computations within a tick.
API
from horizon.features.base import FeatureStore, PriceHistory, FeatureNamespace
class FeatureStore:
def __init__(self, max_history: int = 1000) -> None:
...
def update_feeds(
self,
feeds: dict[str, FeedData],
now: datetime | None = None,
) -> None:
"""Append new prices to every market's history. Clears the per-tick cache."""
def history(self, market_id: str) -> PriceHistory:
"""Get the rolling history for a market (creates an empty one if new)."""
def has_market(self, market_id: str) -> bool:
"""True if the store has seen at least one price for this market."""
def compute(
self,
feature_spec: dict[str, Feature],
universe_market_ids: list[str],
feeds: dict[str, FeedData],
) -> FeatureNamespace:
"""Compute every feature in the spec for every market in the universe."""
Per-tick lifecycle
Tick starts. update_feeds
update_feeds(feeds, now) iterates the feed snapshots and appends each price to that market's PriceHistory. The per-tick cache is cleared.Strategy compute (reads cache)
compute(strat.features, universe_ids, feeds) is called, the store iterates each feature and each market. For each (feature.key, market_id) pair, it checks the cache.Cache hit. return cached value
Cache miss. compute and store
feature.compute(market_id, history, feeds), store the result in the cache, return it.Exception handling
feature.compute raises, the store catches it and returns NaN. The strategy sees NaN, not a traceback.Shared memoization across strategies
class StrategyA(Strategy):
features = {"z": Zscore(window=20)}
class StrategyB(Strategy):
features = {"zscore": Zscore(window=20)} # same params, different name
Both strategies reference the same Zscore(window=20) feature: its key is 'Zscore:market=None:window=20'. When Strategy A runs, the store computes the z-score for every market in its universe and caches the results. When Strategy B runs, it hits the cache. zero recomputation.
This means running 10 strategies that all use Zscore(20) costs the same as running one strategy that uses it.
Cache invalidation
The cache is cleared exactly once per tick, when update_feeds() is called. This means:
- Within one tick, feature values are stable: all strategies see the same answer.
- Between ticks, feature values are fresh: every
update_feedswipes the cache.
This is important for correctness: you don’t want Strategy A to see a stale z-score because it ran before Strategy B updated the feed. The feed update happens first; strategies all run against the fresh state.
PriceHistory internals
@dataclass
class PriceHistory:
max_len: int = 1000
prices: list[float] = field(default_factory=list)
log_returns: list[float] = field(default_factory=list)
timestamps: list[datetime] = field(default_factory=list)
def update(self, price: float, ts: datetime | None = None) -> None:
if price is None or price <= 0 or math.isnan(price) or math.isinf(price):
return # silently reject bad data
if self.prices:
prev = self.prices[-1]
if prev > 0:
self.log_returns.append(math.log(price / prev))
else:
self.log_returns.append(0.0)
else:
self.log_returns.append(0.0)
self.prices.append(price)
self.timestamps.append(ts or _now())
while len(self.prices) > self.max_len:
self.prices.pop(0)
self.log_returns.pop(0)
self.timestamps.pop(0)
Invalid data rejection
Bad prices (NaN, Inf, ≤0) are dropped before being appended. This protects the rolling window from corruption. The next valid price becomes the next entry. Log returns get computed from the previous valid price, skipping the bad one.
Trimming
When prices.len > max_len, the oldest entries are trimmed. This gives constant memory usage regardless of how long the backtest runs. max_len=1000 is enough for most features (a 252-bar lookback has plenty of room).
Set max_history to something larger if you use features that need longer windows (e.g., SMA(window=500)).
store = FeatureStore(max_history=2000)
Performance
The feature store is deliberately simple:
- No numpy for the core operations: pure Python list operations are fast enough at typical backtest scales (252 bars × 100 markets × 10 features = 250k cached values per tick).
- No vectorization: each feature computes per-market. This is slower per-market but parallelizable and cache-friendly.
- O(1) history updates: append plus a trim. No reallocation.
- O(1) cache lookups: dict-backed.
For a 252-bar, 5-market, 10-feature backtest, the feature layer adds ~15ms total. dwarfed by the actual strategy logic.
Direct access
Sometimes you want to read a history directly:
store = FeatureStore()
# ... ticks happen ...
hist = store.history("AAPL")
print(f"Latest price: {hist.last_price()}")
print(f"Last 10 prices: {hist.last_n_prices(10)}")
print(f"Last 10 log returns: {hist.last_n_returns(10)}")
Useful inside a custom Feature.compute that needs to cross-reference another market:
def compute(self, market_id, history, feeds):
# This market's history is passed in
my_prices = history.last_n_prices(20)
# Another market's history. fetch from the store
from horizon.features.base import FeatureStore # circular-safe hint
# In practice, cross-asset features use `self.market="SPY"` and let the
# store pass in SPY's history automatically. Direct access is rare.
...
Thread safety
The FeatureStore is not thread-safe. Horizon’s run loop is single-threaded by design: all strategies for a tick run sequentially in the same thread. If you’re integrating into a multi-threaded system, wrap access in a lock.
Tests
tests/test_features.py::TestFeatureStore has 3 tests:
test_compute_caches_within_tick: same feature called twice returns cached valuetest_cache_invalidated_on_update: newupdate_feedsproduces fresh valuestest_namespace_unknown_attribute_raises: accessing undeclared feature raisesAttributeError
Run PYTHONPATH=. python3 -m pytest tests/test_features.py::TestFeatureStore -v.