Data Sources
Where historical bars come from: synthetic, dict, custom
The backtest engine iterates historical bars from a DataSource. Horizon ships three built-in sources and an extension protocol for your own data.
Built-in sources
Protocol
from typing import Iterator, Protocol, runtime_checkable
from horizon.data.base import Bar
@runtime_checkable
class DataSource(Protocol):
def markets(self) -> list[str]:
"""List market ids this source has data for."""
def iter_bars(self) -> Iterator[Bar]:
"""Yield bars in chronological order across all markets."""
A Bar is the frozen dataclass:
@dataclass(frozen=True)
class Bar:
market_id: str
timestamp: datetime
price: float
open: float | None = None
high: float | None = None
low: float | None = None
close: float | None = None
volume: float | None = None
bid: float | None = None
ask: float | None = None
Using a synthetic source
from horizon.data import SyntheticGBM
import horizon as hz
data = SyntheticGBM(
market_ids=["AAPL", "MSFT", "NVDA"],
n_bars=252,
mu=0.10,
sigma=0.20,
seed=42,
)
result = hz.run(
mode="backtest",
data_source=data,
strategies=[...],
...
)
Using a regime-switching source
from horizon.data import SyntheticRegimes
data = SyntheticRegimes(
market_ids=["A"],
n_bars=300,
regimes=[
(0.40, 0.20, 0.15), # 40% uptrend
(0.30, 0.00, 0.30), # 30% chop
(0.30, -0.30, 0.40), # 30% crash
],
seed=7,
)
Each regime tuple is (fraction_of_bars, mu_annual, sigma_annual). Fractions should sum to ~1.0. Great for testing whether your strategy survives adverse conditions.
Using DictSource for tests
from datetime import datetime
from horizon.data import DictSource
data = DictSource({
"AAPL": [
(datetime(2024, 1, 1), 180.0),
(datetime(2024, 1, 2), 182.5),
(datetime(2024, 1, 3), 179.0),
],
"MSFT": [
(datetime(2024, 1, 1), 400.0),
(datetime(2024, 1, 2), 402.0),
(datetime(2024, 1, 3), 405.0),
],
})
Chronological ordering across markets is handled automatically.
Writing a custom data source
Implement the two-method protocol:
from typing import Iterator
from horizon.data import Bar
import pandas as pd
class ParquetDataSource:
"""Reads historical bars from a parquet file."""
def __init__(self, path: str):
self._df = pd.read_parquet(path)
self._df = self._df.sort_values("timestamp")
def markets(self) -> list[str]:
return sorted(self._df["market_id"].unique().tolist())
def iter_bars(self) -> Iterator[Bar]:
for _, row in self._df.iterrows():
yield Bar(
market_id=row["market_id"],
timestamp=row["timestamp"],
price=float(row["close"]),
open=float(row["open"]),
high=float(row["high"]),
low=float(row["low"]),
close=float(row["close"]),
volume=float(row.get("volume", 0)),
)
Use it:
result = hz.run(
mode="backtest",
data_source=ParquetDataSource("historical.parquet"),
...
)
CSV example
import csv
from datetime import datetime
from horizon.data import Bar
class CSVDataSource:
def __init__(self, path: str, date_fmt: str = "%Y-%m-%d"):
self.path = path
self.date_fmt = date_fmt
def markets(self):
markets = set()
with open(self.path) as fh:
for row in csv.DictReader(fh):
markets.add(row["symbol"])
return sorted(markets)
def iter_bars(self):
with open(self.path) as fh:
rows = list(csv.DictReader(fh))
rows.sort(key=lambda r: (r["date"], r["symbol"]))
for row in rows:
yield Bar(
market_id=row["symbol"],
timestamp=datetime.strptime(row["date"], self.date_fmt),
price=float(row["close"]),
volume=float(row.get("volume", 0)),
)
Yahoo Finance example
import yfinance as yf
from horizon.data import Bar
class YahooDataSource:
def __init__(self, tickers: list[str], start: str, end: str, interval: str = "1d"):
self.tickers = tickers
df = yf.download(tickers, start=start, end=end, interval=interval)
self._df = df
def markets(self):
return self.tickers
def iter_bars(self):
for ticker in self.tickers:
closes = self._df["Close"][ticker].dropna()
for ts, price in closes.items():
yield Bar(
market_id=ticker,
timestamp=ts.to_pydatetime(),
price=float(price),
)
Requires pip install yfinance.
Chronological ordering requirement
DataSources must yield bars in chronological order across all markets. The backtest loop buffers bars at the same timestamp and flushes them as a single tick. If bars come out of order, features will be corrupted.
For custom sources with multiple markets, always sort by timestamp first:
all_bars.sort(key=lambda b: (b.timestamp, b.market_id))