diff --git a/app/controllers/enrichments_controller.rb b/app/controllers/enrichments_controller.rb new file mode 100644 index 000000000..4deb6f740 --- /dev/null +++ b/app/controllers/enrichments_controller.rb @@ -0,0 +1,101 @@ +# frozen_string_literal: true + +class EnrichmentsController < ApplicationController + PAGE_SIZE = 25 + + def index + doi = params["doi"]&.upcase + client_id = params["client_id"] + cursor = params.dig("page", "cursor") + + base_enrichments = base_page_enrichments(doi, client_id) + + enrichments = if cursor.present? + cursor_updated_at, cursor_id, cursor_page = decode_cursor(cursor) + base_enrichments.by_cursor(cursor_updated_at, cursor_id) + else + base_enrichments + end + + enrichments = enrichments.order_by_cursor.limit(PAGE_SIZE).to_a + + cursor_page ||= 1 + + options = { + meta: build_meta(base_enrichments, cursor_page), + links: build_paging_links(enrichments, doi, client_id, cursor_page) + } + + render(json: EnrichmentSerializer.new(enrichments, options).serializable_hash, status: :ok) + end + + private + def base_page_enrichments(doi, client_id) + if doi.present? + Enrichment.by_doi(doi) + elsif client_id.present? + Enrichment.by_client(client_id) + else + Enrichment.all + end + end + + def encode_cursor(hash) + Base64.urlsafe_encode64(hash.to_json, padding: false) + rescue + raise ActionController::InternalServerError, "Failed to encode cursor" + end + + def decode_cursor(token) + decoded_cursor = JSON.parse(Base64.urlsafe_decode64(token)) + cursor_updated_at = Time.iso8601(decoded_cursor.fetch("updated_at")) + cursor_id = decoded_cursor.fetch("id").to_i + cursor_page = decoded_cursor.fetch("page", nil).to_i || 0 + + Rails.logger.info("cursor_page: #{cursor_page}") + + [cursor_updated_at, cursor_id, cursor_page] + rescue + raise ActionController::BadRequest, "Invalid cursor" + end + + def build_meta(enrichments, cursor_page) + enrichments_total = enrichments.count + + { + total: enrichments_total, + totalPages: (enrichments_total / PAGE_SIZE.to_f).ceil, + page: cursor_page + } + end + + def build_next_link(doi, client_id, next_cursor) + base_link = request.original_url.split("?").first + + query_string = if doi.present? + "doi=#{doi}&cursor=#{next_cursor}" + elsif client_id.present? + "client-id=#{client_id}&cursor=#{next_cursor}" + else + "page[cursor]=#{next_cursor}" + end + + "#{base_link}?#{query_string}" + end + + def build_paging_links(enrichments, doi, client_id, cursor_page) + current_link = request.original_url + + next_cursor = if enrichments.any? + last = enrichments.last + encode_cursor(updated_at: last.updated_at.iso8601(6), id: last.id, page: cursor_page + 1) + end + + next_link = build_next_link(doi, client_id, next_cursor) + + { + self: current_link, + next: enrichments.length == PAGE_SIZE ? next_link : nil + } + end +end diff --git a/app/jobs/enrichment_batch_process_job.rb b/app/jobs/enrichment_batch_process_job.rb new file mode 100644 index 000000000..1b8567ea9 --- /dev/null +++ b/app/jobs/enrichment_batch_process_job.rb @@ -0,0 +1,72 @@ +# frozen_string_literal: true + +class EnrichmentBatchProcessJob < ApplicationJob + include ErrorSerializable + + queue_as :enrichment_batch_process_job + + def perform(lines, filename) + log_prefix = "EnrichmentBatchProcessJob (#{filename})" + + # We will process the lines in parallel to speed up ingestion. + Parallel.each(lines, in_threads: 5) do |line| + # with_connection ensures the connection is explicitly checked out and returned to the pool after + # each iteration, preventing connection pool exhaustion. + ActiveRecord::Base.connection_pool.with_connection do + begin + parsed_line = JSON.parse(line) + rescue JSON::ParserError => e + Rails.logger.error("#{log_prefix}: Failed to parse line: #{e.message}") + next + end + + # We only create enrichments for DOIs that exist and which have an agency of 'datacite'. + uid = parsed_line["doi"]&.upcase + doi = Doi.find_by(doi: uid, agency: "datacite") + + if doi.blank? + Rails.logger.error("#{log_prefix}: Doi #{uid} does not exist") + next + end + + if doi.enrichment_field(parsed_line["field"]).nil? + Rails.logger.error("#{log_prefix}: Unsupported enrichment field #{parsed_line["field"]} for DOI #{uid}") + next + end + + # We set the only_validate flag on the DOI model to true such that we + # ensure that validation functions as expected when not persisting the record. + doi.only_validate = true + + enrichment = Enrichment.new( + filename: filename, + doi: uid, + contributors: parsed_line["contributors"], + resources: parsed_line["resources"], + field: parsed_line["field"], + action: parsed_line["action"], + original_value: parsed_line["originalValue"], + enriched_value: parsed_line["enrichedValue"] + ) + + apply_error = doi.apply_enrichment(parsed_line) + + if apply_error.present? + Rails.logger.error("#{log_prefix}: Failed to apply enrichment for DOI #{uid}, #{apply_error}") + next + end + + unless doi.valid? + errors = serialize_errors(doi.errors, uid: enrichment.doi) + Rails.logger.error("#{log_prefix}: Enrichment does not generate valid metadata: #{errors}") + next + end + + unless enrichment.save + errors = enrichment.errors.full_messages.join(";") + Rails.logger.error("#{log_prefix}: Failed to save enrichment for DOI #{uid}: #{errors}") + end + end + end + end +end diff --git a/app/models/concerns/enrichable.rb b/app/models/concerns/enrichable.rb new file mode 100644 index 000000000..f10ae3017 --- /dev/null +++ b/app/models/concerns/enrichable.rb @@ -0,0 +1,81 @@ +# frozen_string_literal: true + +module Enrichable + extend ActiveSupport::Concern + + # Returns a non-nil error message if application of the enrichment failed, otherwise returns nil. + def apply_enrichment(enrichment) + action = enrichment["action"] + field = enrichment_field(enrichment["field"]) + + raise ArgumentError, "Invalid enrichment field #{enrichment["field"]}" if field.nil? + + error = nil + + # A return of true indicates that the enrichment was applied. + # This is important in the case of the update_child and delete_child actions. + case action + when "insert" + self[field] ||= [] + self[field] << enrichment["enriched_value"] + when "update" + if self[field] == enrichment["original_value"] + self[field] = enrichment["enriched_value"] + else + error = "Original value does not match current value for update action" + end + when "updateChild" + success = false + + self[field].each_with_index do |item, index| + if item == enrichment["original_value"] + self[field][index] = enrichment["enriched_value"] + success = true + break + end + end + + error = "Original value not found for updateChild action" unless success + when "deleteChild" + success = false + + self[field] ||= [] + self[field].each_with_index do |item, index| + if item == enrichment["original_value"] + self[field].delete_at(index) + success = true + end + end + + error = "Original value not found for deleteChild action" unless success + end + + error + end + + def enrichment_field(field) + field_mapping = { + "alternateIdentifiers" => "alternate_identifiers", + "creators" => "creators", + "titles" => "titles", + "publisher" => "publisher", + "publicationYear" => "publication_year", + "subjects" => "subjects", + "contributors" => "contributors", + "dates" => "dates", + "language" => "language", + "types" => "types", + "relatedIdentifiers" => "related_identifiers", + "relatedItems" => "related_items", + "sizes" => "sizes", + "formats" => "formats", + "version" => "version", + "rightsList" => "rights_list", + "descriptions" => "descriptions", + "geoLocations" => "geo_locations", + "fundingReferences" => "funding_references" + } + + field_mapping.fetch(field, nil) + end +end diff --git a/app/models/doi.rb b/app/models/doi.rb index a08b6608d..d4452e337 100644 --- a/app/models/doi.rb +++ b/app/models/doi.rb @@ -35,6 +35,8 @@ class Doi < ApplicationRecord include Elasticsearch::Model + include Enrichable + aasm whiny_transitions: false do # draft is initial state for new DOIs. state :draft, initial: true diff --git a/app/models/enrichment.rb b/app/models/enrichment.rb new file mode 100644 index 000000000..32596fc56 --- /dev/null +++ b/app/models/enrichment.rb @@ -0,0 +1,54 @@ +class Enrichment < ApplicationRecord + validate :validate_json_schema + + belongs_to :doi_record, + class_name: "Doi", + foreign_key: :doi, # enrichments.doi + primary_key: :doi, # dois.doi + optional: false + + has_one :client, through: :doi_record + + scope :by_doi, ->(doi) { where(doi: doi) } + + scope :by_client, ->(client_id) { joins(doi_record: :client).where(datacentre: { symbol: client_id }) } + + scope :by_cursor, ->(updated_at, id) { + where("(enrichments.updated_at < ?) OR (enrichments.updated_at = ? AND enrichments.id < ?)", + updated_at, + updated_at, + id) + } + + scope :order_by_cursor, -> { order(updated_at: :desc, id: :desc) } + + private + def validate_json_schema + doc = to_enrichment_hash + error_list = self.class.enrichment_schemer.validate(doc).to_a + + return if error_list.empty? + + errors.add(:base, "Validation failed: #{error_list.map { |e| e['message'] || e.inspect }.join('; ')}") + end + + def to_enrichment_hash + { + "doi" => doi, + "contributors" => contributors, + "resources" => resources, + "field" => field, + "action" => action, + "originalValue" => original_value, + "enrichedValue" => enriched_value + }.compact + end + + def self.enrichment_schemer + @enrichment_schemer ||= begin + schema_path = Rails.root.join("app/models/schemas/enrichment/enrichment.json") + schema = JSON.parse(File.read(schema_path)) + JSONSchemer.schema(schema) + end + end +end diff --git a/app/models/schemas/enrichment/enrichment.json b/app/models/schemas/enrichment/enrichment.json new file mode 100644 index 000000000..ea2382201 --- /dev/null +++ b/app/models/schemas/enrichment/enrichment.json @@ -0,0 +1,359 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "additionalProperties": false, + "properties": { + "doi": { + "type": "string", + "description": "The target DOI of the enrichment record." + }, + "contributors": { + "type": "array", + "minItems": 1, + "description": "The source entities of the enrichment represented as an array of contributors", + "items": { + "type": "object", + "additionalProperties": false, + "required": ["name", "contributorType"], + "properties": { + "name": { + "type": "string" + }, + "nameType": { + "$ref": "#/definitions/nameType" + }, + "givenName": { + "type": ["string", "null"] + }, + "familyName": { + "type": ["string", "null"] + }, + "lang": { + "type": ["string", "null"] + }, + "affiliation": { + "$ref": "#/definitions/affiliations" + }, + "nameIdentifiers": { + "$ref": "#/definitions/nameIdentifiers" + }, + "contributorType": { + "$ref": "#/definitions/contributorTypes" + } + } + } + }, + "resources": { + "type": "array", + "minItems": 1, + "description": "The processes that produced the enrichment represented as an array of relatedIdentifiers", + "items": { + "type": "object", + "additionalProperties": false, + "required": [ + "relatedIdentifier", + "relationType", + "relatedIdentifierType" + ], + "properties": { + "relatedIdentifier": { + "type": "string" + }, + "relationType": { + "$ref": "#/definitions/relationTypes" + }, + "relatedIdentifierType": { + "$ref": "#/definitions/relatedIdentifierTypes" + }, + "relatedMetadataScheme": { + "type": "string" + }, + "schemeUri": { + "type": "string" + }, + "resourceTypeGeneral": { + "$ref": "#/definitions/resourceTypeGeneral" + } + } + } + }, + "field": { + "type": "string", + "description": "The top-level field to enrich.", + "enum": [ + "alternateIdentifiers", + "creators", + "titles", + "publisher", + "publicationYear", + "subjects", + "contributors", + "dates", + "language", + "types", + "relatedIdentifiers", + "relatedItems", + "sizes", + "formats", + "version", + "rightsList", + "descriptions", + "geoLocations", + "fundingReferences" + ] + }, + "action": { + "type": "string", + "description": "The action that the enrichment performs. The available actions are: update (change the entire top-level \"field\" from the \"originalValue\" to the \"enrichedValue\") updateChild (change a child object within the top-level \"field\" from the \"originalValue\" to the \"enrichedValue\") insert (insert the object in \"enrichedValue\" into the top-level \"field\") deleteChild (remove the object in \"originalValue\" within the top-level field specified in \"field\")", + "enum": ["update", "updateChild", "insert", "deleteChild"] + }, + "originalValue": { + "description": "When the action is update, updateChild, or deleteChild, the original value of the field or child value. Otherwise, this field is empty.", + "oneOf": [ + { + "type": "object" + }, + { + "type": "string" + }, + { + "type": "number" + }, + { + "type": "array" + } + ] + }, + "enrichedValue": { + "description": "When the action is update, updateChild, or insert, the enriched value of the field or child value. Otherwise, this field is empty.", + "oneOf": [ + { + "type": "object" + }, + { + "type": "string" + }, + { + "type": "number" + }, + { + "type": "array" + } + ] + } + }, + "required": ["doi", "contributors", "resources", "action", "field"], + "allOf": [ + { + "if": { + "properties": { + "action": { + "enum": ["update", "updateChild", "deleteChild"] + } + } + }, + "then": { + "required": ["originalValue"] + } + }, + { + "if": { + "properties": { + "action": { + "enum": ["update", "updateChild", "insert"] + } + } + }, + "then": { + "required": ["enrichedValue"] + } + } + ], + "definitions": { + "nameType": { + "anyOf": [ + { + "type": "string", + "enum": ["Organizational", "Personal"] + }, + { + "type": "null" + } + ] + }, + "affiliations": { + "type": "array", + "items": { + "type": "object", + "required": ["name"], + "additionalProperties": false, + "properties": { + "affiliationIdentifier": { + "type": ["string", "null"] + }, + "affiliationIdentifierScheme": { + "type": ["string", "null"] + }, + "name": { + "type": "string" + }, + "schemeUri": { + "type": ["string", "null"] + } + } + } + }, + "nameIdentifiers": { + "type": "array", + "items": { + "type": "object", + "required": ["nameIdentifier", "nameIdentifierScheme"], + "additionalProperties": false, + "properties": { + "schemeUri": { + "type": ["string", "null"] + }, + "nameIdentifier": { + "type": "string" + }, + "nameIdentifierScheme": { + "type": "string" + } + } + } + }, + "relationTypes": { + "type": "string", + "enum": [ + "IsCitedBy", + "Cites", + "IsSupplementTo", + "IsSupplementedBy", + "IsContinuedBy", + "Continues", + "IsDescribedBy", + "Describes", + "HasMetadata", + "IsMetadataFor", + "HasVersion", + "IsVersionOf", + "IsNewVersionOf", + "IsPreviousVersionOf", + "IsPartOf", + "HasPart", + "IsPublishedIn", + "IsReferencedBy", + "References", + "IsDocumentedBy", + "Documents", + "IsCompiledBy", + "Compiles", + "IsVariantFormOf", + "IsOriginalFormOf", + "IsIdenticalTo", + "IsReviewedBy", + "Reviews", + "IsDerivedFrom", + "IsSourceOf", + "IsRequiredBy", + "Requires", + "IsObsoletedBy", + "Obsoletes", + "IsCollectedBy", + "Collects", + "IsTranslationOf", + "HasTranslation" + ] + }, + "relatedIdentifierTypes": { + "type": "string", + "enum": [ + "ARK", + "arXiv", + "bibcode", + "CSTR", + "DOI", + "EAN13", + "EISSN", + "Handle", + "IGSN", + "ISBN", + "ISSN", + "ISTC", + "LISSN", + "LSID", + "PMID", + "PURL", + "RRID", + "UPC", + "URL", + "URN", + "w3id" + ] + }, + "resourceTypeGeneral": { + "type": "string", + "enum": [ + "Audiovisual", + "Award", + "Book", + "BookChapter", + "Collection", + "ComputationalNotebook", + "ConferencePaper", + "ConferenceProceeding", + "DataPaper", + "Dataset", + "Dissertation", + "Event", + "Image", + "InteractiveResource", + "Instrument", + "Journal", + "JournalArticle", + "Model", + "OutputManagementPlan", + "PeerReview", + "PhysicalObject", + "Preprint", + "Project", + "Report", + "Service", + "Software", + "Sound", + "Standard", + "StudyRegistration", + "Text", + "Workflow", + "Other" + ] + }, + "contributorTypes": { + "type": "string", + "enum": [ + "ContactPerson", + "DataCollector", + "DataCurator", + "DataManager", + "Distributor", + "Editor", + "HostingInstitution", + "Producer", + "ProjectLeader", + "ProjectManager", + "ProjectMember", + "RegistrationAgency", + "RegistrationAuthority", + "RelatedPerson", + "Researcher", + "ResearchGroup", + "RightsHolder", + "Sponsor", + "Supervisor", + "Translator", + "WorkPackageLeader", + "Other" + ] + } + } +} diff --git a/app/serializers/enrichment_serializer.rb b/app/serializers/enrichment_serializer.rb new file mode 100644 index 000000000..84a7d122e --- /dev/null +++ b/app/serializers/enrichment_serializer.rb @@ -0,0 +1,28 @@ +# frozen_string_literal: true + +class EnrichmentSerializer + include JSONAPI::Serializer + + set_key_transform :camel_lower + set_type :enrichments + set_id :id + + attributes :doi, + :contributors, + :resources, + :field, + :action, + :original_value, + :enriched_value + + # Ensure the DOI value is always serialized in lowercase + attribute :doi do |object| + object.doi&.downcase + end + + # Map created_at to created + attribute :created, &:created_at + + # Map updated_at to updated + attribute :updated, &:updated_at +end diff --git a/config/application.rb b/config/application.rb index b537f6269..5c790a924 100644 --- a/config/application.rb +++ b/config/application.rb @@ -69,6 +69,7 @@ ENV["METADATA_STORAGE_BUCKET_NAME"] ||= "" ENV["MONTHLY_DATAFILE_BUCKET"] ||= "monthly-datafile.stage.datacite.org" ENV["MONTHLY_DATAFILE_ACCESS_ROLE"] ||= "" +ENV["ENRICHMENTS_INGESTION_FILES_BUCKET_NAME"] ||= "" module Lupo class Application < Rails::Application diff --git a/config/routes.rb b/config/routes.rb index c6dfce4fa..c9f65f344 100644 --- a/config/routes.rb +++ b/config/routes.rb @@ -195,6 +195,8 @@ resources :activities end + resources :enrichments, only: %i[index] + resources :reference_repositories, path: "reference-repositories", only: %i[index show], constraints: { id: /.+/ } resources :client_prefixes, path: "client-prefixes" diff --git a/config/shoryuken.yml b/config/shoryuken.yml index 201cad06c..a5276ee01 100644 --- a/config/shoryuken.yml +++ b/config/shoryuken.yml @@ -24,3 +24,7 @@ groups: queues: - events_other_doi_job - events_other_doi_by_id_job + enrichments_group: + concurrency: 2 + queues: + - enrichment_batch_process_job diff --git a/db/migrate/20260120083752_create_enrichments.rb b/db/migrate/20260120083752_create_enrichments.rb new file mode 100644 index 000000000..d66fe779b --- /dev/null +++ b/db/migrate/20260120083752_create_enrichments.rb @@ -0,0 +1,20 @@ +class CreateEnrichments < ActiveRecord::Migration[7.2] + disable_departure! + + def change + create_table :enrichments, options: "DEFAULT CHARSET=utf8 COLLATE=utf8_general_ci" do |t| + t.string :doi, null: false + t.json :contributors, null: false + t.json :resources, null: false + t.string :field, null: false + t.string :action, null: false + t.json :original_value, null: true + t.json :enriched_value, null: true + t.string :filename, null: true + + t.timestamps + end + + add_index :enrichments, [:doi, :updated_at, :id], order: { updated_at: :desc, id: :desc } + end +end diff --git a/db/schema.rb b/db/schema.rb index 989c76f8b..f67b504d5 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema[7.2].define(version: 2025_11_10_121400) do +ActiveRecord::Schema[7.2].define(version: 2026_01_20_083752) do create_table "active_storage_attachments", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| t.string "name", limit: 191, null: false t.string "record_type", null: false @@ -28,7 +28,7 @@ t.text "metadata" t.bigint "byte_size", null: false t.string "checksum", limit: 191 - t.datetime "created_at", precision: nil, null: false + t.datetime "created_at", null: false t.string "service_name", null: false t.index ["key"], name: "index_active_storage_blobs_on_key", unique: true end @@ -41,7 +41,7 @@ create_table "allocator", charset: "utf8mb3", force: :cascade do |t| t.string "system_email", null: false - t.datetime "created", precision: nil + t.datetime "created" t.integer "doi_quota_allowed", null: false t.integer "doi_quota_used", null: false t.binary "is_active", limit: 1 @@ -49,7 +49,7 @@ t.string "password" t.string "role_name" t.string "symbol", null: false - t.datetime "updated", precision: nil + t.datetime "updated" t.integer "version" t.text "comments", size: :long t.string "experiments" @@ -57,7 +57,7 @@ t.string "region" t.string "country_code" t.string "website" - t.datetime "deleted_at", precision: nil + t.datetime "deleted_at" t.date "joined" t.string "logo" t.string "focus_area", limit: 191 @@ -81,7 +81,7 @@ t.string "logo_file_name" t.string "logo_content_type" t.bigint "logo_file_size" - t.datetime "logo_updated_at", precision: nil + t.datetime "logo_updated_at" t.integer "doi_estimate", default: 0, null: false t.index ["deleted_at"], name: "index_allocator_deleted_at" t.index ["organization_type"], name: "index_allocator_organization_type" @@ -115,8 +115,8 @@ create_table "client_prefixes", charset: "utf8mb3", force: :cascade do |t| t.bigint "client_id", null: false t.bigint "prefix_id", null: false - t.datetime "created_at", precision: nil - t.datetime "updated_at", precision: nil + t.datetime "created_at" + t.datetime "updated_at" t.bigint "provider_prefix_id" t.string "uid", null: false t.index ["client_id"], name: "FK13A1B3BA47B5F5FF" @@ -132,9 +132,9 @@ t.string "family_name" t.string "email" t.json "role_name" - t.datetime "created_at", precision: nil - t.datetime "updated_at", precision: nil - t.datetime "deleted_at", precision: nil + t.datetime "created_at" + t.datetime "updated_at" + t.datetime "deleted_at" t.index ["deleted_at"], name: "index_contacts_deleted_at" t.index ["email"], name: "index_contacts_email" t.index ["uid"], name: "index_contacts_uid" @@ -143,7 +143,7 @@ create_table "datacentre", charset: "utf8mb3", force: :cascade do |t| t.text "comments", size: :long t.string "system_email", null: false - t.datetime "created", precision: nil + t.datetime "created" t.integer "doi_quota_allowed", null: false t.integer "doi_quota_used", null: false t.text "domains" @@ -152,11 +152,11 @@ t.string "password" t.string "role_name" t.string "symbol", null: false - t.datetime "updated", precision: nil + t.datetime "updated" t.integer "version" t.bigint "allocator", null: false t.string "experiments" - t.datetime "deleted_at", precision: nil + t.datetime "deleted_at" t.string "re3data_id" t.text "url" t.string "software", limit: 191 @@ -182,18 +182,18 @@ end create_table "dataset", charset: "utf8mb3", force: :cascade do |t| - t.datetime "created", precision: nil + t.datetime "created" t.string "doi", null: false t.binary "is_active", limit: 1, null: false t.binary "is_ref_quality", limit: 1 t.integer "last_landing_page_status" - t.datetime "last_landing_page_status_check", precision: nil + t.datetime "last_landing_page_status_check" t.json "last_landing_page_status_result" t.string "last_metadata_status" - t.datetime "updated", precision: nil + t.datetime "updated" t.integer "version" t.bigint "datacentre", null: false - t.datetime "minted", precision: nil + t.datetime "minted" t.text "url" t.text "last_landing_page" t.string "last_landing_page_content_type" @@ -239,6 +239,20 @@ t.index ["url"], name: "index_dataset_on_url", length: 100 end + create_table "enrichments", charset: "utf8mb3", force: :cascade do |t| + t.string "doi", null: false + t.json "contributors", null: false + t.json "resources", null: false + t.string "field", null: false + t.string "action", null: false + t.json "original_value" + t.json "enriched_value" + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.string "filename" + t.index ["doi", "updated_at", "id"], name: "index_enrichments_on_doi_and_updated_at_and_id", order: { updated_at: :desc, id: :desc } + end + create_table "events", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| t.text "uuid", null: false t.text "subj_id", null: false @@ -251,8 +265,8 @@ t.text "source_token" t.datetime "created_at", precision: 3, null: false t.datetime "updated_at", precision: 3, null: false - t.datetime "indexed_at", precision: nil, default: "1970-01-01 00:00:00", null: false - t.datetime "occurred_at", precision: nil + t.datetime "indexed_at", default: "1970-01-01 00:00:00", null: false + t.datetime "occurred_at" t.string "message_action", limit: 191, default: "create", null: false t.string "relation_type_id", limit: 191 t.text "subj", size: :medium @@ -273,9 +287,9 @@ end create_table "media", charset: "utf8mb3", force: :cascade do |t| - t.datetime "created", precision: nil + t.datetime "created" t.string "media_type", limit: 80 - t.datetime "updated", precision: nil + t.datetime "updated" t.text "url", null: false t.integer "version" t.bigint "dataset", null: false @@ -286,7 +300,7 @@ end create_table "metadata", charset: "utf8mb3", force: :cascade do |t| - t.datetime "created", precision: nil + t.datetime "created" t.integer "metadata_version" t.integer "version" t.binary "xml", size: :medium @@ -299,7 +313,7 @@ end create_table "prefixes", charset: "utf8mb3", force: :cascade do |t| - t.datetime "created_at", precision: nil + t.datetime "created_at" t.string "uid", limit: 80, null: false t.index ["uid"], name: "prefix", unique: true end @@ -307,8 +321,8 @@ create_table "provider_prefixes", charset: "utf8mb3", force: :cascade do |t| t.bigint "provider_id", null: false t.bigint "prefix_id", null: false - t.datetime "created_at", precision: nil - t.datetime "updated_at", precision: nil + t.datetime "created_at" + t.datetime "updated_at" t.string "uid", null: false t.index ["prefix_id"], name: "FKE7FBD674AF86A1C7" t.index ["provider_id"], name: "FKE7FBD67446EBD781" @@ -318,8 +332,8 @@ create_table "reference_repositories", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| t.string "client_id" t.string "re3doi" - t.datetime "created_at", precision: nil, null: false - t.datetime "updated_at", precision: nil, null: false + t.datetime "created_at", null: false + t.datetime "updated_at", null: false end add_foreign_key "active_storage_variant_records", "active_storage_blobs", column: "blob_id" diff --git a/docker-compose.localstack.yml b/docker-compose.localstack.yml index 0dc4aa53f..da930eb29 100644 --- a/docker-compose.localstack.yml +++ b/docker-compose.localstack.yml @@ -9,6 +9,11 @@ services: - ELASTIC_PASSWORD=AnUnsecurePassword123 - PASSENGER_MAX_POOL_SIZE=10 - PASSENGER_MIN_INSTANCES=1 + - ENRICHMENTS_INGESTION_FILES_BUCKET_NAME=enrichments-ingestion-files + - AWS_REGION=us-east-1 + - AWS_ACCESS_KEY_ID=test + - AWS_SECRET_ACCESS_KEY=test + - AWS_ENDPOINT_URL=http://localstack:4566 image: ghcr.io/datacite/lupo:main build: . ports: diff --git a/lib/tasks/enrichment.rake b/lib/tasks/enrichment.rake new file mode 100644 index 000000000..4316d937a --- /dev/null +++ b/lib/tasks/enrichment.rake @@ -0,0 +1,85 @@ +# frozen_string_literal: true + +namespace :enrichment do + desc "Process JSONL from S3 and enqueue batches sized by bytes (256KB message size limit)" + # "Example command: bundle exec rake enrichment:batch_process_file KEY=02022026_test_ingestion_file.jsonl + # bundle exec rake enrichment:batch_process_file KEY=preprint_matching_enrichments_datacite_format_1000.jsonl + task batch_process_file: :environment do + bucket = ENV["ENRICHMENTS_INGESTION_FILES_BUCKET_NAME"] + key = ENV["KEY"] + + abort("ENRICHMENTS_INGESTION_FILES_BUCKET_NAME is not set") if bucket.blank? + abort("KEY is not set") if key.blank? + + # SQS limit is 256KB so we'll set the batch size to be more conservative to allow for some + # overhead and ensure we don't exceed limits. + max_batch_bytes = 150000 + + puts("Begin ingestion for s3://#{bucket}/#{key} (max_batch_bytes=#{max_batch_bytes})") + + s3 = Aws::S3::Client.new(force_path_style: true) + + buffer = +"" + line_no = 0 + + batch_lines = [] + batch_bytes = 0 + + flush = lambda do + return if batch_lines.empty? + + EnrichmentBatchProcessJob.perform_later(batch_lines, key) + + batch_lines.clear + batch_bytes = 0 + end + + s3.get_object(bucket: bucket, key: key) do |chunk| + buffer << chunk + + while (idx = buffer.index("\n")) + raw = buffer.slice!(0..idx).delete_suffix("\n") + line_no += 1 + + line = raw.strip + + next if line.empty? + + # +1 for the newline we removed, and some slack for JSON array encoding. + line_bytes = line.bytesize + 1 + + # If a single line is too big to ever fit in one message we need to process differently. + if line_bytes > max_batch_bytes + raise "Single JSONL line at #{line_no} is #{line_bytes} bytes, exceeds MAX_BATCH_BYTES=#{max_batch_bytes}. " + end + + # If adding this line would exceed the cap, flush current batch first. + if (batch_bytes + line_bytes) > max_batch_bytes + flush.call + end + + batch_lines << line + batch_bytes += line_bytes + end + end + + # File might not end with newline + tail = buffer.strip + + unless tail.empty? + line_no += 1 + line_bytes = tail.bytesize + 1 + + if line_bytes > max_batch_bytes + raise "Single JSONL tail line at #{line_no} is #{line_bytes} bytes, exceeds MAX_BATCH_BYTES=#{max_batch_bytes}." + end + + flush.call if (batch_bytes + line_bytes) > max_batch_bytes + batch_lines << tail + batch_bytes += line_bytes + end + + flush.call + puts("Finished ingestion for s3://#{bucket}/#{key} (lines_seen=#{line_no})") + end +end diff --git a/spec/factories/enrichment.rb b/spec/factories/enrichment.rb new file mode 100644 index 000000000..60f3a93dd --- /dev/null +++ b/spec/factories/enrichment.rb @@ -0,0 +1,7 @@ +# frozen_string_literal: true + +FactoryBot.define do + factory :enrichment do + doi { create(:doi, doi: "10.0000/fake.test.doi", agency: "datacite") } + end +end diff --git a/spec/models/doi_spec.rb b/spec/models/doi_spec.rb index a2359634e..0da32549a 100644 --- a/spec/models/doi_spec.rb +++ b/spec/models/doi_spec.rb @@ -2363,4 +2363,102 @@ expect(doi.as_indexed_json["funder_parent_rors"]).to eq(["https://ror.org/019w4f821", "https://ror.org/04cw6st05"]) end end + + describe "enrichable" do + describe "#enrichment_field" do + let(:doi) { create(:doi, aasm_state: "findable", agency: "datacite") } + + it "maps alternatveIdentifiers to alternate_identifiers" do + expect(doi.enrichment_field("alternateIdentifiers")).to(eq("alternate_identifiers")) + end + + it "maps creators to creators" do + expect(doi.enrichment_field("creators")).to(eq("creators")) + end + + it "maps titles to titles" do + expect(doi.enrichment_field("titles")).to(eq("titles")) + end + + it "maps publisher to publisher" do + expect(doi.enrichment_field("publisher")).to(eq("publisher")) + end + + it "maps publicationYear to publication_year" do + expect(doi.enrichment_field("publicationYear")).to(eq("publication_year")) + end + + it "maps subjects to subjects" do + expect(doi.enrichment_field("subjects")).to(eq("subjects")) + end + + it "maps contributors to contributors" do + expect(doi.enrichment_field("contributors")).to(eq("contributors")) + end + + it "maps dates to dates" do + expect(doi.enrichment_field("dates")).to(eq("dates")) + end + + it "maps language to language" do + expect(doi.enrichment_field("language")).to(eq("language")) + end + + it "maps types to types" do + expect(doi.enrichment_field("types")).to(eq("types")) + end + + it "maps relatedIdentifiers to related_identifiers" do + expect(doi.enrichment_field("relatedIdentifiers")).to(eq("related_identifiers")) + end + + it "maps relatedItems to related_items" do + expect(doi.enrichment_field("relatedItems")).to(eq("related_items")) + end + + it "maps sizes to sizes" do + expect(doi.enrichment_field("sizes")).to(eq("sizes")) + end + + it "maps formats to formats" do + expect(doi.enrichment_field("formats")).to(eq("formats")) + end + + it "maps version to version" do + expect(doi.enrichment_field("version")).to(eq("version")) + end + + it "maps rightsList to rights_list" do + expect(doi.enrichment_field("rightsList")).to(eq("rights_list")) + end + + it "maps descriptions to descriptions" do + expect(doi.enrichment_field("descriptions")).to(eq("descriptions")) + end + + it "maps geoLocations to geo_locations" do + expect(doi.enrichment_field("geoLocations")).to(eq("geo_locations")) + end + + it "maps fundingReferences to funding_references" do + expect(doi.enrichment_field("fundingReferences")).to(eq("funding_references")) + end + end + + describe "#apply_enrichment" do + describe "when field is invalid" do + let(:doi) { create(:doi, aasm_state: "findable", agency: "datacite") } + + it "raises ArgumentError" do + enrichment = { + "action": "insert", + "field": "invalid_field", + "enriched_value": "foo" + } + expect { doi.apply_enrichment(enrichment) } + .to(raise_error(ArgumentError, "Invalid enrichment field: invalid_field")) + end + end + end + end end