From 6bcbe8b3f3741b00f5a32965dc386268ce2a688a Mon Sep 17 00:00:00 2001 From: Kriti Jain Date: Tue, 28 Oct 2025 13:59:57 -0500 Subject: [PATCH 1/2] migrate to folio-s3 --- pom.xml | 19 +- .../batch/AbstractStorageStreamWriter.java | 15 +- .../java/org/folio/dew/batch/CsvWriter.java | 5 +- .../JobCompletionNotificationListener.java | 4 +- .../batch/marc/DataExportCsvItemReader.java | 11 +- .../batch/marc/DataExportCsvPartitioner.java | 9 +- .../properties/MinioClientProperties.java | 15 + .../dew/repository/AbstractFilesStorage.java | 321 ++++++++++ .../dew/repository/BaseFilesStorage.java | 570 ------------------ .../dew/repository/LocalFilesStorage.java | 2 +- .../dew/repository/RemoteFilesStorage.java | 126 +--- .../dew/repository/S3CompatibleResource.java | 4 +- .../dew/repository/S3CompatibleStorage.java | 15 - .../java/org/folio/dew/utils/Constants.java | 1 + .../java/org/folio/dew/utils/CsvHelper.java | 20 +- .../dew/AuthorityControlConsortiumTest.java | 2 +- .../org/folio/dew/AuthorityControlTest.java | 2 +- .../org/folio/dew/CirculationLogTest.java | 2 +- .../java/org/folio/dew/EHoldingsTest.java | 2 +- .../mapper/EdifactMapperTest.java | 1 - .../AuthorityControlCsvFileWriterTest.java | 8 +- .../eholdings/EHoldingsCsvFileWriterTest.java | 6 +- .../PresignedUrlControllerTest.java | 2 +- .../LocalFilesStorageAwsSdkComposingTest.java | 90 --- .../dew/repository/LocalFilesStorageTest.java | 54 +- .../repository/RemoteFilesStorageTest.java | 2 +- 26 files changed, 442 insertions(+), 866 deletions(-) create mode 100644 src/main/java/org/folio/dew/repository/AbstractFilesStorage.java delete mode 100644 src/main/java/org/folio/dew/repository/BaseFilesStorage.java delete mode 100644 src/main/java/org/folio/dew/repository/S3CompatibleStorage.java delete mode 100644 src/test/java/org/folio/dew/repository/LocalFilesStorageAwsSdkComposingTest.java diff --git a/pom.xml b/pom.xml index d674781c5..4cd8c10c6 100644 --- a/pom.xml +++ b/pom.xml @@ -47,11 +47,11 @@ ${project.basedir}/src/main/resources/swagger.api/refresh-presigned-url.yaml + 3.0.0-SNAPSHOT 10.0.0-SNAPSHOT 4.1.1 9.0.0 1.0.0 - 8.5.15 6.2.1 4.5.0 3.7.3 @@ -67,7 +67,6 @@ 2.27.2 2.9.1 1.17.6 - 2.29.47 1.3 5.2.0 @@ -105,6 +104,11 @@ folio-spring-cql ${folio-spring-cql.version} + + org.folio + folio-s3-client + ${folio-s3-client.version} + com.fasterxml.jackson.datatype jackson-datatype-jsr310 @@ -155,11 +159,6 @@ feign-jackson ${feign-jackson.version} - - io.minio - minio - ${minio.version} - org.apache.sshd @@ -218,12 +217,6 @@ ${marc4j.version} - - software.amazon.awssdk - s3 - ${aws.sdk.version} - - org.hibernate.validator hibernate-validator diff --git a/src/main/java/org/folio/dew/batch/AbstractStorageStreamWriter.java b/src/main/java/org/folio/dew/batch/AbstractStorageStreamWriter.java index 368775b26..dcd3cd016 100644 --- a/src/main/java/org/folio/dew/batch/AbstractStorageStreamWriter.java +++ b/src/main/java/org/folio/dew/batch/AbstractStorageStreamWriter.java @@ -1,12 +1,12 @@ package org.folio.dew.batch; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; import org.folio.dew.error.FileOperationException; +import org.folio.dew.repository.AbstractFilesStorage; import org.folio.dew.repository.LocalFilesStorage; import org.folio.dew.repository.S3CompatibleResource; -import org.folio.dew.repository.S3CompatibleStorage; +import org.folio.s3.exception.S3ClientException; import org.springframework.batch.item.Chunk; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor; @@ -14,15 +14,10 @@ import org.springframework.batch.item.file.transform.LineAggregator; import org.springframework.core.io.WritableResource; -import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.util.List; - -import static org.folio.dew.utils.Constants.LINE_SEPARATOR; -import static org.folio.dew.utils.Constants.LINE_SEPARATOR_REPLACEMENT; @Slf4j -public class AbstractStorageStreamWriter implements ItemWriter { +public class AbstractStorageStreamWriter implements ItemWriter { private WritableResource resource; private S storage; @@ -52,7 +47,7 @@ public AbstractStorageStreamWriter(String tempOutputFilePath, String columnHeade if (StringUtils.isNotBlank(columnHeaders)) { try { storage.write(tempOutputFilePath, (columnHeaders + '\n').getBytes(StandardCharsets.UTF_8)); - } catch (IOException e) { + } catch (S3ClientException e) { throw new FileOperationException(e); } } @@ -78,7 +73,7 @@ public LineAggregator getLineAggregator() { return lineAggregator; } - public S3CompatibleStorage getStorage() { + public AbstractFilesStorage getStorage() { return storage; } diff --git a/src/main/java/org/folio/dew/batch/CsvWriter.java b/src/main/java/org/folio/dew/batch/CsvWriter.java index 12b6b2091..2e3d4a6ce 100644 --- a/src/main/java/org/folio/dew/batch/CsvWriter.java +++ b/src/main/java/org/folio/dew/batch/CsvWriter.java @@ -1,10 +1,11 @@ package org.folio.dew.batch; +import org.folio.dew.repository.AbstractFilesStorage; + import lombok.extern.log4j.Log4j2; -import org.folio.dew.repository.S3CompatibleStorage; @Log4j2 -public class CsvWriter extends AbstractStorageStreamWriter { +public class CsvWriter extends AbstractStorageStreamWriter { public CsvWriter(String tempOutputFilePath, String columnHeaders, String[] extractedFieldNames, FieldProcessor fieldProcessor, R storage) { super(tempOutputFilePath, columnHeaders, extractedFieldNames, fieldProcessor, storage); diff --git a/src/main/java/org/folio/dew/batch/JobCompletionNotificationListener.java b/src/main/java/org/folio/dew/batch/JobCompletionNotificationListener.java index 0a5ce7076..35da6e5ba 100644 --- a/src/main/java/org/folio/dew/batch/JobCompletionNotificationListener.java +++ b/src/main/java/org/folio/dew/batch/JobCompletionNotificationListener.java @@ -83,13 +83,13 @@ private void processJobAfter(String jobId, JobParameters jobParameters) { if (StringUtils.isBlank(path) || StringUtils.isBlank(fileNameStart)) { return; } - var files = localFilesStorage.walk(path) + var files = localFilesStorage.listRecursive(path).stream() .filter(name -> FilenameUtils.getName(name).startsWith(fileNameStart)).collect(Collectors.toList()); if (files.isEmpty()) { return; } for (String f : files) { - localFilesStorage.delete(f); + localFilesStorage.removeRecursive(f); } log.info("Deleted temp files {} of job {}.", files, jobId); } diff --git a/src/main/java/org/folio/dew/batch/marc/DataExportCsvItemReader.java b/src/main/java/org/folio/dew/batch/marc/DataExportCsvItemReader.java index 26f823ed2..c009cf8d6 100644 --- a/src/main/java/org/folio/dew/batch/marc/DataExportCsvItemReader.java +++ b/src/main/java/org/folio/dew/batch/marc/DataExportCsvItemReader.java @@ -25,13 +25,10 @@ public DataExportCsvItemReader(String fileName, Long offset, Long limit, LocalFi @Override protected List getItems(int offset, int limit) { try { - try (var lines = localFilesStorage.lines(fileName)) { - return lines - .skip(offset) - .limit(limit) - .map(ItemIdentifier::new) - .collect(Collectors.toList()); - } + return localFilesStorage.readLines(fileName, offset, limit) + .stream() + .map(ItemIdentifier::new) + .collect(Collectors.toList()); } catch (Exception e) { throw new FileOperationException(e.getMessage()); } diff --git a/src/main/java/org/folio/dew/batch/marc/DataExportCsvPartitioner.java b/src/main/java/org/folio/dew/batch/marc/DataExportCsvPartitioner.java index a903bf0c3..b192f67a4 100644 --- a/src/main/java/org/folio/dew/batch/marc/DataExportCsvPartitioner.java +++ b/src/main/java/org/folio/dew/batch/marc/DataExportCsvPartitioner.java @@ -3,6 +3,9 @@ import lombok.extern.log4j.Log4j2; import org.folio.dew.batch.CsvPartitioner; import org.folio.dew.repository.LocalFilesStorage; +import org.folio.s3.exception.S3ClientException; + +import java.io.IOException; @Log4j2 public class DataExportCsvPartitioner extends CsvPartitioner { @@ -19,9 +22,9 @@ public DataExportCsvPartitioner(Long offset, Long limit, String tempOutputFilePa @Override protected Long getLimit() { - try (var lines = localFilesStorage.lines(fileName)) { - return lines.count(); - } catch (Exception e) { + try { + return localFilesStorage.numLines(fileName); + } catch (IOException | S3ClientException e) { log.error("Error reading file {}, reason: {}", fileName, e.getMessage()); return 0L; } diff --git a/src/main/java/org/folio/dew/config/properties/MinioClientProperties.java b/src/main/java/org/folio/dew/config/properties/MinioClientProperties.java index 574a36061..57650fc92 100644 --- a/src/main/java/org/folio/dew/config/properties/MinioClientProperties.java +++ b/src/main/java/org/folio/dew/config/properties/MinioClientProperties.java @@ -1,6 +1,7 @@ package org.folio.dew.config.properties; import lombok.Data; +import org.folio.s3.client.S3ClientProperties; @Data public class MinioClientProperties { @@ -50,4 +51,18 @@ public class MinioClientProperties { * Presigned url expiration time (in seconds). */ private int urlExpirationTimeInSeconds; + + public S3ClientProperties toS3ClientProperties() { + return S3ClientProperties + .builder() + .endpoint(endpoint) + .region(region) + .bucket(bucket) + .accessKey(accessKey) + .secretKey(secretKey) + .awsSdk(composeWithAwsSdk) + .forcePathStyle(forcePathStyle) + .subPath(subPath) + .build(); + } } diff --git a/src/main/java/org/folio/dew/repository/AbstractFilesStorage.java b/src/main/java/org/folio/dew/repository/AbstractFilesStorage.java new file mode 100644 index 000000000..9fc7f02bb --- /dev/null +++ b/src/main/java/org/folio/dew/repository/AbstractFilesStorage.java @@ -0,0 +1,321 @@ +package org.folio.dew.repository; + +import org.apache.commons.lang3.StringUtils; +import org.folio.dew.config.properties.MinioClientProperties; +import org.folio.s3.client.FolioS3Client; +import org.folio.s3.client.PutObjectAdditionalOptions; +import org.folio.s3.client.RemoteStorageWriter; +import org.folio.s3.client.S3ClientFactory; +import org.folio.s3.exception.S3ClientException; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import io.minio.http.Method; +import lombok.extern.log4j.Log4j2; + +import static org.folio.dew.utils.Constants.PATH_SEPARATOR; +import static org.folio.dew.utils.Constants.TMP_PATH_PREFIX; + +/** Custom extension of FolioS3Client with additional utility methods */ +@Log4j2 +public abstract class AbstractFilesStorage implements FolioS3Client { + + private final FolioS3Client client; + + private final MinioClientProperties properties; + + protected AbstractFilesStorage(MinioClientProperties properties) { + this.client = S3ClientFactory.getS3Client(properties.toS3ClientProperties()); + this.properties = properties; + + this.createBucketIfNotExists(); + } + + @Override + public String upload(String path, String filename) { + return client.upload(sanitizePath(path), filename); + } + + @Override + public String append(String path, InputStream is) { + return client.append(sanitizePath(path), is); + } + + public void append(String path, byte[] bytes) { + append(path, new ByteArrayInputStream(bytes)); + } + + @Override + public String write(String path, InputStream is) { + return client.write(sanitizePath(path), is); + } + + public String write(String path, byte[] bytes) { + return write(path, new ByteArrayInputStream(bytes)); + } + + @Override + public String write(String path, InputStream is, long size) { + return client.write(sanitizePath(path), is, size); + } + + @Override + public String write(String path, InputStream is, long size, PutObjectAdditionalOptions extraOptions) { + return client.write(sanitizePath(path), is, size, extraOptions); + } + + @Override + public String compose(String destination, List sourceKeys) { + return client.compose(sanitizePath(destination), sourceKeys.stream() + .map(this::sanitizePath) + .toList()); + } + + @Override + public String compose(String destination, List sourceKeys, PutObjectAdditionalOptions extraOptions) { + return client.compose(sanitizePath(destination), sourceKeys.stream() + .map(this::sanitizePath) + .toList(), extraOptions); + } + + @Override + public String remove(String path) { + return client.remove(sanitizePath(path)); + } + + @Override + public List remove(String... paths) { + return client.remove(Arrays.stream(paths) + .map(this::sanitizePath) + .toArray(String[]::new)); + } + + @Override + public InputStream read(String path) { + return client.read(sanitizePath(path)); + } + + @Override + public List list(String path) { + return client.list(sanitizePath(path)); + } + + @Override + public List list(String path, int maxKeys, String startAfter) { + return client.list(sanitizePath(path), maxKeys, startAfter); + } + + @Override + public List listRecursive(String path) { + return client.listRecursive(sanitizePath(path)); + } + + @Override + public long getSize(String path) { + return client.getSize(sanitizePath(path)); + } + + @Override + public RemoteStorageWriter getRemoteStorageWriter(String path, int size) { + return client.getRemoteStorageWriter(sanitizePath(path), size); + } + + public RemoteStorageWriter writer(String path) { + return getRemoteStorageWriter(path, 1024); + } + + @Override + public String getPresignedUrl(String path) { + return getPresignedUrl(path, Method.GET); + } + + @Override + public String getPresignedUrl(String path, Method method) { + return getPresignedUrl(path, method, properties.getUrlExpirationTimeInSeconds(), TimeUnit.SECONDS); + } + + @Override + public String getPresignedUrl(String path, Method method, int expiryTime, TimeUnit expiryUnit) { + return client.getPresignedUrl(sanitizePath(path), method, expiryTime, expiryUnit); + } + + @Override + public void createBucketIfNotExists() { + client.createBucketIfNotExists(); + } + + @Override + public String initiateMultipartUpload(String path) { + return client.initiateMultipartUpload(sanitizePath(path)); + } + + @Override + public String getPresignedMultipartUploadUrl(String path, String uploadId, int partNumber) { + return client.getPresignedMultipartUploadUrl(sanitizePath(path), uploadId, partNumber); + } + + @Override + public String uploadMultipartPart(String path, String uploadId, int partNumber, String filename) { + return client.uploadMultipartPart(sanitizePath(path), uploadId, partNumber, filename); + } + + @Override + public void abortMultipartUpload(String path, String uploadId) { + client.abortMultipartUpload(sanitizePath(path), uploadId); + } + + @Override + public void completeMultipartUpload(String path, String uploadId, List partETags) { + client.completeMultipartUpload(sanitizePath(path), uploadId, partETags); + } + + /** + * Verifies if file exists on storage + * + * @param path - the path to the file on S3-compatible storage + * @return true if file exists, otherwise - false + */ + public boolean exists(String path) { + return !list(path, 1, null).isEmpty(); + } + + /** + * Verifies if file doesn't exist on S3-compatible storage + * + * @param path - the path to the file + * @return true if file doesn't exist, otherwise - false + */ + public boolean notExists(String path) { + return !exists(path); + } + + /** + * Reads all the bytes from a file + * + * @param path - the path to the file on S3-compatible storage + * @return a byte array containing the bytes read from the file + * @throws IOException - if an I/O error occurs reading from the file + */ + public byte[] readAllBytes(String path) throws IOException { + try (InputStream is = read(path)) { + return is.readAllBytes(); + } catch (S3ClientException e) { + throw log.throwing(new IOException("Error reading file with path: " + path, e)); + } + } + + /** + * Read all lines from a file as a {@code List} + * + * @param path - the path to the file on S3-compatible storage + * @return the lines from the file as a {@code List} + * @throws IOException - if an I/O error occurs reading from the file + */ + public List readAllLines(String path) throws IOException { + try (InputStream is = read(path); + InputStreamReader isr = new InputStreamReader(is); + BufferedReader br = new BufferedReader(isr)) { + return br.lines() + .toList(); + } catch (S3ClientException e) { + throw log.throwing(new IOException("Error reading file: " + path, e)); + } + } + + /** + * Read some lines from a file as a {@code List} + * + * @param path - the path to the file on S3-compatible storage + * @param skip - the number of lines to skip from start of file + * @param limit - the number of lines to read after skipping + * @return the lines from the file as a {@code List} + * @throws IOException - if an I/O error occurs reading from the file + */ + public List readLines(String path, int skip, int limit) throws IOException { + try (InputStream is = read(path); + InputStreamReader isr = new InputStreamReader(is); + BufferedReader br = new BufferedReader(isr)) { + return br.lines() + .skip(skip) + .limit(limit) + .toList(); + } catch (S3ClientException e) { + throw log.throwing(new IOException("Error reading file: " + path, e)); + } + } + + /** + * Count the lines in a file + * + * @param path - the path to the file on S3-compatible storage + * @return the number of lines in the file + * @throws IOException - if an I/O error occurs reading from the file + */ + public long numLines(String path) throws IOException { + try (InputStream is = read(path); + InputStreamReader isr = new InputStreamReader(is); + BufferedReader br = new BufferedReader(isr)) { + return br.lines() + .count(); + } catch (S3ClientException e) { + throw log.throwing(new IOException("Error reading file: " + path, e)); + } + } + + /** + * Read a number of non-blank lines from a file as a {@code Stream} + * + * @param path - the path to the file on S3-compatible storage + * @param num - the num of lines to read from start of file + * @return the lines from the file as a {@code Stream} + * @throws IOException - if an I/O error occurs reading from the file + */ + public List linesNumber(String path, int num) throws IOException { + try (InputStream is = read(path); + InputStreamReader isr = new InputStreamReader(is); + BufferedReader reader = new BufferedReader(isr)) { + List list = new ArrayList<>(); + for (int i = 0; i < num; i++) { + var line = reader.readLine(); + if (StringUtils.isNotBlank(line)) { + list.add(line); + } + } + return list; + } + } + + public void removeRecursive(String path) { + try { + remove(listRecursive(path).toArray(s -> new String[s])); + } catch (S3ClientException e) { + log.error("Cannot delete file: {}", path, e); + } + } + + /** + * Handle leading slashes in a provided path, if there is one. For legacy reasons, some paths for + * temporary local files may start with a slash, as they use filesystem-based naming. To deal with + * this, we append {@code TMP_PATH_PREFIX} to the start of such paths. These paths should never + * appear in completed job result filenames. + * + * @param path + * @return + */ + protected String sanitizePath(String path) { + if (path.startsWith(PATH_SEPARATOR)) { + String result = (TMP_PATH_PREFIX + path).replaceAll(PATH_SEPARATOR + "+", PATH_SEPARATOR); + log.info("Prefixed path '{}' for S3 operations, converted to '{}'", path, result); + return result; + } + return path; + } +} diff --git a/src/main/java/org/folio/dew/repository/BaseFilesStorage.java b/src/main/java/org/folio/dew/repository/BaseFilesStorage.java deleted file mode 100644 index 18b2fa899..000000000 --- a/src/main/java/org/folio/dew/repository/BaseFilesStorage.java +++ /dev/null @@ -1,570 +0,0 @@ -package org.folio.dew.repository; - -import io.minio.BucketExistsArgs; -import io.minio.ComposeObjectArgs; -import io.minio.ComposeSource; -import io.minio.GetObjectArgs; -import io.minio.ListObjectsArgs; -import io.minio.MakeBucketArgs; -import io.minio.MinioClient; -import io.minio.PutObjectArgs; -import io.minio.RemoveObjectArgs; -import io.minio.StatObjectArgs; -import io.minio.UploadObjectArgs; -import io.minio.credentials.IamAwsProvider; -import io.minio.credentials.Provider; -import io.minio.credentials.StaticProvider; - -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.ArrayList; -import lombok.extern.log4j.Log4j2; -import org.apache.commons.lang3.ArrayUtils; -import org.apache.commons.lang3.StringUtils; -import org.folio.dew.config.properties.MinioClientProperties; -import org.folio.dew.error.FileOperationException; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; -import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; -import software.amazon.awssdk.core.sync.RequestBody; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; -import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload; -import software.amazon.awssdk.services.s3.model.CompletedPart; -import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; -import software.amazon.awssdk.services.s3.model.PutObjectRequest; -import software.amazon.awssdk.services.s3.model.UploadPartCopyRequest; -import software.amazon.awssdk.services.s3.model.UploadPartRequest; - -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.io.OutputStreamWriter; -import java.net.URI; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import java.util.stream.StreamSupport; - -import static io.minio.ObjectWriteArgs.MIN_MULTIPART_SIZE; -import static java.lang.String.format; -import static org.folio.dew.utils.Constants.PATH_SEPARATOR; - -@Log4j2 -public class BaseFilesStorage implements S3CompatibleStorage { - - private static final String SET_VALUE = ""; - private static final String NOT_SET_VALUE = ""; - private final MinioClient client; - private S3Client s3Client; - private final String bucket; - private final String region; - private final String subPath; - - private final boolean isComposeWithAwsSdk; - - public BaseFilesStorage(MinioClientProperties properties) { - final String accessKey = properties.getAccessKey(); - final String endpoint = properties.getEndpoint(); - final String regionName = properties.getRegion(); - final String bucketName = properties.getBucket(); - final String secretKey = properties.getSecretKey(); - subPath = properties.getSubPath(); - isComposeWithAwsSdk = properties.isComposeWithAwsSdk(); - final boolean isForcePathStyle = properties.isForcePathStyle(); - log.info("Creating MinIO client endpoint {},region {},bucket {},accessKey {},secretKey {}, subPath {}, isComposedWithAwsSdk {}.", endpoint, regionName, bucketName, - StringUtils.isNotBlank(accessKey) ? SET_VALUE : NOT_SET_VALUE, StringUtils.isNotBlank(secretKey) ? SET_VALUE : NOT_SET_VALUE, - StringUtils.isNotBlank(subPath) ? SET_VALUE : NOT_SET_VALUE, isComposeWithAwsSdk); - - var builder = MinioClient.builder().endpoint(endpoint); - if (StringUtils.isNotBlank(regionName)) { - builder.region(regionName); - } - - Provider provider; - if (StringUtils.isNotBlank(accessKey) && StringUtils.isNotBlank(secretKey)) { - provider = new StaticProvider(accessKey, secretKey, null); - } else { - provider = new IamAwsProvider(null, null); - } - log.info("{} MinIO credentials provider created.", provider.getClass().getSimpleName()); - builder.credentialsProvider(provider); - - client = builder.build(); - - this.bucket = bucketName; - this.region = regionName; - - createBucketIfNotExists(); - - if (isComposeWithAwsSdk) { - AwsCredentialsProvider credentialsProvider; - - if (StringUtils.isNotBlank(accessKey) && StringUtils.isNotBlank(secretKey)) { - var awsCredentials = AwsBasicCredentials.create(accessKey, secretKey); - credentialsProvider = StaticCredentialsProvider.create(awsCredentials); - } else { - credentialsProvider = DefaultCredentialsProvider.create(); - } - - s3Client = S3Client.builder() - .forcePathStyle(isForcePathStyle) - .endpointOverride(URI.create(endpoint)) - .region(Region.of(regionName)) - .credentialsProvider(credentialsProvider) - .build(); - } - - } - - public MinioClient getMinioClient() { - return client; - } - - public void createBucketIfNotExists() { - try { - if (StringUtils.isNotBlank(bucket) && !client.bucketExists(BucketExistsArgs.builder().bucket(bucket).region(region).build())) { - client.makeBucket(MakeBucketArgs.builder() - .bucket(bucket) - .region(region) - .build()); - log.info("Created {} bucket.", bucket); - } else { - log.info("Bucket has already exist."); - } - } catch(Exception e) { - log.error("Error creating bucket: " + bucket, e); - } - } - - /** - * Upload file on S3-compatible storage - * - * @param path - the path to the file on S3-compatible storage - * @param filename – path to uploaded file - * @return the path to the file - * @throws IOException - if an I/O error occurs - */ - public String upload(String path, String filename) throws IOException { - path = getS3Path(path); - try { - return client.uploadObject(UploadObjectArgs.builder() - .bucket(bucket) - .region(region) - .object(path) - .filename(filename) - .build()) - .object(); - } catch (Exception e) { - throw new IOException("Cannot upload file: " + path, e); - } - } - - /** - * Writes bytes to a file on S3-compatible storage - * - * @param path - the path to the file on S3-compatible storage - * @param bytes – the byte array with the bytes to write - * @param headers - headers - * @return the path to the file - * @throws IOException - if an I/O error occurs - */ - public String write(String path, byte[] bytes, Map headers) throws IOException { - path = getS3Path(path); - if (isComposeWithAwsSdk) { - log.info("Writing with using AWS SDK client"); - s3Client.putObject(PutObjectRequest.builder().bucket(bucket) - .key(path).build(), - RequestBody.fromBytes(bytes)); - return path; - } else { - log.info("Writing with using Minio client"); - try(var is = new ByteArrayInputStream(bytes)) { - return client.putObject(PutObjectArgs.builder() - .bucket(bucket) - .region(region) - .object(path) - .headers(headers) - .stream(is, -1, MIN_MULTIPART_SIZE) - .build()) - .object(); - } catch (Exception e) { - throw new IOException("Cannot write file: " + path, e); - } - - } - } - - public String write(String path, byte[] bytes) throws IOException { - return write(path, bytes, new HashMap<>()); - } - - /** - * Writes file to a file on S3-compatible storage - * - * @param path - the path to the file on S3-compatible storage - * @param inputPath – path to the file to write - * @param headers - headers - * @return the path to the file - * @throws IOException - if an I/O error occurs - */ - public String writeFile(String path, Path inputPath, Map headers) throws IOException { - path = getS3Path(path); - if (isComposeWithAwsSdk) { - log.info("Writing file using AWS SDK client"); - s3Client.putObject(PutObjectRequest.builder().bucket(bucket) - .key(path).build(), - RequestBody.fromFile(inputPath)); - return path; - } else { - log.info("Writing file using Minio client"); - try (var is = Files.newInputStream(inputPath)) { - return client.putObject(PutObjectArgs.builder() - .bucket(bucket) - .region(region) - .object(path) - .headers(headers) - .stream(is, -1, MIN_MULTIPART_SIZE) - .build()) - .object(); - } catch (Exception e) { - throw new IOException("Cannot write file: " + path, e); - } - - } - } - - public String writeFile(String destPath, Path inputPath) throws IOException { - return writeFile(destPath, inputPath, new HashMap<>()); - } - - /** - * Appends byte[] to existing on the storage file. - * - * @param path - the path to the file on S3-compatible storage - * @param bytes - the byte array with the bytes to write - * @throws IOException if an I/O error occurs - */ - public void append(String path, byte[] bytes) throws IOException { - path = getS3Path(path); - try { - if (notExists(path)) { - log.info("Appending non-existing file"); - write(path, bytes); - } else { - var size = client.statObject(StatObjectArgs.builder() - .bucket(bucket) - .region(region) - .object(path).build()).size(); - - log.info("Appending to {} with size {}", path, size); - if (size > MIN_MULTIPART_SIZE) { - - if (isComposeWithAwsSdk) { - - var createMultipartUploadRequest = CreateMultipartUploadRequest.builder() - .bucket(bucket) - .key(path) - .build(); - - var uploadId = s3Client.createMultipartUpload(createMultipartUploadRequest).uploadId(); - - var uploadPartRequest1 = UploadPartCopyRequest.builder() - .sourceBucket(bucket) - .sourceKey(path) - .uploadId(uploadId) - .destinationBucket(bucket) - .destinationKey(path) - .partNumber(1).build(); - - var uploadPartRequest2 = UploadPartRequest.builder() - .bucket(bucket) - .key(path) - .uploadId(uploadId) - .partNumber(2).build(); - - var originalEtag = s3Client.uploadPartCopy(uploadPartRequest1).copyPartResult().eTag(); - var appendedEtag = s3Client.uploadPart(uploadPartRequest2, RequestBody.fromBytes(bytes)).eTag(); - - var original = CompletedPart.builder() - .partNumber(1) - .eTag(originalEtag).build(); - var appended = CompletedPart.builder() - .partNumber(2) - .eTag(appendedEtag).build(); - - var completedMultipartUpload = CompletedMultipartUpload.builder() - .parts(original, appended) - .build(); - - var completeMultipartUploadRequest = - CompleteMultipartUploadRequest.builder() - .bucket(bucket) - .key(path) - .uploadId(uploadId) - .multipartUpload(completedMultipartUpload) - .build(); - - s3Client.completeMultipartUpload(completeMultipartUploadRequest); - - } else { - - var temporaryFileName = path + "_temp"; - write(temporaryFileName, bytes); - - client.composeObject(ComposeObjectArgs.builder() - .bucket(bucket) - .region(region) - .object(path) - .sources(List.of(ComposeSource.builder() - .bucket(bucket) - .region(region) - .object(path) - .build(), - ComposeSource.builder() - .bucket(bucket) - .region(region) - .object(temporaryFileName) - .build())) - .build()); - - delete(temporaryFileName); - - } - - } else { - write(path, ArrayUtils.addAll(readAllBytes(path), bytes)); - } - } - } catch (Exception e) { - throw new IOException("Cannot append data for path: " + path, e); - } - } - - /** - * Deletes a file - * - * @param path - the path to the file to delete - * @throws FileOperationException if an I/O error occurs - */ - public void delete(String path) { - path = getS3Path(path); - try { - var paths = walk(path).collect(Collectors.toList()); - - paths.forEach(p -> { - try { - client.removeObject(RemoveObjectArgs.builder() - .bucket(bucket) - .region(region) - .object(p) - .build()); - } catch (Exception e) { - log.error(format("Cannot delete file: %s", p), e.getMessage()); - } - }); - } catch (Exception e) { - throw new FileOperationException("Cannot delete file: " + path, e); - } - } - - /** - * Return a {@code Stream} that is lazily populated with {@code - * Path} by walking the file tree rooted at a given starting file. - * - * @param path - the path to the folder - * @return the {@link Stream} of the nested files - * @throws FileOperationException if an I/O error occurs - */ - public Stream walk(String path) { - return getInternalStructure(getS3Path(path), true); - } - - /** - * Verifies if file exists on storage - * - * @param path - the path to the file on S3-compatible storage - * @return true if file exists, otherwise - false - */ - public boolean exists(String path) { - path = getS3Path(path); - var iterator = client.listObjects(ListObjectsArgs.builder() - .bucket(bucket) - .region(region) - .prefix(path) - .maxKeys(1) - .build()) - .iterator(); - try { - return iterator.hasNext() && Objects.nonNull(iterator.next().get()); - } catch (Exception e) { - log.error("Error file existing verification, path: " + path, e); - return false; - } - } - - /** - * Verifies if file doesn't exist on S3-compatible storage - * @param path - the path to the file - * @return true if file doesn't exist, otherwise - false - */ - public boolean notExists(String path) { - return !exists(path); - } - - /** - * Opens a file, returning an input stream to read from the file - * - * @param path - the path to the file on S3-compatible storage - * @return a new input stream - * @throws IOException - if an I/O error occurs reading from the file - */ - public InputStream newInputStream(String path) throws IOException { - path = getS3Path(path); - try { - return client.getObject(GetObjectArgs.builder() - .bucket(bucket) - .region(region) - .object(path) - .build()); - } catch (Exception e) { - throw new IOException("Error creating input stream for path: " + path, e); - } - } - - /** - * Reads all the bytes from a file - * - * @param path - the path to the file on S3-compatible storage - * @return a byte array containing the bytes read from the file - * @throws IOException - if an I/O error occurs reading from the file - */ - public byte[] readAllBytes(String path) throws IOException { - try (var is = newInputStream(path)) { - return is.readAllBytes(); - } catch (Exception e) { - throw new IOException("Error reading file with path: " + path, e); - } - } - - /** - * Read all lines from a file as a {@code Stream} - * - * @param path - the path to the file on S3-compatible storage - * @return the lines from the file as a {@code Stream} - * @throws IOException - if an I/O error occurs reading from the file - */ - public Stream lines(String path) throws IOException { - return new BufferedReader(new InputStreamReader(newInputStream(path))).lines(); - } - - /** - * Read number lines from a file as a {@code Stream} - * - * @param path - the path to the file on S3-compatible storage - * @param num - the num of lines to read from start of file - * @return the lines from the file as a {@code Stream} - * @throws IOException - if an I/O error occurs reading from the file - */ - public List linesNumber(String path, int num) throws IOException { - try (var reader = new BufferedReader(new InputStreamReader(newInputStream(path)))) { - List list = new ArrayList<>(); - for (int i = 0; i < num; i++) { - var line = reader.readLine(); - if (StringUtils.isNotBlank(line)) { - list.add(line); - } - } - return list; - } - } - - /** - * Read all lines from a file - * - * @param path - the path to the file on S3-compatible storage - * @return - the lines from the file as a List - * @throws IOException - if an I/O error occurs reading from the file - */ - public List readAllLines(String path) throws IOException { - try (var lines = lines(path)) { - return lines.collect(Collectors.toList()); - } catch(Exception e) { - throw new IOException("Error reading file: " + path, e); - } - } - - public OutputStream newOutputStream(String path) { - - - return new OutputStream() { - - byte[] buffer = new byte[0]; - - @Override - public void write(int b) { - buffer = ArrayUtils.add(buffer, (byte) b); - } - - @Override - public void flush() { -// throw new NotImplementedException("Method isn't implemented yet"); - } - - @Override - public void close() { - try { - BaseFilesStorage.this.write(path, buffer); - } catch (IOException e) { - throw new FileOperationException("Error closing stream and writes bytes to path: " + path, e); - } finally { - buffer = new byte[0]; - } - } - }; - } - - public BufferedWriter writer(String path) { - return new BufferedWriter(new OutputStreamWriter(newOutputStream(path))); - } - - private Stream getInternalStructure(String path, boolean isRecursive) { - try { - return StreamSupport.stream(client.listObjects(ListObjectsArgs.builder() - .bucket(bucket) - .region(region) - .prefix(path) - .recursive(isRecursive) - .build()).spliterator(), false).map(item -> { - try { - return item.get().objectName(); - } catch (Exception e) { - throw new FileOperationException(e); - } - }); - } catch(Exception e) { - log.error("Cannot read folder: " + path, e); - return null; - } - } - - public String getS3Path(String path) { - if (StringUtils.isBlank(subPath) || StringUtils.startsWith(path, subPath + PATH_SEPARATOR)) { - return path; - } - if (path.startsWith(PATH_SEPARATOR)) { - return subPath + path; - } - return subPath + PATH_SEPARATOR + path; - } -} diff --git a/src/main/java/org/folio/dew/repository/LocalFilesStorage.java b/src/main/java/org/folio/dew/repository/LocalFilesStorage.java index f935c5963..6ca2a39bc 100644 --- a/src/main/java/org/folio/dew/repository/LocalFilesStorage.java +++ b/src/main/java/org/folio/dew/repository/LocalFilesStorage.java @@ -9,7 +9,7 @@ */ @Repository @Log4j2 -public class LocalFilesStorage extends BaseFilesStorage{ +public class LocalFilesStorage extends AbstractFilesStorage{ public LocalFilesStorage(LocalFilesStorageProperties properties) { super(properties); } diff --git a/src/main/java/org/folio/dew/repository/RemoteFilesStorage.java b/src/main/java/org/folio/dew/repository/RemoteFilesStorage.java index 065546ae1..fb3fd7c0b 100644 --- a/src/main/java/org/folio/dew/repository/RemoteFilesStorage.java +++ b/src/main/java/org/folio/dew/repository/RemoteFilesStorage.java @@ -1,148 +1,86 @@ package org.folio.dew.repository; -import io.minio.ComposeObjectArgs; -import io.minio.ComposeSource; -import io.minio.GetPresignedObjectUrlArgs; -import io.minio.ListObjectsArgs; -import io.minio.MinioClient; -import io.minio.ObjectWriteArgs; -import io.minio.RemoveObjectsArgs; -import io.minio.Result; -import io.minio.errors.ErrorResponseException; -import io.minio.errors.InsufficientDataException; -import io.minio.errors.InternalException; -import io.minio.errors.InvalidResponseException; -import io.minio.errors.ServerException; -import io.minio.errors.XmlParserException; -import io.minio.http.Method; -import io.minio.messages.DeleteError; -import io.minio.messages.DeleteObject; - -import java.io.IOException; -import java.security.InvalidKeyException; -import java.security.NoSuchAlgorithmException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -import io.minio.messages.Item; -import lombok.extern.log4j.Log4j2; import org.apache.commons.lang3.StringUtils; import org.folio.dew.config.properties.RemoteFilesStorageProperties; +import org.folio.s3.client.PutObjectAdditionalOptions; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.http.HttpHeaders; import org.springframework.stereotype.Repository; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.List; + +import lombok.extern.log4j.Log4j2; @Repository @Log4j2 -public class RemoteFilesStorage extends BaseFilesStorage { +public class RemoteFilesStorage extends AbstractFilesStorage { public static final String CONTENT_DISPOSITION_HEADER_WITHOUT_FILENAME = "attachment"; - public static final String CONTENT_DISPOSITION_HEADER_WITH_FILENAME = CONTENT_DISPOSITION_HEADER_WITHOUT_FILENAME + "; filename=\"%s\""; + public static final String CONTENT_DISPOSITION_HEADER_WITH_FILENAME = CONTENT_DISPOSITION_HEADER_WITHOUT_FILENAME + + "; filename=\"%s\""; - private final MinioClient client; @Autowired private LocalFilesStorage localFilesStorage; - private final String bucket; - private final String region; - private final int urlExpirationTimeInSeconds; public RemoteFilesStorage(RemoteFilesStorageProperties properties) { super(properties); - this.bucket = properties.getBucket(); - this.region = properties.getRegion(); - this.urlExpirationTimeInSeconds = properties.getUrlExpirationTimeInSeconds(); - this.client = getMinioClient(); } - public String uploadObject(String object, String filename, String downloadFilename, String contentType, boolean isSourceShouldBeDeleted) - throws IOException { + public String uploadObject(String object, String filename, String downloadFilename, String contentType, + boolean isSourceShouldBeDeleted) throws IOException { log.info("Uploading object {},filename {},downloadFilename {},contentType {}.", object, filename, downloadFilename, - contentType); + contentType); - var result = write(object, localFilesStorage.readAllBytes(filename), prepareHeaders(downloadFilename, contentType)); + byte[] bytes = localFilesStorage.readAllBytes(filename); + var result = write(object, new ByteArrayInputStream(bytes), bytes.length, + prepareAdditionalOptions(downloadFilename, contentType)); if (isSourceShouldBeDeleted) { - localFilesStorage.delete(filename); + localFilesStorage.remove(filename); log.info("Deleted temp file {}.", filename); } return result; } - public boolean containsFile(String fileName) - throws ServerException, InsufficientDataException, ErrorResponseException, IOException, NoSuchAlgorithmException, - InvalidKeyException, InvalidResponseException, XmlParserException, InternalException { - fileName = getS3Path(fileName); - for (Result itemResult : client.listObjects(ListObjectsArgs.builder().bucket(bucket).prefix(getS3Path(fileName)).build())) { - if (fileName.equals(itemResult.get().objectName())) { - return true; - } - } - return false; + public boolean containsFile(String fileName) { + return this.exists(fileName); } - public String composeObject(String destObject, List sourceObjects, String downloadFilename, - String contentType) - throws IOException, InvalidKeyException, InvalidResponseException, InsufficientDataException, NoSuchAlgorithmException, - ServerException, InternalException, XmlParserException, ErrorResponseException { - destObject = getS3Path(destObject); - List sources = sourceObjects.stream() - .map(so -> ComposeSource.builder().bucket(bucket).object(getS3Path(so)).build()) - .collect(Collectors.toList()); - log.info("Composing object {},sources [{}],downloadFilename {},contentType {}.", destObject, - sources.stream().map(s -> String.format("bucket %s,object %s", s.bucket(), s.object())).collect(Collectors.joining(",")), - downloadFilename, contentType); - var result = client.composeObject( - createArgs(ComposeObjectArgs.builder().sources(sources), destObject, downloadFilename, contentType)).object(); + public String composeObject(String destObject, List sourceObjects, String downloadFilename, String contentType) { + log.info("Composing object {} from {}", destObject, sourceObjects); + String result = compose(destObject, sourceObjects, prepareAdditionalOptions(downloadFilename, contentType)); removeObjects(sourceObjects); return result; } - public Iterable> removeObjects(List objects) { + public void removeObjects(List objects) { log.info("Deleting objects [{}].", StringUtils.join(objects, ",")); - return client.removeObjects(RemoveObjectsArgs.builder() - .bucket(bucket) - .objects(objects.stream().map(this::getS3Path).map(DeleteObject::new).toList()) - .build()); + remove(objects.toArray(s -> new String[s])); } - public String objectToPresignedObjectUrl(String object) - throws IOException, InvalidKeyException, InvalidResponseException, InsufficientDataException, NoSuchAlgorithmException, - ServerException, InternalException, XmlParserException, ErrorResponseException { - String result = client.getPresignedObjectUrl(GetPresignedObjectUrlArgs.builder() - .method(Method.GET) - .bucket(bucket) - .object(getS3Path(object)) - .region(region) - .expiry(urlExpirationTimeInSeconds, TimeUnit.SECONDS) - .build()); + public String objectToPresignedObjectUrl(String object) { + String result = getPresignedUrl(object); log.info("Created presigned URL {}.", result); return result; } - private > T createArgs(B builder, String object, - String downloadFilename, String contentType) { - Map headers = prepareHeaders(downloadFilename, contentType); - return builder.headers(headers).object(object).bucket(bucket).build(); - } + private PutObjectAdditionalOptions prepareAdditionalOptions(String downloadFilename, String contentType) { + PutObjectAdditionalOptions.PutObjectAdditionalOptionsBuilder builder = PutObjectAdditionalOptions.builder(); - private Map prepareHeaders(String downloadFilename, String contentType) { - Map headers = new HashMap<>(2); if (StringUtils.isNotBlank(downloadFilename)) { - headers.put(HttpHeaders.CONTENT_DISPOSITION, String.format(CONTENT_DISPOSITION_HEADER_WITH_FILENAME, downloadFilename)); + builder = builder.contentDisposition(String.format(CONTENT_DISPOSITION_HEADER_WITH_FILENAME, downloadFilename)); } else { - headers.put(HttpHeaders.CONTENT_DISPOSITION, CONTENT_DISPOSITION_HEADER_WITHOUT_FILENAME); + builder = builder.contentDisposition(CONTENT_DISPOSITION_HEADER_WITHOUT_FILENAME); } if (StringUtils.isNotBlank(contentType)) { - headers.put(HttpHeaders.CONTENT_TYPE, contentType); + builder = builder.contentType(contentType); } - return headers; + + return builder.build(); } } diff --git a/src/main/java/org/folio/dew/repository/S3CompatibleResource.java b/src/main/java/org/folio/dew/repository/S3CompatibleResource.java index 1023610c2..56c4e506a 100644 --- a/src/main/java/org/folio/dew/repository/S3CompatibleResource.java +++ b/src/main/java/org/folio/dew/repository/S3CompatibleResource.java @@ -13,7 +13,7 @@ import java.net.URL; @AllArgsConstructor -public class S3CompatibleResource implements WritableResource { +public class S3CompatibleResource implements WritableResource { private String path; private R storage; @@ -78,6 +78,6 @@ public String getDescription() { @Override public InputStream getInputStream() throws IOException { - return storage.newInputStream(path); + return storage.read(path); } } diff --git a/src/main/java/org/folio/dew/repository/S3CompatibleStorage.java b/src/main/java/org/folio/dew/repository/S3CompatibleStorage.java deleted file mode 100644 index 524006341..000000000 --- a/src/main/java/org/folio/dew/repository/S3CompatibleStorage.java +++ /dev/null @@ -1,15 +0,0 @@ -package org.folio.dew.repository; - -import java.io.IOException; -import java.io.InputStream; -import java.util.Map; - -public interface S3CompatibleStorage { - String upload(String path, String filename) throws IOException; - void append(String path, byte[] bytes) throws IOException; - String write(String path, byte[] bytes) throws IOException; - String write(String path, byte[] bytes, Map headers) throws IOException; - boolean exists(String path); - InputStream newInputStream(String path) throws IOException; - byte[] readAllBytes(String path) throws IOException; -} diff --git a/src/main/java/org/folio/dew/utils/Constants.java b/src/main/java/org/folio/dew/utils/Constants.java index b300d4248..1044668e6 100644 --- a/src/main/java/org/folio/dew/utils/Constants.java +++ b/src/main/java/org/folio/dew/utils/Constants.java @@ -10,6 +10,7 @@ public class Constants { public static final String ROLLBACK_FILE = "rollBackFile"; public static final String PATH_SEPARATOR = "/"; + public static final String TMP_PATH_PREFIX = "tmp"; // public static final String MATCHED_RECORDS = "-Matched-Records-"; public static final String CHANGED_RECORDS = "-Changed-Records-"; public static final String UPDATED_PREFIX = "UPDATED-"; diff --git a/src/main/java/org/folio/dew/utils/CsvHelper.java b/src/main/java/org/folio/dew/utils/CsvHelper.java index 84ef0eb28..5c97e49a8 100644 --- a/src/main/java/org/folio/dew/utils/CsvHelper.java +++ b/src/main/java/org/folio/dew/utils/CsvHelper.java @@ -12,7 +12,7 @@ import lombok.extern.log4j.Log4j2; import org.apache.commons.lang3.StringUtils; -import org.folio.dew.repository.BaseFilesStorage; +import org.folio.dew.repository.AbstractFilesStorage; import java.io.BufferedReader; import java.io.IOException; @@ -28,8 +28,8 @@ public class CsvHelper { private static final int BATCH_SIZE = 1000; - public static List readRecordsFromStorage(R storage, String fileName, Class clazz, boolean skipHeaders) throws IOException { - try (var reader = new BufferedReader(new InputStreamReader(storage.newInputStream(fileName)))) { + public static List readRecordsFromStorage(R storage, String fileName, Class clazz, boolean skipHeaders) throws IOException { + try (var reader = new BufferedReader(new InputStreamReader(storage.read(fileName)))) { return new CsvToBeanBuilder(reader) .withType(clazz) .withSkipLines(skipHeaders ? 1 : 0) @@ -38,9 +38,9 @@ public static List readRecordsFromStorage(R s } } - public static List readRecordsFromRemoteFilesStorage(R storage, String fileName, int limit, Class clazz) + public static List readRecordsFromRemoteFilesStorage(R storage, String fileName, int limit, Class clazz) throws IOException { - try (var reader = new BufferedReader(new InputStreamReader(storage.newInputStream(fileName)))) { + try (var reader = new BufferedReader(new InputStreamReader(storage.read(fileName)))) { var linesString = reader.lines().skip(1).limit(limit).collect(Collectors.joining("\n")); return new CsvToBeanBuilder(new StringReader(linesString)) .withType(clazz) @@ -49,13 +49,13 @@ public static List readRecordsFromRemoteFiles } } - public static void saveRecordsToStorage(R storage, List beans, Class clazz, String fileName) + public static void saveRecordsToStorage(R storage, List beans, Class clazz, String fileName) throws CsvRequiredFieldEmptyException, CsvDataTypeMismatchException, IOException { var strategy = new RecordColumnMappingStrategy(); strategy.setType(clazz); if (storage.exists(fileName)) { - storage.delete(fileName); + storage.remove(fileName); } if (beans.size() > BATCH_SIZE) { @@ -86,10 +86,8 @@ public static void saveRecordsToStorage(R storag } } - public static long countLines(R storage, String path) throws IOException { - try (var lines = storage.lines(path)) { - return lines.count(); - } + public static long countLines(R storage, String path) throws IOException { + return storage.numLines(path); } diff --git a/src/test/java/org/folio/dew/AuthorityControlConsortiumTest.java b/src/test/java/org/folio/dew/AuthorityControlConsortiumTest.java index 03fa1184b..302529b6c 100644 --- a/src/test/java/org/folio/dew/AuthorityControlConsortiumTest.java +++ b/src/test/java/org/folio/dew/AuthorityControlConsortiumTest.java @@ -43,7 +43,7 @@ class AuthorityControlConsortiumTest extends BaseBatchTest { private static final String EXPECTED_AUTH_HEADING_UPDATE_OUTPUT = "src/test/resources/output/authority_control/auth_heading_update_consortium.csv"; private static final String EXPECTED_S3_FILE_PATH = - "remote/mod-data-export-worker/authority_control_export/consortium/"; + "mod-data-export-worker/authority_control_export/consortium/"; @Autowired private Job getAuthHeadingJob; @Autowired diff --git a/src/test/java/org/folio/dew/AuthorityControlTest.java b/src/test/java/org/folio/dew/AuthorityControlTest.java index a2efe5646..02ba88bb3 100644 --- a/src/test/java/org/folio/dew/AuthorityControlTest.java +++ b/src/test/java/org/folio/dew/AuthorityControlTest.java @@ -52,7 +52,7 @@ class AuthorityControlTest extends BaseBatchTest { "src/test/resources/output/authority_control/auth_heading_update.csv"; private static final String EXPECTED_AUTH_HEADING_UPDATE_EMPTY_OUTPUT = "src/test/resources/output/authority_control/auth_heading_update_empty.csv"; - private static final String EXPECTED_S3_FILE_PATH = "remote/mod-data-export-worker/authority_control_export/diku/"; + private static final String EXPECTED_S3_FILE_PATH = "mod-data-export-worker/authority_control_export/diku/"; @Autowired private Job getAuthHeadingJob; @Autowired diff --git a/src/test/java/org/folio/dew/CirculationLogTest.java b/src/test/java/org/folio/dew/CirculationLogTest.java index 991f3c991..faa021ea9 100644 --- a/src/test/java/org/folio/dew/CirculationLogTest.java +++ b/src/test/java/org/folio/dew/CirculationLogTest.java @@ -83,7 +83,7 @@ private void verifyFileOutput(JobExecution jobExecution) throws Exception { final ExecutionContext executionContext = jobExecution.getExecutionContext(); final String fileInStorage = (String) executionContext.get("outputFilesInStorage"); final String fileName = executionContext.getString(CIRCULATION_LOG_FILE_NAME); - final String expectedNameInStorage = "remote/" + fileName; + final String expectedNameInStorage = fileName; final FileSystemResource actualChargeFeesFinesOutput = actualFileOutput(fileInStorage); FileSystemResource expectedCharges = new FileSystemResource(EXPECTED_CIRCULATION_OUTPUT); diff --git a/src/test/java/org/folio/dew/EHoldingsTest.java b/src/test/java/org/folio/dew/EHoldingsTest.java index 671101195..221d76cc3 100644 --- a/src/test/java/org/folio/dew/EHoldingsTest.java +++ b/src/test/java/org/folio/dew/EHoldingsTest.java @@ -87,7 +87,7 @@ static void beforeAll() { "src/test/resources/output/eholdings_package_export_with_3_titles.csv"; private final static String EXPECTED_PACKAGE_WITH_SAME_TITLE_NAMES_OUTPUT = "src/test/resources/output/eholdings_package_export_with_same_title_names.csv"; - private static final String FILE_PATH = "remote/mod-data-export-worker/e_holdings_export/diku/"; + private static final String FILE_PATH = "mod-data-export-worker/e_holdings_export/diku/"; @Test @DisplayName("Run EHoldingsJob export resource without provider load successfully") diff --git a/src/test/java/org/folio/dew/batch/acquisitions/mapper/EdifactMapperTest.java b/src/test/java/org/folio/dew/batch/acquisitions/mapper/EdifactMapperTest.java index 009f29570..f5713d18d 100644 --- a/src/test/java/org/folio/dew/batch/acquisitions/mapper/EdifactMapperTest.java +++ b/src/test/java/org/folio/dew/batch/acquisitions/mapper/EdifactMapperTest.java @@ -35,7 +35,6 @@ import org.folio.dew.config.JacksonConfiguration; import org.folio.dew.domain.dto.CompositePurchaseOrder; import org.folio.dew.domain.dto.ExportType; -import org.folio.dew.domain.dto.Location; import org.folio.dew.domain.dto.Piece; import org.folio.dew.domain.dto.PieceCollection; import org.folio.dew.domain.dto.VendorEdiOrdersExportConfig; diff --git a/src/test/java/org/folio/dew/batch/authoritycontrol/AuthorityControlCsvFileWriterTest.java b/src/test/java/org/folio/dew/batch/authoritycontrol/AuthorityControlCsvFileWriterTest.java index 641f84dac..a58a20d0a 100644 --- a/src/test/java/org/folio/dew/batch/authoritycontrol/AuthorityControlCsvFileWriterTest.java +++ b/src/test/java/org/folio/dew/batch/authoritycontrol/AuthorityControlCsvFileWriterTest.java @@ -36,7 +36,7 @@ class AuthorityControlCsvFileWriterTest { void setUp() { authorityControlCsvFileWriter = new AuthorityControlCsvFileWriter(AuthUpdateHeadingExportFormat.class, TEMP_FILE, localFilesStorage, folioTenantService); - lenient().doNothing().when(localFilesStorage).append(anyString(), any()); + lenient().doNothing().when(localFilesStorage).append(anyString(), any(byte[].class)); } @Test @@ -46,7 +46,7 @@ void testWriteHeadersBeforeStepMethod() { authorityControlCsvFileWriter.beforeStep(); //Then - verify(localFilesStorage).append(eq(TEMP_FILE), any()); + verify(localFilesStorage).append(eq(TEMP_FILE), any(byte[].class)); } @Test @@ -58,7 +58,7 @@ void testWriteHeadersAfterStepMethod_whenStatsExist() { authorityControlCsvFileWriter.afterStep(); //Then - verify(localFilesStorage, never()).append(eq(TEMP_FILE), any()); + verify(localFilesStorage, never()).append(eq(TEMP_FILE), any(byte[].class)); } @Test @@ -86,6 +86,6 @@ void testWriteItemsMethod() { authorityControlCsvFileWriter.write(new Chunk<>(items)); //Then - verify(localFilesStorage).append(eq(TEMP_FILE), any()); + verify(localFilesStorage).append(eq(TEMP_FILE), any(byte[].class)); } } diff --git a/src/test/java/org/folio/dew/batch/eholdings/EHoldingsCsvFileWriterTest.java b/src/test/java/org/folio/dew/batch/eholdings/EHoldingsCsvFileWriterTest.java index ee9421d54..e8ef414b0 100644 --- a/src/test/java/org/folio/dew/batch/eholdings/EHoldingsCsvFileWriterTest.java +++ b/src/test/java/org/folio/dew/batch/eholdings/EHoldingsCsvFileWriterTest.java @@ -60,7 +60,7 @@ void setUp() { lenient().when(jobExecution.getExecutionContext()).thenReturn(executionContext); lenient().when(executionContext.getInt(anyString(), anyInt())).thenReturn(100); lenient().when(exportConfig.getRecordId()).thenReturn("recordId"); - lenient().doNothing().when(localFilesStorage).append(anyString(), any()); + lenient().doNothing().when(localFilesStorage).append(anyString(), any(byte[].class)); EHoldingsPackageExportFormat exportFormat = new EHoldingsPackageExportFormat(); exportFormat.setPackageId("packageId"); exportFormat.setPackageName("packageName"); @@ -82,7 +82,7 @@ void testBeforeStep(List packageFields, eHoldingsCsvFileWriter.beforeStep(stepExecution); - verify(localFilesStorage, times(localFileStorageInvocations)).append(anyString(), any()); + verify(localFilesStorage, times(localFileStorageInvocations)).append(anyString(), any(byte[].class)); verify(packageRepository, times(packageRepositoryInvocations)).findById(any()); verify(mapper, times(mapperInvocations)).convertToExportFormat(any(EHoldingsPackage.class)); } @@ -102,7 +102,7 @@ void testWriteMethod(List titleFields, int localFileStorageInvocations){ eHoldingsCsvFileWriter.write(new Chunk<>(items)); //Then - verify(localFilesStorage, times(localFileStorageInvocations)).append(anyString(), any()); + verify(localFilesStorage, times(localFileStorageInvocations)).append(anyString(), any(byte[].class)); } private static Stream provideParameters() { diff --git a/src/test/java/org/folio/dew/controller/PresignedUrlControllerTest.java b/src/test/java/org/folio/dew/controller/PresignedUrlControllerTest.java index ef3a3d83b..99d9f81e6 100644 --- a/src/test/java/org/folio/dew/controller/PresignedUrlControllerTest.java +++ b/src/test/java/org/folio/dew/controller/PresignedUrlControllerTest.java @@ -30,7 +30,7 @@ static void beforeAll() { void shouldRetrievePresignedUrl() throws Exception { var jobId = UUID.randomUUID(); var filePath = jobId + PATH_SEPARATOR + FilenameUtils.getName(PREVIEW_ITEM_DATA); - remoteFilesStorage.upload(filePath, PREVIEW_ITEM_DATA); + remoteFilesStorage.upload(PREVIEW_ITEM_DATA, filePath); var headers = defaultHeaders(); diff --git a/src/test/java/org/folio/dew/repository/LocalFilesStorageAwsSdkComposingTest.java b/src/test/java/org/folio/dew/repository/LocalFilesStorageAwsSdkComposingTest.java deleted file mode 100644 index 8527182f6..000000000 --- a/src/test/java/org/folio/dew/repository/LocalFilesStorageAwsSdkComposingTest.java +++ /dev/null @@ -1,90 +0,0 @@ -package org.folio.dew.repository; - -import org.apache.commons.lang3.ArrayUtils; -import org.folio.dew.config.properties.LocalFilesStorageProperties; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.boot.test.context.SpringBootTest; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Objects; -import java.util.concurrent.ThreadLocalRandom; - -import io.minio.ObjectWriteArgs; -import lombok.extern.log4j.Log4j2; - -import static java.util.stream.Collectors.toList; -import static org.folio.dew.utils.Constants.PATH_SEPARATOR; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.is; -import static org.junit.jupiter.api.Assertions.assertTrue; - -@Log4j2 -@SpringBootTest(classes = { LocalFilesStorageProperties.class, LocalFilesStorage.class }, properties = { - "application.minio-local.compose-with-aws-sdk = true", "application.minio-local.force-path-style = true" }) -@EnableConfigurationProperties -class LocalFilesStorageAwsSdkComposingTest { - - @Autowired - private LocalFilesStorageProperties localFilesStorageProperties; - @Autowired - private LocalFilesStorage localFilesStorage; - - @ParameterizedTest - @DisplayName("Create file, update it (append bytes[]), read and delete") - @ValueSource(ints = { 1024, ObjectWriteArgs.MIN_MULTIPART_SIZE + 1 }) - void testWriteReadPatchDelete(int size) throws IOException { - - byte[] original = getRandomBytes(size); - var remoteFilePath = "CSV_Data.csv"; - var expectedS3Path = localFilesStorageProperties.getSubPath() + PATH_SEPARATOR + remoteFilePath; - - assertThat(localFilesStorage.write(remoteFilePath, original), is(expectedS3Path)); - assertTrue(localFilesStorage.exists(remoteFilePath)); - - assertTrue(Objects.deepEquals(localFilesStorage.readAllBytes(remoteFilePath), original)); - assertTrue(Objects.deepEquals(localFilesStorage.lines(remoteFilePath) - .collect(toList()), localFilesStorage.readAllLines(remoteFilePath))); - - localFilesStorage.delete(remoteFilePath); - assertTrue(localFilesStorage.notExists(remoteFilePath)); - } - - @Test - @DisplayName("Append files with using AWS SDK workaround instead of MinIO client composeObject-method") - void testAppendFileParts() throws IOException { - - var name = "directory_1/directory_2/CSV_Data.csv"; - byte[] file = getRandomBytes(30000000); - var size = file.length; - - var first = Arrays.copyOfRange(file, 0, size / 3); - var second = Arrays.copyOfRange(file, size / 3, 2 * size / 3); - var third = Arrays.copyOfRange(file, 2 * size / 3, size); - - var expected = ArrayUtils.addAll(ArrayUtils.addAll(first, second), third); - - assertTrue(Objects.deepEquals(file, expected)); - - localFilesStorage.append(name, first); - localFilesStorage.append(name, second); - localFilesStorage.append(name, third); - - var result = localFilesStorage.readAllBytes(name); - - assertTrue(Objects.deepEquals(file, result)); - - } - - private byte[] getRandomBytes(int size) { - var original = new byte[size]; - ThreadLocalRandom.current() - .nextBytes(original); - return original; - } -} diff --git a/src/test/java/org/folio/dew/repository/LocalFilesStorageTest.java b/src/test/java/org/folio/dew/repository/LocalFilesStorageTest.java index 8205a7d6e..8d1916f44 100644 --- a/src/test/java/org/folio/dew/repository/LocalFilesStorageTest.java +++ b/src/test/java/org/folio/dew/repository/LocalFilesStorageTest.java @@ -3,6 +3,8 @@ import io.minio.ObjectWriteArgs; import lombok.extern.log4j.Log4j2; import org.folio.dew.config.properties.LocalFilesStorageProperties; +import org.folio.s3.client.RemoteStorageWriter; +import org.folio.s3.exception.S3ClientException; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -11,7 +13,6 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.test.context.SpringBootTest; -import java.io.BufferedWriter; import java.io.IOException; import java.io.InputStream; import java.util.List; @@ -20,7 +21,6 @@ import static java.util.List.of; import static java.util.stream.Collectors.toList; -import static org.folio.dew.utils.Constants.PATH_SEPARATOR; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -35,8 +35,6 @@ class LocalFilesStorageTest { private static final String NON_EXISTING_PATH = "non-existing-path"; - @Autowired - private LocalFilesStorageProperties localFilesStorageProperties; @Autowired private LocalFilesStorage localFilesStorage; @@ -45,19 +43,16 @@ class LocalFilesStorageTest { @ValueSource(ints = {1024, ObjectWriteArgs.MIN_MULTIPART_SIZE + 1 }) @DisplayName("Create files and read internal objects structure") void testWriteRead(int size) throws IOException { - var subPath = localFilesStorageProperties.getSubPath() + PATH_SEPARATOR; byte[] content = getRandomBytes(size); var original = of("directory_1/CSV_Data_1.csv", "directory_1/directory_2/CSV_Data_2.csv", - "directory_1/directory_2/directory_3/CSV_Data_3.csv"); - var expectedS3Pathes = of(subPath + "directory_1/CSV_Data_1.csv", subPath + "directory_1/directory_2/CSV_Data_2.csv", - subPath + "directory_1/directory_2/directory_3/CSV_Data_3.csv"); + "directory_1/directory_2/directory_3/CSV_Data_3.csv"); List actual; try { actual = original.stream() .map(p -> { try { return localFilesStorage.write(p, content); - } catch (IOException ex) { + } catch (S3ClientException ex) { throw new RuntimeException(ex); } }) @@ -66,21 +61,19 @@ void testWriteRead(int size) throws IOException { throw new IOException(e); } - assertTrue(Objects.deepEquals(expectedS3Pathes, actual)); + assertTrue(Objects.deepEquals(original, actual)); - assertTrue(Objects.deepEquals(localFilesStorage.walk(subPath + "directory_1/") - .collect(toList()), - of(subPath + "directory_1/CSV_Data_1.csv", subPath + "directory_1/directory_2/CSV_Data_2.csv", - subPath + "directory_1/directory_2/directory_3/CSV_Data_3.csv"))); + assertTrue(Objects.deepEquals(localFilesStorage.listRecursive("directory_1/"), + of("directory_1/CSV_Data_1.csv", "directory_1/directory_2/CSV_Data_2.csv", + "directory_1/directory_2/directory_3/CSV_Data_3.csv"))); - assertTrue(Objects.deepEquals(localFilesStorage.walk(subPath + "directory_1/directory_2/") - .collect(toList()), - of(subPath + "directory_1/directory_2/CSV_Data_2.csv", subPath + "directory_1/directory_2/directory_3/CSV_Data_3.csv"))); + assertTrue(Objects.deepEquals(localFilesStorage.listRecursive("directory_1/directory_2/"), + of("directory_1/directory_2/CSV_Data_2.csv", "directory_1/directory_2/directory_3/CSV_Data_3.csv"))); original.forEach(p -> assertTrue(localFilesStorage.exists(p))); // Clean crated files - localFilesStorage.delete("directory_1"); + localFilesStorage.removeRecursive("directory_1"); original.forEach(p -> assertFalse(localFilesStorage.exists(p))); } @@ -91,21 +84,21 @@ void testWriteRead(int size) throws IOException { void testBufferedWriter(int size) { var path = "directory/resource.csv"; var expected = new String(getRandomBytes(size)); - try(BufferedWriter writer = localFilesStorage.writer(path)) { + try (RemoteStorageWriter writer = localFilesStorage.writer(path)) { writer.write(expected); - } catch (IOException e) { + } catch (S3ClientException e) { fail("Writer exception"); } - try(InputStream is = localFilesStorage.newInputStream(path)) { + try (InputStream is = localFilesStorage.read(path)) { var actual = new String(is.readAllBytes()); assertEquals(expected, actual); } catch (IOException e) { fail("Read resource exception"); } - // Clean crated files - localFilesStorage.delete(path); + // Clean created files + localFilesStorage.remove(path); } @ParameterizedTest @@ -116,21 +109,19 @@ void testWriteReadPatchDelete(int size) throws IOException { byte[] original = getRandomBytes(size); byte[] patch = getRandomBytes(size); var remoteFilePath = "directory_1/directory_2/CSV_Data.csv"; - var expectedS3FilePath = localFilesStorageProperties.getSubPath() + PATH_SEPARATOR + remoteFilePath; + var expectedS3FilePath = remoteFilePath; assertThat(localFilesStorage.write(remoteFilePath, original), is(expectedS3FilePath)); assertTrue(localFilesStorage.exists(remoteFilePath)); assertTrue(Objects.deepEquals(localFilesStorage.readAllBytes(remoteFilePath), original)); - assertTrue(Objects.deepEquals(localFilesStorage.lines(remoteFilePath) - .collect(toList()), localFilesStorage.readAllLines(remoteFilePath))); localFilesStorage.append(remoteFilePath, patch); var patched = localFilesStorage.readAllBytes(remoteFilePath); assertThat(patched.length, is(original.length + patch.length)); - localFilesStorage.delete(remoteFilePath); + localFilesStorage.remove(remoteFilePath); assertTrue(localFilesStorage.notExists(remoteFilePath)); } @@ -138,14 +129,13 @@ void testWriteReadPatchDelete(int size) throws IOException { @DisplayName("Files operations on non-existing file") void testNonExistingFileOperations() { assertThrows(IOException.class, () -> localFilesStorage.readAllLines(NON_EXISTING_PATH)); - assertThrows(IOException.class, () -> localFilesStorage.lines(NON_EXISTING_PATH)); - assertThrows(IOException.class, () -> { - try(var is = localFilesStorage.newInputStream(NON_EXISTING_PATH)){ - log.info("InputStream setup"); + assertThrows(S3ClientException.class, () -> { + try (var is = localFilesStorage.read(NON_EXISTING_PATH)) { + // should fail } }); - localFilesStorage.delete(NON_EXISTING_PATH); + localFilesStorage.remove(NON_EXISTING_PATH); assertFalse(localFilesStorage.exists(NON_EXISTING_PATH)); } diff --git a/src/test/java/org/folio/dew/repository/RemoteFilesStorageTest.java b/src/test/java/org/folio/dew/repository/RemoteFilesStorageTest.java index 616ed4475..576a94635 100644 --- a/src/test/java/org/folio/dew/repository/RemoteFilesStorageTest.java +++ b/src/test/java/org/folio/dew/repository/RemoteFilesStorageTest.java @@ -28,7 +28,7 @@ void testContainsFile() { var uploadedPath = remoteFilesStorage.write(path, content); - assertEquals("remote/directory/data.csv", uploadedPath); + assertEquals("directory/data.csv", uploadedPath); assertTrue(remoteFilesStorage.containsFile(path)); assertTrue(remoteFilesStorage.containsFile(uploadedPath)); } From 373247bdbc92784233942f99aa1d070e8758e9c4 Mon Sep 17 00:00:00 2001 From: Kriti Jain Date: Wed, 29 Oct 2025 13:55:21 -0500 Subject: [PATCH 2/2] re --- .../dew/repository/AbstractFilesStorage.java | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/folio/dew/repository/AbstractFilesStorage.java b/src/main/java/org/folio/dew/repository/AbstractFilesStorage.java index 9fc7f02bb..934fc50a7 100644 --- a/src/main/java/org/folio/dew/repository/AbstractFilesStorage.java +++ b/src/main/java/org/folio/dew/repository/AbstractFilesStorage.java @@ -13,6 +13,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.io.SequenceInputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -45,13 +46,23 @@ public String upload(String path, String filename) { } @Override - public String append(String path, InputStream is) { - return client.append(sanitizePath(path), is); + public String append(String path, InputStream newData) { + try { + if (!exists(path)) { + return write(path, newData); + } + InputStream existingData = read(path); + try (SequenceInputStream combinedStream = new SequenceInputStream(existingData, newData)) { + return write(path, combinedStream); + } + } catch (IOException e) { + throw log.throwing(new S3ClientException("Error appending data to file: " + path, e)); + } } public void append(String path, byte[] bytes) { append(path, new ByteArrayInputStream(bytes)); - } +} @Override public String write(String path, InputStream is) {