Data Sources

Where historical bars come from: synthetic, dict, custom

The backtest engine iterates historical bars from a DataSource. Horizon ships three built-in sources and an extension protocol for your own data.

Built-in sources

Protocol

python
from typing import Iterator, Protocol, runtime_checkable
from horizon.data.base import Bar

@runtime_checkable
class DataSource(Protocol):
    def markets(self) -> list[str]:
        """List market ids this source has data for."""

    def iter_bars(self) -> Iterator[Bar]:
        """Yield bars in chronological order across all markets."""

A Bar is the frozen dataclass:

python
@dataclass(frozen=True)
class Bar:
    market_id: str
    timestamp: datetime
    price: float
    open: float | None = None
    high: float | None = None
    low: float | None = None
    close: float | None = None
    volume: float | None = None
    bid: float | None = None
    ask: float | None = None

Using a synthetic source

python
from horizon.data import SyntheticGBM
import horizon as hz

data = SyntheticGBM(
    market_ids=["AAPL", "MSFT", "NVDA"],
    n_bars=252,
    mu=0.10,
    sigma=0.20,
    seed=42,
)

result = hz.run(
    mode="backtest",
    data_source=data,
    strategies=[...],
    ...
)

Using a regime-switching source

python
from horizon.data import SyntheticRegimes

data = SyntheticRegimes(
    market_ids=["A"],
    n_bars=300,
    regimes=[
        (0.40, 0.20, 0.15),    # 40% uptrend
        (0.30, 0.00, 0.30),    # 30% chop
        (0.30, -0.30, 0.40),   # 30% crash
    ],
    seed=7,
)

Each regime tuple is (fraction_of_bars, mu_annual, sigma_annual). Fractions should sum to ~1.0. Great for testing whether your strategy survives adverse conditions.

Using DictSource for tests

python
from datetime import datetime
from horizon.data import DictSource

data = DictSource({
    "AAPL": [
        (datetime(2024, 1, 1), 180.0),
        (datetime(2024, 1, 2), 182.5),
        (datetime(2024, 1, 3), 179.0),
    ],
    "MSFT": [
        (datetime(2024, 1, 1), 400.0),
        (datetime(2024, 1, 2), 402.0),
        (datetime(2024, 1, 3), 405.0),
    ],
})

Chronological ordering across markets is handled automatically.

Writing a custom data source

Implement the two-method protocol:

python
from typing import Iterator
from horizon.data import Bar
import pandas as pd

class ParquetDataSource:
    """Reads historical bars from a parquet file."""

    def __init__(self, path: str):
        self._df = pd.read_parquet(path)
        self._df = self._df.sort_values("timestamp")

    def markets(self) -> list[str]:
        return sorted(self._df["market_id"].unique().tolist())

    def iter_bars(self) -> Iterator[Bar]:
        for _, row in self._df.iterrows():
            yield Bar(
                market_id=row["market_id"],
                timestamp=row["timestamp"],
                price=float(row["close"]),
                open=float(row["open"]),
                high=float(row["high"]),
                low=float(row["low"]),
                close=float(row["close"]),
                volume=float(row.get("volume", 0)),
            )

Use it:

python
result = hz.run(
    mode="backtest",
    data_source=ParquetDataSource("historical.parquet"),
    ...
)

CSV example

python
import csv
from datetime import datetime
from horizon.data import Bar

class CSVDataSource:
    def __init__(self, path: str, date_fmt: str = "%Y-%m-%d"):
        self.path = path
        self.date_fmt = date_fmt

    def markets(self):
        markets = set()
        with open(self.path) as fh:
            for row in csv.DictReader(fh):
                markets.add(row["symbol"])
        return sorted(markets)

    def iter_bars(self):
        with open(self.path) as fh:
            rows = list(csv.DictReader(fh))
        rows.sort(key=lambda r: (r["date"], r["symbol"]))
        for row in rows:
            yield Bar(
                market_id=row["symbol"],
                timestamp=datetime.strptime(row["date"], self.date_fmt),
                price=float(row["close"]),
                volume=float(row.get("volume", 0)),
            )

Yahoo Finance example

python
import yfinance as yf
from horizon.data import Bar

class YahooDataSource:
    def __init__(self, tickers: list[str], start: str, end: str, interval: str = "1d"):
        self.tickers = tickers
        df = yf.download(tickers, start=start, end=end, interval=interval)
        self._df = df

    def markets(self):
        return self.tickers

    def iter_bars(self):
        for ticker in self.tickers:
            closes = self._df["Close"][ticker].dropna()
            for ts, price in closes.items():
                yield Bar(
                    market_id=ticker,
                    timestamp=ts.to_pydatetime(),
                    price=float(price),
                )

Requires pip install yfinance.

Chronological ordering requirement

DataSources must yield bars in chronological order across all markets. The backtest loop buffers bars at the same timestamp and flushes them as a single tick. If bars come out of order, features will be corrupted.

For custom sources with multiple markets, always sort by timestamp first:

python
all_bars.sort(key=lambda b: (b.timestamp, b.market_id))

Next