diff --git a/pom.xml b/pom.xml
index d674781c5..4cd8c10c6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -47,11 +47,11 @@
${project.basedir}/src/main/resources/swagger.api/refresh-presigned-url.yaml
+ 3.0.0-SNAPSHOT
10.0.0-SNAPSHOT
4.1.1
9.0.0
1.0.0
- 8.5.15
6.2.1
4.5.0
3.7.3
@@ -67,7 +67,6 @@
2.27.2
2.9.1
1.17.6
- 2.29.47
1.3
5.2.0
@@ -105,6 +104,11 @@
folio-spring-cql
${folio-spring-cql.version}
+
+ org.folio
+ folio-s3-client
+ ${folio-s3-client.version}
+
com.fasterxml.jackson.datatype
jackson-datatype-jsr310
@@ -155,11 +159,6 @@
feign-jackson
${feign-jackson.version}
-
- io.minio
- minio
- ${minio.version}
-
org.apache.sshd
@@ -218,12 +217,6 @@
${marc4j.version}
-
- software.amazon.awssdk
- s3
- ${aws.sdk.version}
-
-
org.hibernate.validator
hibernate-validator
diff --git a/src/main/java/org/folio/dew/batch/AbstractStorageStreamWriter.java b/src/main/java/org/folio/dew/batch/AbstractStorageStreamWriter.java
index 368775b26..dcd3cd016 100644
--- a/src/main/java/org/folio/dew/batch/AbstractStorageStreamWriter.java
+++ b/src/main/java/org/folio/dew/batch/AbstractStorageStreamWriter.java
@@ -1,12 +1,12 @@
package org.folio.dew.batch;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.folio.dew.error.FileOperationException;
+import org.folio.dew.repository.AbstractFilesStorage;
import org.folio.dew.repository.LocalFilesStorage;
import org.folio.dew.repository.S3CompatibleResource;
-import org.folio.dew.repository.S3CompatibleStorage;
+import org.folio.s3.exception.S3ClientException;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor;
@@ -14,15 +14,10 @@
import org.springframework.batch.item.file.transform.LineAggregator;
import org.springframework.core.io.WritableResource;
-import java.io.IOException;
import java.nio.charset.StandardCharsets;
-import java.util.List;
-
-import static org.folio.dew.utils.Constants.LINE_SEPARATOR;
-import static org.folio.dew.utils.Constants.LINE_SEPARATOR_REPLACEMENT;
@Slf4j
-public class AbstractStorageStreamWriter implements ItemWriter {
+public class AbstractStorageStreamWriter implements ItemWriter {
private WritableResource resource;
private S storage;
@@ -52,7 +47,7 @@ public AbstractStorageStreamWriter(String tempOutputFilePath, String columnHeade
if (StringUtils.isNotBlank(columnHeaders)) {
try {
storage.write(tempOutputFilePath, (columnHeaders + '\n').getBytes(StandardCharsets.UTF_8));
- } catch (IOException e) {
+ } catch (S3ClientException e) {
throw new FileOperationException(e);
}
}
@@ -78,7 +73,7 @@ public LineAggregator getLineAggregator() {
return lineAggregator;
}
- public S3CompatibleStorage getStorage() {
+ public AbstractFilesStorage getStorage() {
return storage;
}
diff --git a/src/main/java/org/folio/dew/batch/CsvWriter.java b/src/main/java/org/folio/dew/batch/CsvWriter.java
index 12b6b2091..2e3d4a6ce 100644
--- a/src/main/java/org/folio/dew/batch/CsvWriter.java
+++ b/src/main/java/org/folio/dew/batch/CsvWriter.java
@@ -1,10 +1,11 @@
package org.folio.dew.batch;
+import org.folio.dew.repository.AbstractFilesStorage;
+
import lombok.extern.log4j.Log4j2;
-import org.folio.dew.repository.S3CompatibleStorage;
@Log4j2
-public class CsvWriter extends AbstractStorageStreamWriter {
+public class CsvWriter extends AbstractStorageStreamWriter {
public CsvWriter(String tempOutputFilePath, String columnHeaders, String[] extractedFieldNames, FieldProcessor fieldProcessor, R storage) {
super(tempOutputFilePath, columnHeaders, extractedFieldNames, fieldProcessor, storage);
diff --git a/src/main/java/org/folio/dew/batch/JobCompletionNotificationListener.java b/src/main/java/org/folio/dew/batch/JobCompletionNotificationListener.java
index 0a5ce7076..35da6e5ba 100644
--- a/src/main/java/org/folio/dew/batch/JobCompletionNotificationListener.java
+++ b/src/main/java/org/folio/dew/batch/JobCompletionNotificationListener.java
@@ -83,13 +83,13 @@ private void processJobAfter(String jobId, JobParameters jobParameters) {
if (StringUtils.isBlank(path) || StringUtils.isBlank(fileNameStart)) {
return;
}
- var files = localFilesStorage.walk(path)
+ var files = localFilesStorage.listRecursive(path).stream()
.filter(name -> FilenameUtils.getName(name).startsWith(fileNameStart)).collect(Collectors.toList());
if (files.isEmpty()) {
return;
}
for (String f : files) {
- localFilesStorage.delete(f);
+ localFilesStorage.removeRecursive(f);
}
log.info("Deleted temp files {} of job {}.", files, jobId);
}
diff --git a/src/main/java/org/folio/dew/batch/marc/DataExportCsvItemReader.java b/src/main/java/org/folio/dew/batch/marc/DataExportCsvItemReader.java
index 26f823ed2..c009cf8d6 100644
--- a/src/main/java/org/folio/dew/batch/marc/DataExportCsvItemReader.java
+++ b/src/main/java/org/folio/dew/batch/marc/DataExportCsvItemReader.java
@@ -25,13 +25,10 @@ public DataExportCsvItemReader(String fileName, Long offset, Long limit, LocalFi
@Override
protected List getItems(int offset, int limit) {
try {
- try (var lines = localFilesStorage.lines(fileName)) {
- return lines
- .skip(offset)
- .limit(limit)
- .map(ItemIdentifier::new)
- .collect(Collectors.toList());
- }
+ return localFilesStorage.readLines(fileName, offset, limit)
+ .stream()
+ .map(ItemIdentifier::new)
+ .collect(Collectors.toList());
} catch (Exception e) {
throw new FileOperationException(e.getMessage());
}
diff --git a/src/main/java/org/folio/dew/batch/marc/DataExportCsvPartitioner.java b/src/main/java/org/folio/dew/batch/marc/DataExportCsvPartitioner.java
index a903bf0c3..b192f67a4 100644
--- a/src/main/java/org/folio/dew/batch/marc/DataExportCsvPartitioner.java
+++ b/src/main/java/org/folio/dew/batch/marc/DataExportCsvPartitioner.java
@@ -3,6 +3,9 @@
import lombok.extern.log4j.Log4j2;
import org.folio.dew.batch.CsvPartitioner;
import org.folio.dew.repository.LocalFilesStorage;
+import org.folio.s3.exception.S3ClientException;
+
+import java.io.IOException;
@Log4j2
public class DataExportCsvPartitioner extends CsvPartitioner {
@@ -19,9 +22,9 @@ public DataExportCsvPartitioner(Long offset, Long limit, String tempOutputFilePa
@Override
protected Long getLimit() {
- try (var lines = localFilesStorage.lines(fileName)) {
- return lines.count();
- } catch (Exception e) {
+ try {
+ return localFilesStorage.numLines(fileName);
+ } catch (IOException | S3ClientException e) {
log.error("Error reading file {}, reason: {}", fileName, e.getMessage());
return 0L;
}
diff --git a/src/main/java/org/folio/dew/config/properties/MinioClientProperties.java b/src/main/java/org/folio/dew/config/properties/MinioClientProperties.java
index 574a36061..57650fc92 100644
--- a/src/main/java/org/folio/dew/config/properties/MinioClientProperties.java
+++ b/src/main/java/org/folio/dew/config/properties/MinioClientProperties.java
@@ -1,6 +1,7 @@
package org.folio.dew.config.properties;
import lombok.Data;
+import org.folio.s3.client.S3ClientProperties;
@Data
public class MinioClientProperties {
@@ -50,4 +51,18 @@ public class MinioClientProperties {
* Presigned url expiration time (in seconds).
*/
private int urlExpirationTimeInSeconds;
+
+ public S3ClientProperties toS3ClientProperties() {
+ return S3ClientProperties
+ .builder()
+ .endpoint(endpoint)
+ .region(region)
+ .bucket(bucket)
+ .accessKey(accessKey)
+ .secretKey(secretKey)
+ .awsSdk(composeWithAwsSdk)
+ .forcePathStyle(forcePathStyle)
+ .subPath(subPath)
+ .build();
+ }
}
diff --git a/src/main/java/org/folio/dew/repository/AbstractFilesStorage.java b/src/main/java/org/folio/dew/repository/AbstractFilesStorage.java
new file mode 100644
index 000000000..934fc50a7
--- /dev/null
+++ b/src/main/java/org/folio/dew/repository/AbstractFilesStorage.java
@@ -0,0 +1,332 @@
+package org.folio.dew.repository;
+
+import org.apache.commons.lang3.StringUtils;
+import org.folio.dew.config.properties.MinioClientProperties;
+import org.folio.s3.client.FolioS3Client;
+import org.folio.s3.client.PutObjectAdditionalOptions;
+import org.folio.s3.client.RemoteStorageWriter;
+import org.folio.s3.client.S3ClientFactory;
+import org.folio.s3.exception.S3ClientException;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.SequenceInputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import io.minio.http.Method;
+import lombok.extern.log4j.Log4j2;
+
+import static org.folio.dew.utils.Constants.PATH_SEPARATOR;
+import static org.folio.dew.utils.Constants.TMP_PATH_PREFIX;
+
+/** Custom extension of FolioS3Client with additional utility methods */
+@Log4j2
+public abstract class AbstractFilesStorage implements FolioS3Client {
+
+ private final FolioS3Client client;
+
+ private final MinioClientProperties properties;
+
+ protected AbstractFilesStorage(MinioClientProperties properties) {
+ this.client = S3ClientFactory.getS3Client(properties.toS3ClientProperties());
+ this.properties = properties;
+
+ this.createBucketIfNotExists();
+ }
+
+ @Override
+ public String upload(String path, String filename) {
+ return client.upload(sanitizePath(path), filename);
+ }
+
+ @Override
+ public String append(String path, InputStream newData) {
+ try {
+ if (!exists(path)) {
+ return write(path, newData);
+ }
+ InputStream existingData = read(path);
+ try (SequenceInputStream combinedStream = new SequenceInputStream(existingData, newData)) {
+ return write(path, combinedStream);
+ }
+ } catch (IOException e) {
+ throw log.throwing(new S3ClientException("Error appending data to file: " + path, e));
+ }
+ }
+
+ public void append(String path, byte[] bytes) {
+ append(path, new ByteArrayInputStream(bytes));
+}
+
+ @Override
+ public String write(String path, InputStream is) {
+ return client.write(sanitizePath(path), is);
+ }
+
+ public String write(String path, byte[] bytes) {
+ return write(path, new ByteArrayInputStream(bytes));
+ }
+
+ @Override
+ public String write(String path, InputStream is, long size) {
+ return client.write(sanitizePath(path), is, size);
+ }
+
+ @Override
+ public String write(String path, InputStream is, long size, PutObjectAdditionalOptions extraOptions) {
+ return client.write(sanitizePath(path), is, size, extraOptions);
+ }
+
+ @Override
+ public String compose(String destination, List sourceKeys) {
+ return client.compose(sanitizePath(destination), sourceKeys.stream()
+ .map(this::sanitizePath)
+ .toList());
+ }
+
+ @Override
+ public String compose(String destination, List sourceKeys, PutObjectAdditionalOptions extraOptions) {
+ return client.compose(sanitizePath(destination), sourceKeys.stream()
+ .map(this::sanitizePath)
+ .toList(), extraOptions);
+ }
+
+ @Override
+ public String remove(String path) {
+ return client.remove(sanitizePath(path));
+ }
+
+ @Override
+ public List remove(String... paths) {
+ return client.remove(Arrays.stream(paths)
+ .map(this::sanitizePath)
+ .toArray(String[]::new));
+ }
+
+ @Override
+ public InputStream read(String path) {
+ return client.read(sanitizePath(path));
+ }
+
+ @Override
+ public List list(String path) {
+ return client.list(sanitizePath(path));
+ }
+
+ @Override
+ public List list(String path, int maxKeys, String startAfter) {
+ return client.list(sanitizePath(path), maxKeys, startAfter);
+ }
+
+ @Override
+ public List listRecursive(String path) {
+ return client.listRecursive(sanitizePath(path));
+ }
+
+ @Override
+ public long getSize(String path) {
+ return client.getSize(sanitizePath(path));
+ }
+
+ @Override
+ public RemoteStorageWriter getRemoteStorageWriter(String path, int size) {
+ return client.getRemoteStorageWriter(sanitizePath(path), size);
+ }
+
+ public RemoteStorageWriter writer(String path) {
+ return getRemoteStorageWriter(path, 1024);
+ }
+
+ @Override
+ public String getPresignedUrl(String path) {
+ return getPresignedUrl(path, Method.GET);
+ }
+
+ @Override
+ public String getPresignedUrl(String path, Method method) {
+ return getPresignedUrl(path, method, properties.getUrlExpirationTimeInSeconds(), TimeUnit.SECONDS);
+ }
+
+ @Override
+ public String getPresignedUrl(String path, Method method, int expiryTime, TimeUnit expiryUnit) {
+ return client.getPresignedUrl(sanitizePath(path), method, expiryTime, expiryUnit);
+ }
+
+ @Override
+ public void createBucketIfNotExists() {
+ client.createBucketIfNotExists();
+ }
+
+ @Override
+ public String initiateMultipartUpload(String path) {
+ return client.initiateMultipartUpload(sanitizePath(path));
+ }
+
+ @Override
+ public String getPresignedMultipartUploadUrl(String path, String uploadId, int partNumber) {
+ return client.getPresignedMultipartUploadUrl(sanitizePath(path), uploadId, partNumber);
+ }
+
+ @Override
+ public String uploadMultipartPart(String path, String uploadId, int partNumber, String filename) {
+ return client.uploadMultipartPart(sanitizePath(path), uploadId, partNumber, filename);
+ }
+
+ @Override
+ public void abortMultipartUpload(String path, String uploadId) {
+ client.abortMultipartUpload(sanitizePath(path), uploadId);
+ }
+
+ @Override
+ public void completeMultipartUpload(String path, String uploadId, List partETags) {
+ client.completeMultipartUpload(sanitizePath(path), uploadId, partETags);
+ }
+
+ /**
+ * Verifies if file exists on storage
+ *
+ * @param path - the path to the file on S3-compatible storage
+ * @return true if file exists, otherwise - false
+ */
+ public boolean exists(String path) {
+ return !list(path, 1, null).isEmpty();
+ }
+
+ /**
+ * Verifies if file doesn't exist on S3-compatible storage
+ *
+ * @param path - the path to the file
+ * @return true if file doesn't exist, otherwise - false
+ */
+ public boolean notExists(String path) {
+ return !exists(path);
+ }
+
+ /**
+ * Reads all the bytes from a file
+ *
+ * @param path - the path to the file on S3-compatible storage
+ * @return a byte array containing the bytes read from the file
+ * @throws IOException - if an I/O error occurs reading from the file
+ */
+ public byte[] readAllBytes(String path) throws IOException {
+ try (InputStream is = read(path)) {
+ return is.readAllBytes();
+ } catch (S3ClientException e) {
+ throw log.throwing(new IOException("Error reading file with path: " + path, e));
+ }
+ }
+
+ /**
+ * Read all lines from a file as a {@code List}
+ *
+ * @param path - the path to the file on S3-compatible storage
+ * @return the lines from the file as a {@code List}
+ * @throws IOException - if an I/O error occurs reading from the file
+ */
+ public List readAllLines(String path) throws IOException {
+ try (InputStream is = read(path);
+ InputStreamReader isr = new InputStreamReader(is);
+ BufferedReader br = new BufferedReader(isr)) {
+ return br.lines()
+ .toList();
+ } catch (S3ClientException e) {
+ throw log.throwing(new IOException("Error reading file: " + path, e));
+ }
+ }
+
+ /**
+ * Read some lines from a file as a {@code List}
+ *
+ * @param path - the path to the file on S3-compatible storage
+ * @param skip - the number of lines to skip from start of file
+ * @param limit - the number of lines to read after skipping
+ * @return the lines from the file as a {@code List}
+ * @throws IOException - if an I/O error occurs reading from the file
+ */
+ public List readLines(String path, int skip, int limit) throws IOException {
+ try (InputStream is = read(path);
+ InputStreamReader isr = new InputStreamReader(is);
+ BufferedReader br = new BufferedReader(isr)) {
+ return br.lines()
+ .skip(skip)
+ .limit(limit)
+ .toList();
+ } catch (S3ClientException e) {
+ throw log.throwing(new IOException("Error reading file: " + path, e));
+ }
+ }
+
+ /**
+ * Count the lines in a file
+ *
+ * @param path - the path to the file on S3-compatible storage
+ * @return the number of lines in the file
+ * @throws IOException - if an I/O error occurs reading from the file
+ */
+ public long numLines(String path) throws IOException {
+ try (InputStream is = read(path);
+ InputStreamReader isr = new InputStreamReader(is);
+ BufferedReader br = new BufferedReader(isr)) {
+ return br.lines()
+ .count();
+ } catch (S3ClientException e) {
+ throw log.throwing(new IOException("Error reading file: " + path, e));
+ }
+ }
+
+ /**
+ * Read a number of non-blank lines from a file as a {@code Stream}
+ *
+ * @param path - the path to the file on S3-compatible storage
+ * @param num - the num of lines to read from start of file
+ * @return the lines from the file as a {@code Stream}
+ * @throws IOException - if an I/O error occurs reading from the file
+ */
+ public List linesNumber(String path, int num) throws IOException {
+ try (InputStream is = read(path);
+ InputStreamReader isr = new InputStreamReader(is);
+ BufferedReader reader = new BufferedReader(isr)) {
+ List list = new ArrayList<>();
+ for (int i = 0; i < num; i++) {
+ var line = reader.readLine();
+ if (StringUtils.isNotBlank(line)) {
+ list.add(line);
+ }
+ }
+ return list;
+ }
+ }
+
+ public void removeRecursive(String path) {
+ try {
+ remove(listRecursive(path).toArray(s -> new String[s]));
+ } catch (S3ClientException e) {
+ log.error("Cannot delete file: {}", path, e);
+ }
+ }
+
+ /**
+ * Handle leading slashes in a provided path, if there is one. For legacy reasons, some paths for
+ * temporary local files may start with a slash, as they use filesystem-based naming. To deal with
+ * this, we append {@code TMP_PATH_PREFIX} to the start of such paths. These paths should never
+ * appear in completed job result filenames.
+ *
+ * @param path
+ * @return
+ */
+ protected String sanitizePath(String path) {
+ if (path.startsWith(PATH_SEPARATOR)) {
+ String result = (TMP_PATH_PREFIX + path).replaceAll(PATH_SEPARATOR + "+", PATH_SEPARATOR);
+ log.info("Prefixed path '{}' for S3 operations, converted to '{}'", path, result);
+ return result;
+ }
+ return path;
+ }
+}
diff --git a/src/main/java/org/folio/dew/repository/BaseFilesStorage.java b/src/main/java/org/folio/dew/repository/BaseFilesStorage.java
deleted file mode 100644
index 18b2fa899..000000000
--- a/src/main/java/org/folio/dew/repository/BaseFilesStorage.java
+++ /dev/null
@@ -1,570 +0,0 @@
-package org.folio.dew.repository;
-
-import io.minio.BucketExistsArgs;
-import io.minio.ComposeObjectArgs;
-import io.minio.ComposeSource;
-import io.minio.GetObjectArgs;
-import io.minio.ListObjectsArgs;
-import io.minio.MakeBucketArgs;
-import io.minio.MinioClient;
-import io.minio.PutObjectArgs;
-import io.minio.RemoveObjectArgs;
-import io.minio.StatObjectArgs;
-import io.minio.UploadObjectArgs;
-import io.minio.credentials.IamAwsProvider;
-import io.minio.credentials.Provider;
-import io.minio.credentials.StaticProvider;
-
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import lombok.extern.log4j.Log4j2;
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.folio.dew.config.properties.MinioClientProperties;
-import org.folio.dew.error.FileOperationException;
-import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
-import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
-import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
-import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
-import software.amazon.awssdk.core.sync.RequestBody;
-import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.s3.S3Client;
-import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
-import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
-import software.amazon.awssdk.services.s3.model.CompletedPart;
-import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
-import software.amazon.awssdk.services.s3.model.PutObjectRequest;
-import software.amazon.awssdk.services.s3.model.UploadPartCopyRequest;
-import software.amazon.awssdk.services.s3.model.UploadPartRequest;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.net.URI;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import java.util.stream.StreamSupport;
-
-import static io.minio.ObjectWriteArgs.MIN_MULTIPART_SIZE;
-import static java.lang.String.format;
-import static org.folio.dew.utils.Constants.PATH_SEPARATOR;
-
-@Log4j2
-public class BaseFilesStorage implements S3CompatibleStorage {
-
- private static final String SET_VALUE = "";
- private static final String NOT_SET_VALUE = "";
- private final MinioClient client;
- private S3Client s3Client;
- private final String bucket;
- private final String region;
- private final String subPath;
-
- private final boolean isComposeWithAwsSdk;
-
- public BaseFilesStorage(MinioClientProperties properties) {
- final String accessKey = properties.getAccessKey();
- final String endpoint = properties.getEndpoint();
- final String regionName = properties.getRegion();
- final String bucketName = properties.getBucket();
- final String secretKey = properties.getSecretKey();
- subPath = properties.getSubPath();
- isComposeWithAwsSdk = properties.isComposeWithAwsSdk();
- final boolean isForcePathStyle = properties.isForcePathStyle();
- log.info("Creating MinIO client endpoint {},region {},bucket {},accessKey {},secretKey {}, subPath {}, isComposedWithAwsSdk {}.", endpoint, regionName, bucketName,
- StringUtils.isNotBlank(accessKey) ? SET_VALUE : NOT_SET_VALUE, StringUtils.isNotBlank(secretKey) ? SET_VALUE : NOT_SET_VALUE,
- StringUtils.isNotBlank(subPath) ? SET_VALUE : NOT_SET_VALUE, isComposeWithAwsSdk);
-
- var builder = MinioClient.builder().endpoint(endpoint);
- if (StringUtils.isNotBlank(regionName)) {
- builder.region(regionName);
- }
-
- Provider provider;
- if (StringUtils.isNotBlank(accessKey) && StringUtils.isNotBlank(secretKey)) {
- provider = new StaticProvider(accessKey, secretKey, null);
- } else {
- provider = new IamAwsProvider(null, null);
- }
- log.info("{} MinIO credentials provider created.", provider.getClass().getSimpleName());
- builder.credentialsProvider(provider);
-
- client = builder.build();
-
- this.bucket = bucketName;
- this.region = regionName;
-
- createBucketIfNotExists();
-
- if (isComposeWithAwsSdk) {
- AwsCredentialsProvider credentialsProvider;
-
- if (StringUtils.isNotBlank(accessKey) && StringUtils.isNotBlank(secretKey)) {
- var awsCredentials = AwsBasicCredentials.create(accessKey, secretKey);
- credentialsProvider = StaticCredentialsProvider.create(awsCredentials);
- } else {
- credentialsProvider = DefaultCredentialsProvider.create();
- }
-
- s3Client = S3Client.builder()
- .forcePathStyle(isForcePathStyle)
- .endpointOverride(URI.create(endpoint))
- .region(Region.of(regionName))
- .credentialsProvider(credentialsProvider)
- .build();
- }
-
- }
-
- public MinioClient getMinioClient() {
- return client;
- }
-
- public void createBucketIfNotExists() {
- try {
- if (StringUtils.isNotBlank(bucket) && !client.bucketExists(BucketExistsArgs.builder().bucket(bucket).region(region).build())) {
- client.makeBucket(MakeBucketArgs.builder()
- .bucket(bucket)
- .region(region)
- .build());
- log.info("Created {} bucket.", bucket);
- } else {
- log.info("Bucket has already exist.");
- }
- } catch(Exception e) {
- log.error("Error creating bucket: " + bucket, e);
- }
- }
-
- /**
- * Upload file on S3-compatible storage
- *
- * @param path - the path to the file on S3-compatible storage
- * @param filename – path to uploaded file
- * @return the path to the file
- * @throws IOException - if an I/O error occurs
- */
- public String upload(String path, String filename) throws IOException {
- path = getS3Path(path);
- try {
- return client.uploadObject(UploadObjectArgs.builder()
- .bucket(bucket)
- .region(region)
- .object(path)
- .filename(filename)
- .build())
- .object();
- } catch (Exception e) {
- throw new IOException("Cannot upload file: " + path, e);
- }
- }
-
- /**
- * Writes bytes to a file on S3-compatible storage
- *
- * @param path - the path to the file on S3-compatible storage
- * @param bytes – the byte array with the bytes to write
- * @param headers - headers
- * @return the path to the file
- * @throws IOException - if an I/O error occurs
- */
- public String write(String path, byte[] bytes, Map headers) throws IOException {
- path = getS3Path(path);
- if (isComposeWithAwsSdk) {
- log.info("Writing with using AWS SDK client");
- s3Client.putObject(PutObjectRequest.builder().bucket(bucket)
- .key(path).build(),
- RequestBody.fromBytes(bytes));
- return path;
- } else {
- log.info("Writing with using Minio client");
- try(var is = new ByteArrayInputStream(bytes)) {
- return client.putObject(PutObjectArgs.builder()
- .bucket(bucket)
- .region(region)
- .object(path)
- .headers(headers)
- .stream(is, -1, MIN_MULTIPART_SIZE)
- .build())
- .object();
- } catch (Exception e) {
- throw new IOException("Cannot write file: " + path, e);
- }
-
- }
- }
-
- public String write(String path, byte[] bytes) throws IOException {
- return write(path, bytes, new HashMap<>());
- }
-
- /**
- * Writes file to a file on S3-compatible storage
- *
- * @param path - the path to the file on S3-compatible storage
- * @param inputPath – path to the file to write
- * @param headers - headers
- * @return the path to the file
- * @throws IOException - if an I/O error occurs
- */
- public String writeFile(String path, Path inputPath, Map headers) throws IOException {
- path = getS3Path(path);
- if (isComposeWithAwsSdk) {
- log.info("Writing file using AWS SDK client");
- s3Client.putObject(PutObjectRequest.builder().bucket(bucket)
- .key(path).build(),
- RequestBody.fromFile(inputPath));
- return path;
- } else {
- log.info("Writing file using Minio client");
- try (var is = Files.newInputStream(inputPath)) {
- return client.putObject(PutObjectArgs.builder()
- .bucket(bucket)
- .region(region)
- .object(path)
- .headers(headers)
- .stream(is, -1, MIN_MULTIPART_SIZE)
- .build())
- .object();
- } catch (Exception e) {
- throw new IOException("Cannot write file: " + path, e);
- }
-
- }
- }
-
- public String writeFile(String destPath, Path inputPath) throws IOException {
- return writeFile(destPath, inputPath, new HashMap<>());
- }
-
- /**
- * Appends byte[] to existing on the storage file.
- *
- * @param path - the path to the file on S3-compatible storage
- * @param bytes - the byte array with the bytes to write
- * @throws IOException if an I/O error occurs
- */
- public void append(String path, byte[] bytes) throws IOException {
- path = getS3Path(path);
- try {
- if (notExists(path)) {
- log.info("Appending non-existing file");
- write(path, bytes);
- } else {
- var size = client.statObject(StatObjectArgs.builder()
- .bucket(bucket)
- .region(region)
- .object(path).build()).size();
-
- log.info("Appending to {} with size {}", path, size);
- if (size > MIN_MULTIPART_SIZE) {
-
- if (isComposeWithAwsSdk) {
-
- var createMultipartUploadRequest = CreateMultipartUploadRequest.builder()
- .bucket(bucket)
- .key(path)
- .build();
-
- var uploadId = s3Client.createMultipartUpload(createMultipartUploadRequest).uploadId();
-
- var uploadPartRequest1 = UploadPartCopyRequest.builder()
- .sourceBucket(bucket)
- .sourceKey(path)
- .uploadId(uploadId)
- .destinationBucket(bucket)
- .destinationKey(path)
- .partNumber(1).build();
-
- var uploadPartRequest2 = UploadPartRequest.builder()
- .bucket(bucket)
- .key(path)
- .uploadId(uploadId)
- .partNumber(2).build();
-
- var originalEtag = s3Client.uploadPartCopy(uploadPartRequest1).copyPartResult().eTag();
- var appendedEtag = s3Client.uploadPart(uploadPartRequest2, RequestBody.fromBytes(bytes)).eTag();
-
- var original = CompletedPart.builder()
- .partNumber(1)
- .eTag(originalEtag).build();
- var appended = CompletedPart.builder()
- .partNumber(2)
- .eTag(appendedEtag).build();
-
- var completedMultipartUpload = CompletedMultipartUpload.builder()
- .parts(original, appended)
- .build();
-
- var completeMultipartUploadRequest =
- CompleteMultipartUploadRequest.builder()
- .bucket(bucket)
- .key(path)
- .uploadId(uploadId)
- .multipartUpload(completedMultipartUpload)
- .build();
-
- s3Client.completeMultipartUpload(completeMultipartUploadRequest);
-
- } else {
-
- var temporaryFileName = path + "_temp";
- write(temporaryFileName, bytes);
-
- client.composeObject(ComposeObjectArgs.builder()
- .bucket(bucket)
- .region(region)
- .object(path)
- .sources(List.of(ComposeSource.builder()
- .bucket(bucket)
- .region(region)
- .object(path)
- .build(),
- ComposeSource.builder()
- .bucket(bucket)
- .region(region)
- .object(temporaryFileName)
- .build()))
- .build());
-
- delete(temporaryFileName);
-
- }
-
- } else {
- write(path, ArrayUtils.addAll(readAllBytes(path), bytes));
- }
- }
- } catch (Exception e) {
- throw new IOException("Cannot append data for path: " + path, e);
- }
- }
-
- /**
- * Deletes a file
- *
- * @param path - the path to the file to delete
- * @throws FileOperationException if an I/O error occurs
- */
- public void delete(String path) {
- path = getS3Path(path);
- try {
- var paths = walk(path).collect(Collectors.toList());
-
- paths.forEach(p -> {
- try {
- client.removeObject(RemoveObjectArgs.builder()
- .bucket(bucket)
- .region(region)
- .object(p)
- .build());
- } catch (Exception e) {
- log.error(format("Cannot delete file: %s", p), e.getMessage());
- }
- });
- } catch (Exception e) {
- throw new FileOperationException("Cannot delete file: " + path, e);
- }
- }
-
- /**
- * Return a {@code Stream} that is lazily populated with {@code
- * Path} by walking the file tree rooted at a given starting file.
- *
- * @param path - the path to the folder
- * @return the {@link Stream} of the nested files
- * @throws FileOperationException if an I/O error occurs
- */
- public Stream walk(String path) {
- return getInternalStructure(getS3Path(path), true);
- }
-
- /**
- * Verifies if file exists on storage
- *
- * @param path - the path to the file on S3-compatible storage
- * @return true if file exists, otherwise - false
- */
- public boolean exists(String path) {
- path = getS3Path(path);
- var iterator = client.listObjects(ListObjectsArgs.builder()
- .bucket(bucket)
- .region(region)
- .prefix(path)
- .maxKeys(1)
- .build())
- .iterator();
- try {
- return iterator.hasNext() && Objects.nonNull(iterator.next().get());
- } catch (Exception e) {
- log.error("Error file existing verification, path: " + path, e);
- return false;
- }
- }
-
- /**
- * Verifies if file doesn't exist on S3-compatible storage
- * @param path - the path to the file
- * @return true if file doesn't exist, otherwise - false
- */
- public boolean notExists(String path) {
- return !exists(path);
- }
-
- /**
- * Opens a file, returning an input stream to read from the file
- *
- * @param path - the path to the file on S3-compatible storage
- * @return a new input stream
- * @throws IOException - if an I/O error occurs reading from the file
- */
- public InputStream newInputStream(String path) throws IOException {
- path = getS3Path(path);
- try {
- return client.getObject(GetObjectArgs.builder()
- .bucket(bucket)
- .region(region)
- .object(path)
- .build());
- } catch (Exception e) {
- throw new IOException("Error creating input stream for path: " + path, e);
- }
- }
-
- /**
- * Reads all the bytes from a file
- *
- * @param path - the path to the file on S3-compatible storage
- * @return a byte array containing the bytes read from the file
- * @throws IOException - if an I/O error occurs reading from the file
- */
- public byte[] readAllBytes(String path) throws IOException {
- try (var is = newInputStream(path)) {
- return is.readAllBytes();
- } catch (Exception e) {
- throw new IOException("Error reading file with path: " + path, e);
- }
- }
-
- /**
- * Read all lines from a file as a {@code Stream}
- *
- * @param path - the path to the file on S3-compatible storage
- * @return the lines from the file as a {@code Stream}
- * @throws IOException - if an I/O error occurs reading from the file
- */
- public Stream lines(String path) throws IOException {
- return new BufferedReader(new InputStreamReader(newInputStream(path))).lines();
- }
-
- /**
- * Read number lines from a file as a {@code Stream}
- *
- * @param path - the path to the file on S3-compatible storage
- * @param num - the num of lines to read from start of file
- * @return the lines from the file as a {@code Stream}
- * @throws IOException - if an I/O error occurs reading from the file
- */
- public List linesNumber(String path, int num) throws IOException {
- try (var reader = new BufferedReader(new InputStreamReader(newInputStream(path)))) {
- List list = new ArrayList<>();
- for (int i = 0; i < num; i++) {
- var line = reader.readLine();
- if (StringUtils.isNotBlank(line)) {
- list.add(line);
- }
- }
- return list;
- }
- }
-
- /**
- * Read all lines from a file
- *
- * @param path - the path to the file on S3-compatible storage
- * @return
- the lines from the file as a List
- * @throws IOException - if an I/O error occurs reading from the file
- */
- public List readAllLines(String path) throws IOException {
- try (var lines = lines(path)) {
- return lines.collect(Collectors.toList());
- } catch(Exception e) {
- throw new IOException("Error reading file: " + path, e);
- }
- }
-
- public OutputStream newOutputStream(String path) {
-
-
- return new OutputStream() {
-
- byte[] buffer = new byte[0];
-
- @Override
- public void write(int b) {
- buffer = ArrayUtils.add(buffer, (byte) b);
- }
-
- @Override
- public void flush() {
-// throw new NotImplementedException("Method isn't implemented yet");
- }
-
- @Override
- public void close() {
- try {
- BaseFilesStorage.this.write(path, buffer);
- } catch (IOException e) {
- throw new FileOperationException("Error closing stream and writes bytes to path: " + path, e);
- } finally {
- buffer = new byte[0];
- }
- }
- };
- }
-
- public BufferedWriter writer(String path) {
- return new BufferedWriter(new OutputStreamWriter(newOutputStream(path)));
- }
-
- private Stream getInternalStructure(String path, boolean isRecursive) {
- try {
- return StreamSupport.stream(client.listObjects(ListObjectsArgs.builder()
- .bucket(bucket)
- .region(region)
- .prefix(path)
- .recursive(isRecursive)
- .build()).spliterator(), false).map(item -> {
- try {
- return item.get().objectName();
- } catch (Exception e) {
- throw new FileOperationException(e);
- }
- });
- } catch(Exception e) {
- log.error("Cannot read folder: " + path, e);
- return null;
- }
- }
-
- public String getS3Path(String path) {
- if (StringUtils.isBlank(subPath) || StringUtils.startsWith(path, subPath + PATH_SEPARATOR)) {
- return path;
- }
- if (path.startsWith(PATH_SEPARATOR)) {
- return subPath + path;
- }
- return subPath + PATH_SEPARATOR + path;
- }
-}
diff --git a/src/main/java/org/folio/dew/repository/LocalFilesStorage.java b/src/main/java/org/folio/dew/repository/LocalFilesStorage.java
index f935c5963..6ca2a39bc 100644
--- a/src/main/java/org/folio/dew/repository/LocalFilesStorage.java
+++ b/src/main/java/org/folio/dew/repository/LocalFilesStorage.java
@@ -9,7 +9,7 @@
*/
@Repository
@Log4j2
-public class LocalFilesStorage extends BaseFilesStorage{
+public class LocalFilesStorage extends AbstractFilesStorage{
public LocalFilesStorage(LocalFilesStorageProperties properties) {
super(properties);
}
diff --git a/src/main/java/org/folio/dew/repository/RemoteFilesStorage.java b/src/main/java/org/folio/dew/repository/RemoteFilesStorage.java
index 065546ae1..fb3fd7c0b 100644
--- a/src/main/java/org/folio/dew/repository/RemoteFilesStorage.java
+++ b/src/main/java/org/folio/dew/repository/RemoteFilesStorage.java
@@ -1,148 +1,86 @@
package org.folio.dew.repository;
-import io.minio.ComposeObjectArgs;
-import io.minio.ComposeSource;
-import io.minio.GetPresignedObjectUrlArgs;
-import io.minio.ListObjectsArgs;
-import io.minio.MinioClient;
-import io.minio.ObjectWriteArgs;
-import io.minio.RemoveObjectsArgs;
-import io.minio.Result;
-import io.minio.errors.ErrorResponseException;
-import io.minio.errors.InsufficientDataException;
-import io.minio.errors.InternalException;
-import io.minio.errors.InvalidResponseException;
-import io.minio.errors.ServerException;
-import io.minio.errors.XmlParserException;
-import io.minio.http.Method;
-import io.minio.messages.DeleteError;
-import io.minio.messages.DeleteObject;
-
-import java.io.IOException;
-import java.security.InvalidKeyException;
-import java.security.NoSuchAlgorithmException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-import io.minio.messages.Item;
-import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.StringUtils;
import org.folio.dew.config.properties.RemoteFilesStorageProperties;
+import org.folio.s3.client.PutObjectAdditionalOptions;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.HttpHeaders;
import org.springframework.stereotype.Repository;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+
+import lombok.extern.log4j.Log4j2;
@Repository
@Log4j2
-public class RemoteFilesStorage extends BaseFilesStorage {
+public class RemoteFilesStorage extends AbstractFilesStorage {
public static final String CONTENT_DISPOSITION_HEADER_WITHOUT_FILENAME = "attachment";
- public static final String CONTENT_DISPOSITION_HEADER_WITH_FILENAME = CONTENT_DISPOSITION_HEADER_WITHOUT_FILENAME + "; filename=\"%s\"";
+ public static final String CONTENT_DISPOSITION_HEADER_WITH_FILENAME = CONTENT_DISPOSITION_HEADER_WITHOUT_FILENAME
+ + "; filename=\"%s\"";
- private final MinioClient client;
@Autowired
private LocalFilesStorage localFilesStorage;
- private final String bucket;
- private final String region;
- private final int urlExpirationTimeInSeconds;
public RemoteFilesStorage(RemoteFilesStorageProperties properties) {
super(properties);
- this.bucket = properties.getBucket();
- this.region = properties.getRegion();
- this.urlExpirationTimeInSeconds = properties.getUrlExpirationTimeInSeconds();
- this.client = getMinioClient();
}
- public String uploadObject(String object, String filename, String downloadFilename, String contentType, boolean isSourceShouldBeDeleted)
- throws IOException {
+ public String uploadObject(String object, String filename, String downloadFilename, String contentType,
+ boolean isSourceShouldBeDeleted) throws IOException {
log.info("Uploading object {},filename {},downloadFilename {},contentType {}.", object, filename, downloadFilename,
- contentType);
+ contentType);
- var result = write(object, localFilesStorage.readAllBytes(filename), prepareHeaders(downloadFilename, contentType));
+ byte[] bytes = localFilesStorage.readAllBytes(filename);
+ var result = write(object, new ByteArrayInputStream(bytes), bytes.length,
+ prepareAdditionalOptions(downloadFilename, contentType));
if (isSourceShouldBeDeleted) {
- localFilesStorage.delete(filename);
+ localFilesStorage.remove(filename);
log.info("Deleted temp file {}.", filename);
}
return result;
}
- public boolean containsFile(String fileName)
- throws ServerException, InsufficientDataException, ErrorResponseException, IOException, NoSuchAlgorithmException,
- InvalidKeyException, InvalidResponseException, XmlParserException, InternalException {
- fileName = getS3Path(fileName);
- for (Result- itemResult : client.listObjects(ListObjectsArgs.builder().bucket(bucket).prefix(getS3Path(fileName)).build())) {
- if (fileName.equals(itemResult.get().objectName())) {
- return true;
- }
- }
- return false;
+ public boolean containsFile(String fileName) {
+ return this.exists(fileName);
}
- public String composeObject(String destObject, List sourceObjects, String downloadFilename,
- String contentType)
- throws IOException, InvalidKeyException, InvalidResponseException, InsufficientDataException, NoSuchAlgorithmException,
- ServerException, InternalException, XmlParserException, ErrorResponseException {
- destObject = getS3Path(destObject);
- List sources = sourceObjects.stream()
- .map(so -> ComposeSource.builder().bucket(bucket).object(getS3Path(so)).build())
- .collect(Collectors.toList());
- log.info("Composing object {},sources [{}],downloadFilename {},contentType {}.", destObject,
- sources.stream().map(s -> String.format("bucket %s,object %s", s.bucket(), s.object())).collect(Collectors.joining(",")),
- downloadFilename, contentType);
- var result = client.composeObject(
- createArgs(ComposeObjectArgs.builder().sources(sources), destObject, downloadFilename, contentType)).object();
+ public String composeObject(String destObject, List sourceObjects, String downloadFilename, String contentType) {
+ log.info("Composing object {} from {}", destObject, sourceObjects);
+ String result = compose(destObject, sourceObjects, prepareAdditionalOptions(downloadFilename, contentType));
removeObjects(sourceObjects);
return result;
}
- public Iterable> removeObjects(List objects) {
+ public void removeObjects(List objects) {
log.info("Deleting objects [{}].", StringUtils.join(objects, ","));
- return client.removeObjects(RemoveObjectsArgs.builder()
- .bucket(bucket)
- .objects(objects.stream().map(this::getS3Path).map(DeleteObject::new).toList())
- .build());
+ remove(objects.toArray(s -> new String[s]));
}
- public String objectToPresignedObjectUrl(String object)
- throws IOException, InvalidKeyException, InvalidResponseException, InsufficientDataException, NoSuchAlgorithmException,
- ServerException, InternalException, XmlParserException, ErrorResponseException {
- String result = client.getPresignedObjectUrl(GetPresignedObjectUrlArgs.builder()
- .method(Method.GET)
- .bucket(bucket)
- .object(getS3Path(object))
- .region(region)
- .expiry(urlExpirationTimeInSeconds, TimeUnit.SECONDS)
- .build());
+ public String objectToPresignedObjectUrl(String object) {
+ String result = getPresignedUrl(object);
log.info("Created presigned URL {}.", result);
return result;
}
- private > T createArgs(B builder, String object,
- String downloadFilename, String contentType) {
- Map headers = prepareHeaders(downloadFilename, contentType);
- return builder.headers(headers).object(object).bucket(bucket).build();
- }
+ private PutObjectAdditionalOptions prepareAdditionalOptions(String downloadFilename, String contentType) {
+ PutObjectAdditionalOptions.PutObjectAdditionalOptionsBuilder builder = PutObjectAdditionalOptions.builder();
- private Map prepareHeaders(String downloadFilename, String contentType) {
- Map headers = new HashMap<>(2);
if (StringUtils.isNotBlank(downloadFilename)) {
- headers.put(HttpHeaders.CONTENT_DISPOSITION, String.format(CONTENT_DISPOSITION_HEADER_WITH_FILENAME, downloadFilename));
+ builder = builder.contentDisposition(String.format(CONTENT_DISPOSITION_HEADER_WITH_FILENAME, downloadFilename));
} else {
- headers.put(HttpHeaders.CONTENT_DISPOSITION, CONTENT_DISPOSITION_HEADER_WITHOUT_FILENAME);
+ builder = builder.contentDisposition(CONTENT_DISPOSITION_HEADER_WITHOUT_FILENAME);
}
if (StringUtils.isNotBlank(contentType)) {
- headers.put(HttpHeaders.CONTENT_TYPE, contentType);
+ builder = builder.contentType(contentType);
}
- return headers;
+
+ return builder.build();
}
}
diff --git a/src/main/java/org/folio/dew/repository/S3CompatibleResource.java b/src/main/java/org/folio/dew/repository/S3CompatibleResource.java
index 1023610c2..56c4e506a 100644
--- a/src/main/java/org/folio/dew/repository/S3CompatibleResource.java
+++ b/src/main/java/org/folio/dew/repository/S3CompatibleResource.java
@@ -13,7 +13,7 @@
import java.net.URL;
@AllArgsConstructor
-public class S3CompatibleResource implements WritableResource {
+public class S3CompatibleResource implements WritableResource {
private String path;
private R storage;
@@ -78,6 +78,6 @@ public String getDescription() {
@Override
public InputStream getInputStream() throws IOException {
- return storage.newInputStream(path);
+ return storage.read(path);
}
}
diff --git a/src/main/java/org/folio/dew/repository/S3CompatibleStorage.java b/src/main/java/org/folio/dew/repository/S3CompatibleStorage.java
deleted file mode 100644
index 524006341..000000000
--- a/src/main/java/org/folio/dew/repository/S3CompatibleStorage.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package org.folio.dew.repository;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Map;
-
-public interface S3CompatibleStorage {
- String upload(String path, String filename) throws IOException;
- void append(String path, byte[] bytes) throws IOException;
- String write(String path, byte[] bytes) throws IOException;
- String write(String path, byte[] bytes, Map headers) throws IOException;
- boolean exists(String path);
- InputStream newInputStream(String path) throws IOException;
- byte[] readAllBytes(String path) throws IOException;
-}
diff --git a/src/main/java/org/folio/dew/utils/Constants.java b/src/main/java/org/folio/dew/utils/Constants.java
index b300d4248..1044668e6 100644
--- a/src/main/java/org/folio/dew/utils/Constants.java
+++ b/src/main/java/org/folio/dew/utils/Constants.java
@@ -10,6 +10,7 @@ public class Constants {
public static final String ROLLBACK_FILE = "rollBackFile";
public static final String PATH_SEPARATOR = "/";
+ public static final String TMP_PATH_PREFIX = "tmp"; //
public static final String MATCHED_RECORDS = "-Matched-Records-";
public static final String CHANGED_RECORDS = "-Changed-Records-";
public static final String UPDATED_PREFIX = "UPDATED-";
diff --git a/src/main/java/org/folio/dew/utils/CsvHelper.java b/src/main/java/org/folio/dew/utils/CsvHelper.java
index 84ef0eb28..5c97e49a8 100644
--- a/src/main/java/org/folio/dew/utils/CsvHelper.java
+++ b/src/main/java/org/folio/dew/utils/CsvHelper.java
@@ -12,7 +12,7 @@
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.StringUtils;
-import org.folio.dew.repository.BaseFilesStorage;
+import org.folio.dew.repository.AbstractFilesStorage;
import java.io.BufferedReader;
import java.io.IOException;
@@ -28,8 +28,8 @@
public class CsvHelper {
private static final int BATCH_SIZE = 1000;
- public static List readRecordsFromStorage(R storage, String fileName, Class clazz, boolean skipHeaders) throws IOException {
- try (var reader = new BufferedReader(new InputStreamReader(storage.newInputStream(fileName)))) {
+ public static List readRecordsFromStorage(R storage, String fileName, Class clazz, boolean skipHeaders) throws IOException {
+ try (var reader = new BufferedReader(new InputStreamReader(storage.read(fileName)))) {
return new CsvToBeanBuilder(reader)
.withType(clazz)
.withSkipLines(skipHeaders ? 1 : 0)
@@ -38,9 +38,9 @@ public static List readRecordsFromStorage(R s
}
}
- public static List readRecordsFromRemoteFilesStorage(R storage, String fileName, int limit, Class clazz)
+ public static List readRecordsFromRemoteFilesStorage(R storage, String fileName, int limit, Class clazz)
throws IOException {
- try (var reader = new BufferedReader(new InputStreamReader(storage.newInputStream(fileName)))) {
+ try (var reader = new BufferedReader(new InputStreamReader(storage.read(fileName)))) {
var linesString = reader.lines().skip(1).limit(limit).collect(Collectors.joining("\n"));
return new CsvToBeanBuilder(new StringReader(linesString))
.withType(clazz)
@@ -49,13 +49,13 @@ public static List readRecordsFromRemoteFiles
}
}
- public static void saveRecordsToStorage(R storage, List beans, Class clazz, String fileName)
+ public static void saveRecordsToStorage(R storage, List beans, Class clazz, String fileName)
throws CsvRequiredFieldEmptyException, CsvDataTypeMismatchException, IOException {
var strategy = new RecordColumnMappingStrategy();
strategy.setType(clazz);
if (storage.exists(fileName)) {
- storage.delete(fileName);
+ storage.remove(fileName);
}
if (beans.size() > BATCH_SIZE) {
@@ -86,10 +86,8 @@ public static void saveRecordsToStorage(R storag
}
}
- public static long countLines(R storage, String path) throws IOException {
- try (var lines = storage.lines(path)) {
- return lines.count();
- }
+ public static long countLines(R storage, String path) throws IOException {
+ return storage.numLines(path);
}
diff --git a/src/test/java/org/folio/dew/AuthorityControlConsortiumTest.java b/src/test/java/org/folio/dew/AuthorityControlConsortiumTest.java
index 03fa1184b..302529b6c 100644
--- a/src/test/java/org/folio/dew/AuthorityControlConsortiumTest.java
+++ b/src/test/java/org/folio/dew/AuthorityControlConsortiumTest.java
@@ -43,7 +43,7 @@ class AuthorityControlConsortiumTest extends BaseBatchTest {
private static final String EXPECTED_AUTH_HEADING_UPDATE_OUTPUT =
"src/test/resources/output/authority_control/auth_heading_update_consortium.csv";
private static final String EXPECTED_S3_FILE_PATH =
- "remote/mod-data-export-worker/authority_control_export/consortium/";
+ "mod-data-export-worker/authority_control_export/consortium/";
@Autowired
private Job getAuthHeadingJob;
@Autowired
diff --git a/src/test/java/org/folio/dew/AuthorityControlTest.java b/src/test/java/org/folio/dew/AuthorityControlTest.java
index a2efe5646..02ba88bb3 100644
--- a/src/test/java/org/folio/dew/AuthorityControlTest.java
+++ b/src/test/java/org/folio/dew/AuthorityControlTest.java
@@ -52,7 +52,7 @@ class AuthorityControlTest extends BaseBatchTest {
"src/test/resources/output/authority_control/auth_heading_update.csv";
private static final String EXPECTED_AUTH_HEADING_UPDATE_EMPTY_OUTPUT =
"src/test/resources/output/authority_control/auth_heading_update_empty.csv";
- private static final String EXPECTED_S3_FILE_PATH = "remote/mod-data-export-worker/authority_control_export/diku/";
+ private static final String EXPECTED_S3_FILE_PATH = "mod-data-export-worker/authority_control_export/diku/";
@Autowired
private Job getAuthHeadingJob;
@Autowired
diff --git a/src/test/java/org/folio/dew/CirculationLogTest.java b/src/test/java/org/folio/dew/CirculationLogTest.java
index 991f3c991..faa021ea9 100644
--- a/src/test/java/org/folio/dew/CirculationLogTest.java
+++ b/src/test/java/org/folio/dew/CirculationLogTest.java
@@ -83,7 +83,7 @@ private void verifyFileOutput(JobExecution jobExecution) throws Exception {
final ExecutionContext executionContext = jobExecution.getExecutionContext();
final String fileInStorage = (String) executionContext.get("outputFilesInStorage");
final String fileName = executionContext.getString(CIRCULATION_LOG_FILE_NAME);
- final String expectedNameInStorage = "remote/" + fileName;
+ final String expectedNameInStorage = fileName;
final FileSystemResource actualChargeFeesFinesOutput = actualFileOutput(fileInStorage);
FileSystemResource expectedCharges = new FileSystemResource(EXPECTED_CIRCULATION_OUTPUT);
diff --git a/src/test/java/org/folio/dew/EHoldingsTest.java b/src/test/java/org/folio/dew/EHoldingsTest.java
index 671101195..221d76cc3 100644
--- a/src/test/java/org/folio/dew/EHoldingsTest.java
+++ b/src/test/java/org/folio/dew/EHoldingsTest.java
@@ -87,7 +87,7 @@ static void beforeAll() {
"src/test/resources/output/eholdings_package_export_with_3_titles.csv";
private final static String EXPECTED_PACKAGE_WITH_SAME_TITLE_NAMES_OUTPUT =
"src/test/resources/output/eholdings_package_export_with_same_title_names.csv";
- private static final String FILE_PATH = "remote/mod-data-export-worker/e_holdings_export/diku/";
+ private static final String FILE_PATH = "mod-data-export-worker/e_holdings_export/diku/";
@Test
@DisplayName("Run EHoldingsJob export resource without provider load successfully")
diff --git a/src/test/java/org/folio/dew/batch/acquisitions/mapper/EdifactMapperTest.java b/src/test/java/org/folio/dew/batch/acquisitions/mapper/EdifactMapperTest.java
index 009f29570..f5713d18d 100644
--- a/src/test/java/org/folio/dew/batch/acquisitions/mapper/EdifactMapperTest.java
+++ b/src/test/java/org/folio/dew/batch/acquisitions/mapper/EdifactMapperTest.java
@@ -35,7 +35,6 @@
import org.folio.dew.config.JacksonConfiguration;
import org.folio.dew.domain.dto.CompositePurchaseOrder;
import org.folio.dew.domain.dto.ExportType;
-import org.folio.dew.domain.dto.Location;
import org.folio.dew.domain.dto.Piece;
import org.folio.dew.domain.dto.PieceCollection;
import org.folio.dew.domain.dto.VendorEdiOrdersExportConfig;
diff --git a/src/test/java/org/folio/dew/batch/authoritycontrol/AuthorityControlCsvFileWriterTest.java b/src/test/java/org/folio/dew/batch/authoritycontrol/AuthorityControlCsvFileWriterTest.java
index 641f84dac..a58a20d0a 100644
--- a/src/test/java/org/folio/dew/batch/authoritycontrol/AuthorityControlCsvFileWriterTest.java
+++ b/src/test/java/org/folio/dew/batch/authoritycontrol/AuthorityControlCsvFileWriterTest.java
@@ -36,7 +36,7 @@ class AuthorityControlCsvFileWriterTest {
void setUp() {
authorityControlCsvFileWriter = new AuthorityControlCsvFileWriter(AuthUpdateHeadingExportFormat.class, TEMP_FILE,
localFilesStorage, folioTenantService);
- lenient().doNothing().when(localFilesStorage).append(anyString(), any());
+ lenient().doNothing().when(localFilesStorage).append(anyString(), any(byte[].class));
}
@Test
@@ -46,7 +46,7 @@ void testWriteHeadersBeforeStepMethod() {
authorityControlCsvFileWriter.beforeStep();
//Then
- verify(localFilesStorage).append(eq(TEMP_FILE), any());
+ verify(localFilesStorage).append(eq(TEMP_FILE), any(byte[].class));
}
@Test
@@ -58,7 +58,7 @@ void testWriteHeadersAfterStepMethod_whenStatsExist() {
authorityControlCsvFileWriter.afterStep();
//Then
- verify(localFilesStorage, never()).append(eq(TEMP_FILE), any());
+ verify(localFilesStorage, never()).append(eq(TEMP_FILE), any(byte[].class));
}
@Test
@@ -86,6 +86,6 @@ void testWriteItemsMethod() {
authorityControlCsvFileWriter.write(new Chunk<>(items));
//Then
- verify(localFilesStorage).append(eq(TEMP_FILE), any());
+ verify(localFilesStorage).append(eq(TEMP_FILE), any(byte[].class));
}
}
diff --git a/src/test/java/org/folio/dew/batch/eholdings/EHoldingsCsvFileWriterTest.java b/src/test/java/org/folio/dew/batch/eholdings/EHoldingsCsvFileWriterTest.java
index ee9421d54..e8ef414b0 100644
--- a/src/test/java/org/folio/dew/batch/eholdings/EHoldingsCsvFileWriterTest.java
+++ b/src/test/java/org/folio/dew/batch/eholdings/EHoldingsCsvFileWriterTest.java
@@ -60,7 +60,7 @@ void setUp() {
lenient().when(jobExecution.getExecutionContext()).thenReturn(executionContext);
lenient().when(executionContext.getInt(anyString(), anyInt())).thenReturn(100);
lenient().when(exportConfig.getRecordId()).thenReturn("recordId");
- lenient().doNothing().when(localFilesStorage).append(anyString(), any());
+ lenient().doNothing().when(localFilesStorage).append(anyString(), any(byte[].class));
EHoldingsPackageExportFormat exportFormat = new EHoldingsPackageExportFormat();
exportFormat.setPackageId("packageId");
exportFormat.setPackageName("packageName");
@@ -82,7 +82,7 @@ void testBeforeStep(List packageFields,
eHoldingsCsvFileWriter.beforeStep(stepExecution);
- verify(localFilesStorage, times(localFileStorageInvocations)).append(anyString(), any());
+ verify(localFilesStorage, times(localFileStorageInvocations)).append(anyString(), any(byte[].class));
verify(packageRepository, times(packageRepositoryInvocations)).findById(any());
verify(mapper, times(mapperInvocations)).convertToExportFormat(any(EHoldingsPackage.class));
}
@@ -102,7 +102,7 @@ void testWriteMethod(List titleFields, int localFileStorageInvocations){
eHoldingsCsvFileWriter.write(new Chunk<>(items));
//Then
- verify(localFilesStorage, times(localFileStorageInvocations)).append(anyString(), any());
+ verify(localFilesStorage, times(localFileStorageInvocations)).append(anyString(), any(byte[].class));
}
private static Stream provideParameters() {
diff --git a/src/test/java/org/folio/dew/controller/PresignedUrlControllerTest.java b/src/test/java/org/folio/dew/controller/PresignedUrlControllerTest.java
index ef3a3d83b..99d9f81e6 100644
--- a/src/test/java/org/folio/dew/controller/PresignedUrlControllerTest.java
+++ b/src/test/java/org/folio/dew/controller/PresignedUrlControllerTest.java
@@ -30,7 +30,7 @@ static void beforeAll() {
void shouldRetrievePresignedUrl() throws Exception {
var jobId = UUID.randomUUID();
var filePath = jobId + PATH_SEPARATOR + FilenameUtils.getName(PREVIEW_ITEM_DATA);
- remoteFilesStorage.upload(filePath, PREVIEW_ITEM_DATA);
+ remoteFilesStorage.upload(PREVIEW_ITEM_DATA, filePath);
var headers = defaultHeaders();
diff --git a/src/test/java/org/folio/dew/repository/LocalFilesStorageAwsSdkComposingTest.java b/src/test/java/org/folio/dew/repository/LocalFilesStorageAwsSdkComposingTest.java
deleted file mode 100644
index 8527182f6..000000000
--- a/src/test/java/org/folio/dew/repository/LocalFilesStorageAwsSdkComposingTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-package org.folio.dew.repository;
-
-import org.apache.commons.lang3.ArrayUtils;
-import org.folio.dew.config.properties.LocalFilesStorageProperties;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.boot.test.context.SpringBootTest;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Objects;
-import java.util.concurrent.ThreadLocalRandom;
-
-import io.minio.ObjectWriteArgs;
-import lombok.extern.log4j.Log4j2;
-
-import static java.util.stream.Collectors.toList;
-import static org.folio.dew.utils.Constants.PATH_SEPARATOR;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.is;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-@Log4j2
-@SpringBootTest(classes = { LocalFilesStorageProperties.class, LocalFilesStorage.class }, properties = {
- "application.minio-local.compose-with-aws-sdk = true", "application.minio-local.force-path-style = true" })
-@EnableConfigurationProperties
-class LocalFilesStorageAwsSdkComposingTest {
-
- @Autowired
- private LocalFilesStorageProperties localFilesStorageProperties;
- @Autowired
- private LocalFilesStorage localFilesStorage;
-
- @ParameterizedTest
- @DisplayName("Create file, update it (append bytes[]), read and delete")
- @ValueSource(ints = { 1024, ObjectWriteArgs.MIN_MULTIPART_SIZE + 1 })
- void testWriteReadPatchDelete(int size) throws IOException {
-
- byte[] original = getRandomBytes(size);
- var remoteFilePath = "CSV_Data.csv";
- var expectedS3Path = localFilesStorageProperties.getSubPath() + PATH_SEPARATOR + remoteFilePath;
-
- assertThat(localFilesStorage.write(remoteFilePath, original), is(expectedS3Path));
- assertTrue(localFilesStorage.exists(remoteFilePath));
-
- assertTrue(Objects.deepEquals(localFilesStorage.readAllBytes(remoteFilePath), original));
- assertTrue(Objects.deepEquals(localFilesStorage.lines(remoteFilePath)
- .collect(toList()), localFilesStorage.readAllLines(remoteFilePath)));
-
- localFilesStorage.delete(remoteFilePath);
- assertTrue(localFilesStorage.notExists(remoteFilePath));
- }
-
- @Test
- @DisplayName("Append files with using AWS SDK workaround instead of MinIO client composeObject-method")
- void testAppendFileParts() throws IOException {
-
- var name = "directory_1/directory_2/CSV_Data.csv";
- byte[] file = getRandomBytes(30000000);
- var size = file.length;
-
- var first = Arrays.copyOfRange(file, 0, size / 3);
- var second = Arrays.copyOfRange(file, size / 3, 2 * size / 3);
- var third = Arrays.copyOfRange(file, 2 * size / 3, size);
-
- var expected = ArrayUtils.addAll(ArrayUtils.addAll(first, second), third);
-
- assertTrue(Objects.deepEquals(file, expected));
-
- localFilesStorage.append(name, first);
- localFilesStorage.append(name, second);
- localFilesStorage.append(name, third);
-
- var result = localFilesStorage.readAllBytes(name);
-
- assertTrue(Objects.deepEquals(file, result));
-
- }
-
- private byte[] getRandomBytes(int size) {
- var original = new byte[size];
- ThreadLocalRandom.current()
- .nextBytes(original);
- return original;
- }
-}
diff --git a/src/test/java/org/folio/dew/repository/LocalFilesStorageTest.java b/src/test/java/org/folio/dew/repository/LocalFilesStorageTest.java
index 8205a7d6e..8d1916f44 100644
--- a/src/test/java/org/folio/dew/repository/LocalFilesStorageTest.java
+++ b/src/test/java/org/folio/dew/repository/LocalFilesStorageTest.java
@@ -3,6 +3,8 @@
import io.minio.ObjectWriteArgs;
import lombok.extern.log4j.Log4j2;
import org.folio.dew.config.properties.LocalFilesStorageProperties;
+import org.folio.s3.client.RemoteStorageWriter;
+import org.folio.s3.exception.S3ClientException;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -11,7 +13,6 @@
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.test.context.SpringBootTest;
-import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
@@ -20,7 +21,6 @@
import static java.util.List.of;
import static java.util.stream.Collectors.toList;
-import static org.folio.dew.utils.Constants.PATH_SEPARATOR;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -35,8 +35,6 @@
class LocalFilesStorageTest {
private static final String NON_EXISTING_PATH = "non-existing-path";
- @Autowired
- private LocalFilesStorageProperties localFilesStorageProperties;
@Autowired
private LocalFilesStorage localFilesStorage;
@@ -45,19 +43,16 @@ class LocalFilesStorageTest {
@ValueSource(ints = {1024, ObjectWriteArgs.MIN_MULTIPART_SIZE + 1 })
@DisplayName("Create files and read internal objects structure")
void testWriteRead(int size) throws IOException {
- var subPath = localFilesStorageProperties.getSubPath() + PATH_SEPARATOR;
byte[] content = getRandomBytes(size);
var original = of("directory_1/CSV_Data_1.csv", "directory_1/directory_2/CSV_Data_2.csv",
- "directory_1/directory_2/directory_3/CSV_Data_3.csv");
- var expectedS3Pathes = of(subPath + "directory_1/CSV_Data_1.csv", subPath + "directory_1/directory_2/CSV_Data_2.csv",
- subPath + "directory_1/directory_2/directory_3/CSV_Data_3.csv");
+ "directory_1/directory_2/directory_3/CSV_Data_3.csv");
List actual;
try {
actual = original.stream()
.map(p -> {
try {
return localFilesStorage.write(p, content);
- } catch (IOException ex) {
+ } catch (S3ClientException ex) {
throw new RuntimeException(ex);
}
})
@@ -66,21 +61,19 @@ void testWriteRead(int size) throws IOException {
throw new IOException(e);
}
- assertTrue(Objects.deepEquals(expectedS3Pathes, actual));
+ assertTrue(Objects.deepEquals(original, actual));
- assertTrue(Objects.deepEquals(localFilesStorage.walk(subPath + "directory_1/")
- .collect(toList()),
- of(subPath + "directory_1/CSV_Data_1.csv", subPath + "directory_1/directory_2/CSV_Data_2.csv",
- subPath + "directory_1/directory_2/directory_3/CSV_Data_3.csv")));
+ assertTrue(Objects.deepEquals(localFilesStorage.listRecursive("directory_1/"),
+ of("directory_1/CSV_Data_1.csv", "directory_1/directory_2/CSV_Data_2.csv",
+ "directory_1/directory_2/directory_3/CSV_Data_3.csv")));
- assertTrue(Objects.deepEquals(localFilesStorage.walk(subPath + "directory_1/directory_2/")
- .collect(toList()),
- of(subPath + "directory_1/directory_2/CSV_Data_2.csv", subPath + "directory_1/directory_2/directory_3/CSV_Data_3.csv")));
+ assertTrue(Objects.deepEquals(localFilesStorage.listRecursive("directory_1/directory_2/"),
+ of("directory_1/directory_2/CSV_Data_2.csv", "directory_1/directory_2/directory_3/CSV_Data_3.csv")));
original.forEach(p -> assertTrue(localFilesStorage.exists(p)));
// Clean crated files
- localFilesStorage.delete("directory_1");
+ localFilesStorage.removeRecursive("directory_1");
original.forEach(p -> assertFalse(localFilesStorage.exists(p)));
}
@@ -91,21 +84,21 @@ void testWriteRead(int size) throws IOException {
void testBufferedWriter(int size) {
var path = "directory/resource.csv";
var expected = new String(getRandomBytes(size));
- try(BufferedWriter writer = localFilesStorage.writer(path)) {
+ try (RemoteStorageWriter writer = localFilesStorage.writer(path)) {
writer.write(expected);
- } catch (IOException e) {
+ } catch (S3ClientException e) {
fail("Writer exception");
}
- try(InputStream is = localFilesStorage.newInputStream(path)) {
+ try (InputStream is = localFilesStorage.read(path)) {
var actual = new String(is.readAllBytes());
assertEquals(expected, actual);
} catch (IOException e) {
fail("Read resource exception");
}
- // Clean crated files
- localFilesStorage.delete(path);
+ // Clean created files
+ localFilesStorage.remove(path);
}
@ParameterizedTest
@@ -116,21 +109,19 @@ void testWriteReadPatchDelete(int size) throws IOException {
byte[] original = getRandomBytes(size);
byte[] patch = getRandomBytes(size);
var remoteFilePath = "directory_1/directory_2/CSV_Data.csv";
- var expectedS3FilePath = localFilesStorageProperties.getSubPath() + PATH_SEPARATOR + remoteFilePath;
+ var expectedS3FilePath = remoteFilePath;
assertThat(localFilesStorage.write(remoteFilePath, original), is(expectedS3FilePath));
assertTrue(localFilesStorage.exists(remoteFilePath));
assertTrue(Objects.deepEquals(localFilesStorage.readAllBytes(remoteFilePath), original));
- assertTrue(Objects.deepEquals(localFilesStorage.lines(remoteFilePath)
- .collect(toList()), localFilesStorage.readAllLines(remoteFilePath)));
localFilesStorage.append(remoteFilePath, patch);
var patched = localFilesStorage.readAllBytes(remoteFilePath);
assertThat(patched.length, is(original.length + patch.length));
- localFilesStorage.delete(remoteFilePath);
+ localFilesStorage.remove(remoteFilePath);
assertTrue(localFilesStorage.notExists(remoteFilePath));
}
@@ -138,14 +129,13 @@ void testWriteReadPatchDelete(int size) throws IOException {
@DisplayName("Files operations on non-existing file")
void testNonExistingFileOperations() {
assertThrows(IOException.class, () -> localFilesStorage.readAllLines(NON_EXISTING_PATH));
- assertThrows(IOException.class, () -> localFilesStorage.lines(NON_EXISTING_PATH));
- assertThrows(IOException.class, () -> {
- try(var is = localFilesStorage.newInputStream(NON_EXISTING_PATH)){
- log.info("InputStream setup");
+ assertThrows(S3ClientException.class, () -> {
+ try (var is = localFilesStorage.read(NON_EXISTING_PATH)) {
+ // should fail
}
});
- localFilesStorage.delete(NON_EXISTING_PATH);
+ localFilesStorage.remove(NON_EXISTING_PATH);
assertFalse(localFilesStorage.exists(NON_EXISTING_PATH));
}
diff --git a/src/test/java/org/folio/dew/repository/RemoteFilesStorageTest.java b/src/test/java/org/folio/dew/repository/RemoteFilesStorageTest.java
index 616ed4475..576a94635 100644
--- a/src/test/java/org/folio/dew/repository/RemoteFilesStorageTest.java
+++ b/src/test/java/org/folio/dew/repository/RemoteFilesStorageTest.java
@@ -28,7 +28,7 @@ void testContainsFile() {
var uploadedPath = remoteFilesStorage.write(path, content);
- assertEquals("remote/directory/data.csv", uploadedPath);
+ assertEquals("directory/data.csv", uploadedPath);
assertTrue(remoteFilesStorage.containsFile(path));
assertTrue(remoteFilesStorage.containsFile(uploadedPath));
}