Writing Custom Features

Extend the feature library with your own indicators

Horizon’s feature system is fully extensible. A custom feature is just a class that subclasses Feature and implements compute(). No registration, no boilerplate: if it implements the protocol, the FeatureStore will compute and cache it.

The Feature protocol

python
from horizon.features.base import Feature, PriceHistory
from horizon.context import FeedData

class Feature(ABC):
    def __init__(self, *, market: str | None = None) -> None:
        self.market = market

    @abstractmethod
    def compute(
        self,
        market_id: str,
        history: PriceHistory,
        feeds: dict[str, FeedData],
    ) -> float:
        ...

    @property
    def key(self) -> str:
        """Auto-generated from class name + instance attributes."""

Every custom feature must:

  1. Subclass Feature
  2. Call super().__init__(market=market) in its own __init__
  3. Implement compute(market_id, history, feeds) -> float
  4. Return a scalar. NaN when insufficient history or computation fails

Recipe: Max Drawdown over N bars

python
import math
from horizon.features.base import Feature, PriceHistory
from horizon.context import FeedData

class MaxDrawdown(Feature):
    """Largest peak-to-trough drop over the last `window` bars."""

    def __init__(self, window: int = 20, market: str | None = None):
        super().__init__(market=market)
        if window < 2:
            raise ValueError(f"window must be >= 2, got {window}")
        self.window = window

    def compute(
        self,
        market_id: str,
        history: PriceHistory,
        feeds: dict[str, FeedData],
    ) -> float:
        prices = history.last_n_prices(self.window)
        if len(prices) < self.window:
            return float("nan")

        peak = prices[0]
        max_dd = 0.0
        for p in prices:
            peak = max(peak, p)
            if peak > 0:
                dd = (peak - p) / peak
                max_dd = max(max_dd, dd)
        return max_dd

Use it:

python
from horizon import Strategy, Signal
from horizon.features import Zscore

class DrawdownAware(Strategy):
    features = {
        "dd20": MaxDrawdown(window=20),
        "z": Zscore(window=10),
    }

    def evaluate(self, f, universe):
        return [
            Signal.increase(m, edge_bps=40)
            for m in universe
            if f.dd20[m.id] > 0.10      # recent drawdown > 10%
            and f.z[m.id] < -1.5         # recently pulled back
        ]

Recipe: Rolling Correlation to Benchmark

python
import math
from horizon.features.base import Feature, PriceHistory
from horizon.context import FeedData

class BetaToBenchmark(Feature):
    """Rolling beta of this market's returns against a benchmark's returns."""

    def __init__(self, benchmark: str, window: int = 60, market: str | None = None):
        super().__init__(market=market)
        self.benchmark = benchmark
        self.window = window

    def compute(
        self,
        market_id: str,
        history: PriceHistory,
        feeds: dict[str, FeedData],
    ) -> float:
        # This market's returns
        my_rets = history.last_n_returns(self.window + 1)
        if len(my_rets) < self.window + 1:
            return float("nan")

        # Benchmark's returns. need cross-market access
        # Note: this is awkward because the feature doesn't have direct
        # access to the FeatureStore. In practice, you'd pin the feature
        # to the benchmark with `market="SPY"` and read the result from
        # a different strategy feature.
        bench_feed = feeds.get(self.benchmark)
        if bench_feed is None:
            return float("nan")

        # Simplified. in a real implementation you'd need access to
        # the benchmark's history, not just the current feed. See below.
        return 1.0   # placeholder

Recipe: RSI with Wilder smoothing

The built-in RSI uses simple averaging. Wilder’s original uses exponential smoothing. Here’s a proper Wilder version:

python
class WilderRSI(Feature):
    """Wilder's original exponentially-smoothed RSI."""

    def __init__(self, window: int = 14, market: str | None = None):
        super().__init__(market=market)
        self.window = window

    def compute(
        self,
        market_id: str,
        history: PriceHistory,
        feeds: dict[str, FeedData],
    ) -> float:
        prices = history.last_n_prices(self.window * 3)   # plenty of burn-in
        if len(prices) < self.window + 1:
            return float("nan")

        diffs = [prices[i] - prices[i-1] for i in range(1, len(prices))]
        alpha = 1.0 / self.window   # Wilder's smoothing factor

        # Initialize with simple average over the first `window` bars
        init_gains = [max(0, d) for d in diffs[:self.window]]
        init_losses = [max(0, -d) for d in diffs[:self.window]]
        avg_gain = sum(init_gains) / self.window
        avg_loss = sum(init_losses) / self.window

        # Exponentially smooth the rest
        for d in diffs[self.window:]:
            gain = max(0, d)
            loss = max(0, -d)
            avg_gain = alpha * gain + (1 - alpha) * avg_gain
            avg_loss = alpha * loss + (1 - alpha) * avg_loss

        if avg_loss == 0:
            return 100.0 if avg_gain > 0 else 50.0
        rs = avg_gain / avg_loss
        return 100.0 - (100.0 / (1.0 + rs))

