Skip to content

Commit b7c3350

Browse files
authored
Merge pull request #167 from MITLibraries/epic-TIMX-515
Merge long running feature branch `epic-TIMX-515`
2 parents 9bf9b1b + 1fba058 commit b7c3350

21 files changed

+3033
-1758
lines changed

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,4 +160,6 @@ cython_debug/
160160
# VSCode
161161
.vscode
162162

163-
output/
163+
output/
164+
165+
AGENTS.md

Makefile

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,10 @@ ruff-apply: # Resolve 'fixable errors' with 'ruff'
6363
######################
6464
minio-start:
6565
docker run \
66-
-p 9000:9000 \
67-
-p 9001:9001 \
68-
-v $(MINIO_DATA):/data \
69-
-e "MINIO_ROOT_USER=$(MINIO_USERNAME)" \
70-
-e "MINIO_ROOT_PASSWORD=$(MINIO_PASSWORD)" \
71-
quay.io/minio/minio server /data --console-address ":9001"
66+
-d \
67+
-p 9000:9000 \
68+
-p 9001:9001 \
69+
-v $(MINIO_DATA):/data \
70+
-e "MINIO_ROOT_USER=$(MINIO_USERNAME)" \
71+
-e "MINIO_ROOT_PASSWORD=$(MINIO_PASSWORD)" \
72+
quay.io/minio/minio server /data --console-address ":9001"

Pipfile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@ boto3 = "*"
99
duckdb = "*"
1010
pandas = "*"
1111
pyarrow = "*"
12+
sqlalchemy = "*"
13+
duckdb-engine = "*"
1214

1315
[dev-packages]
1416
black = "*"
15-
boto3-stubs = {version = "*", extras = ["s3"]}
17+
boto3-stubs = {extras = ["essential"], version = "*"}
1618
coveralls = "*"
1719
ipython = "*"
1820
moto = "*"

Pipfile.lock

Lines changed: 727 additions & 558 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,17 @@ WARNING_ONLY_LOGGERS=# Comma-seperated list of logger names to set as WARNING on
4949
MINIO_S3_ENDPOINT_URL=# If set, informs the library to use this Minio S3 instance. Requires the http(s):// protocol.
5050
MINIO_USERNAME=# Username / AWS Key for Minio; required when MINIO_S3_ENDPOINT_URL is set
5151
MINIO_PASSWORD=# Pasword / AWS Secret for Minio; required when MINIO_S3_ENDPOINT_URL is set
52-
MINIO_DATA=# Path to persist MinIO data if started via Makefile command
52+
MINIO_DATA=# Path to persist MinIO data if started via Makefile command
53+
54+
TDA_READ_BATCH_SIZE=# Row size of batches read, affecting memory consumption
55+
TDA_WRITE_BATCH_SIZE=# Row size of batches written, directly affecting row group size in final parquet files
56+
TDA_MAX_ROWS_PER_GROUP=# Max number of rows per row group in a parquet file
57+
TDA_MAX_ROWS_PER_FILE=# Max number of rows in a single parquet file
58+
TDA_BATCH_READ_AHEAD=# Number of batches to optimistically read ahead when batch reading from a dataset; pyarrow default is 16
59+
TDA_FRAGMENT_READ_AHEAD=# Number of fragments to optimistically read ahead when batch reaching from a dataset; pyarrow default is 4
60+
TDA_DUCKDB_MEMORY_LIMIT=# Memory limit for DuckDB connection
61+
TDA_DUCKDB_THREADS=# Thread limit for DuckDB connection
62+
TDA_DUCKDB_JOIN_BATCH_SIZE=# Batch size for metadata + data joins, 100k default and recommended
5363
```
5464

5565
## Local S3 via MinIO
@@ -101,12 +111,6 @@ timdex_dataset = TIMDEXDataset("s3://my-bucket/path/to/dataset")
101111

102112
# or, local dataset (e.g. testing or development)
103113
timdex_dataset = TIMDEXDataset("/path/to/dataset")
104-
105-
# load the dataset, which discovers all parquet files
106-
timdex_dataset.load()
107-
108-
# or, load the dataset but ensure that only current records are ever yielded
109-
timdex_dataset.load(current_records=True)
110114
```
111115

