diff --git a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java index b5687c87dd..2c019fa5b8 100644 --- a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java @@ -74,9 +74,12 @@ public class StorageConfig extends MapConfig { static final String INMEMORY_KV_STORAGE_ENGINE_FACTORY = "org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory"; - // Internal config to clean storeDirs of a store on container start. This is used to benchmark bootstrap performance. + // Internal config to clean storeDirs of a logged store on container start. This is used to benchmark bootstrap performance. static final String CLEAN_LOGGED_STOREDIRS_ON_START = STORE_PREFIX + "%s.clean.on.container.start"; + // Internal config to clean storeDirs of a logged store on container start. This is used to benchmark bootstrap performance. + static final String RETAIN_NONLOGGED_STOREDIRS_ON_START = STORE_PREFIX + "%s.retain.on.container.start"; + public StorageConfig(Config config) { super(config); } @@ -266,4 +269,13 @@ public int getNumPersistentStores() { public boolean getCleanLoggedStoreDirsOnStart(String storeName) { return getBoolean(String.format(CLEAN_LOGGED_STOREDIRS_ON_START, storeName), false); } + + /** + * Helper method to get if nonlogged store dirs should not be deleted on container start. + * @param storeName + * @return + */ + public boolean getRetainNonloggedStoreDirsOnStart(String storeName) { + return getBoolean(String.format(RETAIN_NONLOGGED_STOREDIRS_ON_START, storeName), false); + } } diff --git a/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java b/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java index 70c61746e8..c276e7fed7 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java +++ b/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java @@ -132,7 +132,7 @@ private void cleanBaseDirsAndReadOffsetFiles() { storageManagerUtil.getTaskStoreDir(nonLoggedStoreBaseDirectory, storeName, taskModel.getTaskName(), taskModel.getTaskMode()); LOG.info("Got non logged storage partition directory as " + nonLoggedStorePartitionDir.toPath().toString()); - if (nonLoggedStorePartitionDir.exists()) { + if (nonLoggedStorePartitionDir.exists() || !storageConfig.getRetainNonloggedStoreDirsOnStart(storeName)) { LOG.info("Deleting non logged storage partition directory " + nonLoggedStorePartitionDir.toPath().toString()); fileUtil.rm(nonLoggedStorePartitionDir); } diff --git a/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java b/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java index 689d431cbd..f4f26838a8 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java +++ b/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java @@ -214,15 +214,18 @@ static StoreActions getStoreActions( return; } - // persistent but non-logged stores are always deleted - if (storageEngine.getStoreProperties().isPersistedToDisk() && - !storageEngine.getStoreProperties().isLoggedStore()) { - File currentDir = storageManagerUtil.getTaskStoreDir( - nonLoggedStoreBaseDirectory, storeName, taskName, taskMode); - LOG.info("Marking current directory: {} for store: {} in task: {} for deletion since it is not a logged store.", - currentDir, storeName, taskName); - storeDirsToDelete.put(storeName, currentDir); - // persistent but non-logged stores should not have checkpoint dirs + // persistent but non-logged stores are always deleted unless retain.on.container.start config is set + if (storageEngine.getStoreProperties().isPersistedToDisk() && !storageEngine.getStoreProperties().isLoggedStore()) { + File currentDir = storageManagerUtil.getTaskStoreDir(nonLoggedStoreBaseDirectory, storeName, taskName, taskMode); + + if (!new StorageConfig(config).getRetainNonloggedStoreDirsOnStart(storeName)) { + LOG.info("Marking current directory: {} for store: {} in task: {} for deletion since it is not a logged store.", + currentDir, storeName, taskName); + storeDirsToDelete.put(storeName, currentDir); + // persistent but non-logged stores should not have checkpoint dirs + } else { + LOG.info("Retaining current directory: {} for store: {} in task: {}", currentDir, storeName, taskName); + } return; } @@ -243,17 +246,6 @@ static StoreActions getStoreActions( timeSinceLastCheckpointInMs = System.currentTimeMillis() - checkpointedChangelogOffset.getCheckpointId().getMillis(); } - - // if the clean.store.start config is set, delete the currentDir, restore from oldest offset to checkpointed - if (storageEngine.getStoreProperties().isPersistedToDisk() && new StorageConfig( - config).getCleanLoggedStoreDirsOnStart(storeName)) { - File currentDir = storageManagerUtil.getTaskStoreDir(nonLoggedStoreBaseDirectory, storeName, taskName, taskMode); - LOG.info("Marking current directory: {} for store: {} in task: {}.", currentDir, storeName, taskName); - storeDirsToDelete.put(storeName, currentDir); - LOG.info("Marking restore offsets for store: {} in task: {} to {}, {} ", storeName, taskName, oldestOffset, checkpointedOffset); - storesToRestore.put(storeName, new RestoreOffsets(oldestOffset, checkpointedOffset)); - return; - } // if the clean.store.start config is set, delete the currentDir, restore from oldest offset to checkpointed if (storageEngine.getStoreProperties().isPersistedToDisk() && new StorageConfig(