From 6119ff0450046b02f21f2b446daf9e3d4ce40794 Mon Sep 17 00:00:00 2001 From: Eric O Date: Sat, 24 May 2025 11:31:42 -0400 Subject: [PATCH 1/2] Add atc:backfill:source_and_stored_objects task; Add is_backfilled_entry to stored_objects table --- Gemfile.lock | 1 + ...d_is_backfilled_entry_to_stored_objects.rb | 5 + db/schema.rb | 3 +- lib/tasks/atc/backfill.rake | 121 ++++++++++++++++++ lib/tasks/setup.rake | 5 +- 5 files changed, 133 insertions(+), 2 deletions(-) create mode 100644 db/migrate/20250524120351_add_is_backfilled_entry_to_stored_objects.rb create mode 100644 lib/tasks/atc/backfill.rake diff --git a/Gemfile.lock b/Gemfile.lock index 308efd2..1c30ef7 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -499,6 +499,7 @@ GEM PLATFORMS arm64-darwin-21 arm64-darwin-22 + arm64-darwin-23 x86_64-linux DEPENDENCIES diff --git a/db/migrate/20250524120351_add_is_backfilled_entry_to_stored_objects.rb b/db/migrate/20250524120351_add_is_backfilled_entry_to_stored_objects.rb new file mode 100644 index 0000000..a66e662 --- /dev/null +++ b/db/migrate/20250524120351_add_is_backfilled_entry_to_stored_objects.rb @@ -0,0 +1,5 @@ +class AddIsBackfilledEntryToStoredObjects < ActiveRecord::Migration[7.1] + def change + add_column :stored_objects, :is_backfilled_entry, :boolean, default: false + end +end diff --git a/db/schema.rb b/db/schema.rb index 713316e..23b038b 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema[7.1].define(version: 2024_06_03_194835) do +ActiveRecord::Schema[7.1].define(version: 2025_05_24_120351) do create_table "checksum_algorithms", force: :cascade do |t| t.string "name", null: false t.datetime "created_at", null: false @@ -95,6 +95,7 @@ t.binary "transfer_checksum_value", limit: 4 t.integer "transfer_checksum_part_size" t.integer "transfer_checksum_part_count" + t.boolean "is_backfilled_entry", default: false t.index ["source_object_id"], name: "index_stored_objects_on_source_object_id" t.index ["storage_provider_id", "path_hash"], name: "index_stored_objects_on_storage_provider_id_and_path_hash", unique: true t.index ["storage_provider_id", "source_object_id"], name: "idx_on_storage_provider_id_source_object_id_25088e9be4", unique: true diff --git a/lib/tasks/atc/backfill.rake b/lib/tasks/atc/backfill.rake new file mode 100644 index 0000000..dfaceea --- /dev/null +++ b/lib/tasks/atc/backfill.rake @@ -0,0 +1,121 @@ +namespace :atc do + namespace :backfill do + desc 'Load files from an AIP into ATC, load checksums from the AIP manifest, and initiate transfer and verification processes.' + task source_and_stored_objects: :environment do + required_headers = [ + # SourceObject required headers + 'source_object_path', + 'object_size', + 'fixity_checksum_algorithm_name', + 'fixity_checksum_hex_value', + # StoredObject required headers + 'transfer_checksum_algorithm_name', + 'transfer_checksum_hex_value', + 'aws_path', + 'aws_storage_provider_id', # Note: In production, preservation aws is 0 + 'gcp_path', + 'gcp_storage_provider_id' # Note: In production, preservation gcp is 1 + ] + + csv_path = ENV['csv_path'] + dry_run = ENV['dry_run'] == 'true' + + if csv_path.blank? + puts Rainbow("Missing required argument: csv_path").red.bright + next + end + + # Validate headers + missing_headers = [] + CSV.foreach(csv_path) do |row| + # Read first row only, validate headers, then break + missing_headers = (required_headers - row) + break + end + + if missing_headers.present? + puts "Missing required csv headers: #{missing_headers.join(', ')}" + next + end + + available_checksum_algorithms = ChecksumAlgorithm.all.map do |checksum_algorithm| + [checksum_algorithm.name, checksum_algorithm] + end.to_h + + storage_providers = StorageProvider.all.map do |storage_provider| + [storage_provider.id, storage_provider] + end.to_h + + CSV.foreach(csv_path, headers: true).with_index do |row, i| + # Validate the data in this row + # Ensure that none of the required values are blank: + required_headers.each do |required_header| + if row[required_header].nil? || row[required_header] == '' + raise StandardError, "Error on CSV row #{i+1}. Missing required value: #{required_header}" + end + end + + # Make sure that fixity_checksum_algorithm_name resolves to a ChecksumAlgorithm + fixity_checksum_algorithm = available_checksum_algorithms[row['fixity_checksum_algorithm_name'].upcase] + raise StandardError, "Could not resolve fixity_checksum_algorithm_name to a known ChecksumAlgorithm" if fixity_checksum_algorithm.nil? + + # Make sure that transfer_checksum_algorithm_name resolves to a ChecksumAlgorithm + transfer_checksum_algorithm = available_checksum_algorithms[row['transfer_checksum_algorithm_name'].upcase] + raise StandardError, "Could not resolve transfer_checksum_algorithm_name to a known ChecksumAlgorithm" if transfer_checksum_algorithm.nil? + + # Make sure that aws_storage_provider_id and gcp_storage_provider_id are numbers + raise StandardError, "aws_storage_provider_id is not a number" unless row['aws_storage_provider_id'] =~ /\d+/ + raise StandardError, "gcp_storage_provider_id is not a number" unless row['gcp_storage_provider_id'] =~ /\d+/ + + # Make sure that aws_storage_provider_id and gcp_storage_provider_id resolve to real StorageProvider objects + aws_storage_provider = storage_providers[row['aws_storage_provider_id'].to_i] + gcp_storage_provider = storage_providers[row['gcp_storage_provider_id'].to_i] + raise StandardError, "Could not resolve aws_storage_provider_id to a known StorageProvider" if aws_storage_provider.nil? + raise StandardError, "Could not resolve gcp_storage_provider_id to a known StorageProvider" if gcp_storage_provider.nil? + + if dry_run + puts "Row #{i+1} appears to be valid. No records were created because we are in dry_run mode." + break + end + + # Create source object + source_object = SourceObject.create!( + path: row['source_object_path'], + object_size: row['object_size'].to_i, + fixity_checksum_algorithm: fixity_checksum_algorithm, + fixity_checksum_value: Atc::Utils::HexUtils.hex_to_bin(row['fixity_checksum_hex_value']) + ) + + # Create AWS StoredObject record + aws_stored_object = StoredObject.create!( + path: row['aws_path'], + source_object: source_object, + storage_provider: aws_storage_provider, + transfer_checksum_algorithm: transfer_checksum_algorithm, + transfer_checksum_value: Atc::Utils::HexUtils.hex_to_bin(row['transfer_checksum_hex_value']), + transfer_checksum_part_size: nil, # We leave this blank because we're using a single part value + transfer_checksum_part_count: nil, # We leave this blank because we're using a single part value + is_backfilled_entry: true + ) + + # Create GCP StoredObject record + gcp_stored_object = StoredObject.create!( + path: row['gcp_path'], + source_object: source_object, + storage_provider: gcp_storage_provider, + transfer_checksum_algorithm: transfer_checksum_algorithm, + transfer_checksum_value: Atc::Utils::HexUtils.hex_to_bin(row['transfer_checksum_hex_value']), + transfer_checksum_part_size: nil, # We leave this blank because we're using a single part value + transfer_checksum_part_count: nil, # We leave this blank because we're using a single part value + is_backfilled_entry: true + ) + + # Queue fixity verification job for the new AWS StoredObject + # NOTE: We do not do this for the GCP StoredObject because we are not verifying GCP StoredObjects at this time. + VerifyFixityJob.perform_later(aws_stored_object.id) + end + + puts "\nDone!" + end + end +end diff --git a/lib/tasks/setup.rake b/lib/tasks/setup.rake index f6ee981..a5b4d43 100644 --- a/lib/tasks/setup.rake +++ b/lib/tasks/setup.rake @@ -25,7 +25,10 @@ namespace :atc do [ { name: 'SHA256', empty_binary_value: Digest::SHA256.new.digest }, { name: 'SHA512', empty_binary_value: Digest::SHA512.new.digest }, - { name: 'CRC32C', empty_binary_value: Digest::CRC32c.new.digest } + { name: 'CRC32C', empty_binary_value: Digest::CRC32c.new.digest }, + # NOTE: If we add MD5 later and use it for the transfer_checksum_algorithm, we'll need to adjust the size of + # the transfer_checksum_algorithm column becuse it currently holds a maximum of 4 bytes. + # { name: 'MD5', empty_binary_value: Digest::MD5.new.digest } ].each do |checksum_algorithm_args| if ChecksumAlgorithm.exists?(name: checksum_algorithm_args[:name]) puts "#{Rainbow("ChecksumAlgorithm already exists (skipping): #{checksum_algorithm_args[:name]}").blue.bright}\n" From 1f74f1014198820180c0763290b3091e65c5db73 Mon Sep 17 00:00:00 2001 From: Eric O Date: Wed, 1 Oct 2025 14:56:36 -0400 Subject: [PATCH 2/2] Update to dry_run logic in backfill task --- Gemfile.lock | 1 + lib/tasks/atc/backfill.rake | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/Gemfile.lock b/Gemfile.lock index 1c30ef7..7684236 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -500,6 +500,7 @@ PLATFORMS arm64-darwin-21 arm64-darwin-22 arm64-darwin-23 + arm64-darwin-24 x86_64-linux DEPENDENCIES diff --git a/lib/tasks/atc/backfill.rake b/lib/tasks/atc/backfill.rake index dfaceea..740e154 100644 --- a/lib/tasks/atc/backfill.rake +++ b/lib/tasks/atc/backfill.rake @@ -75,7 +75,7 @@ namespace :atc do if dry_run puts "Row #{i+1} appears to be valid. No records were created because we are in dry_run mode." - break + next end # Create source object @@ -113,6 +113,8 @@ namespace :atc do # Queue fixity verification job for the new AWS StoredObject # NOTE: We do not do this for the GCP StoredObject because we are not verifying GCP StoredObjects at this time. VerifyFixityJob.perform_later(aws_stored_object.id) + + puts "Processed row #{i+1}." end puts "\nDone!"