Custom Data Sources
CSV, Parquet, yfinance, Polygon, or anything else
Implementing a custom data source is two methods: markets() and iter_bars().
Minimal protocol
python
from typing import Iterator
from horizon.data import Bar
class MySource:
def markets(self) -> list[str]:
...
def iter_bars(self) -> Iterator[Bar]:
...
Yield bars in chronological order across markets. The backtest loop buffers bars at the same timestamp and flushes them as a single tick: if bars come out of order, features will be corrupted.
CSV recipe
python
import csv
from datetime import datetime
from horizon.data import Bar
class CSVDataSource:
def __init__(self, path: str):
self._rows = []
with open(path) as fh:
for row in csv.DictReader(fh):
self._rows.append({
"market_id": row["symbol"],
"timestamp": datetime.fromisoformat(row["date"]),
"price": float(row["close"]),
"volume": float(row.get("volume", 0)),
})
self._rows.sort(key=lambda r: (r["timestamp"], r["market_id"]))
def markets(self):
return sorted(set(r["market_id"] for r in self._rows))
def iter_bars(self):
for r in self._rows:
yield Bar(
market_id=r["market_id"],
timestamp=r["timestamp"],
price=r["price"],
volume=r["volume"],
)
Use:
python
data = CSVDataSource("historical.csv")
hz.run(mode="backtest", data_source=data, ...)
Parquet recipe
python
import pandas as pd
from horizon.data import Bar
class ParquetDataSource:
def __init__(self, path: str):
self._df = pd.read_parquet(path).sort_values(["timestamp", "symbol"])
def markets(self):
return sorted(self._df["symbol"].unique().tolist())
def iter_bars(self):
for _, row in self._df.iterrows():
yield Bar(
market_id=row["symbol"],
timestamp=row["timestamp"],
price=float(row["close"]),
open=float(row.get("open", row["close"])),
high=float(row.get("high", row["close"])),
low=float(row.get("low", row["close"])),
close=float(row.get("close", row["close"])),
volume=float(row.get("volume", 0)),
)
Requires pandas and pyarrow.
Yahoo Finance recipe
python
import yfinance as yf
from horizon.data import Bar
class YahooDataSource:
def __init__(self, tickers, start, end, interval="1d"):
self.tickers = tickers
self._df = yf.download(tickers, start=start, end=end, interval=interval)
def markets(self):
return self.tickers
def iter_bars(self):
# Build a unified chronological stream
entries = []
for ticker in self.tickers:
closes = self._df["Close"][ticker].dropna()
for ts, price in closes.items():
entries.append((ts.to_pydatetime(), ticker, float(price)))
entries.sort(key=lambda x: (x[0], x[1]))
for ts, ticker, price in entries:
yield Bar(market_id=ticker, timestamp=ts, price=price)
Requires pip install yfinance.
Polygon.io recipe
python
from polygon import RESTClient
from horizon.data import Bar
class PolygonDataSource:
def __init__(self, api_key, tickers, start, end):
self._client = RESTClient(api_key)
self._tickers = tickers
self._start = start
self._end = end
self._bars = self._fetch_all()
def _fetch_all(self):
all_bars = []
for ticker in self._tickers:
aggs = self._client.get_aggs(
ticker=ticker,
multiplier=1,
timespan="day",
from_=self._start,
to=self._end,
)
for agg in aggs:
all_bars.append({
"market_id": ticker,
"timestamp": datetime.fromtimestamp(agg.timestamp / 1000),
"price": agg.close,
"open": agg.open,
"high": agg.high,
"low": agg.low,
"close": agg.close,
"volume": agg.volume,
})
all_bars.sort(key=lambda b: (b["timestamp"], b["market_id"]))
return all_bars
def markets(self):
return self._tickers
def iter_bars(self):
for b in self._bars:
yield Bar(**b)
Requires pip install polygon-api-client and a Polygon API key.
Databento recipe
python
import databento as db
from horizon.data import Bar
class DatabentoDataSource:
def __init__(self, api_key, dataset, symbols, start, end):
self._client = db.Historical(key=api_key)
self._dataset = dataset
self._symbols = symbols
self._start = start
self._end = end
def markets(self):
return self._symbols
def iter_bars(self):
df = self._client.timeseries.get_range(
dataset=self._dataset,
symbols=self._symbols,
start=self._start,
end=self._end,
schema="ohlcv-1d",
).to_df()
df = df.sort_values(["ts_event", "symbol"])
for _, row in df.iterrows():
yield Bar(
market_id=row["symbol"],
timestamp=row["ts_event"].to_pydatetime(),
price=float(row["close"]),
open=float(row["open"]),
high=float(row["high"]),
low=float(row["low"]),
close=float(row["close"]),
volume=float(row["volume"]),
)
Requires pip install databento.
Chronological ordering: the rule
All custom sources must yield bars in chronological order across all markets. Otherwise the backtest loop’s per-tick flushing corrupts features.
If your raw data is unsorted, sort by (timestamp, market_id) before yielding:
python
all_bars.sort(key=lambda b: (b.timestamp, b.market_id))