112116
All read methods for `TIMDEXDataset` allow for the same group of filters which are defined in `timdex_dataset_api.dataset.DatasetFilters`. Examples are shown below.
@@ -144,6 +148,8 @@ run_df = timdex_dataset.read_dataframe(
144148
)
145149
```
146150

151+
See [docs/reading.md](docs/reading.md) for more information.
152+
147153
### Writing Data
148154

149155
At this time, the only application that writes to the ETL parquet dataset is Transmogrifier.

docs/reading.md

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
# Reading data from TIMDEXDataset
2+
3+
This guide explains how `TIMDEXDataset` read methods work and how to use them effectively.
4+
5+
- `TIMDEXDataset` and `TIMDEXDatasetMetadata` both maintain an in-memory DuckDB context. You can issue DuckDB SQL against the views/tables they create.
6+
- Read methods use a two-step query flow for performance:
7+
1) a metadata query determines which Parquet files and row offsets are relevant
8+
2) a data query reads just those rows and returns the requested columns
9+
- Prefer simple key/value `DatasetFilters` for most use cases; add a `where=` SQL predicate when you need more advanced logic (e.g., ranges, `BETWEEN`, `>`, `<`, `IN`).
10+
11+
## Available read methods
12+
13+
- `read_batches_iter(...)`: yields `pyarrow.RecordBatch`
14+
- `read_dicts_iter(...)`: yields Python `dict` per row
15+
- `read_dataframe(...)`: returns a pandas `DataFrame`
16+
- `read_dataframes_iter(...)`: yields pandas `DataFrame` batches
17+
- `read_transformed_records_iter(...)`: yields `transformed_record` dictionaries only
18+
19+
All accept the same `DatasetFilters` and the optional `where=` SQL predicate.
20+
21+
## Filters vs. where=
22+
23+
- `DatasetFilters` are key/value arguments on read methods. They are validated and translated into SQL and will cover most queries.
24+
- Examples: `source="alma"`, `run_date="2024-12-01"`, `run_type="daily"`, `action="index"`
25+
- `where=` is an optional raw SQL WHERE predicate string, combined with `DatasetFilters` using `AND`. Use it for:
26+
- date/time ranges (BETWEEN, >, <)
27+
- set membership (IN (...))
28+
- complex boolean logic (AND/OR grouping)
29+
30+
Important: `where=` must be only a WHERE predicate (no `SELECT`/`FROM`/`;`). The library plugs it into generated SQL.
31+
32+
## How reading works (two-step process)
33+
34+
1) Metadata query
35+
- Runs against `TIMDEXDatasetMetadata` views (e.g., `metadata.records`, `metadata.current_records`)
36+
- Produces a small result set with identifiers: `filename`, row group/offsets, and primary keys
37+
- Greatly reduces how much data must be scanned
38+
39+
2) Data query
40+
- Uses DuckDB to read only relevant Parquet fragments based on metadata results
41+
- Joins the metadata identifiers to return the exact rows requested
42+
- Returns batches, dicts, or a `DataFrame` depending on the method
43+
44+
This pattern keeps reads fast and memory-efficient even for large datasets.
45+
46+
The following diagram shows the flow for an example query:
47+
48+
```python
49+
for record_dict in td.read_dicts_iter(
50+
table="records",
51+
source="dspace",
52+
run_date="2025-09-01",
53+
run_id="abc123"
54+
):
55+
# process record...
56+
```
57+
58+
```mermaid
59+
sequenceDiagram
60+
autonumber
61+
participant U as User
62+
participant TD as TIMDEXDataset
63+
participant TDM as TIMDEXDatasetMetadata
64+
participant D as DuckDB Context
65+
participant P as Parquet files
66+
67+
U->>TD: Perform query
68+
Note left of TD: read_dicts_iter(<br>table="records",<br>source="dspace",<br>run_date="2025-09-01",<br>run_id="abc123")
69+
TD->>TDM: build_meta_query(table, filters, where=None)
70+
Note right of TDM: (Metadata Query)<br><br>SELECT r.timdex_record_id, r.run_id, r.filename, r.run_record_offset<br>FROM metadata.records r<br>WHERE r.source = 'dspace'<br>AND r.run_date = '2025-09-01'<br>AND r.run_id = 'abc123'<br>ORDER BY r.filename, r.run_record_offset
71+
72+
TDM->>D: Execute metadata query
73+
D-->>TD: lightweight result set (file + offsets)
74+
75+
TD->>D: Build and run data query using metadata
76+
Note right of D: (Data query)<br><br>SELECT <COLUMNS><br>FROM read_parquet(P.files) d<br>JOIN meta m<br>USING (timdex_record_id, run_id, run_record_offset)
77+
78+
D-->>TD: batches of rows
79+
TD-->>U: iterator of dicts (one dict per row)
80+
```
81+
82+
83+
## Quick start examples
84+
85+
```python
86+
from timdex_dataset_api import TIMDEXDataset
87+
88+
td = TIMDEXDataset("s3://my-bucket/timdex-dataset") # example instance
89+
90+
# 1) Get a single record as a dict
91+
first = next(td.read_dicts_iter())
92+
93+
# 2) Read batches with simple filters
94+
for batch in td.read_batches_iter(source="alma", run_date="2025-06-01", run_id="abc123"):
95+
... # process pyarrow.RecordBatch
96+
97+
# 3) DataFrame of one run
98+
df = td.read_dataframe(source="dspace", run_date="2025-06-01", run_id="def456")
99+
100+
# 4) Only transformed records (used by indexer)
101+
for rec in td.read_transformed_records_iter(source="aspace", run_type="daily"):
102+
... # rec is a dict of the transformed_record
103+
```
104+
105+
## `where=` examples
106+
107+
Advanced filtering that complements `DatasetFilters`.
108+
109+
```python
110+
# date range with BETWEEN
111+
where = "run_date BETWEEN '2024-12-01' AND '2024-12-31'"
112+
df = td.read_dataframe(source="alma", where=where)
113+
114+
# greater-than on a timestamp (if present in columns)
115+
where = "run_timestamp > '2024-12-01T10:00:00Z'"
116+
df = td.read_dataframe(source="aspace", run_type="daily", where=where)
117+
118+
# combine set membership and action
119+
where = "run_id IN ('run-1', 'run-3', 'run-5') AND action = 'index'"
120+
df = td.read_dataframe(source="alma", where=where)
121+
122+
# combine filters (AND) with where=
123+
where = "run_type = 'daily' AND action = 'index'"
124+
df = td.read_dataframe(source="libguides", where=where)
125+
```
126+
127+
Validation tips:
128+
- Use only a predicate (no SELECT/FROM, no trailing semicolon).
129+
- Column names must exist in the target table/view (e.g., records or current_records).
130+
- `DatasetFilters` + `where=` are ANDed; if the combination yields zero rows, you’ll get an empty result.
131+
132+
## Choosing a table
133+
134+
By default, read methods query the `records` view (all versions). To get only the latest version per `timdex_record_id`, target the `current_records` view:
135+
136+
```python
137+
# ALL records in the 'libguides' source
138+
all_libguides_df = td.read_dataframe(table="records", source="libguides")
139+
140+
# latest unique records across the dataset
141+
current_df = td.read_dataframe(table="current_records")
142+
143+
# current records for a source and specific run
144+
current_df = td.read_dataframe(table="current_records", source="alma", run_id="run-5")
145+
```
146+
147+
## DuckDB context
148+
149+
- `TIMDEXDataset` exposes a DuckDB connection used for data queries against Parquet.
150+
- `TIMDEXDatasetMetadata` exposes a DuckDB connection used for metadata queries and provides views:
151+
- `metadata.records`: all record versions with run metadata
152+
- `metadata.current_records`: latest record per `timdex_record_id`
153+
- `metadata.append_deltas`: incremental write tracking
154+
155+
You can execute raw DuckDB SQL for inspection and debugging:
156+
157+
```python
158+
# access metadata connection
159+
conn = td.metadata.conn # DuckDB connection
160+
161+
# peek at view schemas
162+
print(conn.sql("DESCRIBE metadata.records").to_df())
163+
print(conn.sql("DESCRIBE metadata.current_records").to_df())
164+
165+
# ad-hoc query (read-only)
166+
debug_df = conn.sql("""
167+
SELECT source, action, COUNT(*) as n
168+
FROM metadata.records
169+
WHERE run_date = '2024-12-01'
170+
GROUP BY 1, 2
171+
ORDER BY n DESC
172+
""").to_df()
173+
```
174+
175+
## Performance notes
176+
177+
- Batch iterators (`read_batches_iter()` / `read_dataframes_iter()`) stream results to control memory.
178+
- `read_dataframe()` loads ALL matching rows into memory; fine for small/filtered sets but can easily overwhelm memory for large result sets
179+
- Tuning via env vars (advanced): `TDA_READ_BATCH_SIZE`, `TDA_DUCKDB_THREADS`, `TDA_DUCKDB_MEMORY_LIMIT`.
180+
181+
## Troubleshooting
182+
183+
- Empty results? Check that filters and `where=` don’t over-constrain your query.
184+
- Syntax errors? Ensure `where=` is a valid predicate and references existing columns.
185+
- Large scans? Make sure to use `_iter()` read methods.

migrations/002_2025_06_25_consistent_run_timestamp_per_etl_run.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
# ruff: noqa: BLE001, D212, TRY300, TRY400
1+
# ruff: noqa: PGH004
2+
# ruff: noqa
3+
# type: ignore
4+
25
"""
36
Date: 2025-06-25
47
@@ -29,6 +32,10 @@
2932
pipenv run python migrations/002_2025_06_25_consistent_run_timestamp_per_etl_run.py \
3033
<DATASET_LOCATION> \
3134
--dry-run
35+
36+
Update: 2025-08-04
37+
38+
This migration is no longer functional given changes to TIMDEXDataset.
3239
"""
3340

3441
import argparse

pyproject.toml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@ dependencies = [
2525
"attrs",
2626
"boto3",
2727
"duckdb",
28+
"duckdb_engine",
2829
"pandas",
2930
"pyarrow",
31+
"sqlalchemy"
3032
]
3133

3234
[project.optional-dependencies]
@@ -54,14 +56,17 @@ line-length = 90
5456
[tool.mypy]
5557
disallow_untyped_calls = true
5658
disallow_untyped_defs = true
57-
exclude = ["tests/", "output/"]
59+
exclude = ["tests/", "output/", "migrations/"]
5860

5961
[[tool.mypy.overrides]]
6062
module = []
6163
ignore_missing_imports = true
6264

6365
[tool.pytest.ini_options]
6466
log_level = "INFO"
67+
filterwarnings = [
68+
"ignore:duckdb-engine doesn't yet support reflection on indices:duckdb_engine.DuckDBEngineWarning",
69+
]
6570

6671
[tool.ruff]
6772
target-version = "py312"
@@ -95,6 +100,8 @@ ignore = [
95100
"PLR0915",
96101
"S321",
97102
"S608",
103+
"TD002",
104+
"TD003",
98105
"TRY003"
99106
]
100107

0 commit comments

Comments
 (0)