Tutorial: real-world incremental ingestion workflow
Goal: Understand how to integrate timeseries-table-format into a recurring ingestion job
(e.g. a daily cron task) that safely appends new files over time.
Prereqs: Completed Create, append, query.
What you'll learn:
- How to open an existing table and append new data to it on each run
- How to skip files that are already loaded (using CoverageOverlapError)
- How to verify what's already in the table before and after ingestion
Scenario
You receive one Parquet file per day for a set of symbols. Each file covers one calendar day of hourly bars. You want a table that accumulates all history on disk, and you want your ingestion script to be idempotent — running it twice on the same file should not corrupt the table.
Pattern: open-or-create
On the first run, create the table. On subsequent runs, open it.
import timeseries_table_format as ttf
from pathlib import Path
TABLE_ROOT = Path("./prices_table")
def open_or_create_table(table_root: Path) -> ttf.TimeSeriesTable:
try:
return ttf.TimeSeriesTable.open(str(table_root))
except ttf.TimeseriesTableError:
return ttf.TimeSeriesTable.create(
table_root=str(table_root),
time_column="ts",
bucket="1h",
entity_columns=["symbol"],
)
Pattern: skip-if-already-loaded
append_parquet(...) raises CoverageOverlapError if the incoming file overlaps existing
coverage. Use that to make your loop idempotent:
import timeseries_table_format as ttf
from pathlib import Path
def ingest_files(table: ttf.TimeSeriesTable, files: list[Path]) -> None:
for f in files:
try:
version = table.append_parquet(str(f))
print(f" appended {f.name} → table version {version}")
except ttf.CoverageOverlapError as e:
print(f" skipped {f.name}: {e.overlap_count} overlapping bucket(s) already loaded")
Running this twice on the same list of files will append on the first pass and silently skip on the second — the table is never double-loaded.
Pattern: verify what's in the table
After ingestion, query the table to confirm what's loaded:
import timeseries_table_format as ttf
def print_table_summary(table_root: str) -> None:
sess = ttf.Session()
sess.register_tstable("t", table_root)
summary = sess.sql(
"""
SELECT
symbol,
min(ts) AS first_ts,
max(ts) AS last_ts,
count(*) AS row_count
FROM t
GROUP BY symbol
ORDER BY symbol
"""
)
print(summary)
Putting it together
from pathlib import Path
import timeseries_table_format as ttf
TABLE_ROOT = Path("./prices_table")
def open_or_create_table(table_root: Path) -> ttf.TimeSeriesTable:
try:
return ttf.TimeSeriesTable.open(str(table_root))
except ttf.TimeseriesTableError:
return ttf.TimeSeriesTable.create(
table_root=str(table_root),
time_column="ts",
bucket="1h",
entity_columns=["symbol"],
)
def run_daily_ingest(new_files: list[Path]) -> None:
table = open_or_create_table(TABLE_ROOT)
for f in new_files:
try:
version = table.append_parquet(str(f))
print(f"appended {f.name} (version {version})")
except ttf.CoverageOverlapError as e:
print(f"skipped {f.name}: already loaded ({e.overlap_count} overlap(s))")
# Verify
sess = ttf.Session()
sess.register_tstable("prices", str(TABLE_ROOT))
print(sess.sql("SELECT min(ts), max(ts), count(*) FROM prices"))
Key takeaways
- Open, don't recreate. Use
TimeSeriesTable.open(...)on subsequent runs so you keep history. - Let
CoverageOverlapErrorbe your idempotency guard. Catching it is normal; it means the data is already there. - Always verify after ingest. A quick
min/max/countquery is cheap and confirms the append actually committed.
Next: - Reference: TimeSeriesTable - Reference: Exceptions - Concept: Buckets + overlap