Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
## Unreleased v3.5.0

## 2026-01-19 v3.4.11

Sunflower backport - including folio-s3-library improvements

[Full Changelog](https://github.com/folio-org/mod-data-export-worker/compare/v3.4.10...v3.4.11)

## 2025-11-21 v3.4.10

[Full Changelog](https://github.com/folio-org/mod-data-export-worker/compare/v3.4.9...v3.4.10)
Expand Down
30 changes: 16 additions & 14 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<groupId>org.folio</groupId>
<artifactId>mod-data-export-worker</artifactId>
<description>Data Export Worker module</description>
<version>3.4.11-SNAPSHOT</version>
<version>3.4.12-SNAPSHOT</version>
<packaging>jar</packaging>

<licenses>
Expand Down Expand Up @@ -54,7 +54,6 @@
<folio-service-tools.version>4.1.1</folio-service-tools.version>
<folio-spring-cql.version>9.0.0</folio-spring-cql.version>
<folio-module-descriptor-validator.version>1.0.0</folio-module-descriptor-validator.version>
<minio.version>8.5.15</minio.version>
<openapi-generator.version>6.2.1</openapi-generator.version>
<commons-collections4.version>4.4</commons-collections4.version>
<hypersistence-utils-hibernate-63.version>3.7.3</hypersistence-utils-hibernate-63.version>
Expand All @@ -65,12 +64,13 @@

<!-- Test properties-->
<junit-extensions.version>2.4.0</junit-extensions.version>
<embedded-minio.version>2.3.1</embedded-minio.version>
<spring-cloud-starter-bootstrap.version>4.1.1</spring-cloud-starter-bootstrap.version>
<wiremock.version>2.27.2</wiremock.version>
<localstack.version>1.20.5</localstack.version>
<marc4j.version>2.9.1</marc4j.version>
<testcontainers.version>1.17.6</testcontainers.version>
<aws.sdk.version>2.29.47</aws.sdk.version>
<aws.sdk.version>2.29.6</aws.sdk.version>
<folio-s3-client.version>2.4.0</folio-s3-client.version>
<hamcrest-all.version>1.3</hamcrest-all.version>
<mockito-inline.version>5.2.0</mockito-inline.version>

Expand Down Expand Up @@ -160,12 +160,6 @@
<artifactId>feign-jackson</artifactId>
<version>${feign-jackson.version}</version>
</dependency>
<dependency>
<groupId>io.minio</groupId>
<artifactId>minio</artifactId>
<version>${minio.version}</version>
</dependency>

<dependency>
<groupId>org.apache.sshd</groupId>
<artifactId>sshd-spring-sftp</artifactId>
Expand Down Expand Up @@ -254,6 +248,12 @@
<version>${streamex.version}</version>
</dependency>

<dependency>
<groupId>org.folio</groupId>
<artifactId>folio-s3-client</artifactId>
<version>${folio-s3-client.version}</version>
</dependency>

<!-- Test dependencies -->

<dependency>
Expand Down Expand Up @@ -343,10 +343,10 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.playtika.testcontainers</groupId>
<artifactId>embedded-minio</artifactId>
<version>${embedded-minio.version}</version>
<scope>test</scope>
<groupId>org.testcontainers</groupId>
<artifactId>localstack</artifactId>
<version>${localstack.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.tomakehurst</groupId>
Expand Down Expand Up @@ -706,6 +706,8 @@
<artifactId>maven-surefire-plugin</artifactId>
<version>${maven-surefire-plugin.version}</version>
<configuration>
<forkCount>1</forkCount>
<reuseForks>true</reuseForks>
<argLine>
@{argLine} -Xmx2G -Duser.language=en -Duser.region=US
</argLine>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,45 @@
package org.folio.dew.batch;

import static java.lang.System.lineSeparator;
import static org.folio.dew.utils.WriterHelper.enrichHoldingsJson;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.BufferedWriter;
import java.io.IOException;
import java.nio.file.FileSystem;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermissions;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;
import org.folio.dew.domain.dto.Formatable;
import org.folio.dew.domain.dto.HoldingsFormat;
import org.folio.dew.repository.S3CompatibleResource;
import org.folio.dew.repository.S3CompatibleStorage;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.json.JacksonJsonObjectMarshaller;
import org.springframework.core.io.WritableResource;

import java.nio.charset.StandardCharsets;

@Slf4j
public class AbstractStorageStreamAndJsonWriter<O, T extends Formatable<O>, S extends S3CompatibleStorage> extends AbstractStorageStreamWriter<T, S> {
public class AbstractStorageStreamAndJsonWriter<O, T extends Formatable<O>, S extends S3CompatibleStorage>
extends AbstractStorageStreamWriter<T, S> implements StepExecutionListener {

private final JacksonJsonObjectMarshaller<O> jacksonJsonObjectMarshaller;
private final ObjectMapper objectMapper;

private WritableResource jsonResource;

private Path csvTmpFile;
private Path jsonTmpFile;

public AbstractStorageStreamAndJsonWriter(String tempOutputFilePath, String columnHeaders, String[] extractedFieldNames, FieldProcessor fieldProcessor, S storage) {
super(tempOutputFilePath, columnHeaders, extractedFieldNames, fieldProcessor, storage);
setJsonResource(new S3CompatibleResource<>(tempOutputFilePath + ".json", storage));
Expand All @@ -33,26 +51,97 @@ public void setJsonResource(S3CompatibleResource<S> jsonResource) {
this.jsonResource = jsonResource;
}

@Override
public void beforeStep(StepExecution stepExecution) {
try {
Path tmpDir = createPrivateTmpDir();
csvTmpFile = Files.createTempFile(tmpDir, "dew-csv-", ".tmp");
jsonTmpFile = Files.createTempFile(tmpDir, "dew-json-", ".tmp");
} catch (IOException e) {
throw new IllegalStateException("Error creating tmp files for resources", e);
}
}

private Path createPrivateTmpDir() throws IOException {
Path parent = Path.of(System.getProperty("java.io.tmpdir"));
FileAttribute<?>[] attrs = new FileAttribute[0];

FileSystem fs = FileSystems.getDefault();
if (fs.supportedFileAttributeViews().contains("posix")) {
attrs = new FileAttribute[]{
PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rwx------"))
};
}
return Files.createTempDirectory(parent, "dew-", attrs);
}


@Override
public void write(Chunk<? extends T> items) throws Exception {
var sb = new StringBuilder();
var json = new StringBuilder();

var iterator = items.iterator();
while (iterator.hasNext()) {
var item = iterator.next();
sb.append(super.getLineAggregator().aggregate(item)).append('\n');

if (item instanceof HoldingsFormat hf) {
json.append(enrichHoldingsJson(hf, objectMapper));
} else {
json.append(jacksonJsonObjectMarshaller.marshal(item.getOriginal()));

if (items.isEmpty()) {
return;
}

boolean jsonHasDataAlready = Files.size(jsonTmpFile) > 0;

try (BufferedWriter plainWriter = Files.newBufferedWriter(
csvTmpFile,
StandardCharsets.UTF_8,
StandardOpenOption.APPEND);
BufferedWriter jsonWriter = Files.newBufferedWriter(
jsonTmpFile,
StandardCharsets.UTF_8,
StandardOpenOption.APPEND)) {

if (jsonHasDataAlready) {
jsonWriter.append(lineSeparator());
}
if(iterator.hasNext()) {
json.append('\n');

var iterator = items.iterator();
while (iterator.hasNext()) {
var item = iterator.next();

plainWriter.append(super.getLineAggregator().aggregate(item))
.append(lineSeparator());

if (item instanceof HoldingsFormat hf) {
jsonWriter.append(enrichHoldingsJson(hf, objectMapper));
} else {
jsonWriter.append(jacksonJsonObjectMarshaller.marshal(item.getOriginal()));
}

if (iterator.hasNext()) {
jsonWriter.append(lineSeparator());
}
}
}
getStorage().append(getResource().getFilename(), sb.toString().getBytes(StandardCharsets.UTF_8));
getStorage().append(jsonResource.getFilename(), json.toString().getBytes(StandardCharsets.UTF_8));
}

@Override
public ExitStatus afterStep(StepExecution stepExecution) {
try {
getStorage().write(getResource().getFilename(),
ArrayUtils.addAll(getStorage().readAllBytes(getResource().getFilename()), Files.readAllBytes(csvTmpFile)));
getStorage().write(jsonResource.getFilename(), Files.readAllBytes(jsonTmpFile));
} catch (IOException e) {
log.error("Error uploading data to S3", e);
return ExitStatus.FAILED.addExitDescription(e.getMessage());
} finally {
deleteTempFile(csvTmpFile);
deleteTempFile(jsonTmpFile);
}
return stepExecution.getExitStatus();
}

private void deleteTempFile(Path path) {
if (path == null) {
return;
}
try {
Files.deleteIfExists(path);
} catch (IOException ex) {
log.warn("Error deleting tmp-file {}", path, ex);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.folio.dew.batch;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.folio.dew.error.FileOperationException;
import org.folio.dew.repository.LocalFilesStorage;
Expand All @@ -16,10 +15,6 @@

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;

import static org.folio.dew.utils.Constants.LINE_SEPARATOR;
import static org.folio.dew.utils.Constants.LINE_SEPARATOR_REPLACEMENT;

@Slf4j
public class AbstractStorageStreamWriter<T, S extends S3CompatibleStorage> implements ItemWriter<T> {
Expand Down Expand Up @@ -84,10 +79,13 @@ public S3CompatibleStorage getStorage() {

@Override
public void write(Chunk<? extends T> items) throws Exception {
var filename = resource.getFilename();
var sb = new StringBuilder();
var header =new String(storage.readAllBytes(filename));
sb.append(header);
for (T item : items) {
sb.append(lineAggregator.aggregate(item)).append('\n');
}
storage.append(resource.getFilename(), sb.toString().getBytes(StandardCharsets.UTF_8));
storage.write(filename, sb.toString().getBytes(StandardCharsets.UTF_8));
}
}
3 changes: 1 addition & 2 deletions src/main/java/org/folio/dew/batch/CsvAndJsonListWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import org.springframework.batch.item.Chunk;

import java.util.List;
import java.util.stream.Collectors;

public class CsvAndJsonListWriter<O, T extends Formatable<O>, R extends S3CompatibleStorage> extends AbstractStorageStreamWriter<List<T>, R> {
private final CsvAndJsonWriter<O, T, R> delegate;
Expand All @@ -19,7 +18,7 @@ public CsvAndJsonListWriter(String tempOutputFilePath, String columnHeaders, Str

@Override
public void write(Chunk<? extends List<T>> lists) throws Exception {
var chunk = new Chunk<>(lists.getItems().stream().flatMap(List::stream).collect(Collectors.toList()));
var chunk = new Chunk<>(lists.getItems().stream().flatMap(List::stream).toList());
delegate.write(chunk);
}
}
14 changes: 6 additions & 8 deletions src/main/java/org/folio/dew/batch/CsvFileAssembler.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.folio.dew.batch;

import java.util.Collection;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.io.FilenameUtils;
Expand All @@ -9,9 +10,6 @@
import org.springframework.batch.core.partition.support.StepExecutionAggregator;
import org.springframework.stereotype.Component;

import java.util.Collection;
import java.util.stream.Collectors;

@Component
@Log4j2
@RequiredArgsConstructor
Expand All @@ -24,28 +22,28 @@ public class CsvFileAssembler implements StepExecutionAggregator {
public void aggregate(StepExecution stepExecution, Collection<StepExecution> finishedStepExecutions) {
var csvFilePartObjectNames = finishedStepExecutions.stream()
.map(e -> e.getExecutionContext().getString(JobParameterNames.TEMP_OUTPUT_FILE_PATH))
.collect(Collectors.toList());
.toList();
var destCsvObject = FilenameUtils.getName(
stepExecution.getJobExecution().getJobParameters().getString(JobParameterNames.TEMP_OUTPUT_FILE_PATH) + ".csv");
try {
if ("CIRCULATION_LOG".equals(stepExecution.getJobExecution().getJobInstance().getJobName())) {
var csvUrl = remoteFilesStorage.composeObject(destCsvObject, csvFilePartObjectNames, null, TEXT_CSV);
var csvUrl = remoteFilesStorage.compose(destCsvObject, csvFilePartObjectNames, null, TEXT_CSV);
ExecutionContextUtils.addToJobExecutionContext(stepExecution, JobParameterNames.CIRCULATION_LOG_FILE_NAME, destCsvObject, ";");
ExecutionContextUtils.addToJobExecutionContext(stepExecution, JobParameterNames.OUTPUT_FILES_IN_STORAGE, csvUrl, ";");
} else {
var prefix = stepExecution.getJobExecution().getJobParameters().getString(JobParameterNames.JOB_ID) + "/";

destCsvObject = prefix + destCsvObject;
var csvUrl = remoteFilesStorage.objectToPresignedObjectUrl(
remoteFilesStorage.composeObject(destCsvObject, csvFilePartObjectNames, null, TEXT_CSV));
remoteFilesStorage.compose(destCsvObject, csvFilePartObjectNames, null, TEXT_CSV));

var jsonFilePartObjectNames = finishedStepExecutions.stream()
.map(e -> e.getExecutionContext().getString(JobParameterNames.TEMP_OUTPUT_FILE_PATH) + ".json")
.collect(Collectors.toList());
.toList();
var destJsonObject = prefix + FilenameUtils.getName(
stepExecution.getJobExecution().getJobParameters().getString(JobParameterNames.TEMP_OUTPUT_FILE_PATH) + ".json");
var jsonUrl = remoteFilesStorage.objectToPresignedObjectUrl(
remoteFilesStorage.composeObject(destJsonObject, jsonFilePartObjectNames, null, TEXT_CSV));
remoteFilesStorage.compose(destJsonObject, jsonFilePartObjectNames, null, TEXT_CSV));

ExecutionContextUtils.addToJobExecutionContext(stepExecution, JobParameterNames.OUTPUT_FILES_IN_STORAGE, csvUrl + ";;" + jsonUrl, ";");
}
Expand Down
Loading