Writing Custom Features
Extend the feature library with your own indicators
Horizon’s feature system is fully extensible. A custom feature is just a class that subclasses Feature and implements compute(). No registration, no boilerplate: if it implements the protocol, the FeatureStore will compute and cache it.
The Feature protocol
from horizon.features.base import Feature, PriceHistory
from horizon.context import FeedData
class Feature(ABC):
def __init__(self, *, market: str | None = None) -> None:
self.market = market
@abstractmethod
def compute(
self,
market_id: str,
history: PriceHistory,
feeds: dict[str, FeedData],
) -> float:
...
@property
def key(self) -> str:
"""Auto-generated from class name + instance attributes."""
Every custom feature must:
- Subclass
Feature - Call
super().__init__(market=market)in its own__init__ - Implement
compute(market_id, history, feeds) -> float - Return a scalar.
NaNwhen insufficient history or computation fails
Recipe: Max Drawdown over N bars
import math
from horizon.features.base import Feature, PriceHistory
from horizon.context import FeedData
class MaxDrawdown(Feature):
"""Largest peak-to-trough drop over the last `window` bars."""
def __init__(self, window: int = 20, market: str | None = None):
super().__init__(market=market)
if window < 2:
raise ValueError(f"window must be >= 2, got {window}")
self.window = window
def compute(
self,
market_id: str,
history: PriceHistory,
feeds: dict[str, FeedData],
) -> float:
prices = history.last_n_prices(self.window)
if len(prices) < self.window:
return float("nan")
peak = prices[0]
max_dd = 0.0
for p in prices:
peak = max(peak, p)
if peak > 0:
dd = (peak - p) / peak
max_dd = max(max_dd, dd)
return max_dd
Use it:
from horizon import Strategy, Signal
from horizon.features import Zscore
class DrawdownAware(Strategy):
features = {
"dd20": MaxDrawdown(window=20),
"z": Zscore(window=10),
}
def evaluate(self, f, universe):
return [
Signal.increase(m, edge_bps=40)
for m in universe
if f.dd20[m.id] > 0.10 # recent drawdown > 10%
and f.z[m.id] < -1.5 # recently pulled back
]
Recipe: Rolling Correlation to Benchmark
import math
from horizon.features.base import Feature, PriceHistory
from horizon.context import FeedData
class BetaToBenchmark(Feature):
"""Rolling beta of this market's returns against a benchmark's returns."""
def __init__(self, benchmark: str, window: int = 60, market: str | None = None):
super().__init__(market=market)
self.benchmark = benchmark
self.window = window
def compute(
self,
market_id: str,
history: PriceHistory,
feeds: dict[str, FeedData],
) -> float:
# This market's returns
my_rets = history.last_n_returns(self.window + 1)
if len(my_rets) < self.window + 1:
return float("nan")
# Benchmark's returns. need cross-market access
# Note: this is awkward because the feature doesn't have direct
# access to the FeatureStore. In practice, you'd pin the feature
# to the benchmark with `market="SPY"` and read the result from
# a different strategy feature.
bench_feed = feeds.get(self.benchmark)
if bench_feed is None:
return float("nan")
# Simplified. in a real implementation you'd need access to
# the benchmark's history, not just the current feed. See below.
return 1.0 # placeholder
Recipe: RSI with Wilder smoothing
The built-in RSI uses simple averaging. Wilder’s original uses exponential smoothing. Here’s a proper Wilder version:
class WilderRSI(Feature):
"""Wilder's original exponentially-smoothed RSI."""
def __init__(self, window: int = 14, market: str | None = None):
super().__init__(market=market)
self.window = window
def compute(
self,
market_id: str,
history: PriceHistory,
feeds: dict[str, FeedData],
) -> float:
prices = history.last_n_prices(self.window * 3) # plenty of burn-in
if len(prices) < self.window + 1:
return float("nan")
diffs = [prices[i] - prices[i-1] for i in range(1, len(prices))]
alpha = 1.0 / self.window # Wilder's smoothing factor
# Initialize with simple average over the first `window` bars
init_gains = [max(0, d) for d in diffs[:self.window]]
init_losses = [max(0, -d) for d in diffs[:self.window]]
avg_gain = sum(init_gains) / self.window
avg_loss = sum(init_losses) / self.window
# Exponentially smooth the rest
for d in diffs[self.window:]:
gain = max(0, d)
loss = max(0, -d)
avg_gain = alpha * gain + (1 - alpha) * avg_gain
avg_loss = alpha * loss + (1 - alpha) * avg_loss
if avg_loss == 0:
return 100.0 if avg_gain > 0 else 50.0
rs = avg_gain / avg_loss
return 100.0 - (100.0 / (1.0 + rs))
Recipe: Fractional Differentiation
Implements de Prado’s fractional differentiation. preserves memory while achieving stationarity.
class FracDiff(Feature):
"""Fractional differentiation of order `d`."""
def __init__(
self,
d: float = 0.4,
window: int = 100,
threshold: float = 1e-4,
market: str | None = None,
):
super().__init__(market=market)
if not 0 < d < 1:
raise ValueError(f"d must be in (0, 1), got {d}")
self.d = d
self.window = window
self.threshold = threshold
self._weights = self._compute_weights()
def _compute_weights(self) -> list[float]:
"""Precompute weights. depends only on d and threshold."""
weights = [1.0]
k = 1
while True:
w = -weights[-1] * (self.d - k + 1) / k
if abs(w) < self.threshold:
break
weights.append(w)
k += 1
if k > self.window:
break
return weights
def compute(
self,
market_id: str,
history: PriceHistory,
feeds: dict[str, FeedData],
) -> float:
prices = history.last_n_prices(len(self._weights))
if len(prices) < len(self._weights):
return float("nan")
value = 0.0
for i, w in enumerate(self._weights):
value += w * prices[-(i + 1)]
return value
See de Prado features for the math and motivation.
Feature keys
The .key property is auto-generated from your instance attributes:
class MaxDrawdown(Feature):
def __init__(self, window=20, market=None):
super().__init__(market=market)
self.window = window
>>> MaxDrawdown(window=20).key
'MaxDrawdown:market=None:window=20'
>>> MaxDrawdown(window=30).key
'MaxDrawdown:market=None:window=30'
>>> MaxDrawdown(window=20, market='AAPL').key
'MaxDrawdown:market=AAPL:window=20'
Two MaxDrawdown(window=20) instances produce the same key and share cache entries. Two different window values produce different keys and don’t collide.
If you have attributes that shouldn’t affect the cache (e.g., a logger reference), prefix them with an underscore:
class MyFeature(Feature):
def __init__(self, window=20, market=None):
super().__init__(market=market)
self.window = window
self._logger = logging.getLogger(__name__) # underscore = excluded from key
Validation
Validate parameters in __init__:
def __init__(self, window=20, market=None):
super().__init__(market=market)
if window < 2:
raise ValueError(f"window must be >= 2, got {window}")
if window > 10_000:
raise ValueError(f"window too large: {window}")
self.window = window
Failing at construction catches bugs at the top of the backtest rather than deep in a loop.
NaN handling
Return NaN when:
- History is shorter than your lookback window
- A computation would divide by zero
- Any mathematical error occurs
The FeatureStore catches exceptions from compute() and returns NaN automatically, so you don’t strictly need to wrap in try/except, but explicit returns are clearer.
Performance notes
- Use
history.last_n_prices(n)/last_n_returns(n): they’re O(1) slices, not O(history length) operations - Don’t walk the full history unless necessary: only request what your window needs
- Avoid dict lookups in hot loops: cache dict values in local variables before looping
- Numpy is available but optional: for heavy matrix operations, import numpy and use it; the store doesn’t care
Cross-referencing other features
Sometimes a feature depends on another feature’s output. The cleanest pattern is to declare both features in your strategy and compose them in evaluate():
class MyStrategy(Strategy):
features = {
"z": Zscore(window=20),
"vol": RealizedVol(window=60),
}
def evaluate(self, f, universe):
# Combine the two features here, not in a single feature
return [
Signal.increase(m, edge_bps=40)
for m in universe
if f.z[m.id] < -2 and f.vol[m.id] < 0.25
]
This is cleaner than building a ZscoreWhileVolLow feature.
Tests
Every custom feature should have a unit test with hand-calculated expected values:
# tests/test_my_feature.py
import pytest
from horizon.features.base import FeatureStore
from horizon.context import FeedData
from my_features import MaxDrawdown
def test_drawdown_captures_peak_to_trough():
store = FeatureStore()
# 100 → 120 → 90
for p in [100, 110, 120, 110, 100, 90]:
store.update_feeds({"T": FeedData(market_id="T", price=p)})
dd = MaxDrawdown(window=6)
ns = store.compute({"dd": dd}, ["T"], {})
assert ns.dd["T"] == pytest.approx(0.25, abs=1e-6) # (120-90)/120
def test_returns_nan_when_insufficient_history():
store = FeatureStore()
for p in [100, 110]:
store.update_feeds({"T": FeedData(market_id="T", price=p)})
dd = MaxDrawdown(window=20)
ns = store.compute({"dd": dd}, ["T"], {})
import math
assert math.isnan(ns.dd["T"])
Run with pytest tests/test_my_feature.py.