From abee16f58af58f919c74dda49572fe018ea0728a Mon Sep 17 00:00:00 2001 From: psmagin Date: Fri, 12 Dec 2025 15:11:23 +0200 Subject: [PATCH] MODSOURMAN-1371 Remove quick-marc events handling logic --- NEWS.md | 1 + descriptors/ModuleDescriptor-template.json | 98 ------------ .../folio/rest/impl/ChangeManagerImpl.java | 22 --- .../java/org/folio/rest/impl/InitAPIImpl.java | 28 ++-- .../folio/services/ParsedRecordService.java | 21 --- .../services/ParsedRecordServiceImpl.java | 87 ----------- .../QuickMarcEventProducerService.java | 38 ----- .../QuickMarcEventProducerServiceImpl.java | 77 ---------- .../services/entity/MappingRuleCacheKey.java | 12 -- .../QuickMarcUpdateConsumersVerticle.java | 42 ----- .../QuickMarcUpdateKafkaHandler.java | 74 --------- .../verticle/consumers/util/QMEventTypes.java | 11 -- .../ChangeManagerParsedRecordsAPITest.java | 83 ---------- .../QuickMarcUpdateKafkaHandlerTest.java | 144 ------------------ ramls/change-manager.raml | 22 +-- 15 files changed, 11 insertions(+), 749 deletions(-) delete mode 100644 mod-source-record-manager-server/src/main/java/org/folio/services/ParsedRecordService.java delete mode 100644 mod-source-record-manager-server/src/main/java/org/folio/services/ParsedRecordServiceImpl.java delete mode 100644 mod-source-record-manager-server/src/main/java/org/folio/services/QuickMarcEventProducerService.java delete mode 100644 mod-source-record-manager-server/src/main/java/org/folio/services/QuickMarcEventProducerServiceImpl.java delete mode 100644 mod-source-record-manager-server/src/main/java/org/folio/verticle/QuickMarcUpdateConsumersVerticle.java delete mode 100644 mod-source-record-manager-server/src/main/java/org/folio/verticle/consumers/QuickMarcUpdateKafkaHandler.java delete mode 100644 mod-source-record-manager-server/src/main/java/org/folio/verticle/consumers/util/QMEventTypes.java delete mode 100644 mod-source-record-manager-server/src/test/java/org/folio/rest/impl/changeManager/ChangeManagerParsedRecordsAPITest.java delete mode 100644 mod-source-record-manager-server/src/test/java/org/folio/verticle/consumers/QuickMarcUpdateKafkaHandlerTest.java diff --git a/NEWS.md b/NEWS.md index 65251eaae..189cacd7f 100644 --- a/NEWS.md +++ b/NEWS.md @@ -13,6 +13,7 @@ * [MODSOURMAN-1365](https://folio-org.atlassian.net/browse/MODSOURMAN-1365) Update bib mapping rule for Cumulative Index / Finding Aids notes * [MODSOURMAN-1260](https://folio-org.atlassian.net/browse/MODSOURMAN-1260) The 'The 004 tag of the Holdings doesn't have a link to the Bibliographic record' error message is not displayed on JSON data after importing file * [MODSOURMAN-1366](https://folio-org.atlassian.net/browse/MODSOURMAN-1366) Unable to update MARC authority record if the match point is 999 ff $i +* [MODSOURMAN-1371](https://folio-org.atlassian.net/browse/MODSOURMAN-1371) Remove quick-marc events handling logic ## 2025-03-13 v3.10.0 * [MODSOURMAN-1246](https://folio-org.atlassian.net/browse/MODSOURMAN-1246) Added data import completion notifications diff --git a/descriptors/ModuleDescriptor-template.json b/descriptors/ModuleDescriptor-template.json index 43ef6c325..b9e935c42 100644 --- a/descriptors/ModuleDescriptor-template.json +++ b/descriptors/ModuleDescriptor-template.json @@ -399,92 +399,6 @@ } ] }, - { - "id": "source-manager-parsed-records", - "version": "4.0", - "handlers": [ - { - "methods": [ - "PUT" - ], - "pathPattern": "/change-manager/parsedRecords/{id}", - "permissionsRequired": [ - "change-manager.parsedrecords.put" - ], - "modulePermissions": [ - "mod-settings.global.read.stripes-core.prefs.manage", - "converter-storage.field-protection-settings.collection.get", - "converter-storage.jobprofilesnapshots.get", - "inventory-storage.alternative-title-types.collection.get", - "inventory-storage.authorities.collection.get", - "inventory-storage.authorities.item.get", - "inventory-storage.authorities.item.post", - "inventory-storage.authorities.item.put", - "inventory-storage.authorities.item.delete", - "inventory-storage.authority-note-types.collection.get", - "inventory-storage.authority-source-files.collection.get", - "inventory-storage.call-number-types.collection.get", - "inventory-storage.classification-types.collection.get", - "inventory-storage.contributor-name-types.collection.get", - "inventory-storage.contributor-types.collection.get", - "inventory-storage.electronic-access-relationships.collection.get", - "inventory-storage.holdings-note-types.collection.get", - "inventory-storage.holdings-sources.collection.get", - "inventory-storage.holdings-types.collection.get", - "inventory-storage.holdings.collection.get", - "inventory-storage.holdings.item.get", - "inventory-storage.holdings.item.post", - "inventory-storage.holdings.item.put", - "inventory-storage.identifier-types.collection.get", - "inventory-storage.ill-policies.collection.get", - "inventory-storage.instance-date-types.collection.get", - "inventory-storage.instance-formats.collection.get", - "inventory-storage.instance-note-types.collection.get", - "inventory-storage.instance-relationship-types.collection.get", - "inventory-storage.instance-statuses.collection.get", - "inventory-storage.instance-types.collection.get", - "inventory-storage.instances.collection.get", - "inventory-storage.instances.item.get", - "inventory-storage.instances.item.post", - "inventory-storage.instances.item.put", - "inventory-storage.instances.preceding-succeeding-titles.collection.put", - "inventory-storage.item-damaged-statuses.collection.get", - "inventory-storage.item-note-types.collection.get", - "inventory-storage.items.collection.get", - "inventory-storage.items.item.post", - "inventory-storage.items.item.put", - "inventory-storage.loan-types.collection.get", - "inventory-storage.loan-types.item.get", - "inventory-storage.locations.collection.get", - "inventory-storage.locations.item.get", - "inventory-storage.material-types.collection.get", - "inventory-storage.material-types.item.get", - "inventory-storage.modes-of-issuance.collection.get", - "inventory-storage.nature-of-content-terms.collection.get", - "inventory-storage.preceding-succeeding-titles.collection.get", - "inventory-storage.preceding-succeeding-titles.item.delete", - "inventory-storage.preceding-succeeding-titles.item.get", - "inventory-storage.preceding-succeeding-titles.item.post", - "inventory-storage.preceding-succeeding-titles.item.put", - "inventory-storage.statistical-code-types.collection.get", - "inventory-storage.statistical-codes.collection.get", - "inventory-storage.subject-sources.collection.get", - "inventory-storage.subject-types.collection.get", - "mapping-metadata.item.get", - "mapping-metadata.type.item.get", - "source-storage.snapshots.put", - "source-storage.verified.records", - "users.collection.get", - "orders.po-lines.collection.get", - "instance-authority-links.instances.collection.get", - "instance-authority-links.instances.collection.put", - "instance-authority.linking-rules.collection.get", - "source-storage.records.post", - "source-storage.records.put" - ] - } - ] - }, { "id": "metadata-provider-job-execution-logs", "version": "1.1", @@ -773,16 +687,6 @@ "displayName": "Change Manager - delete jobExecution and all associated records from SRS", "description": "Delete JobExecution and all associated records from SRS" }, - { - "permissionName": "change-manager.parsedrecords.get", - "displayName": "Change Manager - get parsed records by externalId", - "description": "Get parsed record" - }, - { - "permissionName": "change-manager.parsedrecords.put", - "displayName": "Change Manager - update parsed record by id", - "description": "Update parsed record" - }, { "permissionName": "mapping-rules.get", "displayName": "Mapping Rules provider - get default rules by tenant id", @@ -831,8 +735,6 @@ "change-manager.jobExecutions.item.put", "change-manager.records.post", "change-manager.records.delete", - "change-manager.parsedrecords.get", - "change-manager.parsedrecords.put", "mapping-rules.get", "mapping-rules.update", "mapping-rules.restore", diff --git a/mod-source-record-manager-server/src/main/java/org/folio/rest/impl/ChangeManagerImpl.java b/mod-source-record-manager-server/src/main/java/org/folio/rest/impl/ChangeManagerImpl.java index 4d539795f..81cd49606 100644 --- a/mod-source-record-manager-server/src/main/java/org/folio/rest/impl/ChangeManagerImpl.java +++ b/mod-source-record-manager-server/src/main/java/org/folio/rest/impl/ChangeManagerImpl.java @@ -15,14 +15,12 @@ import org.folio.rest.jaxrs.model.InitJobExecutionsRqDto; import org.folio.rest.jaxrs.model.JobExecution; import org.folio.rest.jaxrs.model.JobProfileInfo; -import org.folio.rest.jaxrs.model.ParsedRecordDto; import org.folio.rest.jaxrs.model.RawRecordsDto; import org.folio.rest.jaxrs.model.StatusDto; import org.folio.rest.jaxrs.resource.ChangeManager; import org.folio.rest.tools.utils.TenantTool; import org.folio.services.ChunkProcessingService; import org.folio.services.JobExecutionService; -import org.folio.services.ParsedRecordService; import org.folio.spring.SpringContextUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; @@ -43,8 +41,6 @@ public class ChangeManagerImpl implements ChangeManager { @Autowired @Qualifier("eventDrivenChunkProcessingService") private ChunkProcessingService eventDrivenChunkProcessingService; - @Autowired - private ParsedRecordService parsedRecordService; private String tenantId; @@ -241,24 +237,6 @@ public void deleteChangeManagerJobExecutionsRecordsById(String id, Map okapiHeaders, - Handler> asyncResultHandler, Context vertxContext) { - vertxContext.runOnContext(v -> { - try { - LOGGER.debug("putChangeManagerParsedRecordsById:: id {}, parsedRecordId {}", id, entity.getId()); - parsedRecordService.updateRecord(entity, new OkapiConnectionParams(okapiHeaders, vertxContext.owner())) - .map(sentEventForProcessing -> PutChangeManagerParsedRecordsByIdResponse.respond202()) - .map(Response.class::cast) - .otherwise(ExceptionHelper::mapExceptionToResponse) - .onComplete(asyncResultHandler); - } catch (Exception e) { - LOGGER.warn(getMessage("putChangeManagerParsedRecordsById:: Failed to update parsed record with id {}", e, id)); - asyncResultHandler.handle(Future.succeededFuture(ExceptionHelper.mapExceptionToResponse(e))); - } - }); - } - private ParameterizedMessage getMessage(String pattern, Exception e, String... params) { return new ParameterizedMessage(pattern, new Object[] {params}, e); } diff --git a/mod-source-record-manager-server/src/main/java/org/folio/rest/impl/InitAPIImpl.java b/mod-source-record-manager-server/src/main/java/org/folio/rest/impl/InitAPIImpl.java index 72858d9d2..baa68a966 100644 --- a/mod-source-record-manager-server/src/main/java/org/folio/rest/impl/InitAPIImpl.java +++ b/mod-source-record-manager-server/src/main/java/org/folio/rest/impl/InitAPIImpl.java @@ -6,6 +6,7 @@ import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.Promise; +import io.vertx.core.ThreadingModel; import io.vertx.core.Vertx; import io.vertx.core.spi.VerticleFactory; import io.vertx.serviceproxy.ServiceBinder; @@ -22,7 +23,6 @@ import org.folio.verticle.DataImportInitConsumersVerticle; import org.folio.verticle.JobExecutionProgressVerticle; import org.folio.verticle.DataImportJournalBatchConsumerVerticle; -import org.folio.verticle.QuickMarcUpdateConsumersVerticle; import org.folio.verticle.RawMarcChunkConsumersVerticle; import org.folio.verticle.StoredRecordChunkConsumersVerticle; import org.folio.verticle.SpringVerticleFactory; @@ -57,9 +57,6 @@ public class InitAPIImpl implements InitAPI { @Value("${srm.kafka.JobExecutionProgressVerticle.instancesNumber:1}") private int jobExecutionProgressInstancesNumber; - @Value("${srm.kafka.QuickMarcUpdateConsumersVerticle.instancesNumber:1}") - private int quickMarcUpdateConsumerInstancesNumber; - @Value("${srm.kafka.JobExecutionDeletion.instancesNumber:1}") private int jobExecutionDeletionInstanceNumber; @@ -87,7 +84,7 @@ public void init(Vertx vertx, Context context, Handler> han handler.handle(Future.failedFuture(th)); LOGGER.warn("init:: Consumer Verticles were not started", th); }); - } catch (Throwable th) { + } catch (Exception th) { LOGGER.warn("init:: Error during module init", th); handler.handle(Future.failedFuture(th)); } @@ -110,47 +107,41 @@ private Future deployConsumersVerticles(Vertx vertx) { Promise deployDataImportConsumer = Promise.promise(); Promise deployDataImportJournalConsumer = Promise.promise(); Promise deployJobExecutionProgressConsumer = Promise.promise(); - Promise deployQuickMarcUpdateConsumer = Promise.promise(); Promise deployPeriodicDeleteJobExecution = Promise.promise(); vertx.deployVerticle(getVerticleName(verticleFactory, DataImportInitConsumersVerticle.class), new DeploymentOptions() - .setWorker(true) + .setThreadingModel(ThreadingModel.WORKER) .setInstances(initConsumerInstancesNumber), deployInitConsumer); vertx.deployVerticle(getVerticleName(verticleFactory, RawMarcChunkConsumersVerticle.class), new DeploymentOptions() - .setWorker(true) + .setThreadingModel(ThreadingModel.WORKER) .setInstances(rawMarcChunkConsumerInstancesNumber), deployRawMarcChunkConsumer); vertx.deployVerticle(getVerticleName(verticleFactory, StoredRecordChunkConsumersVerticle.class), new DeploymentOptions() - .setWorker(true) + .setThreadingModel(ThreadingModel.WORKER) .setInstances(storedMarcChunkConsumerInstancesNumber), deployStoredMarcChunkConsumer); vertx.deployVerticle(getVerticleName(verticleFactory, DataImportConsumersVerticle.class), new DeploymentOptions() - .setWorker(true) + .setThreadingModel(ThreadingModel.WORKER) .setInstances(dataImportConsumerInstancesNumber), deployDataImportConsumer); vertx.deployVerticle(getVerticleName(verticleFactory, DataImportJournalBatchConsumerVerticle.class), new DeploymentOptions() - .setWorker(true) + .setThreadingModel(ThreadingModel.WORKER) .setInstances(dataImportJournalConsumerInstancesNumber), deployDataImportJournalConsumer); vertx.deployVerticle(getVerticleName(verticleFactory, JobExecutionProgressVerticle.class), new DeploymentOptions() - .setWorker(true) + .setThreadingModel(ThreadingModel.WORKER) .setInstances(jobExecutionProgressInstancesNumber), deployJobExecutionProgressConsumer); - vertx.deployVerticle(getVerticleName(verticleFactory, QuickMarcUpdateConsumersVerticle.class), - new DeploymentOptions() - .setWorker(true) - .setInstances(quickMarcUpdateConsumerInstancesNumber), deployQuickMarcUpdateConsumer); - vertx.deployVerticle(getVerticleName(verticleFactory, PeriodicDeleteJobExecutionVerticle.class), new DeploymentOptions() - .setWorker(true) + .setThreadingModel(ThreadingModel.WORKER) .setInstances(jobExecutionDeletionInstanceNumber), deployPeriodicDeleteJobExecution); return GenericCompositeFuture.all(Arrays.asList( @@ -159,7 +150,6 @@ private Future deployConsumersVerticles(Vertx vertx) { deployStoredMarcChunkConsumer.future(), deployDataImportConsumer.future(), deployDataImportJournalConsumer.future(), - deployQuickMarcUpdateConsumer.future(), deployPeriodicDeleteJobExecution.future())); } diff --git a/mod-source-record-manager-server/src/main/java/org/folio/services/ParsedRecordService.java b/mod-source-record-manager-server/src/main/java/org/folio/services/ParsedRecordService.java deleted file mode 100644 index 7e4936517..000000000 --- a/mod-source-record-manager-server/src/main/java/org/folio/services/ParsedRecordService.java +++ /dev/null @@ -1,21 +0,0 @@ -package org.folio.services; - -import io.vertx.core.Future; -import org.folio.dataimport.util.OkapiConnectionParams; -import org.folio.rest.jaxrs.model.ParsedRecordDto; - -/** - * Service for processing parsed record - */ -public interface ParsedRecordService { - - /** - * Creates new Record in SRS with higher generation, updates status of the OLD Record and sends event with updated Record - * - * @param parsedRecordDto - parsed record DTO containing updated {@link org.folio.rest.jaxrs.model.ParsedRecord} - * @param params - OkapiConnectionParams - * @return future with true if succeeded - */ - Future updateRecord(ParsedRecordDto parsedRecordDto, OkapiConnectionParams params); - -} diff --git a/mod-source-record-manager-server/src/main/java/org/folio/services/ParsedRecordServiceImpl.java b/mod-source-record-manager-server/src/main/java/org/folio/services/ParsedRecordServiceImpl.java deleted file mode 100644 index 9c8dd741e..000000000 --- a/mod-source-record-manager-server/src/main/java/org/folio/services/ParsedRecordServiceImpl.java +++ /dev/null @@ -1,87 +0,0 @@ -package org.folio.services; - -import static io.vertx.core.Future.failedFuture; -import static java.lang.String.format; - -import static org.folio.verticle.consumers.util.QMEventTypes.QM_RECORD_UPDATED; - -import java.util.HashMap; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicInteger; - -import io.vertx.core.Future; -import io.vertx.core.json.Json; -import io.vertx.core.json.JsonObject; -import lombok.extern.log4j.Log4j2; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Service; - -import org.folio.dataimport.util.OkapiConnectionParams; -import org.folio.kafka.KafkaHeaderUtils; -import org.folio.processing.mapping.defaultmapper.processor.parameters.MappingParameters; -import org.folio.rest.jaxrs.model.ParsedRecordDto; -import org.folio.services.entity.MappingRuleCacheKey; -import org.folio.services.mappers.processor.MappingParametersProvider; - -@Log4j2 -@Service -public class ParsedRecordServiceImpl implements ParsedRecordService { - private static final AtomicInteger indexer = new AtomicInteger(); - - private final MappingParametersProvider mappingParametersProvider; - private final MappingRuleCache mappingRuleCache; - private final QuickMarcEventProducerService producerService; - - @Value("${srm.kafka.QuickMarcUpdateKafkaHandler.maxDistributionNum:100}") - private int maxDistributionNum; - - public ParsedRecordServiceImpl(MappingParametersProvider mappingParametersProvider, - MappingRuleCache mappingRuleCache, - QuickMarcEventProducerService producerService) { - this.mappingParametersProvider = mappingParametersProvider; - this.mappingRuleCache = mappingRuleCache; - this.producerService = producerService; - } - - @Override - public Future updateRecord(ParsedRecordDto parsedRecordDto, OkapiConnectionParams params) { - String snapshotId = UUID.randomUUID().toString(); - MappingRuleCacheKey cacheKey = new MappingRuleCacheKey(params.getTenantId(), parsedRecordDto.getRecordType()); - return mappingParametersProvider.get(snapshotId, params) - .compose(mappingParameters -> mappingRuleCache.get(cacheKey) - .compose(rulesOptional -> { - if (rulesOptional.isPresent()) { - return updateRecord(parsedRecordDto, snapshotId, mappingParameters, rulesOptional.get(), params); - } else { - var message = format("Can not send %s event, no mapping rules found for tenant %s", QM_RECORD_UPDATED.name(), - params.getTenantId()); - log.warn(message); - return failedFuture(message); - } - })); - } - - private Future updateRecord(ParsedRecordDto parsedRecordDto, String snapshotId, - MappingParameters mappingParameters, JsonObject mappingRules, - OkapiConnectionParams params) { - var tenantId = params.getTenantId(); - var key = String.valueOf(indexer.incrementAndGet() % maxDistributionNum); - var kafkaHeaders = KafkaHeaderUtils.kafkaHeadersFromMultiMap(params.getHeaders()); - var eventPayload = prepareEventPayload(parsedRecordDto, mappingRules, mappingParameters, snapshotId); - return producerService.sendEventWithZipping(eventPayload, QM_RECORD_UPDATED.name(), key, tenantId, kafkaHeaders); - } - - private String prepareEventPayload(ParsedRecordDto parsedRecordDto, JsonObject mappingRules, - MappingParameters mappingParameters, String snapshotId) { - HashMap eventPayload = new HashMap<>(); - eventPayload.put("PARSED_RECORD_DTO", Json.encode(parsedRecordDto)); - eventPayload.put("MAPPING_RULES", mappingRules.encode()); - eventPayload.put("MAPPING_PARAMS", Json.encode(mappingParameters)); - eventPayload.put("SNAPSHOT_ID", snapshotId); - eventPayload.put("RECORD_ID", parsedRecordDto.getParsedRecord().getId()); - eventPayload.put("RECORD_DTO_ID", parsedRecordDto.getId()); - eventPayload.put("RECORD_TYPE", parsedRecordDto.getRecordType().value()); - return Json.encode(eventPayload); - } - -} diff --git a/mod-source-record-manager-server/src/main/java/org/folio/services/QuickMarcEventProducerService.java b/mod-source-record-manager-server/src/main/java/org/folio/services/QuickMarcEventProducerService.java deleted file mode 100644 index 2524dee37..000000000 --- a/mod-source-record-manager-server/src/main/java/org/folio/services/QuickMarcEventProducerService.java +++ /dev/null @@ -1,38 +0,0 @@ -package org.folio.services; - -import java.util.List; - -import io.vertx.core.Future; -import io.vertx.kafka.client.producer.KafkaHeader; - -/** - * Service for processing quick marc events - */ -public interface QuickMarcEventProducerService { - - /** - * Publishes an event with each of the passed records to the specified topic without zipping eventPayload - * - * @param eventPayload payload - * @param eventType event type - * @param key key with which the specified event - * @param tenantId tenant id - * @param kafkaHeaders kafka headers - * @return true if successful - */ - Future sendEvent(String eventPayload, String eventType, String key, String tenantId, - List kafkaHeaders); - - /** - * Publishes an event with each of the passed records to the specified topic with zipping eventPayload - * - * @param eventPayload payload - * @param eventType event type - * @param key key with which the specified event - * @param tenantId tenant id - * @param kafkaHeaders kafka headers - * @return true if successful - */ - Future sendEventWithZipping(String eventPayload, String eventType, String key, String tenantId, - List kafkaHeaders); -} diff --git a/mod-source-record-manager-server/src/main/java/org/folio/services/QuickMarcEventProducerServiceImpl.java b/mod-source-record-manager-server/src/main/java/org/folio/services/QuickMarcEventProducerServiceImpl.java deleted file mode 100644 index 9228ce85c..000000000 --- a/mod-source-record-manager-server/src/main/java/org/folio/services/QuickMarcEventProducerServiceImpl.java +++ /dev/null @@ -1,77 +0,0 @@ -package org.folio.services; - -import static org.folio.services.util.EventHandlingUtil.createEvent; -import static org.folio.services.util.EventHandlingUtil.createProducer; -import static org.folio.services.util.EventHandlingUtil.createProducerRecord; -import static org.folio.services.util.EventHandlingUtil.createTopicName; -import static org.folio.verticle.consumers.util.QMEventTypes.QM_COMPLETED; -import static org.folio.verticle.consumers.util.QMEventTypes.QM_RECORD_UPDATED; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import io.vertx.core.Future; -import io.vertx.core.Promise; -import io.vertx.kafka.client.producer.KafkaHeader; -import io.vertx.kafka.client.producer.KafkaProducer; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.springframework.stereotype.Service; - -import org.folio.kafka.KafkaConfig; - -@Service -public class QuickMarcEventProducerServiceImpl implements QuickMarcEventProducerService { - - private static final Logger LOGGER = LogManager.getLogger(); - private final KafkaConfig kafkaConfig; - private final Map> kafkaProducers = new HashMap<>(); - - public QuickMarcEventProducerServiceImpl(KafkaConfig kafkaConfig) { - this.kafkaConfig = kafkaConfig; - kafkaProducers.put(QM_RECORD_UPDATED.name(), createProducer(QM_RECORD_UPDATED.name(), kafkaConfig)); - kafkaProducers.put(QM_COMPLETED.name(), createProducer(QM_COMPLETED.name(), kafkaConfig)); - } - - @Override - public Future sendEvent(String eventPayload, String eventType, String key, String tenantId, - List kafkaHeaders) { - return sendEventInternal(eventPayload, eventType, key, tenantId, kafkaHeaders); - } - - @Override - public Future sendEventWithZipping(String eventPayload, String eventType, String key, String tenantId, - List kafkaHeaders) { - return sendEventInternal(eventPayload, eventType, key, tenantId, kafkaHeaders); - } - - private Future sendEventInternal(String eventPayload, String eventType, String key, String tenantId, - List kafkaHeaders) { - Promise promise = Promise.promise(); - try { - var event = createEvent(eventPayload, eventType, tenantId); - var topicName = createTopicName(eventType, tenantId, kafkaConfig); - var record = createProducerRecord(event, key, topicName, kafkaHeaders); - var producer = kafkaProducers.get(eventType); - if (producer != null) { - producer.write(record) - .onSuccess(unused -> { - LOGGER.info("sendEventInternal:: Event with type {} was sent to kafka", eventType); - promise.complete(true); - }) - .onFailure(throwable -> { - var cause = throwable.getCause(); - LOGGER.warn("sendEventInternal:: Error while send event {}: {}", eventType, cause); - promise.fail(cause); - }); - } else { - promise.fail("No producer found for event: " + eventType); - } - } catch (Exception e) { - LOGGER.warn("sendEventInternal:: error while sending event", e); - promise.fail(e); - } - return promise.future(); - } -} diff --git a/mod-source-record-manager-server/src/main/java/org/folio/services/entity/MappingRuleCacheKey.java b/mod-source-record-manager-server/src/main/java/org/folio/services/entity/MappingRuleCacheKey.java index a9130002c..eb5634558 100644 --- a/mod-source-record-manager-server/src/main/java/org/folio/services/entity/MappingRuleCacheKey.java +++ b/mod-source-record-manager-server/src/main/java/org/folio/services/entity/MappingRuleCacheKey.java @@ -5,7 +5,6 @@ import org.folio.Record; import org.folio.rest.jaxrs.model.JournalRecord; -import org.folio.rest.jaxrs.model.ParsedRecordDto; @Getter @EqualsAndHashCode @@ -22,17 +21,6 @@ public MappingRuleCacheKey(String tenantId, Record.RecordType recordType) { } } - public MappingRuleCacheKey(String tenantId, ParsedRecordDto.RecordType recordType) { - this.tenantId = tenantId; - if (ParsedRecordDto.RecordType.MARC_BIB.equals(recordType)) { - this.recordType = Record.RecordType.MARC_BIB; - } else if (ParsedRecordDto.RecordType.MARC_HOLDING.equals(recordType)) { - this.recordType = Record.RecordType.MARC_HOLDING; - } else if (ParsedRecordDto.RecordType.MARC_AUTHORITY.equals(recordType)) { - this.recordType = Record.RecordType.MARC_AUTHORITY; - } - } - public MappingRuleCacheKey(String tenantId, JournalRecord.EntityType entityType) { this.tenantId = tenantId; if (JournalRecord.EntityType.MARC_BIBLIOGRAPHIC.equals(entityType)) { diff --git a/mod-source-record-manager-server/src/main/java/org/folio/verticle/QuickMarcUpdateConsumersVerticle.java b/mod-source-record-manager-server/src/main/java/org/folio/verticle/QuickMarcUpdateConsumersVerticle.java deleted file mode 100644 index fba51956e..000000000 --- a/mod-source-record-manager-server/src/main/java/org/folio/verticle/QuickMarcUpdateConsumersVerticle.java +++ /dev/null @@ -1,42 +0,0 @@ -package org.folio.verticle; - -import java.util.List; - -import org.springframework.beans.factory.annotation.Autowired; - -import org.folio.kafka.AsyncRecordHandler; -import org.folio.verticle.consumers.QuickMarcUpdateKafkaHandler; -import org.folio.verticle.consumers.util.QMEventTypes; -import org.springframework.context.annotation.Scope; -import org.springframework.stereotype.Component; - -import static org.springframework.beans.factory.config.BeanDefinition.SCOPE_PROTOTYPE; - -/** - * Verticle to update quick mark. - * Marked with SCOPE_PROTOTYPE to support deploying more than 1 instance. - * @see org.folio.rest.impl.InitAPIImpl - */ -@Component -@Scope(SCOPE_PROTOTYPE) -public class QuickMarcUpdateConsumersVerticle extends AbstractConsumersVerticle { - - @Autowired - private QuickMarcUpdateKafkaHandler quickMarcUpdateKafkaHandler; - - @Override - public List getEvents() { - return List.of( - QMEventTypes.QM_ERROR.name(), - QMEventTypes.QM_INVENTORY_INSTANCE_UPDATED.name(), - QMEventTypes.QM_INVENTORY_HOLDINGS_UPDATED.name(), - QMEventTypes.QM_INVENTORY_AUTHORITY_UPDATED.name() - ); - } - - @Override - public AsyncRecordHandler getHandler() { - return this.quickMarcUpdateKafkaHandler; - } - -} diff --git a/mod-source-record-manager-server/src/main/java/org/folio/verticle/consumers/QuickMarcUpdateKafkaHandler.java b/mod-source-record-manager-server/src/main/java/org/folio/verticle/consumers/QuickMarcUpdateKafkaHandler.java deleted file mode 100644 index 3c0d00a98..000000000 --- a/mod-source-record-manager-server/src/main/java/org/folio/verticle/consumers/QuickMarcUpdateKafkaHandler.java +++ /dev/null @@ -1,74 +0,0 @@ -package org.folio.verticle.consumers; - -import static org.folio.kafka.KafkaHeaderUtils.kafkaHeadersToMap; -import static org.folio.verticle.consumers.util.QMEventTypes.QM_COMPLETED; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import io.vertx.core.Future; -import io.vertx.core.Vertx; -import io.vertx.core.json.Json; -import io.vertx.kafka.client.consumer.KafkaConsumerRecord; -import io.vertx.kafka.client.producer.KafkaHeader; -import lombok.RequiredArgsConstructor; -import lombok.extern.log4j.Log4j2; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.stereotype.Component; - -import org.folio.dataimport.util.OkapiConnectionParams; -import org.folio.kafka.AsyncRecordHandler; -import org.folio.rest.jaxrs.model.Event; -import org.folio.services.QuickMarcEventProducerService; -import org.folio.verticle.consumers.util.QmCompletedEventPayload; - -@Component -@Log4j2 -@Qualifier("QuickMarcUpdateKafkaHandler") -@RequiredArgsConstructor -public class QuickMarcUpdateKafkaHandler implements AsyncRecordHandler { - - private static final String RECORD_ID_KEY = "RECORD_ID"; - private static final String ERROR_KEY = "ERROR"; - - private final QuickMarcEventProducerService producerService; - private final Vertx vertx; - - @Override - public Future handle(KafkaConsumerRecord record) { - var event = Json.decodeValue(record.value(), Event.class); - var eventType = event.getEventType(); - - var kafkaHeaders = record.headers(); - var okapiConnectionParams = OkapiConnectionParams.createSystemUserConnectionParams( - kafkaHeadersToMap(kafkaHeaders), vertx); - var tenantId = okapiConnectionParams.getTenantId(); - - return getEventPayload(event) - .compose(eventPayload -> sendQmCompletedEvent(eventPayload, tenantId, kafkaHeaders)) - .compose(s -> Future.succeededFuture(record.key()), th -> { - log.warn("handle:: Update record state was failed while handle {} event", eventType); - return Future.failedFuture(th); - }); - } - - private Future> sendQmCompletedEvent(Map eventPayload, - String tenantId, List kafkaHeaders) { - var recordId = eventPayload.get(RECORD_ID_KEY); - var errorMessage = eventPayload.get(ERROR_KEY); - var qmCompletedEventPayload = new QmCompletedEventPayload(recordId, errorMessage); - return producerService.sendEvent(Json.encode(qmCompletedEventPayload), QM_COMPLETED.name(), null, tenantId, kafkaHeaders) - .map(v -> eventPayload); - } - - @SuppressWarnings("unchecked") - private Future> getEventPayload(Event event) { - try { - var eventPayload = Json.decodeValue(event.getEventPayload(), HashMap.class); - return Future.succeededFuture(eventPayload); - } catch (Exception e) { - return Future.failedFuture(e); - } - } -} diff --git a/mod-source-record-manager-server/src/main/java/org/folio/verticle/consumers/util/QMEventTypes.java b/mod-source-record-manager-server/src/main/java/org/folio/verticle/consumers/util/QMEventTypes.java deleted file mode 100644 index 8fd51a5bc..000000000 --- a/mod-source-record-manager-server/src/main/java/org/folio/verticle/consumers/util/QMEventTypes.java +++ /dev/null @@ -1,11 +0,0 @@ -package org.folio.verticle.consumers.util; - -public enum QMEventTypes { - - QM_RECORD_UPDATED, - QM_ERROR, - QM_INVENTORY_INSTANCE_UPDATED, - QM_INVENTORY_HOLDINGS_UPDATED, - QM_INVENTORY_AUTHORITY_UPDATED, - QM_COMPLETED -} diff --git a/mod-source-record-manager-server/src/test/java/org/folio/rest/impl/changeManager/ChangeManagerParsedRecordsAPITest.java b/mod-source-record-manager-server/src/test/java/org/folio/rest/impl/changeManager/ChangeManagerParsedRecordsAPITest.java deleted file mode 100644 index cb8f9c879..000000000 --- a/mod-source-record-manager-server/src/test/java/org/folio/rest/impl/changeManager/ChangeManagerParsedRecordsAPITest.java +++ /dev/null @@ -1,83 +0,0 @@ -package org.folio.rest.impl.changeManager; - -import static java.util.Collections.emptyList; -import static org.folio.KafkaUtil.checkKafkaEventSent; -import static org.folio.KafkaUtil.getKafkaHostAndPort; -import static org.folio.kafka.KafkaTopicNameHelper.formatTopicName; -import static org.folio.kafka.KafkaTopicNameHelper.getDefaultNameSpace; - -import com.github.tomakehurst.wiremock.client.WireMock; -import io.restassured.RestAssured; -import io.vertx.core.json.Json; -import io.vertx.ext.unit.Async; -import io.vertx.ext.unit.TestContext; -import io.vertx.ext.unit.junit.VertxUnitRunner; -import java.util.UUID; -import org.apache.http.HttpStatus; -import org.folio.kafka.KafkaConfig; -import org.folio.rest.impl.AbstractRestTest; -import org.folio.rest.jaxrs.model.ExternalIdsHolder; -import org.folio.rest.jaxrs.model.ParsedRecord; -import org.folio.rest.jaxrs.model.ParsedRecordDto; -import org.folio.verticle.consumers.util.QMEventTypes; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; - -@RunWith(VertxUnitRunner.class) -public class ChangeManagerParsedRecordsAPITest extends AbstractRestTest { - - private static final String KAFKA_HOST = "KAFKA_HOST"; - private static final String KAFKA_PORT = "KAFKA_PORT"; - private static final String KAFKA_ENV = "ENV"; - private static final String KAFKA_ENV_ID = "test-env"; - private static final String KAFKA_MAX_REQUEST_SIZE = "MAX_REQUEST_SIZE"; - private static final String PARSED_RECORDS_URL = "/change-manager/parsedRecords"; - - private KafkaConfig kafkaConfig; - - @Before - public void setUp() { - String[] hostAndPort = getKafkaHostAndPort(); - System.setProperty(KAFKA_HOST, hostAndPort[0]); - System.setProperty(KAFKA_PORT, hostAndPort[1]); - System.setProperty(KAFKA_ENV, KAFKA_ENV_ID); - System.setProperty(OKAPI_URL_ENV, OKAPI_URL); - System.setProperty(KAFKA_MAX_REQUEST_SIZE, "1048576"); - - kafkaConfig = KafkaConfig.builder() - .kafkaHost(hostAndPort[0]) - .kafkaPort(hostAndPort[1]) - .envId(KAFKA_ENV_ID) - .build(); - } - - @Test - public void shouldUpdateParsedRecordOnPut(TestContext testContext) { - Async async = testContext.async(); - - WireMock.stubFor(WireMock.get("/linking-rules/instance-authority") - .willReturn(WireMock.ok().withBody(Json.encode(emptyList())))); - - ParsedRecordDto parsedRecordDto = new ParsedRecordDto() - .withId(UUID.randomUUID().toString()) - .withParsedRecord(new ParsedRecord().withId(UUID.randomUUID().toString()) - .withContent("{\"leader\":\"01240cas a2200397 4500\",\"fields\":[]}")) - .withRecordType(ParsedRecordDto.RecordType.MARC_HOLDING) - .withExternalIdsHolder(new ExternalIdsHolder().withInstanceId(UUID.randomUUID().toString())); - - RestAssured.given() - .spec(spec) - .body(parsedRecordDto) - .when() - .put(PARSED_RECORDS_URL + "/" + parsedRecordDto.getId()) - .then() - .statusCode(HttpStatus.SC_ACCEPTED); - - String observeTopic = - formatTopicName(kafkaConfig.getEnvId(), getDefaultNameSpace(), TENANT_ID, QMEventTypes.QM_RECORD_UPDATED.name()); - checkKafkaEventSent(observeTopic, 1); - - async.complete(); - } -} diff --git a/mod-source-record-manager-server/src/test/java/org/folio/verticle/consumers/QuickMarcUpdateKafkaHandlerTest.java b/mod-source-record-manager-server/src/test/java/org/folio/verticle/consumers/QuickMarcUpdateKafkaHandlerTest.java deleted file mode 100644 index ba4cf107d..000000000 --- a/mod-source-record-manager-server/src/test/java/org/folio/verticle/consumers/QuickMarcUpdateKafkaHandlerTest.java +++ /dev/null @@ -1,144 +0,0 @@ -package org.folio.verticle.consumers; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.ArgumentMatchers.anyList; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.ArgumentMatchers.isNull; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import static org.folio.rest.RestVerticle.OKAPI_HEADER_TENANT; - -import io.vertx.core.Future; -import io.vertx.core.Vertx; -import io.vertx.core.json.Json; -import io.vertx.kafka.client.consumer.KafkaConsumerRecord; -import io.vertx.kafka.client.producer.KafkaHeader; -import java.util.Base64; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.ArgumentCaptor; -import org.mockito.Captor; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; - -import org.folio.rest.jaxrs.model.Event; -import org.folio.services.QuickMarcEventProducerService; -import org.folio.verticle.consumers.util.QMEventTypes; -import org.folio.verticle.consumers.util.QmCompletedEventPayload; - -@RunWith(MockitoJUnitRunner.class) -public class QuickMarcUpdateKafkaHandlerTest { - - private static final String TENANT_ID = "test"; - - @Mock - private QuickMarcEventProducerService producerService; - @Mock - private KafkaConsumerRecord kafkaRecord; - - @Captor - private ArgumentCaptor qmCompletedEventCaptor; - - private final Vertx vertx = Vertx.vertx(); - private QuickMarcUpdateKafkaHandler quickMarcHandler; - - @Before - public void setUp() throws Exception { - quickMarcHandler = new QuickMarcUpdateKafkaHandler(producerService, vertx); - } - - @Test - public void shouldUpdateRecordStateAndSendEventOnHandleQmInventoryInstanceUpdated() { - var recordId = UUID.randomUUID().toString(); - var recordDtoId = UUID.randomUUID().toString(); - var kafkaHeaders = List.of(KafkaHeader.header(OKAPI_HEADER_TENANT.toLowerCase(), TENANT_ID)); - - Map eventPayload = new HashMap<>(); - eventPayload.put("RECORD_ID", recordId); - eventPayload.put("RECORD_DTO_ID", recordDtoId); - - Event event = new Event() - .withId(UUID.randomUUID().toString()) - .withEventType(QMEventTypes.QM_INVENTORY_INSTANCE_UPDATED.name()) - .withEventPayload(Json.encode(eventPayload)); - - when(kafkaRecord.value()).thenReturn(Json.encode(event)); - when(kafkaRecord.headers()).thenReturn(kafkaHeaders); - when(producerService.sendEvent(anyString(), anyString(), isNull(), anyString(), anyList())) - .thenReturn(Future.succeededFuture(true)); - - var future = quickMarcHandler.handle(kafkaRecord); - assertTrue(future.succeeded()); - verify(producerService, times(1)) - .sendEvent(qmCompletedEventCaptor.capture(), eq(QMEventTypes.QM_COMPLETED.name()), isNull(), eq(TENANT_ID), - eq(kafkaHeaders)); - - var actualEventPayload = Json.decodeValue(qmCompletedEventCaptor.getValue(), QmCompletedEventPayload.class); - assertEquals(recordId, actualEventPayload.getRecordId()); - assertNull(actualEventPayload.getErrorMessage()); - } - - @Test - public void shouldUpdateRecordStateAndSendEventOnHandleQmError() { - var errorMessage = "random error"; - var recordId = UUID.randomUUID().toString(); - var recordDtoId = UUID.randomUUID().toString(); - var kafkaHeaders = List.of(KafkaHeader.header(OKAPI_HEADER_TENANT.toLowerCase(), TENANT_ID)); - - Map eventPayload = new HashMap<>(); - eventPayload.put("RECORD_ID", recordId); - eventPayload.put("RECORD_DTO_ID", recordDtoId); - eventPayload.put("ERROR", errorMessage); - - Event event = new Event() - .withId(UUID.randomUUID().toString()) - .withEventType(QMEventTypes.QM_ERROR.name()) - .withEventPayload(Json.encode(eventPayload)); - - when(kafkaRecord.value()).thenReturn(Json.encode(event)); - when(kafkaRecord.headers()).thenReturn(kafkaHeaders); - when(producerService.sendEvent(anyString(), anyString(), isNull(), anyString(), anyList())) - .thenReturn(Future.succeededFuture(true)); - - var future = quickMarcHandler.handle(kafkaRecord); - assertTrue(future.succeeded()); - verify(producerService, times(1)) - .sendEvent(qmCompletedEventCaptor.capture(), eq(QMEventTypes.QM_COMPLETED.name()), isNull(), eq(TENANT_ID), - eq(kafkaHeaders)); - - var actualEventPayload = Json.decodeValue(qmCompletedEventCaptor.getValue(), QmCompletedEventPayload.class); - assertEquals(recordId, actualEventPayload.getRecordId()); - assertEquals(errorMessage, actualEventPayload.getErrorMessage()); - } - - @Test - public void shouldReturnFailedFutureWhenHandleEncodedEventPayload() { - var recordId = UUID.randomUUID().toString(); - var kafkaHeaders = List.of(KafkaHeader.header(OKAPI_HEADER_TENANT.toLowerCase(), TENANT_ID)); - - Map eventPayload = new HashMap<>(); - eventPayload.put("RECORD_ID", recordId); - String encodedEventPayload = Base64.getEncoder().encodeToString(Json.encode(eventPayload).getBytes()); - - Event event = new Event() - .withId(UUID.randomUUID().toString()) - .withEventType(QMEventTypes.QM_INVENTORY_INSTANCE_UPDATED.name()) - .withEventPayload(encodedEventPayload); - - when(kafkaRecord.value()).thenReturn(Json.encode(event)); - when(kafkaRecord.headers()).thenReturn(kafkaHeaders); - - var future = quickMarcHandler.handle(kafkaRecord); - assertTrue(future.failed()); - } -} diff --git a/ramls/change-manager.raml b/ramls/change-manager.raml index 79669dc38..3e5f0162d 100644 --- a/ramls/change-manager.raml +++ b/ramls/change-manager.raml @@ -217,24 +217,4 @@ resourceTypes: description: "Internal server error" body: text/plain: - example: "Internal server error" - /parsedRecords: - /{id}: - description: Update ParsedRecord by id - put: - is: [validate] - body: - application/json: - type: parsedRecordDto - responses: - 202: - 400: - description: "Bad request" - body: - text/plain: - example: "Bad request" - 500: - description: "Internal server error" - body: - text/plain: - example: "Internal server error" + example: "Internal server error" \ No newline at end of file