From a3d209f4d6276e7c5e5205b3a5e2a44541d204db Mon Sep 17 00:00:00 2001 From: kateyeo Date: Sat, 31 Dec 2022 12:03:53 -0800 Subject: [PATCH 01/14] Rename IdempotentImportExecutorExtension --- ...ecutorExtension.java => IdempotentExecutor.java} | 3 +-- ...torLoader.java => IdempotentExecutorLoader.java} | 13 ++++++------- .../datatransferproject/transfer/WorkerMain.java | 4 ++-- 3 files changed, 9 insertions(+), 11 deletions(-) rename portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/{IdempotentImportExecutorExtension.java => IdempotentExecutor.java} (68%) rename portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/{IdempotentImportExecutorLoader.java => IdempotentExecutorLoader.java} (61%) diff --git a/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/IdempotentImportExecutorExtension.java b/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/IdempotentExecutor.java similarity index 68% rename from portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/IdempotentImportExecutorExtension.java rename to portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/IdempotentExecutor.java index bf514c92e..330472752 100644 --- a/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/IdempotentImportExecutorExtension.java +++ b/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/IdempotentExecutor.java @@ -2,8 +2,7 @@ import org.datatransferproject.api.launcher.BootExtension; import org.datatransferproject.api.launcher.ExtensionContext; -import org.datatransferproject.api.launcher.Monitor; -public interface IdempotentImportExecutorExtension extends BootExtension { +public interface IdempotentExecutor extends BootExtension { IdempotentImportExecutor getIdempotentImportExecutor(ExtensionContext extensionContext); } diff --git a/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/IdempotentImportExecutorLoader.java b/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/IdempotentExecutorLoader.java similarity index 61% rename from portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/IdempotentImportExecutorLoader.java rename to portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/IdempotentExecutorLoader.java index 02cd3336f..306e5c334 100644 --- a/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/IdempotentImportExecutorLoader.java +++ b/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/IdempotentExecutorLoader.java @@ -3,20 +3,19 @@ import com.google.common.collect.ImmutableList; import java.util.ServiceLoader; import org.datatransferproject.api.launcher.ExtensionContext; -import org.datatransferproject.api.launcher.Monitor; -public class IdempotentImportExecutorLoader { +public class IdempotentExecutorLoader { public static IdempotentImportExecutor load(ExtensionContext extensionContext) { - ImmutableList.Builder builder = ImmutableList.builder(); - ServiceLoader.load(IdempotentImportExecutorExtension.class) + ImmutableList.Builder builder = ImmutableList.builder(); + ServiceLoader.load(IdempotentExecutor.class) .iterator() .forEachRemaining(builder::add); - ImmutableList executors = builder.build(); + ImmutableList executors = builder.build(); if (executors.isEmpty()) { return new InMemoryIdempotentImportExecutor(extensionContext.getMonitor()); } else if (executors.size() == 1) { - IdempotentImportExecutorExtension extension = executors.get(0); + IdempotentExecutor extension = executors.get(0); extension.initialize(); return extension.getIdempotentImportExecutor(extensionContext); } else { @@ -24,6 +23,6 @@ public static IdempotentImportExecutor load(ExtensionContext extensionContext) { } } - private IdempotentImportExecutorLoader() { + private IdempotentExecutorLoader() { } } diff --git a/portability-transfer/src/main/java/org/datatransferproject/transfer/WorkerMain.java b/portability-transfer/src/main/java/org/datatransferproject/transfer/WorkerMain.java index c665d52ee..9a395b42c 100644 --- a/portability-transfer/src/main/java/org/datatransferproject/transfer/WorkerMain.java +++ b/portability-transfer/src/main/java/org/datatransferproject/transfer/WorkerMain.java @@ -44,7 +44,7 @@ import org.datatransferproject.spi.transfer.extension.TransferExtension; import org.datatransferproject.spi.transfer.hooks.JobHooks; import org.datatransferproject.spi.transfer.idempotentexecutor.IdempotentImportExecutor; -import org.datatransferproject.spi.transfer.idempotentexecutor.IdempotentImportExecutorLoader; +import org.datatransferproject.spi.transfer.idempotentexecutor.IdempotentExecutorLoader; import org.datatransferproject.spi.transfer.provider.TransferCompatibilityProvider; import org.datatransferproject.spi.transfer.security.SecurityExtension; import org.datatransferproject.spi.transfer.security.SecurityExtensionLoader; @@ -104,7 +104,7 @@ public void initialize() { monitor.info(() -> "Using SecurityExtension: " + securityExtension.getClass().getName()); IdempotentImportExecutor idempotentImportExecutor = - IdempotentImportExecutorLoader.load(extensionContext); + IdempotentExecutorLoader.load(extensionContext); monitor.info( () -> "Using IdempotentImportExecutor: " + idempotentImportExecutor.getClass().getName()); From c45b23edd54b56b1960b573671c31ee5a613ed73 Mon Sep 17 00:00:00 2001 From: kateyeo Date: Tue, 4 Apr 2023 00:30:52 -0700 Subject: [PATCH 02/14] Add getRetryingIdempotentImportExecutor --- .../InMemoryIdempotentImportExecutorExtension.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/InMemoryIdempotentImportExecutorExtension.java b/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/InMemoryIdempotentImportExecutorExtension.java index c07902fbc..4edd427b5 100644 --- a/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/InMemoryIdempotentImportExecutorExtension.java +++ b/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/InMemoryIdempotentImportExecutorExtension.java @@ -14,6 +14,11 @@ public IdempotentImportExecutor getIdempotentImportExecutor(ExtensionContext ext return new InMemoryIdempotentImportExecutor(extensionContext.getMonitor()); } + @Override + public IdempotentImportExecutor getRetryingIdempotentImportExecutor(ExtensionContext extensionContext) { + return new InMemoryIdempotentImportExecutor(extensionContext.getMonitor()); + } + @Override public void initialize() { } From df22f1fb3b5bc0484a3e4616dfe34f496a0f69ab Mon Sep 17 00:00:00 2001 From: kateyeo Date: Tue, 4 Apr 2023 01:59:40 -0700 Subject: [PATCH 03/14] Fix file renames --- ...entExecutor.java => IdempotentImportExecutorExtension.java} | 3 ++- ...ExecutorLoader.java => IdempotentImportExecutorLoader.java} | 0 2 files changed, 2 insertions(+), 1 deletion(-) rename portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/{IdempotentExecutor.java => IdempotentImportExecutorExtension.java} (68%) rename portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/{IdempotentExecutorLoader.java => IdempotentImportExecutorLoader.java} (100%) diff --git a/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/IdempotentExecutor.java b/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/IdempotentImportExecutorExtension.java similarity index 68% rename from portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/IdempotentExecutor.java rename to portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/IdempotentImportExecutorExtension.java index 330472752..bf514c92e 100644 --- a/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/IdempotentExecutor.java +++ b/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/IdempotentImportExecutorExtension.java @@ -2,7 +2,8 @@ import org.datatransferproject.api.launcher.BootExtension; import org.datatransferproject.api.launcher.ExtensionContext; +import org.datatransferproject.api.launcher.Monitor; -public interface IdempotentExecutor extends BootExtension { +public interface IdempotentImportExecutorExtension extends BootExtension { IdempotentImportExecutor getIdempotentImportExecutor(ExtensionContext extensionContext); } diff --git a/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/IdempotentExecutorLoader.java b/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/IdempotentImportExecutorLoader.java similarity index 100% rename from portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/IdempotentExecutorLoader.java rename to portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/IdempotentImportExecutorLoader.java From 984399bfe1938ba3d61c66db040550a4c3efba68 Mon Sep 17 00:00:00 2001 From: kateyeo Date: Wed, 5 Apr 2023 21:56:55 -0700 Subject: [PATCH 04/14] Add getRetryingIdempotentImportExecutor --- .../GoogleCloudIdempotentImportExecutorExtension.java | 6 ++++++ .../IdempotentImportExecutorExtension.java | 1 + 2 files changed, 7 insertions(+) diff --git a/extensions/cloud/portability-cloud-google/src/main/java/org/datatransferproject/cloud/google/GoogleCloudIdempotentImportExecutorExtension.java b/extensions/cloud/portability-cloud-google/src/main/java/org/datatransferproject/cloud/google/GoogleCloudIdempotentImportExecutorExtension.java index 335742240..802b936db 100644 --- a/extensions/cloud/portability-cloud-google/src/main/java/org/datatransferproject/cloud/google/GoogleCloudIdempotentImportExecutorExtension.java +++ b/extensions/cloud/portability-cloud-google/src/main/java/org/datatransferproject/cloud/google/GoogleCloudIdempotentImportExecutorExtension.java @@ -25,6 +25,12 @@ public IdempotentImportExecutor getIdempotentImportExecutor(ExtensionContext ext } } + @Override + public IdempotentImportExecutor getRetryingIdempotentImportExecutor( + ExtensionContext extensionContext) { + return null; + } + @Override public void initialize() { } diff --git a/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/IdempotentImportExecutorExtension.java b/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/IdempotentImportExecutorExtension.java index bf514c92e..12c1cf3b5 100644 --- a/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/IdempotentImportExecutorExtension.java +++ b/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/IdempotentImportExecutorExtension.java @@ -6,4 +6,5 @@ public interface IdempotentImportExecutorExtension extends BootExtension { IdempotentImportExecutor getIdempotentImportExecutor(ExtensionContext extensionContext); + IdempotentImportExecutor getRetryingIdempotentImportExecutor(ExtensionContext extensionContext); } From 20793f0d82eaf9e0429288e3cbe5e251e8efe879 Mon Sep 17 00:00:00 2001 From: kateyeo Date: Fri, 7 Apr 2023 14:56:32 -0700 Subject: [PATCH 05/14] Add default implementation for getRetryingIdempotentImportExecutor to IdempotentImportExecutorExtension --- .../GoogleCloudIdempotentImportExecutorExtension.java | 6 ------ .../IdempotentImportExecutorExtension.java | 5 ++++- .../InMemoryIdempotentImportExecutorExtension.java | 4 ---- 3 files changed, 4 insertions(+), 11 deletions(-) diff --git a/extensions/cloud/portability-cloud-google/src/main/java/org/datatransferproject/cloud/google/GoogleCloudIdempotentImportExecutorExtension.java b/extensions/cloud/portability-cloud-google/src/main/java/org/datatransferproject/cloud/google/GoogleCloudIdempotentImportExecutorExtension.java index 802b936db..335742240 100644 --- a/extensions/cloud/portability-cloud-google/src/main/java/org/datatransferproject/cloud/google/GoogleCloudIdempotentImportExecutorExtension.java +++ b/extensions/cloud/portability-cloud-google/src/main/java/org/datatransferproject/cloud/google/GoogleCloudIdempotentImportExecutorExtension.java @@ -25,12 +25,6 @@ public IdempotentImportExecutor getIdempotentImportExecutor(ExtensionContext ext } } - @Override - public IdempotentImportExecutor getRetryingIdempotentImportExecutor( - ExtensionContext extensionContext) { - return null; - } - @Override public void initialize() { } diff --git a/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/IdempotentImportExecutorExtension.java b/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/IdempotentImportExecutorExtension.java index 12c1cf3b5..0e3cc23b4 100644 --- a/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/IdempotentImportExecutorExtension.java +++ b/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/IdempotentImportExecutorExtension.java @@ -6,5 +6,8 @@ public interface IdempotentImportExecutorExtension extends BootExtension { IdempotentImportExecutor getIdempotentImportExecutor(ExtensionContext extensionContext); - IdempotentImportExecutor getRetryingIdempotentImportExecutor(ExtensionContext extensionContext); + public default IdempotentImportExecutor getRetryingIdempotentImportExecutor( + ExtensionContext extensionContext) { + return null; + } } diff --git a/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/InMemoryIdempotentImportExecutorExtension.java b/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/InMemoryIdempotentImportExecutorExtension.java index 4edd427b5..28f38a86a 100644 --- a/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/InMemoryIdempotentImportExecutorExtension.java +++ b/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/InMemoryIdempotentImportExecutorExtension.java @@ -14,10 +14,6 @@ public IdempotentImportExecutor getIdempotentImportExecutor(ExtensionContext ext return new InMemoryIdempotentImportExecutor(extensionContext.getMonitor()); } - @Override - public IdempotentImportExecutor getRetryingIdempotentImportExecutor(ExtensionContext extensionContext) { - return new InMemoryIdempotentImportExecutor(extensionContext.getMonitor()); - } @Override public void initialize() { From ca0301fb83bd95af2791d0b7c1d06ec798cc3501 Mon Sep 17 00:00:00 2001 From: kateyeo Date: Fri, 7 Apr 2023 15:00:23 -0700 Subject: [PATCH 06/14] Remove new line --- .../InMemoryIdempotentImportExecutorExtension.java | 1 - 1 file changed, 1 deletion(-) diff --git a/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/InMemoryIdempotentImportExecutorExtension.java b/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/InMemoryIdempotentImportExecutorExtension.java index 28f38a86a..c07902fbc 100644 --- a/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/InMemoryIdempotentImportExecutorExtension.java +++ b/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/InMemoryIdempotentImportExecutorExtension.java @@ -14,7 +14,6 @@ public IdempotentImportExecutor getIdempotentImportExecutor(ExtensionContext ext return new InMemoryIdempotentImportExecutor(extensionContext.getMonitor()); } - @Override public void initialize() { } From 04763659e8675be5dacdf56200d1681bd80b60f4 Mon Sep 17 00:00:00 2001 From: kateyeo Date: Tue, 25 Apr 2023 10:55:39 -0700 Subject: [PATCH 07/14] Add RetryingInMemoryIdempotentImportExecutor --- ...moryIdempotentImportExecutorExtension.java | 9 ++ ...ryingInMemoryIdempotentImportExecutor.java | 146 ++++++++++++++++++ 2 files changed, 155 insertions(+) create mode 100644 portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/RetryingInMemoryIdempotentImportExecutor.java diff --git a/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/InMemoryIdempotentImportExecutorExtension.java b/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/InMemoryIdempotentImportExecutorExtension.java index c07902fbc..87322a156 100644 --- a/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/InMemoryIdempotentImportExecutorExtension.java +++ b/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/InMemoryIdempotentImportExecutorExtension.java @@ -1,7 +1,9 @@ package org.datatransferproject.spi.transfer.idempotentexecutor; +import com.google.inject.Provider; import org.datatransferproject.api.launcher.ExtensionContext; import org.datatransferproject.api.launcher.Monitor; +import org.datatransferproject.types.transfer.retry.RetryStrategyLibrary; /** * ImMemory Implementation of IdempotentImportExecutor. @@ -9,11 +11,18 @@ public class InMemoryIdempotentImportExecutorExtension implements IdempotentImportExecutorExtension { + private Provider retryStrategyLibraryProvider; + @Override public IdempotentImportExecutor getIdempotentImportExecutor(ExtensionContext extensionContext) { return new InMemoryIdempotentImportExecutor(extensionContext.getMonitor()); } + @Override + public IdempotentImportExecutor getRetryingIdempotentImportExecutor(ExtensionContext extensionContext){ + return new RetryingInMemoryIdempotentImportExecutor(extensionContext.getMonitor(), retryStrategyLibraryProvider); + } + @Override public void initialize() { } diff --git a/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/RetryingInMemoryIdempotentImportExecutor.java b/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/RetryingInMemoryIdempotentImportExecutor.java new file mode 100644 index 000000000..89a4d1bfd --- /dev/null +++ b/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/RetryingInMemoryIdempotentImportExecutor.java @@ -0,0 +1,146 @@ +/* + * Copyright 2023 The Data Transfer Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.datatransferproject.spi.transfer.idempotentexecutor; + +import static java.lang.String.format; + +import com.google.common.base.Joiner; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.inject.Provider; +import java.io.IOException; +import java.io.Serializable; +import java.time.Clock; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Callable; +import org.datatransferproject.api.launcher.Monitor; +import org.datatransferproject.transfer.JobMetadata; +import org.datatransferproject.types.transfer.errors.ErrorDetail; +import org.datatransferproject.types.transfer.retry.RetryStrategyLibrary; +import org.datatransferproject.types.transfer.retry.RetryingCallable; + +/** A {@link IdempotentImportExecutor} that stores known values in memory. */ +public class RetryingInMemoryIdempotentImportExecutor implements IdempotentImportExecutor { + + private final Map knownValues = new HashMap<>(); + private final Map errors = new HashMap<>(); + private final Map recentErrors = new HashMap<>(); + private final Monitor monitor; + private UUID jobId; + + private final Provider retryStrategyLibraryProvider; + + public RetryingInMemoryIdempotentImportExecutor( + Monitor monitor, Provider retryStrategyLibraryProvider) { + this.monitor = monitor; + this.retryStrategyLibraryProvider = retryStrategyLibraryProvider; + } + + @Override + public T executeAndSwallowIOExceptions( + String idempotentId, String itemName, Callable callable) throws Exception { + try { + return executeOrThrowException(idempotentId, itemName, callable); + } catch (IOException e) { + // Note all errors are logged in executeOrThrowException so no need to re-log them here. + return null; + } + } + + @Override + @SuppressWarnings("unchecked") + public T executeOrThrowException( + String idempotentId, String itemName, Callable callable) throws Exception { + String jobIdPrefix = "Job " + jobId + ": "; + + RetryingCallable retryingCallable = + new RetryingCallable<>( + callable, + retryStrategyLibraryProvider.get(), + Clock.systemUTC(), + monitor, + JobMetadata.getDataType(), + JobMetadata.getExportService()); + + if (knownValues.containsKey(idempotentId)) { + monitor.debug( + () -> + jobIdPrefix + + format("Using cached key %s from cache for %s", idempotentId, itemName)); + return (T) knownValues.get(idempotentId); + } + try { + T result = retryingCallable.call(); + knownValues.put(idempotentId, result); + monitor.debug( + () -> jobIdPrefix + format("Storing key %s in cache for %s", idempotentId, itemName)); + errors.remove(idempotentId); + return result; + } catch (Exception e) { + ErrorDetail errorDetail = + ErrorDetail.builder() + .setId(idempotentId) + .setTitle(itemName) + .setException(Throwables.getStackTraceAsString(e)) + .build(); + errors.put(idempotentId, errorDetail); + recentErrors.put(idempotentId, errorDetail); + monitor.severe(() -> jobIdPrefix + "Problem with importing item: " + errorDetail); + throw e; + } + } + + @Override + @SuppressWarnings("unchecked") + public T getCachedValue(String idempotentId) { + if (!knownValues.containsKey(idempotentId)) { + throw new IllegalArgumentException( + idempotentId + + " is not a known key, known keys: " + + Joiner.on(", ").join(knownValues.keySet())); + } + return (T) knownValues.get(idempotentId); + } + + @Override + public boolean isKeyCached(String idempotentId) { + return knownValues.containsKey(idempotentId); + } + + @Override + public Collection getErrors() { + return ImmutableList.copyOf(errors.values()); + } + + @Override + public void setJobId(UUID jobId) { + this.jobId = jobId; + } + + @Override + public Collection getRecentErrors() { + return ImmutableList.copyOf(recentErrors.values()); + } + + @Override + public void resetRecentErrors() { + recentErrors.clear(); + } +} From 8888777cedf63244fa7854195874146fb2931d85 Mon Sep 17 00:00:00 2001 From: kateyeo Date: Thu, 27 Apr 2023 17:19:40 -0700 Subject: [PATCH 08/14] Remove provider for retryStrategyLibrary and get from extensionContext instead --- .../InMemoryIdempotentImportExecutorExtension.java | 4 +--- .../RetryingInMemoryIdempotentImportExecutor.java | 10 ++++------ 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/InMemoryIdempotentImportExecutorExtension.java b/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/InMemoryIdempotentImportExecutorExtension.java index 87322a156..9ed2cdfe4 100644 --- a/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/InMemoryIdempotentImportExecutorExtension.java +++ b/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/InMemoryIdempotentImportExecutorExtension.java @@ -11,8 +11,6 @@ public class InMemoryIdempotentImportExecutorExtension implements IdempotentImportExecutorExtension { - private Provider retryStrategyLibraryProvider; - @Override public IdempotentImportExecutor getIdempotentImportExecutor(ExtensionContext extensionContext) { return new InMemoryIdempotentImportExecutor(extensionContext.getMonitor()); @@ -20,7 +18,7 @@ public IdempotentImportExecutor getIdempotentImportExecutor(ExtensionContext ext @Override public IdempotentImportExecutor getRetryingIdempotentImportExecutor(ExtensionContext extensionContext){ - return new RetryingInMemoryIdempotentImportExecutor(extensionContext.getMonitor(), retryStrategyLibraryProvider); + return new RetryingInMemoryIdempotentImportExecutor(extensionContext.getMonitor(), extensionContext.getSetting("retryLibrary", null)); } @Override diff --git a/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/RetryingInMemoryIdempotentImportExecutor.java b/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/RetryingInMemoryIdempotentImportExecutor.java index 89a4d1bfd..bf049f947 100644 --- a/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/RetryingInMemoryIdempotentImportExecutor.java +++ b/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/RetryingInMemoryIdempotentImportExecutor.java @@ -21,7 +21,6 @@ import com.google.common.base.Joiner; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; -import com.google.inject.Provider; import java.io.IOException; import java.io.Serializable; import java.time.Clock; @@ -44,13 +43,12 @@ public class RetryingInMemoryIdempotentImportExecutor implements IdempotentImpor private final Map recentErrors = new HashMap<>(); private final Monitor monitor; private UUID jobId; - - private final Provider retryStrategyLibraryProvider; + private final RetryStrategyLibrary retryStrategyLibrary; public RetryingInMemoryIdempotentImportExecutor( - Monitor monitor, Provider retryStrategyLibraryProvider) { + Monitor monitor, RetryStrategyLibrary retryStrategyLibrary) { this.monitor = monitor; - this.retryStrategyLibraryProvider = retryStrategyLibraryProvider; + this.retryStrategyLibrary = retryStrategyLibrary; } @Override @@ -73,7 +71,7 @@ public T executeOrThrowException( RetryingCallable retryingCallable = new RetryingCallable<>( callable, - retryStrategyLibraryProvider.get(), + retryStrategyLibrary, Clock.systemUTC(), monitor, JobMetadata.getDataType(), From 7d6d21285d2b3e428a23e65534ac1f6aac60b009 Mon Sep 17 00:00:00 2001 From: kateyeo Date: Thu, 27 Apr 2023 17:22:15 -0700 Subject: [PATCH 09/14] Remove unused import --- .../InMemoryIdempotentImportExecutorExtension.java | 1 - 1 file changed, 1 deletion(-) diff --git a/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/InMemoryIdempotentImportExecutorExtension.java b/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/InMemoryIdempotentImportExecutorExtension.java index 9ed2cdfe4..aad7c543d 100644 --- a/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/InMemoryIdempotentImportExecutorExtension.java +++ b/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/InMemoryIdempotentImportExecutorExtension.java @@ -1,6 +1,5 @@ package org.datatransferproject.spi.transfer.idempotentexecutor; -import com.google.inject.Provider; import org.datatransferproject.api.launcher.ExtensionContext; import org.datatransferproject.api.launcher.Monitor; import org.datatransferproject.types.transfer.retry.RetryStrategyLibrary; From 32b4a62ddbb81727bc0e98ab04d90b9c1c0b6e4e Mon Sep 17 00:00:00 2001 From: kateyeo Date: Thu, 27 Apr 2023 17:29:40 -0700 Subject: [PATCH 10/14] Update InMemoryIdempotentImportExecutorExtension.java --- .../InMemoryIdempotentImportExecutorExtension.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/InMemoryIdempotentImportExecutorExtension.java b/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/InMemoryIdempotentImportExecutorExtension.java index aad7c543d..4966ed135 100644 --- a/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/InMemoryIdempotentImportExecutorExtension.java +++ b/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/InMemoryIdempotentImportExecutorExtension.java @@ -1,8 +1,6 @@ package org.datatransferproject.spi.transfer.idempotentexecutor; import org.datatransferproject.api.launcher.ExtensionContext; -import org.datatransferproject.api.launcher.Monitor; -import org.datatransferproject.types.transfer.retry.RetryStrategyLibrary; /** * ImMemory Implementation of IdempotentImportExecutor. From 0dbddbfce14c45c7ad5782411c234c327db92dcb Mon Sep 17 00:00:00 2001 From: kateyeo Date: Fri, 28 Apr 2023 17:13:36 -0700 Subject: [PATCH 11/14] Update build.gradle --- portability-spi-transfer/build.gradle | 1 + 1 file changed, 1 insertion(+) diff --git a/portability-spi-transfer/build.gradle b/portability-spi-transfer/build.gradle index 3f9679462..48ee8d7a4 100644 --- a/portability-spi-transfer/build.gradle +++ b/portability-spi-transfer/build.gradle @@ -24,6 +24,7 @@ dependencies { compile project(':portability-types-transfer') compile project(':portability-api-launcher') + compile project(':portability-transfer') compile('org.apache.commons:commons-lang3:3.11') } From e2e1ed3df68902552ae1feec1b82413ac8215fee Mon Sep 17 00:00:00 2001 From: kateyeo Date: Tue, 2 May 2023 11:40:25 -0700 Subject: [PATCH 12/14] Remove dependency on JobMetadata --- portability-spi-transfer/build.gradle | 1 - .../RetryingInMemoryIdempotentImportExecutor.java | 5 +---- .../types/transfer/retry/RetryingCallable.java | 8 ++++++++ 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/portability-spi-transfer/build.gradle b/portability-spi-transfer/build.gradle index 48ee8d7a4..3f9679462 100644 --- a/portability-spi-transfer/build.gradle +++ b/portability-spi-transfer/build.gradle @@ -24,7 +24,6 @@ dependencies { compile project(':portability-types-transfer') compile project(':portability-api-launcher') - compile project(':portability-transfer') compile('org.apache.commons:commons-lang3:3.11') } diff --git a/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/RetryingInMemoryIdempotentImportExecutor.java b/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/RetryingInMemoryIdempotentImportExecutor.java index bf049f947..5dd9c49de 100644 --- a/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/RetryingInMemoryIdempotentImportExecutor.java +++ b/portability-spi-transfer/src/main/java/org/datatransferproject/spi/transfer/idempotentexecutor/RetryingInMemoryIdempotentImportExecutor.java @@ -30,7 +30,6 @@ import java.util.UUID; import java.util.concurrent.Callable; import org.datatransferproject.api.launcher.Monitor; -import org.datatransferproject.transfer.JobMetadata; import org.datatransferproject.types.transfer.errors.ErrorDetail; import org.datatransferproject.types.transfer.retry.RetryStrategyLibrary; import org.datatransferproject.types.transfer.retry.RetryingCallable; @@ -73,9 +72,7 @@ public T executeOrThrowException( callable, retryStrategyLibrary, Clock.systemUTC(), - monitor, - JobMetadata.getDataType(), - JobMetadata.getExportService()); + monitor); if (knownValues.containsKey(idempotentId)) { monitor.debug( diff --git a/portability-types-transfer/src/main/java/org/datatransferproject/types/transfer/retry/RetryingCallable.java b/portability-types-transfer/src/main/java/org/datatransferproject/types/transfer/retry/RetryingCallable.java index ad89f277e..9fa9ff9a2 100644 --- a/portability-types-transfer/src/main/java/org/datatransferproject/types/transfer/retry/RetryingCallable.java +++ b/portability-types-transfer/src/main/java/org/datatransferproject/types/transfer/retry/RetryingCallable.java @@ -58,6 +58,14 @@ public RetryingCallable( this.attempts = 0; } + public RetryingCallable( + Callable callable, + RetryStrategyLibrary retryStrategyLibrary, + Clock clock, + Monitor monitor) { + this(callable, retryStrategyLibrary, clock, monitor, null, null); + } + /** * Tries to call the {@link Callable} given the class's {@link RetryStrategyLibrary}. * From 09845eb43cb573f8b07718176beb88e43e01fc83 Mon Sep 17 00:00:00 2001 From: kateyeo Date: Wed, 10 May 2023 13:21:20 -0700 Subject: [PATCH 13/14] Add RetryingIdempotentImportExecutor to GooglePhotosImporter --- .../google/GoogleTransferExtension.java | 8 ++++++- .../google/photos/GooglePhotosImporter.java | 22 +++++++++++++++++-- 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/extensions/data-transfer/portability-data-transfer-google/src/main/java/org/datatransferproject/datatransfer/google/GoogleTransferExtension.java b/extensions/data-transfer/portability-data-transfer-google/src/main/java/org/datatransferproject/datatransfer/google/GoogleTransferExtension.java index 9d325cad9..8cfc40ccf 100644 --- a/extensions/data-transfer/portability-data-transfer-google/src/main/java/org/datatransferproject/datatransfer/google/GoogleTransferExtension.java +++ b/extensions/data-transfer/portability-data-transfer-google/src/main/java/org/datatransferproject/datatransfer/google/GoogleTransferExtension.java @@ -37,6 +37,8 @@ import org.datatransferproject.datatransfer.google.videos.GoogleVideosImporter; import org.datatransferproject.spi.cloud.storage.AppCredentialStore; import org.datatransferproject.spi.cloud.storage.JobStore; +import org.datatransferproject.spi.transfer.idempotentexecutor.IdempotentImportExecutor; +import org.datatransferproject.spi.transfer.idempotentexecutor.IdempotentImportExecutorExtension; import org.datatransferproject.types.common.models.DataVertical; import org.datatransferproject.spi.transfer.extension.TransferExtension; import org.datatransferproject.spi.transfer.provider.Exporter; @@ -107,6 +109,9 @@ public void initialize(ExtensionContext context) { GoogleCredentialFactory credentialFactory = new GoogleCredentialFactory(httpTransport, jsonFactory, appCredentials, monitor); + IdempotentImportExecutor idempotentImportExecutor = context.getService( + IdempotentImportExecutorExtension.class).getRetryingIdempotentImportExecutor(context); + ImmutableMap.Builder importerBuilder = ImmutableMap.builder(); importerBuilder.put(BLOBS, new DriveImporter(credentialFactory, jobStore, monitor)); importerBuilder.put(CONTACTS, new GoogleContactsImporter(credentialFactory)); @@ -120,7 +125,8 @@ public void initialize(ExtensionContext context) { jobStore, jsonFactory, monitor, - context.getSetting("googleWritesPerSecond", 1.0))); + context.getSetting("googleWritesPerSecond", 1.0), + idempotentImportExecutor)); importerBuilder.put(VIDEOS, new GoogleVideosImporter(appCredentials, jobStore, monitor)); importerMap = importerBuilder.build(); diff --git a/extensions/data-transfer/portability-data-transfer-google/src/main/java/org/datatransferproject/datatransfer/google/photos/GooglePhotosImporter.java b/extensions/data-transfer/portability-data-transfer-google/src/main/java/org/datatransferproject/datatransfer/google/photos/GooglePhotosImporter.java index 2db0be2d6..720dc6b73 100644 --- a/extensions/data-transfer/portability-data-transfer-google/src/main/java/org/datatransferproject/datatransfer/google/photos/GooglePhotosImporter.java +++ b/extensions/data-transfer/portability-data-transfer-google/src/main/java/org/datatransferproject/datatransfer/google/photos/GooglePhotosImporter.java @@ -79,6 +79,8 @@ public class GooglePhotosImporter private final GooglePhotosInterface photosInterface; private final HashMap multilingualStrings = new HashMap<>(); + private IdempotentImportExecutor retryingIdempotentExecutor; + public GooglePhotosImporter( GoogleCredentialFactory credentialFactory, JobStore jobStore, @@ -95,6 +97,21 @@ public GooglePhotosImporter( monitor, writesPerSecond); } + public GooglePhotosImporter( + GoogleCredentialFactory credentialFactory, + JobStore jobStore, + JsonFactory jsonFactory, + Monitor monitor, + double writesPerSecond, + IdempotentImportExecutor retryingIdempotentExecutor) { + this( + credentialFactory, + jobStore, + jsonFactory, + monitor, + writesPerSecond); + this.retryingIdempotentExecutor = retryingIdempotentExecutor; + } @VisibleForTesting GooglePhotosImporter( @@ -131,10 +148,11 @@ public ImportResult importItem( // Nothing to do return ImportResult.OK; } - GPhotosUpload gPhotosUpload = new GPhotosUpload(jobId, idempotentImportExecutor, authData); + IdempotentImportExecutor executor = retryingIdempotentExecutor == null ? idempotentImportExecutor : retryingIdempotentExecutor; + GPhotosUpload gPhotosUpload = new GPhotosUpload(jobId, executor, authData); for (PhotoAlbum album : data.getAlbums()) { - idempotentImportExecutor.executeAndSwallowIOExceptions( + executor.executeAndSwallowIOExceptions( album.getId(), album.getName(), () -> importSingleAlbum(jobId, authData, album)); } long bytes = importPhotos(data.getPhotos(), gPhotosUpload); From ac4cb8048abc95907f0a3bb12ed5ee0ac676f939 Mon Sep 17 00:00:00 2001 From: kateyeo Date: Wed, 24 May 2023 11:12:09 -0700 Subject: [PATCH 14/14] Add flag guarded retrying idempotent executor to flickr importer --- .../flickr/FlickrTransferExtension.java | 13 ++++++++++-- .../flickr/photos/FlickrPhotosImporter.java | 21 ++++++++++++++++++- 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/extensions/data-transfer/portability-data-transfer-flickr/src/main/java/org/datatransferproject/datatransfer/flickr/FlickrTransferExtension.java b/extensions/data-transfer/portability-data-transfer-flickr/src/main/java/org/datatransferproject/datatransfer/flickr/FlickrTransferExtension.java index b524a00e7..1d9aa466a 100644 --- a/extensions/data-transfer/portability-data-transfer-flickr/src/main/java/org/datatransferproject/datatransfer/flickr/FlickrTransferExtension.java +++ b/extensions/data-transfer/portability-data-transfer-flickr/src/main/java/org/datatransferproject/datatransfer/flickr/FlickrTransferExtension.java @@ -28,6 +28,8 @@ import org.datatransferproject.datatransfer.flickr.photos.FlickrPhotosImporter; import org.datatransferproject.spi.cloud.storage.AppCredentialStore; import org.datatransferproject.spi.cloud.storage.TemporaryPerJobDataStore; +import org.datatransferproject.spi.transfer.idempotentexecutor.IdempotentImportExecutor; +import org.datatransferproject.spi.transfer.idempotentexecutor.IdempotentImportExecutorExtension; import org.datatransferproject.types.common.models.DataVertical; import org.datatransferproject.spi.transfer.extension.TransferExtension; import org.datatransferproject.spi.transfer.provider.Exporter; @@ -36,6 +38,7 @@ import org.datatransferproject.types.transfer.serviceconfig.TransferServiceConfig; public class FlickrTransferExtension implements TransferExtension { + private static final String SERVICE_ID = "flickr"; private static final String FLICKR_KEY = "FLICKR_KEY"; private static final String FLICKR_SECRET = "FLICKR_SECRET"; @@ -69,7 +72,9 @@ public String getServiceId() { @Override public void initialize(ExtensionContext context) { - if (initialized) return; + if (initialized) { + return; + } jobStore = context.getService(TemporaryPerJobDataStore.class); Monitor monitor = context.getMonitor(); @@ -89,7 +94,11 @@ public void initialize(ExtensionContext context) { TransferServiceConfig serviceConfig = context.getService(TransferServiceConfig.class); - importer = new FlickrPhotosImporter(appCredentials, jobStore, monitor, serviceConfig); + IdempotentImportExecutor idempotentImportExecutor = context.getService( + IdempotentImportExecutorExtension.class).getRetryingIdempotentImportExecutor(context); + boolean enableRetrying = context.getSetting("enableRetrying", false); + + importer = new FlickrPhotosImporter(appCredentials, jobStore, monitor, serviceConfig, idempotentImportExecutor, enableRetrying); exporter = new FlickrPhotosExporter(appCredentials, serviceConfig); initialized = true; } diff --git a/extensions/data-transfer/portability-data-transfer-flickr/src/main/java/org/datatransferproject/datatransfer/flickr/photos/FlickrPhotosImporter.java b/extensions/data-transfer/portability-data-transfer-flickr/src/main/java/org/datatransferproject/datatransfer/flickr/photos/FlickrPhotosImporter.java index 1fbf4db9d..6115bacce 100644 --- a/extensions/data-transfer/portability-data-transfer-flickr/src/main/java/org/datatransferproject/datatransfer/flickr/photos/FlickrPhotosImporter.java +++ b/extensions/data-transfer/portability-data-transfer-flickr/src/main/java/org/datatransferproject/datatransfer/flickr/photos/FlickrPhotosImporter.java @@ -59,6 +59,10 @@ public class FlickrPhotosImporter implements Importer