Recipe: Fractional Differentiation

Implements de Prado’s fractional differentiation. preserves memory while achieving stationarity.

python
class FracDiff(Feature):
    """Fractional differentiation of order `d`."""

    def __init__(
        self,
        d: float = 0.4,
        window: int = 100,
        threshold: float = 1e-4,
        market: str | None = None,
    ):
        super().__init__(market=market)
        if not 0 < d < 1:
            raise ValueError(f"d must be in (0, 1), got {d}")
        self.d = d
        self.window = window
        self.threshold = threshold
        self._weights = self._compute_weights()

    def _compute_weights(self) -> list[float]:
        """Precompute weights. depends only on d and threshold."""
        weights = [1.0]
        k = 1
        while True:
            w = -weights[-1] * (self.d - k + 1) / k
            if abs(w) < self.threshold:
                break
            weights.append(w)
            k += 1
            if k > self.window:
                break
        return weights

    def compute(
        self,
        market_id: str,
        history: PriceHistory,
        feeds: dict[str, FeedData],
    ) -> float:
        prices = history.last_n_prices(len(self._weights))
        if len(prices) < len(self._weights):
            return float("nan")

        value = 0.0
        for i, w in enumerate(self._weights):
            value += w * prices[-(i + 1)]
        return value

See de Prado features for the math and motivation.

Feature keys

The .key property is auto-generated from your instance attributes:

python
class MaxDrawdown(Feature):
    def __init__(self, window=20, market=None):
        super().__init__(market=market)
        self.window = window
python
>>> MaxDrawdown(window=20).key
'MaxDrawdown:market=None:window=20'
>>> MaxDrawdown(window=30).key
'MaxDrawdown:market=None:window=30'
>>> MaxDrawdown(window=20, market='AAPL').key
'MaxDrawdown:market=AAPL:window=20'

Two MaxDrawdown(window=20) instances produce the same key and share cache entries. Two different window values produce different keys and don’t collide.

If you have attributes that shouldn’t affect the cache (e.g., a logger reference), prefix them with an underscore:

python
class MyFeature(Feature):
    def __init__(self, window=20, market=None):
        super().__init__(market=market)
        self.window = window
        self._logger = logging.getLogger(__name__)   # underscore = excluded from key

Validation

Validate parameters in __init__:

python
def __init__(self, window=20, market=None):
    super().__init__(market=market)
    if window < 2:
        raise ValueError(f"window must be >= 2, got {window}")
    if window > 10_000:
        raise ValueError(f"window too large: {window}")
    self.window = window

Failing at construction catches bugs at the top of the backtest rather than deep in a loop.

NaN handling

Return NaN when:

  • History is shorter than your lookback window
  • A computation would divide by zero
  • Any mathematical error occurs

The FeatureStore catches exceptions from compute() and returns NaN automatically, so you don’t strictly need to wrap in try/except, but explicit returns are clearer.

Performance notes

  • Use history.last_n_prices(n) / last_n_returns(n): they’re O(1) slices, not O(history length) operations
  • Don’t walk the full history unless necessary: only request what your window needs
  • Avoid dict lookups in hot loops: cache dict values in local variables before looping
  • Numpy is available but optional: for heavy matrix operations, import numpy and use it; the store doesn’t care

Cross-referencing other features

Sometimes a feature depends on another feature’s output. The cleanest pattern is to declare both features in your strategy and compose them in evaluate():

python
class MyStrategy(Strategy):
    features = {
        "z": Zscore(window=20),
        "vol": RealizedVol(window=60),
    }

    def evaluate(self, f, universe):
        # Combine the two features here, not in a single feature
        return [
            Signal.increase(m, edge_bps=40)
            for m in universe
            if f.z[m.id] < -2 and f.vol[m.id] < 0.25
        ]

This is cleaner than building a ZscoreWhileVolLow feature.

Tests

Every custom feature should have a unit test with hand-calculated expected values:

python
# tests/test_my_feature.py
import pytest
from horizon.features.base import FeatureStore
from horizon.context import FeedData

from my_features import MaxDrawdown


def test_drawdown_captures_peak_to_trough():
    store = FeatureStore()
    # 100 → 120 → 90
    for p in [100, 110, 120, 110, 100, 90]:
        store.update_feeds({"T": FeedData(market_id="T", price=p)})
    dd = MaxDrawdown(window=6)
    ns = store.compute({"dd": dd}, ["T"], {})
    assert ns.dd["T"] == pytest.approx(0.25, abs=1e-6)   # (120-90)/120

def test_returns_nan_when_insufficient_history():
    store = FeatureStore()
    for p in [100, 110]:
        store.update_feeds({"T": FeedData(market_id="T", price=p)})
    dd = MaxDrawdown(window=20)
    ns = store.compute({"dd": dd}, ["T"], {})
    import math
    assert math.isnan(ns.dd["T"])

Run with pytest tests/test_my_feature.py.

Next