From 7fce0f124bfd9343c3542384a79c18596903536e Mon Sep 17 00:00:00 2001 From: Prateek Maheshwari Date: Fri, 7 Apr 2023 10:43:58 -0700 Subject: [PATCH] Wait for pending commits to finish during shutdown. --- .../samza/container/SamzaContainer.scala | 24 +++++++++---------- .../apache/samza/container/TaskInstance.scala | 11 ++++++++- .../BlobStoreStateBackendIntegrationTest.java | 1 + 3 files changed, 23 insertions(+), 13 deletions(-) diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 3d38474700..436c3ccf8f 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -1077,18 +1077,6 @@ class SamzaContainer( } } - if (commitThreadPool != null) { - info("Shutting down task commit thread pool") - try { - commitThreadPool.shutdown() - if(!commitThreadPool.awaitTermination(shutdownMs, TimeUnit.MILLISECONDS)) { - commitThreadPool.shutdownNow() - } - } catch { - case e: Exception => error(e.getMessage, e) - } - } - if (timerExecutor != null) { info("Shutting down timer executor") try { @@ -1102,6 +1090,18 @@ class SamzaContainer( } taskInstances.values.foreach(_.shutdownTask) + + if (commitThreadPool != null) { + info("Shutting down task commit thread pool") + try { + commitThreadPool.shutdown() + if(!commitThreadPool.awaitTermination(shutdownMs, TimeUnit.MILLISECONDS)) { + commitThreadPool.shutdownNow() + } + } catch { + case e: Exception => error(e.getMessage, e) + } + } } def shutdownStores { diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala index 564c473441..2d05f6cb8c 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala @@ -523,19 +523,28 @@ class TaskInstance( } def shutdownTask { + // wait for in-flight commits to complete + val numTasksInContainer = containerContext.getContainerModel.getTasks.size() + val waitTimeMillis = taskConfig.getShutdownMs / numTasksInContainer + val acquired = commitInProgress.tryAcquire(waitTimeMillis, TimeUnit.MILLISECONDS) + if (!acquired) { + info("Pending commit did not complete within %d ms. Proceeding with shutdown." format waitTimeMillis) + } + if (commitManager != null) { debug("Shutting down commit manager for taskName: %s" format taskName) commitManager.close() } else { debug("Skipping commit manager shutdown for taskName: %s" format taskName) } + applicationTaskContextOption.foreach(applicationTaskContext => { debug("Stopping application-defined task context for taskName: %s" format taskName) applicationTaskContext.stop() }) + if (task.isInstanceOf[ClosableTask]) { debug("Shutting down stream task for taskName: %s" format taskName) - task.asInstanceOf[ClosableTask].close } else { debug("Skipping stream task shutdown for taskName: %s" format taskName) diff --git a/samza-test/src/test/java/org/apache/samza/storage/kv/BlobStoreStateBackendIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/storage/kv/BlobStoreStateBackendIntegrationTest.java index 1779ac735e..8a3af47d09 100644 --- a/samza-test/src/test/java/org/apache/samza/storage/kv/BlobStoreStateBackendIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/storage/kv/BlobStoreStateBackendIntegrationTest.java @@ -106,6 +106,7 @@ public static Collection data() { put(TaskConfig.COMMIT_MS, "-1"); // manual commit only put(TaskConfig.COMMIT_MAX_DELAY_MS, "0"); // Ensure no commits are skipped due to in progress commits + put(TaskConfig.TASK_SHUTDOWN_MS, "10000"); // wait for pending commits to complete. // override store level state backend for in memory stores to use Kafka changelogs put(String.format(StorageConfig.STORE_BACKUP_FACTORIES, IN_MEMORY_STORE_NAME),