Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,18 @@ private static byte[] copyPrefix(byte[] prefix) {
}

RDBStoreByteArrayIterator(ManagedRocksIterator iterator,
RDBTable table, byte[] prefix, IteratorType type) {
RDBTable table, byte[] prefix, IteratorType type) {
this(iterator, table, prefix, type, null);
}

RDBStoreByteArrayIterator(ManagedRocksIterator iterator,
RDBTable table, byte[] prefix, IteratorType type, byte[] seekKey) {
super(iterator, table, copyPrefix(prefix), type);
seekToFirst();
if (seekKey != null) {
seek(seekKey);
} else {
seekToFirst();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,14 @@ public void deleteRangeWithBatch(BatchOperation batch, byte[] beginKey, byte[] e
public KeyValueIterator<byte[], byte[]> iterator(byte[] prefix, IteratorType type)
throws RocksDatabaseException {
return new RDBStoreByteArrayIterator(db.newIterator(family, false), this,
prefix, type);
prefix, type, null);
}

@Override
public KeyValueIterator<byte[], byte[]> iterator(byte[] prefix, byte[] seek)
throws RocksDatabaseException {
return new RDBStoreByteArrayIterator(db.newIterator(family, false), this,
prefix, IteratorType.KEY_AND_VALUE, seek);
}

KeyValueIterator<CodecBuffer, CodecBuffer> iterator(
Expand Down Expand Up @@ -282,7 +289,7 @@ public List<KeyValue<byte[], byte[]>> getRangeKVs(

if (count < 0) {
throw new IllegalArgumentException(
"Invalid count given " + count + ", count must be greater than 0");
"Invalid count given " + count + ", count must be greater than 0");
}
final List<KeyValue<byte[], byte[]>> result = new ArrayList<>();
try (KeyValueIterator<byte[], byte[]> it = iterator(prefix)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ default VALUE getReadCopy(KEY key) throws RocksDatabaseException, CodecException
* Deletes a range of keys from the metadata store.
*
* @param beginKey start metadata key
* @param endKey end metadata key
* @param endKey end metadata key
*/
void deleteRange(KEY beginKey, KEY endKey) throws RocksDatabaseException, CodecException;

Expand Down Expand Up @@ -177,6 +177,20 @@ default KeyValueIterator<KEY, VALUE> iterator(KEY prefix) throws RocksDatabaseEx
KeyValueIterator<KEY, VALUE> iterator(KEY prefix, IteratorType type)
throws RocksDatabaseException, CodecException;

/**
* Iterate the table with a seek key.
*
* @param prefix The prefix of the elements to be iterated.
* @param seek The key to seek to.
* @return an iterator.
*/
default KeyValueIterator<KEY, VALUE> iterator(KEY prefix, KEY seek)
throws RocksDatabaseException, CodecException {
KeyValueIterator<KEY, VALUE> iterator = iterator(prefix, IteratorType.KEY_AND_VALUE);
iterator.seek(seek);
return iterator;
}

/**
* @param prefix The prefix of the elements to be iterated.
* @return a key-only iterator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,20 @@ public KeyValueIterator<KEY, VALUE> iterator(KEY prefix, IteratorType type)
}
}

@Override
public KeyValueIterator<KEY, VALUE> iterator(KEY prefix, KEY seek)
throws RocksDatabaseException, CodecException {
if (supportCodecBuffer) {
KeyValueIterator<KEY, VALUE> iterator = iterator(prefix, IteratorType.KEY_AND_VALUE);
iterator.seek(seek);
return iterator;
} else {
final byte[] prefixBytes = encodeKey(prefix);
final byte[] seekBytes = encodeKey(seek);
return new TypedTableIterator(rawTable.iterator(prefixBytes, seekBytes));
}
}

@Override
public String getName() {
return rawTable.getName();
Expand Down Expand Up @@ -573,7 +587,9 @@ abstract class RawIterator<RAW>
this.rawIterator = rawIterator;
}

/** Covert the given key to the {@link RAW} type. */
/**
* Covert the given key to the {@link RAW} type.
*/
abstract AutoCloseSupplier<RAW> convert(KEY key) throws CodecException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hdds.StringUtils;
import org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions;
Expand Down Expand Up @@ -403,4 +409,136 @@ private void compareSstWithSameName(File checkpoint1, File checkpoint2)
}
}
}

@Test
public void testIteratorWithSeek() throws Exception {
final Table<byte[], byte[]> table = rdbStore.getTable(families.get(0));
// Write keys: a1, a3, a5, b2, b4
table.put(getBytesUtf16("a1"), getBytesUtf16("val1"));
table.put(getBytesUtf16("a3"), getBytesUtf16("val3"));
table.put(getBytesUtf16("a5"), getBytesUtf16("val5"));
table.put(getBytesUtf16("b2"), getBytesUtf16("val2"));
table.put(getBytesUtf16("b4"), getBytesUtf16("val4"));

// Case 1: Seek to existing key, no prefix
try (TableIterator<byte[], ? extends Table.KeyValue<byte[], byte[]>> iter = table.iterator(null,
getBytesUtf16("a3"))) {
assertTrue(iter.hasNext());
assertArrayEquals(getBytesUtf16("a3"), iter.next().getKey());
assertTrue(iter.hasNext());
assertArrayEquals(getBytesUtf16("a5"), iter.next().getKey());
}

// Case 2: Seek to non-existent key (should land on next greater), no prefix
try (TableIterator<byte[], ? extends Table.KeyValue<byte[], byte[]>> iter = table.iterator(null,
getBytesUtf16("a2"))) {
assertTrue(iter.hasNext());
assertArrayEquals(getBytesUtf16("a3"), iter.next().getKey());
}

// Case 3: Seek past all keys, no prefix
try (TableIterator<byte[], ? extends Table.KeyValue<byte[], byte[]>> iter = table.iterator(null,
getBytesUtf16("z9"))) {
assertFalse(iter.hasNext());
}

// Case 4: Seek with prefix
try (TableIterator<byte[], ? extends Table.KeyValue<byte[], byte[]>> iter = table.iterator(getBytesUtf16("b"),
getBytesUtf16("b3"))) {
assertTrue(iter.hasNext());
assertArrayEquals(getBytesUtf16("b4"), iter.next().getKey());
assertFalse(iter.hasNext());
}

// Case 5: Seek with prefix to start of prefix
try (TableIterator<byte[], ? extends Table.KeyValue<byte[], byte[]>> iter = table.iterator(getBytesUtf16("b"),
getBytesUtf16("b2"))) {
assertTrue(iter.hasNext());
assertArrayEquals(getBytesUtf16("b2"), iter.next().getKey());
}
}

@Test
public void testIteratorSeekEdgeCases() throws Exception {
final Table<byte[], byte[]> table = rdbStore.getTable(families.get(0));
// Empty table check
try (TableIterator<byte[], ? extends Table.KeyValue<byte[], byte[]>> iter = table.iterator(null,
getBytesUtf16("a1"))) {
assertFalse(iter.hasNext());
}

table.put(getBytesUtf16("a1"), getBytesUtf16("val1"));

// Seek before first key
try (TableIterator<byte[], ? extends Table.KeyValue<byte[], byte[]>> iter = table.iterator(null,
getBytesUtf16("00"))) {
assertTrue(iter.hasNext());
assertArrayEquals(getBytesUtf16("a1"), iter.next().getKey());
}

// Seek after last key
try (TableIterator<byte[], ? extends Table.KeyValue<byte[], byte[]>> iter = table.iterator(null,
getBytesUtf16("b1"))) {
assertFalse(iter.hasNext());
}
}

@Test
public void testConcurrentIteratorWithWrites() throws Exception {
final Table<byte[], byte[]> table = rdbStore.getTable(families.get(1));
final int keyCount = 5000;
final CountDownLatch readyLatch = new CountDownLatch(1);
final CountDownLatch startLatch = new CountDownLatch(1);
final AtomicLong writtenKeys = new AtomicLong(0);

// Writer thread
ExecutorService executor = Executors.newFixedThreadPool(2);
Future<Void> writer = executor.submit(() -> {
readyLatch.countDown();
startLatch.await();
for (int i = 0; i < keyCount; i++) {
String key = String.format("key-%05d", i);
table.put(getBytesUtf16(key), getBytesUtf16("value-" + i));
writtenKeys.incrementAndGet();
if (i % 100 == 0) {
Thread.yield(); // Give reader a chance
}
}
return null;
});

// Reader thread (using the optimization)
Future<Void> reader = executor.submit(() -> {
readyLatch.countDown();
startLatch.await();
// Wait for some data to be written
while (writtenKeys.get() < 100) {
Thread.sleep(1);
}

int seeks = 0;
for (int i = 0; i < 100; i++) {
// Randomly seek to a key that should exist (or be close to existing)
long targetId = (long) (Math.random() * writtenKeys.get());
String seekKeyStr = String.format("key-%05d", targetId);
try (TableIterator<byte[], ? extends Table.KeyValue<byte[], byte[]>> iter = table.iterator(null,
getBytesUtf16(seekKeyStr))) {
if (iter.hasNext()) {
assertNotNull(iter.next().getKey());
}
seeks++;
}
}
assertTrue(seeks > 0);
return null;
});

readyLatch.await();
startLatch.countDown();

writer.get(10, TimeUnit.SECONDS);
reader.get(10, TimeUnit.SECONDS);

executor.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public class OmMetadataManagerImpl implements OMMetadataManager,
// to ensure uniqueness of objectIDs.
private final long omEpoch;

private Map<String, Table> tableMap = new HashMap<>();
private final Map<String, Table> tableMap = new HashMap<>();
private final Map<String, TableCacheMetrics> tableCacheMetricsMap =
new HashMap<>();
private SnapshotChainManager snapshotChainManager;
Expand Down Expand Up @@ -675,7 +675,7 @@ public String getMultipartKeyFSO(String volume, String bucket, String key, Strin

final String fileName = OzoneFSUtils.getFileName(key);
return getMultipartKey(volumeId, bucketId, parentId,
fileName, uploadId);
fileName, uploadId);
}

/**
Expand Down Expand Up @@ -718,10 +718,7 @@ public boolean isVolumeEmpty(String volume) throws IOException {
return false;
}

if (isKeyPresentInTable(volumePrefix, bucketTable)) {
return false; // we found at least one key with this vol/
}
return true;
return !isKeyPresentInTable(volumePrefix, bucketTable); // we found at least one key with this vol/
}

/**
Expand Down Expand Up @@ -1103,7 +1100,7 @@ public ListKeysResult listKeys(String volumeName, String bucketName,
long readFromRDbStartNs, readFromRDbStopNs = 0;
// Get maxKeys from DB if it has.
try (TableIterator<String, ? extends KeyValue<String, OmKeyInfo>>
keyIter = getKeyTable(getBucketLayout()).iterator()) {
keyIter = getKeyTable(getBucketLayout()).iterator(null, seekKey)) {
readFromRDbStartNs = Time.monotonicNowNanos();
KeyValue< String, OmKeyInfo > kv;
keyIter.seek(seekKey);
Expand Down Expand Up @@ -1875,11 +1872,7 @@ public boolean containsIncompleteMPUs(String volume, String bucket)
}

// Check in table
if (isKeyPresentInTable(keyPrefix, multipartInfoTable)) {
return true;
}

return false;
return isKeyPresentInTable(keyPrefix, multipartInfoTable);
}

// NOTE: Update both getTableBucketPrefix(volume, bucket) & getTableBucketPrefix(tableName, volume, bucket)
Expand Down