Skip to content

Commit 6d13113

Browse files
Use metadata in 'bulk-update-embeddings'
Why these changes are being introduced: * With TDA 3.8.0, we can now retrieve record metadata columns in embeddings read methods. Filtering embeddings by `action="index"` prevents any attempt to update documents that do not exist in OpenSearch (`action="delete"`), which results in an API error.. This is important especially with the current state of tim.opensearch.bulk_update, which will raise a BulkOperationError and cause the 'bulk_update_embeddings' CLI command to exit early. This also includes an additional change to also index embeddings when performing a reindex. How this addresses that need: * Filter embeddings by action="index" * Install latest version of timdex-dataset-api (latest commit) * Update embeddings in fixtures/test/dataset to use 'embeddings_timestamp" Side effects of this change: * None Relevant ticket(s): * https://mitlibraries.atlassian.net/browse/USE-273
1 parent fc4a958 commit 6d13113

File tree

4 files changed

+25
-7
lines changed

4 files changed

+25
-7
lines changed

Pipfile.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

tests/fixtures/dataset/data/embeddings/year=2025/month=12/day=10/629d15f4-84e4-4b32-92c1-1b1debd377fb-0.parquet renamed to tests/fixtures/dataset/data/embeddings/year=2025/month=12/day=17/4a40cef5-5629-4bc7-b743-7804a34f9593-0.parquet

22.8 KB
Binary file not shown.

tim/cli.py

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,7 @@ def bulk_update_embeddings(
385385
"embedding_object",
386386
],
387387
run_id=run_id,
388+
action="index",
388389
)
389390
embeddings_to_index = helpers.format_embeddings(embeddings)
390391

@@ -454,12 +455,11 @@ def reindex_source(
454455
tim_os.get_index_aliases(client, index),
455456
)
456457

457-
# perform bulk indexing of current records from source
458-
index_results = {"created": 0, "updated": 0, "errors": 0, "total": 0}
459-
458+
# reindex current records from source
460459
td = TIMDEXDataset(location=dataset_path)
461460

462461
# bulk index records
462+
index_results = {"created": 0, "updated": 0, "errors": 0, "total": 0}
463463
records_to_index = td.read_transformed_records_iter(
464464
table="current_records",
465465
source=source,
@@ -468,7 +468,25 @@ def reindex_source(
468468
try:
469469
index_results.update(tim_os.bulk_index(client, index, records_to_index))
470470
except BulkIndexingError as exception:
471-
logger.info(f"Bulk indexing failed: {exception}")
471+
logger.error(f"Bulk indexing failed: {exception}") # noqa: TRY400
472+
473+
# bulk index embeddings
474+
update_results = {"updated": 0, "errors": 0, "total": 0}
475+
embeddings = td.embeddings.read_dicts_iter(
476+
table="current_embeddings",
477+
columns=[
478+
"timdex_record_id",
479+
"embedding_strategy",
480+
"embedding_object",
481+
],
482+
source=source,
483+
action="index",
484+
)
485+
embeddings_to_index = helpers.format_embeddings(embeddings)
486+
try:
487+
update_results.update(tim_os.bulk_update(client, index, embeddings_to_index))
488+
except BulkOperationError as exception:
489+
logger.error(f"Bulk update with embeddings failed: {exception}") # noqa: TRY400
472490

473-
summary_results = {"index": index_results}
491+
summary_results = {"index": index_results, "update": update_results}
474492
logger.info(f"Reindex source complete: {json.dumps(summary_results)}")

0 commit comments

Comments
 (0)