Custom Data Sources

CSV, Parquet, yfinance, Polygon, or anything else

Implementing a custom data source is two methods: markets() and iter_bars().

Minimal protocol

python
from typing import Iterator
from horizon.data import Bar

class MySource:
    def markets(self) -> list[str]:
        ...

    def iter_bars(self) -> Iterator[Bar]:
        ...

Yield bars in chronological order across markets. The backtest loop buffers bars at the same timestamp and flushes them as a single tick: if bars come out of order, features will be corrupted.

CSV recipe

python
import csv
from datetime import datetime
from horizon.data import Bar

class CSVDataSource:
    def __init__(self, path: str):
        self._rows = []
        with open(path) as fh:
            for row in csv.DictReader(fh):
                self._rows.append({
                    "market_id": row["symbol"],
                    "timestamp": datetime.fromisoformat(row["date"]),
                    "price": float(row["close"]),
                    "volume": float(row.get("volume", 0)),
                })
        self._rows.sort(key=lambda r: (r["timestamp"], r["market_id"]))

    def markets(self):
        return sorted(set(r["market_id"] for r in self._rows))

    def iter_bars(self):
        for r in self._rows:
            yield Bar(
                market_id=r["market_id"],
                timestamp=r["timestamp"],
                price=r["price"],
                volume=r["volume"],
            )

Use:

python
data = CSVDataSource("historical.csv")
hz.run(mode="backtest", data_source=data, ...)

Parquet recipe

python
import pandas as pd
from horizon.data import Bar

class ParquetDataSource:
    def __init__(self, path: str):
        self._df = pd.read_parquet(path).sort_values(["timestamp", "symbol"])

    def markets(self):
        return sorted(self._df["symbol"].unique().tolist())

    def iter_bars(self):
        for _, row in self._df.iterrows():
            yield Bar(
                market_id=row["symbol"],
                timestamp=row["timestamp"],
                price=float(row["close"]),
                open=float(row.get("open", row["close"])),
                high=float(row.get("high", row["close"])),
                low=float(row.get("low", row["close"])),
                close=float(row.get("close", row["close"])),
                volume=float(row.get("volume", 0)),
            )

Requires pandas and pyarrow.

Yahoo Finance recipe

python
import yfinance as yf
from horizon.data import Bar

class YahooDataSource:
    def __init__(self, tickers, start, end, interval="1d"):
        self.tickers = tickers
        self._df = yf.download(tickers, start=start, end=end, interval=interval)

    def markets(self):
        return self.tickers

    def iter_bars(self):
        # Build a unified chronological stream
        entries = []
        for ticker in self.tickers:
            closes = self._df["Close"][ticker].dropna()
            for ts, price in closes.items():
                entries.append((ts.to_pydatetime(), ticker, float(price)))
        entries.sort(key=lambda x: (x[0], x[1]))
        for ts, ticker, price in entries:
            yield Bar(market_id=ticker, timestamp=ts, price=price)

Requires pip install yfinance.

Polygon.io recipe

python
from polygon import RESTClient
from horizon.data import Bar

class PolygonDataSource:
    def __init__(self, api_key, tickers, start, end):
        self._client = RESTClient(api_key)
        self._tickers = tickers
        self._start = start
        self._end = end
        self._bars = self._fetch_all()

    def _fetch_all(self):
        all_bars = []
        for ticker in self._tickers:
            aggs = self._client.get_aggs(
                ticker=ticker,
                multiplier=1,
                timespan="day",
                from_=self._start,
                to=self._end,
            )
            for agg in aggs:
                all_bars.append({
                    "market_id": ticker,
                    "timestamp": datetime.fromtimestamp(agg.timestamp / 1000),
                    "price": agg.close,
                    "open": agg.open,
                    "high": agg.high,
                    "low": agg.low,
                    "close": agg.close,
                    "volume": agg.volume,
                })
        all_bars.sort(key=lambda b: (b["timestamp"], b["market_id"]))
        return all_bars

    def markets(self):
        return self._tickers

    def iter_bars(self):
        for b in self._bars:
            yield Bar(**b)

Requires pip install polygon-api-client and a Polygon API key.

Databento recipe

python
import databento as db
from horizon.data import Bar

class DatabentoDataSource:
    def __init__(self, api_key, dataset, symbols, start, end):
        self._client = db.Historical(key=api_key)
        self._dataset = dataset
        self._symbols = symbols
        self._start = start
        self._end = end

    def markets(self):
        return self._symbols

    def iter_bars(self):
        df = self._client.timeseries.get_range(
            dataset=self._dataset,
            symbols=self._symbols,
            start=self._start,
            end=self._end,
            schema="ohlcv-1d",
        ).to_df()
        df = df.sort_values(["ts_event", "symbol"])
        for _, row in df.iterrows():
            yield Bar(
                market_id=row["symbol"],
                timestamp=row["ts_event"].to_pydatetime(),
                price=float(row["close"]),
                open=float(row["open"]),
                high=float(row["high"]),
                low=float(row["low"]),
                close=float(row["close"]),
                volume=float(row["volume"]),
            )

Requires pip install databento.

Chronological ordering: the rule

All custom sources must yield bars in chronological order across all markets. Otherwise the backtest loop’s per-tick flushing corrupts features.

If your raw data is unsorted, sort by (timestamp, market_id) before yielding:

python
all_bars.sort(key=lambda b: (b.timestamp, b.market_id))

Next