From 09129dfdc896ce4128ddcd5ae4fd19c8deb33ce1 Mon Sep 17 00:00:00 2001 From: Armstrong Date: Mon, 22 Dec 2025 14:42:51 -0800 Subject: [PATCH] HDDS-8655: Optimize listKeys and framework iterator seek --- .../utils/db/RDBStoreByteArrayIterator.java | 13 +- .../apache/hadoop/hdds/utils/db/RDBTable.java | 11 +- .../apache/hadoop/hdds/utils/db/Table.java | 16 +- .../hadoop/hdds/utils/db/TypedTable.java | 18 ++- .../hadoop/hdds/utils/db/TestRDBStore.java | 138 ++++++++++++++++++ .../ozone/om/OmMetadataManagerImpl.java | 17 +-- 6 files changed, 195 insertions(+), 18 deletions(-) diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreByteArrayIterator.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreByteArrayIterator.java index 67593f744e3c..f9301ac3d157 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreByteArrayIterator.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreByteArrayIterator.java @@ -29,9 +29,18 @@ private static byte[] copyPrefix(byte[] prefix) { } RDBStoreByteArrayIterator(ManagedRocksIterator iterator, - RDBTable table, byte[] prefix, IteratorType type) { + RDBTable table, byte[] prefix, IteratorType type) { + this(iterator, table, prefix, type, null); + } + + RDBStoreByteArrayIterator(ManagedRocksIterator iterator, + RDBTable table, byte[] prefix, IteratorType type, byte[] seekKey) { super(iterator, table, copyPrefix(prefix), type); - seekToFirst(); + if (seekKey != null) { + seek(seekKey); + } else { + seekToFirst(); + } } @Override diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java index 045f020b2fe3..fe55b2e922ce 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java @@ -216,7 +216,14 @@ public void deleteRangeWithBatch(BatchOperation batch, byte[] beginKey, byte[] e public KeyValueIterator iterator(byte[] prefix, IteratorType type) throws RocksDatabaseException { return new RDBStoreByteArrayIterator(db.newIterator(family, false), this, - prefix, type); + prefix, type, null); + } + + @Override + public KeyValueIterator iterator(byte[] prefix, byte[] seek) + throws RocksDatabaseException { + return new RDBStoreByteArrayIterator(db.newIterator(family, false), this, + prefix, IteratorType.KEY_AND_VALUE, seek); } KeyValueIterator iterator( @@ -282,7 +289,7 @@ public List> getRangeKVs( if (count < 0) { throw new IllegalArgumentException( - "Invalid count given " + count + ", count must be greater than 0"); + "Invalid count given " + count + ", count must be greater than 0"); } final List> result = new ArrayList<>(); try (KeyValueIterator it = iterator(prefix)) { diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java index 6904f22d7d8c..22fb35c3012f 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java @@ -146,7 +146,7 @@ default VALUE getReadCopy(KEY key) throws RocksDatabaseException, CodecException * Deletes a range of keys from the metadata store. * * @param beginKey start metadata key - * @param endKey end metadata key + * @param endKey end metadata key */ void deleteRange(KEY beginKey, KEY endKey) throws RocksDatabaseException, CodecException; @@ -177,6 +177,20 @@ default KeyValueIterator iterator(KEY prefix) throws RocksDatabaseEx KeyValueIterator iterator(KEY prefix, IteratorType type) throws RocksDatabaseException, CodecException; + /** + * Iterate the table with a seek key. + * + * @param prefix The prefix of the elements to be iterated. + * @param seek The key to seek to. + * @return an iterator. + */ + default KeyValueIterator iterator(KEY prefix, KEY seek) + throws RocksDatabaseException, CodecException { + KeyValueIterator iterator = iterator(prefix, IteratorType.KEY_AND_VALUE); + iterator.seek(seek); + return iterator; + } + /** * @param prefix The prefix of the elements to be iterated. * @return a key-only iterator diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java index 6d2fa3a99ffb..5fd1d1274b02 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java @@ -401,6 +401,20 @@ public KeyValueIterator iterator(KEY prefix, IteratorType type) } } + @Override + public KeyValueIterator iterator(KEY prefix, KEY seek) + throws RocksDatabaseException, CodecException { + if (supportCodecBuffer) { + KeyValueIterator iterator = iterator(prefix, IteratorType.KEY_AND_VALUE); + iterator.seek(seek); + return iterator; + } else { + final byte[] prefixBytes = encodeKey(prefix); + final byte[] seekBytes = encodeKey(seek); + return new TypedTableIterator(rawTable.iterator(prefixBytes, seekBytes)); + } + } + @Override public String getName() { return rawTable.getName(); @@ -573,7 +587,9 @@ abstract class RawIterator this.rawIterator = rawIterator; } - /** Covert the given key to the {@link RAW} type. */ + /** + * Covert the given key to the {@link RAW} type. + */ abstract AutoCloseSupplier convert(KEY key) throws CodecException; /** diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java index cc59a3407ffe..8ce2868c8b59 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java @@ -38,6 +38,12 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.hdds.StringUtils; import org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions; @@ -403,4 +409,136 @@ private void compareSstWithSameName(File checkpoint1, File checkpoint2) } } } + + @Test + public void testIteratorWithSeek() throws Exception { + final Table table = rdbStore.getTable(families.get(0)); + // Write keys: a1, a3, a5, b2, b4 + table.put(getBytesUtf16("a1"), getBytesUtf16("val1")); + table.put(getBytesUtf16("a3"), getBytesUtf16("val3")); + table.put(getBytesUtf16("a5"), getBytesUtf16("val5")); + table.put(getBytesUtf16("b2"), getBytesUtf16("val2")); + table.put(getBytesUtf16("b4"), getBytesUtf16("val4")); + + // Case 1: Seek to existing key, no prefix + try (TableIterator> iter = table.iterator(null, + getBytesUtf16("a3"))) { + assertTrue(iter.hasNext()); + assertArrayEquals(getBytesUtf16("a3"), iter.next().getKey()); + assertTrue(iter.hasNext()); + assertArrayEquals(getBytesUtf16("a5"), iter.next().getKey()); + } + + // Case 2: Seek to non-existent key (should land on next greater), no prefix + try (TableIterator> iter = table.iterator(null, + getBytesUtf16("a2"))) { + assertTrue(iter.hasNext()); + assertArrayEquals(getBytesUtf16("a3"), iter.next().getKey()); + } + + // Case 3: Seek past all keys, no prefix + try (TableIterator> iter = table.iterator(null, + getBytesUtf16("z9"))) { + assertFalse(iter.hasNext()); + } + + // Case 4: Seek with prefix + try (TableIterator> iter = table.iterator(getBytesUtf16("b"), + getBytesUtf16("b3"))) { + assertTrue(iter.hasNext()); + assertArrayEquals(getBytesUtf16("b4"), iter.next().getKey()); + assertFalse(iter.hasNext()); + } + + // Case 5: Seek with prefix to start of prefix + try (TableIterator> iter = table.iterator(getBytesUtf16("b"), + getBytesUtf16("b2"))) { + assertTrue(iter.hasNext()); + assertArrayEquals(getBytesUtf16("b2"), iter.next().getKey()); + } + } + + @Test + public void testIteratorSeekEdgeCases() throws Exception { + final Table table = rdbStore.getTable(families.get(0)); + // Empty table check + try (TableIterator> iter = table.iterator(null, + getBytesUtf16("a1"))) { + assertFalse(iter.hasNext()); + } + + table.put(getBytesUtf16("a1"), getBytesUtf16("val1")); + + // Seek before first key + try (TableIterator> iter = table.iterator(null, + getBytesUtf16("00"))) { + assertTrue(iter.hasNext()); + assertArrayEquals(getBytesUtf16("a1"), iter.next().getKey()); + } + + // Seek after last key + try (TableIterator> iter = table.iterator(null, + getBytesUtf16("b1"))) { + assertFalse(iter.hasNext()); + } + } + + @Test + public void testConcurrentIteratorWithWrites() throws Exception { + final Table table = rdbStore.getTable(families.get(1)); + final int keyCount = 5000; + final CountDownLatch readyLatch = new CountDownLatch(1); + final CountDownLatch startLatch = new CountDownLatch(1); + final AtomicLong writtenKeys = new AtomicLong(0); + + // Writer thread + ExecutorService executor = Executors.newFixedThreadPool(2); + Future writer = executor.submit(() -> { + readyLatch.countDown(); + startLatch.await(); + for (int i = 0; i < keyCount; i++) { + String key = String.format("key-%05d", i); + table.put(getBytesUtf16(key), getBytesUtf16("value-" + i)); + writtenKeys.incrementAndGet(); + if (i % 100 == 0) { + Thread.yield(); // Give reader a chance + } + } + return null; + }); + + // Reader thread (using the optimization) + Future reader = executor.submit(() -> { + readyLatch.countDown(); + startLatch.await(); + // Wait for some data to be written + while (writtenKeys.get() < 100) { + Thread.sleep(1); + } + + int seeks = 0; + for (int i = 0; i < 100; i++) { + // Randomly seek to a key that should exist (or be close to existing) + long targetId = (long) (Math.random() * writtenKeys.get()); + String seekKeyStr = String.format("key-%05d", targetId); + try (TableIterator> iter = table.iterator(null, + getBytesUtf16(seekKeyStr))) { + if (iter.hasNext()) { + assertNotNull(iter.next().getKey()); + } + seeks++; + } + } + assertTrue(seeks > 0); + return null; + }); + + readyLatch.await(); + startLatch.countDown(); + + writer.get(10, TimeUnit.SECONDS); + reader.get(10, TimeUnit.SECONDS); + + executor.shutdown(); + } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java index f9c31f052958..22bdc89a62f9 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java @@ -193,7 +193,7 @@ public class OmMetadataManagerImpl implements OMMetadataManager, // to ensure uniqueness of objectIDs. private final long omEpoch; - private Map tableMap = new HashMap<>(); + private final Map tableMap = new HashMap<>(); private final Map tableCacheMetricsMap = new HashMap<>(); private SnapshotChainManager snapshotChainManager; @@ -675,7 +675,7 @@ public String getMultipartKeyFSO(String volume, String bucket, String key, Strin final String fileName = OzoneFSUtils.getFileName(key); return getMultipartKey(volumeId, bucketId, parentId, - fileName, uploadId); + fileName, uploadId); } /** @@ -718,10 +718,7 @@ public boolean isVolumeEmpty(String volume) throws IOException { return false; } - if (isKeyPresentInTable(volumePrefix, bucketTable)) { - return false; // we found at least one key with this vol/ - } - return true; + return !isKeyPresentInTable(volumePrefix, bucketTable); // we found at least one key with this vol/ } /** @@ -1103,7 +1100,7 @@ public ListKeysResult listKeys(String volumeName, String bucketName, long readFromRDbStartNs, readFromRDbStopNs = 0; // Get maxKeys from DB if it has. try (TableIterator> - keyIter = getKeyTable(getBucketLayout()).iterator()) { + keyIter = getKeyTable(getBucketLayout()).iterator(null, seekKey)) { readFromRDbStartNs = Time.monotonicNowNanos(); KeyValue< String, OmKeyInfo > kv; keyIter.seek(seekKey); @@ -1875,11 +1872,7 @@ public boolean containsIncompleteMPUs(String volume, String bucket) } // Check in table - if (isKeyPresentInTable(keyPrefix, multipartInfoTable)) { - return true; - } - - return false; + return isKeyPresentInTable(keyPrefix, multipartInfoTable); } // NOTE: Update both getTableBucketPrefix(volume, bucket) & getTableBucketPrefix(tableName, volume, bucket)