delete(String path) {
+ return metadataCache.get().delete(path);
+ }
+
+ @Override
+ public void invalidate(String path) {
+ metadataCache.get().invalidate(path);
+ }
+
+ @Override
+ public void invalidateAll() {
+ metadataCache.get().invalidateAll();
+ }
+
+ @Override
+ public void refresh(String path) {
+ metadataCache.get().refresh(path);
+ }
+
+ void handleSwitchToTargetStore() {
+ if (clazz != null) {
+ metadataCache.set(dualMetadataStore.targetStore.getMetadataCache(clazz, cacheConfig));
+ } else if (typeRef != null) {
+ metadataCache.set(dualMetadataStore.targetStore.getMetadataCache(typeRef, cacheConfig));
+ } else {
+ metadataCache.set(dualMetadataStore.targetStore.getMetadataCache(cacheName, serde, cacheConfig));
+ }
+ }
+}
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
new file mode 100644
index 0000000000000..3e56f105e7e47
--- /dev/null
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.impl;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.migration.MigrationPhase;
+import org.apache.pulsar.common.migration.MigrationState;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.MetadataCache;
+import org.apache.pulsar.metadata.api.MetadataCacheConfig;
+import org.apache.pulsar.metadata.api.MetadataEvent;
+import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
+import org.apache.pulsar.metadata.api.MetadataSerde;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.MetadataStoreLifecycle;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.Stat;
+import org.apache.pulsar.metadata.api.extended.CreateOption;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.api.extended.SessionEvent;
+
+/**
+ * Wrapper around a metadata store that provides transparent migration capability.
+ *
+ * When migration is not active, all operations are forwarded to the source store.
+ * When migration starts (detected via flag in source store), this wrapper:
+ *
+ * - Initializes connection to target store
+ * - Recreates ephemeral nodes in target store
+ * - Routes reads/writes based on migration phase
+ *
+ */
+@Slf4j
+public class DualMetadataStore implements MetadataStoreExtended {
+
+ @Getter
+ final MetadataStoreExtended sourceStore;
+ volatile MetadataStoreExtended targetStore = null;
+
+ private volatile MigrationState migrationState = MigrationState.NOT_STARTED;
+
+ private final MetadataStoreConfig config;
+ private String participantId;
+ private final Set localEphemeralPaths = ConcurrentHashMap.newKeySet();
+
+ private final ScheduledExecutorService executor;
+
+ private final MetadataCache migrationStateCache;
+
+ private final Set> listeners = ConcurrentHashMap.newKeySet();
+ private final Set> sessionListeners = ConcurrentHashMap.newKeySet();
+
+ private final AtomicInteger pendingSourceWrites = new AtomicInteger();
+
+ private final Set> caches = ConcurrentHashMap.newKeySet();
+
+ private static final IllegalStateException READ_ONLY_STATE_EXCEPTION =
+ new IllegalStateException("Write operations not allowed during migrations");
+
+ public DualMetadataStore(MetadataStore sourceStore, MetadataStoreConfig config) throws MetadataStoreException {
+ this.sourceStore = (MetadataStoreExtended) sourceStore;
+ this.config = config;
+ this.executor = new ScheduledThreadPoolExecutor(1,
+ new DefaultThreadFactory("pulsar-dual-metadata-store", true));
+ this.migrationStateCache = sourceStore.getMetadataCache(MigrationState.class);
+
+ if (sourceStore instanceof MetadataStoreLifecycle msl) {
+ msl.initializeCluster();
+ }
+
+ readCurrentState();
+ registerAsParticipant();
+
+ // Watch for migration events
+ watchForMigrationEvents();
+ }
+
+ private void readCurrentState() throws MetadataStoreException {
+ try {
+ // Read the current state
+ var initialState = migrationStateCache.get(MigrationState.MIGRATION_FLAG_PATH).get();
+ initialState.ifPresent(state -> this.migrationState = state);
+
+ if (migrationState.getPhase() == MigrationPhase.COMPLETED) {
+ initializeTargetStore(migrationState.getTargetUrl());
+ }
+
+ } catch (Exception e) {
+ throw new MetadataStoreException(e);
+ }
+ }
+
+ private void registerAsParticipant() throws MetadataStoreException {
+ try {
+ // Register ourselves as participant in an eventual migration
+ Stat stat = this.sourceStore.put(MigrationState.PARTICIPANTS_PATH + "/id-", new byte[0],
+ Optional.empty(), EnumSet.of(CreateOption.Sequential, CreateOption.Ephemeral)).get();
+ participantId = stat.getPath();
+ log.info("Participant metadata store created: {}", participantId);
+ } catch (Throwable e) {
+ throw new MetadataStoreException(e);
+ }
+ }
+
+ private void watchForMigrationEvents() {
+ // Register listener for migration-related paths
+ sourceStore.registerListener(notification -> {
+ if (!MigrationState.MIGRATION_FLAG_PATH.equals(notification.getPath())) {
+ return;
+ }
+
+ migrationStateCache.get(MigrationState.MIGRATION_FLAG_PATH)
+ .thenAccept(migrationState -> {
+ this.migrationState = migrationState.orElse(MigrationState.NOT_STARTED);
+
+ switch (this.migrationState.getPhase()) {
+ case PREPARATION -> executor.execute(this::handleMigrationStart);
+ case COMPLETED -> executor.execute(this::handleMigrationComplete);
+ case FAILED -> executor.execute(this::handleMigrationFailed);
+ default -> {
+ // no-op
+ }
+ }
+ });
+ });
+ }
+
+ private void handleMigrationStart() {
+ try {
+ log.info("=== Starting Metadata Migration Preparation ===");
+ log.info("Target metadata store URL: {}", migrationState.getTargetUrl());
+
+ // Mark the session as lost so that all the component will avoid trying to make metadata writes
+ // for anything that can be deferred (eg: ledgers rollovers)
+ sessionListeners.forEach(listener -> listener.accept(SessionEvent.SessionLost));
+
+ // Initialize target store
+ initializeTargetStore(migrationState.getTargetUrl());
+
+ this.recreateEphemeralNodesInTarget();
+
+ // Acknowledge preparation by deleting the participant id
+ sourceStore.delete(participantId, Optional.empty()).get();
+
+ log.info("=== Migration Preparation Complete ===");
+
+ } catch (Exception e) {
+ log.error("Failed during migration preparation", e);
+ }
+ }
+
+ private void handleMigrationComplete() {
+ log.info("=== Metadata Migration Complete ===");
+
+ caches.forEach(DualMetadataCache::handleSwitchToTargetStore);
+ listeners.forEach(targetStore::registerListener);
+ sessionListeners.forEach(targetStore::registerSessionListener);
+
+ sessionListeners.forEach(listener -> listener.accept(SessionEvent.SessionReestablished));
+ }
+
+ private void handleMigrationFailed() {
+ log.info("=== Metadata Migration Failed ===");
+ sessionListeners.forEach(listener -> listener.accept(SessionEvent.SessionReestablished));
+ }
+
+
+ private synchronized void initializeTargetStore(String targetUrl) throws MetadataStoreException {
+ if (this.targetStore != null) {
+ return;
+ }
+
+ log.info("Initializing target metadata store: {}", targetUrl);
+ this.targetStore = (MetadataStoreExtended) MetadataStoreFactoryImpl.create(
+ targetUrl,
+ MetadataStoreConfig.builder()
+ .sessionTimeoutMillis(config.getSessionTimeoutMillis())
+ .batchingEnabled(config.isBatchingEnabled())
+ .batchingMaxDelayMillis(config.getBatchingMaxDelayMillis())
+ .batchingMaxOperations(config.getBatchingMaxOperations())
+ .batchingMaxSizeKb(config.getBatchingMaxSizeKb())
+ .build()
+ );
+
+ log.info("Target store initialized successfully");
+ }
+
+ private void recreateEphemeralNodesInTarget() throws Exception {
+ log.info("Found {} local ephemeral nodes to recreate", localEphemeralPaths.size());
+ var futures = localEphemeralPaths.stream()
+ .map(path ->
+ sourceStore.get(path)
+ .thenCompose(ogr ->
+ ogr.map(gr -> targetStore.put(path, gr.getValue(), Optional.empty(),
+ EnumSet.of(CreateOption.Ephemeral)))
+ .orElse(
+ CompletableFuture.completedFuture(null))
+ )
+ ).toList();
+
+ FutureUtil.waitForAll(futures).get();
+ }
+
+ @Override
+ public CompletableFuture> get(String path) {
+ return switch (migrationState.getPhase()) {
+ case NOT_STARTED, PREPARATION, COPYING, FAILED -> sourceStore.get(path);
+ case COMPLETED -> targetStore.get(path);
+ };
+ }
+
+ @Override
+ public CompletableFuture> getChildren(String path) {
+ return switch (migrationState.getPhase()) {
+ case NOT_STARTED, PREPARATION, COPYING, FAILED -> sourceStore.getChildren(path);
+ case COMPLETED -> targetStore.getChildren(path);
+ };
+ }
+
+ @Override
+ public CompletableFuture> getChildrenFromStore(String path) {
+ return switch (migrationState.getPhase()) {
+ case NOT_STARTED, PREPARATION, COPYING, FAILED -> sourceStore.getChildrenFromStore(path);
+ case COMPLETED -> targetStore.getChildrenFromStore(path);
+ };
+ }
+
+ @Override
+ public CompletableFuture exists(String path) {
+ return switch (migrationState.getPhase()) {
+ case NOT_STARTED, PREPARATION, COPYING, FAILED -> sourceStore.exists(path);
+ case COMPLETED -> targetStore.exists(path);
+ };
+ }
+
+ @Override
+ public CompletableFuture put(String path, byte[] value, Optional expectedVersion) {
+ return put(path, value, expectedVersion, EnumSet.noneOf(CreateOption.class));
+ }
+
+ @Override
+ public CompletableFuture put(String path, byte[] value, Optional expectedVersion,
+ EnumSet options) {
+ switch (migrationState.getPhase()) {
+ case NOT_STARTED, FAILED -> {
+ // Track ephemeral nodes
+ if (options.contains(CreateOption.Ephemeral)) {
+ localEphemeralPaths.add(path);
+ }
+
+ // Track pending writes
+ pendingSourceWrites.incrementAndGet();
+ var future = sourceStore.put(path, value, expectedVersion, options);
+ future.whenComplete((result, e) -> pendingSourceWrites.decrementAndGet());
+ return future;
+ }
+
+ case PREPARATION, COPYING -> {
+ return CompletableFuture.failedFuture(READ_ONLY_STATE_EXCEPTION);
+ }
+
+ case COMPLETED -> {
+ return targetStore.put(path, value, expectedVersion, options);
+ }
+
+ default -> throw new IllegalStateException("Invalid phase " + migrationState.getPhase());
+ }
+ }
+
+ @Override
+ public CompletableFuture delete(String path, Optional expectedVersion) {
+ switch (migrationState.getPhase()) {
+ case NOT_STARTED, FAILED -> {
+ localEphemeralPaths.remove(path);
+
+ pendingSourceWrites.incrementAndGet();
+ var future = sourceStore.delete(path, expectedVersion);
+ future.whenComplete((result, e) -> pendingSourceWrites.decrementAndGet());
+ return future;
+ }
+
+ case PREPARATION, COPYING -> {
+ return CompletableFuture.failedFuture(READ_ONLY_STATE_EXCEPTION);
+ }
+
+ case COMPLETED -> {
+ return targetStore.delete(path, expectedVersion);
+ }
+
+ default -> throw new IllegalStateException("Invalid phase " + migrationState.getPhase());
+ }
+ }
+
+ @Override
+ public CompletableFuture deleteRecursive(String path) {
+ switch (migrationState.getPhase()) {
+ case NOT_STARTED, FAILED -> {
+ pendingSourceWrites.incrementAndGet();
+ var future = sourceStore.deleteRecursive(path);
+ future.whenComplete((result, e) -> pendingSourceWrites.decrementAndGet());
+ return future;
+ }
+ case PREPARATION, COPYING -> {
+ return CompletableFuture.failedFuture(READ_ONLY_STATE_EXCEPTION);
+ }
+ case COMPLETED -> {
+ return targetStore.deleteRecursive(path);
+ }
+
+ default -> throw new IllegalStateException("Invalid phase " + migrationState.getPhase());
+ }
+ }
+
+ @Override
+ public void registerListener(Consumer listener) {
+ switch (migrationState.getPhase()) {
+ case NOT_STARTED, PREPARATION, COPYING, FAILED -> {
+ listeners.add(listener);
+ sourceStore.registerListener(listener);
+ }
+
+ case COMPLETED -> targetStore.registerListener(listener);
+ }
+ }
+
+ @Override
+ public void registerSessionListener(Consumer listener) {
+ switch (migrationState.getPhase()) {
+ case NOT_STARTED, PREPARATION, COPYING, FAILED -> {
+ sessionListeners.add(listener);
+ sourceStore.registerSessionListener(listener);
+ }
+
+ case COMPLETED -> targetStore.registerSessionListener(listener);
+ }
+ }
+
+ @Override
+ public MetadataCache getMetadataCache(Class clazz, MetadataCacheConfig cacheConfig) {
+ var cache = new DualMetadataCache<>(this, clazz, null, null, null, cacheConfig);
+ caches.add(cache);
+ return cache;
+ }
+
+ @Override
+ public MetadataCache getMetadataCache(TypeReference typeRef, MetadataCacheConfig cacheConfig) {
+ var cache = new DualMetadataCache<>(this, null, typeRef, null, null, cacheConfig);
+ caches.add(cache);
+ return cache;
+ }
+
+ @Override
+ public MetadataCache getMetadataCache(String cacheName, MetadataSerde serde,
+ MetadataCacheConfig cacheConfig) {
+ var cache = new DualMetadataCache<>(this, null, null, cacheName, serde, cacheConfig);
+ caches.add(cache);
+ return cache;
+ }
+
+ @Override
+ public Optional getMetadataEventSynchronizer() {
+ return sourceStore.getMetadataEventSynchronizer();
+ }
+
+ @Override
+ public void updateMetadataEventSynchronizer(MetadataEventSynchronizer synchronizer) {
+ sourceStore.updateMetadataEventSynchronizer(synchronizer);
+ }
+
+ @Override
+ public CompletableFuture handleMetadataEvent(MetadataEvent event) {
+ return sourceStore.handleMetadataEvent(event);
+ }
+
+ @Override
+ public void close() throws Exception {
+ log.info("Closing DualMetadataStore");
+
+ // Close target store first (if exists)
+ if (targetStore != null) {
+ try {
+ targetStore.close();
+ log.info("Target store closed");
+ } catch (Exception e) {
+ log.error("Error closing target store", e);
+ }
+ }
+
+ // Close source store
+ try {
+ sourceStore.close();
+ log.info("Source store closed");
+ } catch (Exception e) {
+ log.error("Error closing source store", e);
+ }
+
+ executor.shutdownNow();
+ executor.awaitTermination(5, TimeUnit.SECONDS);
+ }
+}
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index f56d6c6941f1e..011508567e515 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -685,6 +685,11 @@ public MetadataStore create(String metadataURL, MetadataStoreConfig metadataStor
if (metadataURL.startsWith(ZKMetadataStore.ZK_SCHEME_IDENTIFIER)) {
metadataURL = metadataURL.substring(ZKMetadataStore.ZK_SCHEME_IDENTIFIER.length());
}
- return new ZKMetadataStore(metadataURL, metadataStoreConfig, enableSessionWatcher);
+
+ // Create the ZK metadata store
+ ZKMetadataStore zkStore = new ZKMetadataStore(metadataURL, metadataStoreConfig, enableSessionWatcher);
+
+ // Wrap with DualMetadataStore to enable migration capability
+ return new DualMetadataStore(zkStore, metadataStoreConfig);
}
}
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStoreProvider.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStoreProvider.java
index a4c52134a8a75..2b7dbfce72807 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStoreProvider.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStoreProvider.java
@@ -18,6 +18,10 @@
*/
package org.apache.pulsar.metadata.impl.oxia;
+import io.oxia.client.api.AsyncOxiaClient;
+import io.oxia.client.api.OxiaClientBuilder;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
import lombok.NonNull;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.metadata.api.MetadataStore;
@@ -29,7 +33,7 @@ public class OxiaMetadataStoreProvider implements MetadataStoreProvider {
// declare the specific namespace to avoid any changes in the future.
public static final String DefaultNamespace = "default";
- public static final String OXIA_SCHEME = "oxia";
+ public static final String OXIA_SCHEME = "oxia";
public static final String OXIA_SCHEME_IDENTIFIER = OXIA_SCHEME + ":";
@Override
@@ -72,4 +76,16 @@ Pair getServiceAddressAndNamespace(String metadataURL)
}
return Pair.of(split[0], split[1]);
}
-}
+
+ public AsyncOxiaClient getOxiaClient(String metadataURL) throws MetadataStoreException {
+ var pair = getServiceAddressAndNamespace(metadataURL);
+ try {
+ return OxiaClientBuilder.create(pair.getLeft())
+ .namespace(pair.getRight())
+ .batchLinger(Duration.of(100, ChronoUnit.MILLIS))
+ .asyncClient().get();
+ } catch (Exception e) {
+ throw new MetadataStoreException(e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/tableview/impl/MetadataStoreTableViewImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/tableview/impl/MetadataStoreTableViewImpl.java
index c06fbe3cc07ae..37a2e77084ffe 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/tableview/impl/MetadataStoreTableViewImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/tableview/impl/MetadataStoreTableViewImpl.java
@@ -52,8 +52,8 @@
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreTableView;
import org.apache.pulsar.metadata.api.NotificationType;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
-import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
import org.jspecify.annotations.Nullable;
@Slf4j
@@ -161,8 +161,8 @@ public MetadataStoreTableViewImpl(@NonNull Class clazz,
.asyncReloadConsumer(this::consumeAsyncReload)
.build());
store.registerListener(this::handleNotification);
- if (store instanceof AbstractMetadataStore abstractMetadataStore) {
- abstractMetadataStore.registerSessionListener(this::handleSessionEvent);
+ if (store instanceof MetadataStoreExtended metadataStoreExtended) {
+ metadataStoreExtended.registerSessionListener(this::handleSessionEvent);
} else {
// Since ServiceUnitStateMetadataStoreTableViewImpl has checked the configuration that named
// "zookeeperSessionExpiredPolicy", skip to print the duplicated log here.
diff --git a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicBookieCheckTest.java b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicBookieCheckTest.java
index 9e8c5a54a5d91..ae7de3a47e913 100644
--- a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicBookieCheckTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicBookieCheckTest.java
@@ -99,7 +99,7 @@ public void testPeriodicBookieCheckInterval() throws Exception {
getBookie(0),
getBookie(1))));
long underReplicatedLedger = -1;
- for (int i = 0; i < 10; i++) {
+ for (int i = 0; i < 20; i++) {
underReplicatedLedger = underReplicationManager.pollLedgerToRereplicate();
if (underReplicatedLedger != -1) {
break;
diff --git a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
index 1d741c551ddb9..146e4f0dd58ca 100644
--- a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
@@ -30,6 +30,7 @@
import org.apache.bookkeeper.util.TestUtils;
import org.apache.pulsar.metadata.bookkeeper.PulsarLedgerManagerFactory;
import org.apache.pulsar.metadata.bookkeeper.PulsarMetadataClientDriver;
+import org.apache.pulsar.metadata.impl.DualMetadataStore;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.zookeeper.ZooKeeper;
import org.awaitility.Awaitility;
@@ -242,7 +243,9 @@ private ZooKeeper getZk(PulsarMetadataClientDriver pulsarMetadataClientDriver) t
(PulsarLedgerManagerFactory) pulsarMetadataClientDriver.getLedgerManagerFactory();
Field field = pulsarLedgerManagerFactory.getClass().getDeclaredField("store");
field.setAccessible(true);
- ZKMetadataStore zkMetadataStore = (ZKMetadataStore) field.get(pulsarLedgerManagerFactory);
+
+ DualMetadataStore store = (DualMetadataStore) field.get(pulsarLedgerManagerFactory);
+ ZKMetadataStore zkMetadataStore = (ZKMetadataStore) store.getSourceStore();
return zkMetadataStore.getZkClient();
}
}
diff --git a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
index d78bc4c3a4622..008817195cc63 100644
--- a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
+++ b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
@@ -87,6 +87,7 @@
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.metadata.bookkeeper.PulsarLedgerManagerFactory;
import org.apache.pulsar.metadata.bookkeeper.PulsarMetadataClientDriver;
+import org.apache.pulsar.metadata.impl.DualMetadataStore;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
@@ -1242,7 +1243,8 @@ private ZooKeeper getZk(PulsarMetadataClientDriver pulsarMetadataClientDriver) t
(PulsarLedgerManagerFactory) pulsarMetadataClientDriver.getLedgerManagerFactory();
Field field = pulsarLedgerManagerFactory.getClass().getDeclaredField("store");
field.setAccessible(true);
- ZKMetadataStore zkMetadataStore = (ZKMetadataStore) field.get(pulsarLedgerManagerFactory);
+ DualMetadataStore store = (DualMetadataStore) field.get(pulsarLedgerManagerFactory);
+ ZKMetadataStore zkMetadataStore = (ZKMetadataStore) store.getSourceStore();
return zkMetadataStore.getZkClient();
}
}
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/DualMetadataCacheTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/DualMetadataCacheTest.java
new file mode 100644
index 0000000000000..88890466b7844
--- /dev/null
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/DualMetadataCacheTest.java
@@ -0,0 +1,531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.util.EnumSet;
+import java.util.Optional;
+import java.util.UUID;
+import lombok.AllArgsConstructor;
+import lombok.Cleanup;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.migration.MigrationPhase;
+import org.apache.pulsar.common.migration.MigrationState;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.metadata.api.CacheGetResult;
+import org.apache.pulsar.metadata.api.MetadataCache;
+import org.apache.pulsar.metadata.api.MetadataCacheConfig;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.api.extended.CreateOption;
+import org.apache.pulsar.metadata.impl.DualMetadataStore;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class DualMetadataCacheTest extends BaseMetadataStoreTest {
+
+ @Data
+ @AllArgsConstructor
+ @NoArgsConstructor
+ static class TestObject {
+ String name;
+ int value;
+ }
+
+ @BeforeMethod(alwaysRun = true)
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ }
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ public void cleanup() throws Exception {
+ super.cleanup();
+ }
+
+ @Test
+ public void testCacheGetInNotStartedPhase() throws Exception {
+ String prefix = newKey();
+ @Cleanup
+ MetadataStore sourceStore = MetadataStoreFactory.create(zks.getConnectionString(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
+
+ @Cleanup
+ DualMetadataStore dualStore = new DualMetadataStore(sourceStore,
+ MetadataStoreConfig.builder().build());
+
+ MetadataCache cache = dualStore.getMetadataCache(TestObject.class);
+
+ // Create object via source store
+ String path = prefix + "/test-obj";
+ TestObject obj = new TestObject("test", 42);
+ cache.create(path, obj).join();
+
+ // Read via cache
+ Optional result = cache.get(path).join();
+ assertTrue(result.isPresent());
+ assertEquals(result.get().getName(), "test");
+ assertEquals(result.get().getValue(), 42);
+ }
+
+ @Test
+ public void testCacheGetWithStatsInNotStartedPhase() throws Exception {
+ String prefix = newKey();
+ @Cleanup
+ MetadataStore sourceStore = MetadataStoreFactory.create(zks.getConnectionString(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
+
+ @Cleanup
+ DualMetadataStore dualStore = new DualMetadataStore(sourceStore,
+ MetadataStoreConfig.builder().build());
+
+ MetadataCache cache = dualStore.getMetadataCache(TestObject.class);
+
+ // Create object
+ String path = prefix + "/test-obj";
+ TestObject obj = new TestObject("test", 42);
+ cache.create(path, obj).join();
+
+ // Read with stats
+ Optional> result = cache.getWithStats(path).join();
+ assertTrue(result.isPresent());
+ assertEquals(result.get().getValue().getName(), "test");
+ assertEquals(result.get().getValue().getValue(), 42);
+ assertNotNull(result.get().getStat());
+ }
+
+ @Test
+ public void testCacheGetIfCached() throws Exception {
+ String prefix = newKey();
+ @Cleanup
+ MetadataStore sourceStore = MetadataStoreFactory.create(zks.getConnectionString(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
+
+ @Cleanup
+ DualMetadataStore dualStore = new DualMetadataStore(sourceStore,
+ MetadataStoreConfig.builder().build());
+
+ MetadataCache cache = dualStore.getMetadataCache(TestObject.class);
+
+ String path = prefix + "/test-obj";
+
+ // Initially not cached
+ Optional cached = cache.getIfCached(path);
+ assertEquals(cached, Optional.empty());
+
+ // Create object
+ TestObject obj = new TestObject("test", 42);
+ cache.create(path, obj).join();
+
+ // Now should be cached
+ Optional cachedAfter = cache.getIfCached(path);
+ assertTrue(cachedAfter.isPresent());
+ assertEquals(cachedAfter.get().getName(), "test");
+ }
+
+ @Test
+ public void testCacheGetChildren() throws Exception {
+ String prefix = newKey();
+ @Cleanup
+ MetadataStore sourceStore = MetadataStoreFactory.create(zks.getConnectionString(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
+
+ @Cleanup
+ DualMetadataStore dualStore = new DualMetadataStore(sourceStore,
+ MetadataStoreConfig.builder().build());
+
+ MetadataCache cache = dualStore.getMetadataCache(TestObject.class);
+
+ // Create multiple objects
+ cache.create(prefix + "/child1", new TestObject("obj1", 1)).join();
+ cache.create(prefix + "/child2", new TestObject("obj2", 2)).join();
+ cache.create(prefix + "/child3", new TestObject("obj3", 3)).join();
+
+ // Get children
+ var children = cache.getChildren(prefix).join();
+ assertEquals(children.size(), 3);
+ assertTrue(children.contains("child1"));
+ assertTrue(children.contains("child2"));
+ assertTrue(children.contains("child3"));
+ }
+
+ @Test
+ public void testCacheExists() throws Exception {
+ String prefix = newKey();
+ @Cleanup
+ MetadataStore sourceStore = MetadataStoreFactory.create(zks.getConnectionString(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
+
+ @Cleanup
+ DualMetadataStore dualStore = new DualMetadataStore(sourceStore,
+ MetadataStoreConfig.builder().build());
+
+ MetadataCache cache = dualStore.getMetadataCache(TestObject.class);
+
+ String path = prefix + "/test-obj";
+
+ // Initially doesn't exist
+ assertFalse(cache.exists(path).join());
+
+ // Create object
+ cache.create(path, new TestObject("test", 42)).join();
+
+ // Now exists
+ assertTrue(cache.exists(path).join());
+ }
+
+ @Test
+ public void testCacheReadModifyUpdateOrCreate() throws Exception {
+ String prefix = newKey();
+ @Cleanup
+ MetadataStore sourceStore = MetadataStoreFactory.create(zks.getConnectionString(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
+
+ @Cleanup
+ DualMetadataStore dualStore = new DualMetadataStore(sourceStore,
+ MetadataStoreConfig.builder().build());
+
+ MetadataCache cache = dualStore.getMetadataCache(TestObject.class);
+
+ String path = prefix + "/test-obj";
+
+ // Create new object via readModifyUpdateOrCreate
+ TestObject result1 = cache.readModifyUpdateOrCreate(path, optObj -> {
+ assertFalse(optObj.isPresent());
+ return new TestObject("created", 100);
+ }).join();
+
+ assertEquals(result1.getName(), "created");
+ assertEquals(result1.getValue(), 100);
+
+ // Modify existing object
+ TestObject result2 = cache.readModifyUpdateOrCreate(path, optObj -> {
+ assertTrue(optObj.isPresent());
+ TestObject existing = optObj.get();
+ return new TestObject(existing.getName() + "-modified", existing.getValue() + 1);
+ }).join();
+
+ assertEquals(result2.getName(), "created-modified");
+ assertEquals(result2.getValue(), 101);
+ }
+
+ @Test
+ public void testCacheReadModifyUpdate() throws Exception {
+ String prefix = newKey();
+ @Cleanup
+ MetadataStore sourceStore = MetadataStoreFactory.create(zks.getConnectionString(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
+
+ @Cleanup
+ DualMetadataStore dualStore = new DualMetadataStore(sourceStore,
+ MetadataStoreConfig.builder().build());
+
+ MetadataCache cache = dualStore.getMetadataCache(TestObject.class);
+
+ String path = prefix + "/test-obj";
+
+ // Create initial object
+ cache.create(path, new TestObject("initial", 1)).join();
+
+ // Modify it
+ TestObject result = cache.readModifyUpdate(path, obj -> {
+ return new TestObject(obj.getName() + "-updated", obj.getValue() * 2);
+ }).join();
+
+ assertEquals(result.getName(), "initial-updated");
+ assertEquals(result.getValue(), 2);
+
+ // Verify persisted
+ Optional verified = cache.get(path).join();
+ assertTrue(verified.isPresent());
+ assertEquals(verified.get().getName(), "initial-updated");
+ assertEquals(verified.get().getValue(), 2);
+ }
+
+ @Test
+ public void testCachePutWithOptions() throws Exception {
+ String prefix = newKey();
+ @Cleanup
+ MetadataStore sourceStore = MetadataStoreFactory.create(zks.getConnectionString(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
+
+ @Cleanup
+ DualMetadataStore dualStore = new DualMetadataStore(sourceStore,
+ MetadataStoreConfig.builder().build());
+
+ MetadataCache cache = dualStore.getMetadataCache(TestObject.class);
+
+ String path = prefix + "/test-obj";
+ TestObject obj = new TestObject("test", 42);
+
+ // Put without options (creates or updates)
+ cache.put(path, obj, EnumSet.noneOf(CreateOption.class)).join();
+
+ // Verify
+ Optional result = cache.get(path).join();
+ assertTrue(result.isPresent());
+ assertEquals(result.get().getName(), "test");
+ }
+
+ @Test
+ public void testCacheDelete() throws Exception {
+ String prefix = newKey();
+ @Cleanup
+ MetadataStore sourceStore = MetadataStoreFactory.create(zks.getConnectionString(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
+
+ @Cleanup
+ DualMetadataStore dualStore = new DualMetadataStore(sourceStore,
+ MetadataStoreConfig.builder().build());
+
+ MetadataCache cache = dualStore.getMetadataCache(TestObject.class);
+
+ String path = prefix + "/test-obj";
+
+ // Create object
+ cache.create(path, new TestObject("test", 42)).join();
+ assertTrue(cache.exists(path).join());
+
+ // Delete
+ cache.delete(path).join();
+ assertFalse(cache.exists(path).join());
+ }
+
+ @Test
+ public void testCacheInvalidate() throws Exception {
+ String prefix = newKey();
+ @Cleanup
+ MetadataStore sourceStore = MetadataStoreFactory.create(zks.getConnectionString(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
+
+ @Cleanup
+ DualMetadataStore dualStore = new DualMetadataStore(sourceStore,
+ MetadataStoreConfig.builder().build());
+
+ MetadataCache cache = dualStore.getMetadataCache(TestObject.class);
+
+ String path = prefix + "/test-obj";
+
+ // Create and cache object
+ cache.create(path, new TestObject("test", 42)).join();
+ cache.get(path).join(); // Ensure it's cached
+
+ // Verify cached
+ Optional cached = cache.getIfCached(path);
+ assertTrue(cached.isPresent());
+
+ // Invalidate
+ cache.invalidate(path);
+
+ // Should not be in cache anymore (but still in store)
+ Optional cachedAfterInvalidate = cache.getIfCached(path);
+ assertEquals(cachedAfterInvalidate, Optional.empty());
+
+ // But should still exist in store
+ Optional fromStore = cache.get(path).join();
+ assertTrue(fromStore.isPresent());
+ }
+
+ @Test
+ public void testCacheInvalidateAll() throws Exception {
+ String prefix = newKey();
+ @Cleanup
+ MetadataStore sourceStore = MetadataStoreFactory.create(zks.getConnectionString(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
+
+ @Cleanup
+ DualMetadataStore dualStore = new DualMetadataStore(sourceStore,
+ MetadataStoreConfig.builder().build());
+
+ MetadataCache cache = dualStore.getMetadataCache(TestObject.class);
+
+ // Create multiple objects
+ cache.create(prefix + "/obj1", new TestObject("test1", 1)).join();
+ cache.create(prefix + "/obj2", new TestObject("test2", 2)).join();
+ cache.create(prefix + "/obj3", new TestObject("test3", 3)).join();
+
+ // Load into cache
+ cache.get(prefix + "/obj1").join();
+ cache.get(prefix + "/obj2").join();
+ cache.get(prefix + "/obj3").join();
+
+ // Invalidate all
+ cache.invalidateAll();
+
+ // None should be cached
+ assertEquals(cache.getIfCached(prefix + "/obj1"), Optional.empty());
+ assertEquals(cache.getIfCached(prefix + "/obj2"), Optional.empty());
+ assertEquals(cache.getIfCached(prefix + "/obj3"), Optional.empty());
+ }
+
+ @Test
+ public void testCacheRefresh() throws Exception {
+ String prefix = newKey();
+ @Cleanup
+ MetadataStore sourceStore = MetadataStoreFactory.create(zks.getConnectionString(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
+
+ @Cleanup
+ DualMetadataStore dualStore = new DualMetadataStore(sourceStore,
+ MetadataStoreConfig.builder().build());
+
+ MetadataCache cache = dualStore.getMetadataCache(TestObject.class);
+
+ String path = prefix + "/test-obj";
+
+ // Create object
+ cache.create(path, new TestObject("test", 42)).join();
+ cache.get(path).join(); // Ensure it's cached
+
+ // Modify directly in store (bypassing cache)
+ sourceStore.put(path,
+ ObjectMapperFactory.getMapper().writer().writeValueAsBytes(new TestObject("modified", 100)),
+ Optional.empty()).join();
+
+ // Refresh cache
+ cache.refresh(path);
+
+ // Wait a bit for refresh to complete
+ Thread.sleep(200);
+
+ // Should get updated value
+ Optional result = cache.get(path).join();
+ assertTrue(result.isPresent());
+ assertEquals(result.get().getName(), "modified");
+ assertEquals(result.get().getValue(), 100);
+ }
+
+ @Test
+ public void testCacheSwitchToTargetStore() throws Exception {
+ String prefix = newKey();
+ @Cleanup
+ MetadataStore sourceStore = MetadataStoreFactory.create(zks.getConnectionString(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
+
+ String targetUrl = "memory:" + UUID.randomUUID();
+ @Cleanup
+ MetadataStore targetStore = MetadataStoreFactory.create(targetUrl,
+ MetadataStoreConfig.builder().build());
+
+ @Cleanup
+ DualMetadataStore dualStore = new DualMetadataStore(sourceStore,
+ MetadataStoreConfig.builder().build());
+
+ MetadataCache cache = dualStore.getMetadataCache(TestObject.class);
+
+ // Create object in source
+ String path = prefix + "/test-obj";
+ cache.create(path, new TestObject("source-obj", 1)).join();
+
+ // Verify it exists in source
+ assertTrue(cache.exists(path).join());
+
+ // Trigger migration - first PREPARATION
+ MigrationState preparationState = new MigrationState(MigrationPhase.PREPARATION, targetUrl);
+ sourceStore.put(MigrationState.MIGRATION_FLAG_PATH,
+ ObjectMapperFactory.getMapper().writer().writeValueAsBytes(preparationState),
+ Optional.empty()).join();
+
+ // Then COMPLETED phase
+ MigrationState completedState = new MigrationState(MigrationPhase.COMPLETED, targetUrl);
+ sourceStore.put(MigrationState.MIGRATION_FLAG_PATH,
+ ObjectMapperFactory.getMapper().writer().writeValueAsBytes(completedState),
+ Optional.empty()).join();
+
+ // Wait for dual store to switch and caches to update
+ Thread.sleep(1000);
+
+ // Create object in target via cache (should use target store now)
+ String targetPath = prefix + "/target-obj";
+ cache.create(targetPath, new TestObject("target-obj", 2)).join();
+
+ // Verify it exists in target store directly
+ Optional targetResult = targetStore.get(targetPath).join()
+ .map(gr -> gr.getValue());
+ assertTrue(targetResult.isPresent());
+ TestObject targetObj = ObjectMapperFactory.getMapper().reader().readValue(targetResult.get(), TestObject.class);
+ assertEquals(targetObj.getName(), "target-obj");
+ assertEquals(targetObj.getValue(), 2);
+ }
+
+ @Test
+ public void testMultipleCachesWithDifferentTypes() throws Exception {
+ String prefix = newKey();
+ @Cleanup
+ MetadataStore sourceStore = MetadataStoreFactory.create(zks.getConnectionString(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
+
+ @Cleanup
+ DualMetadataStore dualStore = new DualMetadataStore(sourceStore,
+ MetadataStoreConfig.builder().build());
+
+ // Create caches for different types
+ MetadataCache objCache = dualStore.getMetadataCache(TestObject.class);
+ MetadataCache strCache = dualStore.getMetadataCache(String.class);
+
+ // Use both caches
+ objCache.create(prefix + "/obj", new TestObject("test", 42)).join();
+ strCache.create(prefix + "/str", "test-string").join();
+
+ // Verify both work
+ Optional objResult = objCache.get(prefix + "/obj").join();
+ assertTrue(objResult.isPresent());
+ assertEquals(objResult.get().getName(), "test");
+
+ Optional strResult = strCache.get(prefix + "/str").join();
+ assertTrue(strResult.isPresent());
+ assertEquals(strResult.get(), "test-string");
+ }
+
+ @Test
+ public void testCacheWithCustomConfig() throws Exception {
+ String prefix = newKey();
+ @Cleanup
+ MetadataStore sourceStore = MetadataStoreFactory.create(zks.getConnectionString(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
+
+ @Cleanup
+ DualMetadataStore dualStore = new DualMetadataStore(sourceStore,
+ MetadataStoreConfig.builder().build());
+
+ // Create cache with custom config
+ MetadataCacheConfig cacheConfig = MetadataCacheConfig.builder()
+ .refreshAfterWriteMillis(1000)
+ .build();
+
+ MetadataCache cache = dualStore.getMetadataCache(TestObject.class, cacheConfig);
+
+ // Use the cache
+ String path = prefix + "/test-obj";
+ cache.create(path, new TestObject("test", 42)).join();
+
+ Optional result = cache.get(path).join();
+ assertTrue(result.isPresent());
+ assertEquals(result.get().getName(), "test");
+ }
+}
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/DualMetadataStoreTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/DualMetadataStoreTest.java
new file mode 100644
index 0000000000000..65306b2155efa
--- /dev/null
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/DualMetadataStoreTest.java
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.migration.MigrationPhase;
+import org.apache.pulsar.common.migration.MigrationState;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.api.extended.SessionEvent;
+import org.apache.pulsar.metadata.impl.DualMetadataStore;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class DualMetadataStoreTest extends BaseMetadataStoreTest {
+
+
+ @BeforeMethod(alwaysRun = true)
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ }
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ public void cleanup() throws Exception {
+ super.cleanup();
+ }
+
+ @Test
+ public void testNotStartedPhaseRoutesToSource() throws Exception {
+ String prefix = newKey();
+ @Cleanup
+ MetadataStore sourceStore = MetadataStoreFactory.create(zks.getConnectionString(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
+
+ @Cleanup
+ DualMetadataStore dualStore = new DualMetadataStore(sourceStore,
+ MetadataStoreConfig.builder().build());
+
+ // Write should go to source store
+ String path = prefix + "/test-key";
+ byte[] data = "test-data".getBytes(StandardCharsets.UTF_8);
+ dualStore.put(path, data, Optional.empty()).join();
+
+ // Verify data in source store
+ Optional result = sourceStore.get(path).join();
+ assertTrue(result.isPresent());
+ assertEquals(new String(result.get().getValue(), StandardCharsets.UTF_8), "test-data");
+
+ // Read should come from source store
+ Optional readResult = dualStore.get(path).join();
+ assertTrue(readResult.isPresent());
+ assertEquals(new String(readResult.get().getValue(), StandardCharsets.UTF_8), "test-data");
+ }
+
+ @Test
+ public void testPreparationPhaseBlocksWrites() throws Exception {
+ String prefix = newKey();
+ @Cleanup
+ MetadataStore sourceStore = MetadataStoreFactory.create(zks.getConnectionString(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
+
+ @Cleanup
+ DualMetadataStore dualStore = new DualMetadataStore(sourceStore,
+ MetadataStoreConfig.builder().build());
+
+ // Set migration state to PREPARATION
+ MigrationState preparationState = new MigrationState(MigrationPhase.PREPARATION,
+ "memory:" + UUID.randomUUID());
+ sourceStore.put(MigrationState.MIGRATION_FLAG_PATH,
+ ObjectMapperFactory.getMapper().writer().writeValueAsBytes(preparationState),
+ Optional.empty()).join();
+
+ // Wait for dual store to detect migration
+ Thread.sleep(500);
+
+ // Writes should be blocked
+ String path = prefix + "/test-key";
+ byte[] data = "test-data".getBytes(StandardCharsets.UTF_8);
+ try {
+ dualStore.put(path, data, Optional.empty()).join();
+ fail("Should have thrown IllegalStateException");
+ } catch (CompletionException e) {
+ assertTrue(e.getCause() instanceof IllegalStateException);
+ assertTrue(e.getCause().getMessage().contains("Write operations not allowed during migrations"));
+ }
+
+ // Reads should still work
+ Optional result = dualStore.get(path).join();
+ assertFalse(result.isPresent());
+ }
+
+ @Test
+ public void testCopyingPhaseBlocksWrites() throws Exception {
+ String prefix = newKey();
+ @Cleanup
+ MetadataStore sourceStore = MetadataStoreFactory.create(zks.getConnectionString(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
+
+ @Cleanup
+ DualMetadataStore dualStore = new DualMetadataStore(sourceStore,
+ MetadataStoreConfig.builder().build());
+
+ // Set migration state to COPYING
+ MigrationState copyingState = new MigrationState(MigrationPhase.COPYING,
+ "memory:" + UUID.randomUUID());
+ sourceStore.put(MigrationState.MIGRATION_FLAG_PATH,
+ ObjectMapperFactory.getMapper().writer().writeValueAsBytes(copyingState),
+ Optional.empty()).join();
+
+ // Wait for dual store to detect migration
+ Thread.sleep(500);
+
+ // Writes should be blocked
+ String path = prefix + "/test-key";
+ byte[] data = "test-data".getBytes(StandardCharsets.UTF_8);
+ try {
+ dualStore.put(path, data, Optional.empty()).join();
+ fail("Should have thrown IllegalStateException");
+ } catch (CompletionException e) {
+ assertTrue(e.getCause() instanceof IllegalStateException);
+ assertTrue(e.getCause().getMessage().contains("Write operations not allowed during migration"));
+ }
+ }
+
+ @Test
+ public void testCompletedPhaseRoutesToTarget() throws Exception {
+ String prefix = newKey();
+
+ @Cleanup
+ MetadataStore store =
+ MetadataStoreFactory.create(zks.getConnectionString(), MetadataStoreConfig.builder().build());
+
+ String oxiaService = "oxia://" + getOxiaServerConnectString();
+
+ @Cleanup
+ MetadataStore targetStore = MetadataStoreFactory.create(oxiaService,
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
+
+
+ // Set migration state to PREPARATION
+ MigrationState preparationState = new MigrationState(MigrationPhase.PREPARATION, oxiaService);
+ store.put(MigrationState.MIGRATION_FLAG_PATH,
+ ObjectMapperFactory.getMapper().writer().writeValueAsBytes(preparationState),
+ Optional.empty()).join();
+
+ // Set migration state to COMPLETED
+ MigrationState completedState = new MigrationState(MigrationPhase.COMPLETED, oxiaService);
+ store.put(MigrationState.MIGRATION_FLAG_PATH,
+ ObjectMapperFactory.getMapper().writer().writeValueAsBytes(completedState),
+ Optional.empty()).join();
+
+ // Wait for dual store to detect migration and initialize target
+ Thread.sleep(1000);
+
+ // Write should go to target store
+ String path = prefix + "/test-key";
+ byte[] data = "test-data".getBytes(StandardCharsets.UTF_8);
+ store.put(path, data, Optional.empty()).join();
+
+ // Verify data in target store
+ Optional targetResult = targetStore.get(path).join();
+ assertTrue(targetResult.isPresent());
+ assertEquals(new String(targetResult.get().getValue(), StandardCharsets.UTF_8), "test-data");
+
+ // Read should come from target store
+ Optional readResult = store.get(path).join();
+ assertTrue(readResult.isPresent());
+ assertEquals(new String(readResult.get().getValue(), StandardCharsets.UTF_8), "test-data");
+ }
+
+ @Test
+ public void testFailedPhaseRoutesToSource() throws Exception {
+ String prefix = newKey();
+ @Cleanup
+ MetadataStore sourceStore = MetadataStoreFactory.create(zks.getConnectionString(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
+
+ @Cleanup
+ DualMetadataStore dualStore = new DualMetadataStore(sourceStore,
+ MetadataStoreConfig.builder().build());
+
+ // Set migration state to FAILED
+ MigrationState failedState = new MigrationState(MigrationPhase.FAILED,
+ "memory:" + UUID.randomUUID());
+ sourceStore.put(MigrationState.MIGRATION_FLAG_PATH,
+ ObjectMapperFactory.getMapper().writer().writeValueAsBytes(failedState),
+ Optional.empty()).join();
+
+ // Wait for dual store to detect migration
+ Thread.sleep(500);
+
+ // Write should go to source store (migration failed)
+ String path = prefix + "/test-key";
+ byte[] data = "test-data".getBytes(StandardCharsets.UTF_8);
+ dualStore.put(path, data, Optional.empty()).join();
+
+ // Verify data in source store
+ Optional result = sourceStore.get(path).join();
+ assertTrue(result.isPresent());
+ assertEquals(new String(result.get().getValue(), StandardCharsets.UTF_8), "test-data");
+ }
+
+ @Test
+ public void testSessionLostEventDuringPreparation() throws Exception {
+ @Cleanup
+ MetadataStore sourceStore = MetadataStoreFactory.create(zks.getConnectionString(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
+
+ @Cleanup
+ DualMetadataStore dualStore = new DualMetadataStore(sourceStore,
+ MetadataStoreConfig.builder().build());
+
+ CountDownLatch sessionLostLatch = new CountDownLatch(1);
+ AtomicReference receivedEvent = new AtomicReference<>();
+
+ dualStore.registerSessionListener(event -> {
+ receivedEvent.set(event);
+ if (event == SessionEvent.SessionLost) {
+ sessionLostLatch.countDown();
+ }
+ });
+
+ // Set migration state to PREPARATION to trigger SessionLost
+ MigrationState preparationState = new MigrationState(MigrationPhase.PREPARATION,
+ "memory:" + UUID.randomUUID());
+ sourceStore.put(MigrationState.MIGRATION_FLAG_PATH,
+ ObjectMapperFactory.getMapper().writer().writeValueAsBytes(preparationState),
+ Optional.empty()).join();
+
+ // Wait for SessionLost event
+ assertTrue(sessionLostLatch.await(5, TimeUnit.SECONDS));
+ assertEquals(receivedEvent.get(), SessionEvent.SessionLost);
+ }
+
+ @Test
+ public void testSessionReestablishedEventOnCompletion() throws Exception {
+ @Cleanup
+ MetadataStore sourceStore =
+ new ZKMetadataStore(zks.getConnectionString(), MetadataStoreConfig.builder().build(), true);
+
+ String targetUrl = "memory:" + UUID.randomUUID();
+
+ @Cleanup
+ DualMetadataStore dualStore = new DualMetadataStore(sourceStore,
+ MetadataStoreConfig.builder().build());
+
+ CountDownLatch sessionReestablishedLatch = new CountDownLatch(1);
+ List receivedEvents = new ArrayList<>();
+
+ dualStore.registerSessionListener(event -> {
+ receivedEvents.add(event);
+ if (event == SessionEvent.SessionReestablished) {
+ sessionReestablishedLatch.countDown();
+ }
+ });
+
+ // First trigger PREPARATION (SessionLost)
+ MigrationState preparationState = new MigrationState(MigrationPhase.PREPARATION, targetUrl);
+ sourceStore.put(MigrationState.MIGRATION_FLAG_PATH,
+ ObjectMapperFactory.getMapper().writer().writeValueAsBytes(preparationState),
+ Optional.empty()).join();
+
+ Thread.sleep(500);
+
+ // Then trigger COMPLETED (SessionReestablished)
+ MigrationState completedState = new MigrationState(MigrationPhase.COMPLETED, targetUrl);
+ sourceStore.put(MigrationState.MIGRATION_FLAG_PATH,
+ ObjectMapperFactory.getMapper().writer().writeValueAsBytes(completedState),
+ Optional.empty()).join();
+
+ // Wait for SessionReestablished event
+ assertTrue(sessionReestablishedLatch.await(5, TimeUnit.SECONDS));
+ assertTrue(receivedEvents.contains(SessionEvent.SessionLost));
+ assertTrue(receivedEvents.contains(SessionEvent.SessionReestablished));
+ }
+
+ @Test
+ public void testSessionReestablishedEventOnFailure() throws Exception {
+ MetadataStore zkStore =
+ new ZKMetadataStore(zks.getConnectionString(), MetadataStoreConfig.builder().build(), true);
+
+ @Cleanup
+ MetadataStoreExtended dualStore = new DualMetadataStore(zkStore, MetadataStoreConfig.builder().build());
+
+ CountDownLatch sessionReestablishedLatch = new CountDownLatch(1);
+ List receivedEvents = new ArrayList<>();
+
+ dualStore.registerSessionListener(event -> {
+ receivedEvents.add(event);
+ if (event == SessionEvent.SessionReestablished) {
+ sessionReestablishedLatch.countDown();
+ }
+ });
+
+ // First trigger PREPARATION (SessionLost)
+ MigrationState preparationState = new MigrationState(MigrationPhase.PREPARATION,
+ "memory:" + UUID.randomUUID());
+ zkStore.put(MigrationState.MIGRATION_FLAG_PATH,
+ ObjectMapperFactory.getMapper().writer().writeValueAsBytes(preparationState),
+ Optional.empty()).join();
+
+ Thread.sleep(500);
+
+ // Then trigger FAILED (SessionReestablished)
+ MigrationState failedState = new MigrationState(MigrationPhase.FAILED,
+ "memory:" + UUID.randomUUID());
+ zkStore.put(MigrationState.MIGRATION_FLAG_PATH,
+ ObjectMapperFactory.getMapper().writer().writeValueAsBytes(failedState),
+ Optional.empty()).join();
+
+ // Wait for SessionReestablished event
+ assertTrue(sessionReestablishedLatch.await(5, TimeUnit.SECONDS));
+ assertTrue(receivedEvents.contains(SessionEvent.SessionLost));
+ assertTrue(receivedEvents.contains(SessionEvent.SessionReestablished));
+ }
+
+ @Test
+ public void testParticipantRegistration() throws Exception {
+ @Cleanup
+ MetadataStore sourceStore = MetadataStoreFactory.create(zks.getConnectionString(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
+
+ // Verify participant registration node exists
+ List participants = sourceStore.getChildren(MigrationState.PARTICIPANTS_PATH).join();
+ log.info("participants: {}", participants);
+ assertEquals(participants.size(), 1);
+ assertTrue(participants.get(0).startsWith("id-"));
+ }
+
+ @Test
+ public void testDeleteOperationRouting() throws Exception {
+ String prefix = newKey();
+ @Cleanup
+ MetadataStore dualStore = MetadataStoreFactory.create(zks.getConnectionString(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
+ assertEquals(dualStore.getClass(), DualMetadataStore.class);
+
+ // Create a key in NOT_STARTED phase
+ String path = prefix + "/test-key";
+ byte[] data = "test-data".getBytes(StandardCharsets.UTF_8);
+ dualStore.put(path, data, Optional.empty()).join();
+
+ // Delete should work in NOT_STARTED phase
+ dualStore.delete(path, Optional.empty()).join();
+ assertFalse(dualStore.exists(path).join());
+ }
+
+ @Test
+ public void testParticipantRegistrationWithChroot() throws Exception {
+ // Test with chroot path to ensure participant registration works
+ String chrootPath = "/test-chroot-" + UUID.randomUUID().toString().substring(0, 8);
+ String zkConnectString = zks.getConnectionString() + chrootPath;
+
+ @Cleanup
+ MetadataStore sourceStore = MetadataStoreFactory.create(zkConnectString,
+ MetadataStoreConfig.builder().build());
+
+ // Verify participant registration node exists under chroot
+ List participants = sourceStore.getChildren(MigrationState.PARTICIPANTS_PATH).join();
+ log.info("Participants in chroot {}: {}", chrootPath, participants);
+ assertEquals(participants.size(), 1);
+ assertTrue(participants.get(0).startsWith("id-"));
+
+ // Verify the parent path was created
+ assertTrue(sourceStore.exists(MigrationState.PARTICIPANTS_PATH).join());
+ }
+
+ @Test
+ public void testExistsOperationRouting() throws Exception {
+ String prefix = newKey();
+ @Cleanup
+ MetadataStore sourceStore = new ZKMetadataStore(zks.getConnectionString(),
+ MetadataStoreConfig.builder().build(), false);
+
+ String targetUrl = "memory:" + UUID.randomUUID();
+ @Cleanup
+ MetadataStore targetStore = MetadataStoreFactory.create(targetUrl,
+ MetadataStoreConfig.builder().build());
+
+ @Cleanup
+ DualMetadataStore dualStore = new DualMetadataStore(sourceStore,
+ MetadataStoreConfig.builder().build());
+
+ String path = prefix + "/test-key";
+ byte[] data = "test-data".getBytes(StandardCharsets.UTF_8);
+
+ // Create in source
+ sourceStore.put(path, data, Optional.empty()).join();
+
+ // Exists should check source in NOT_STARTED phase
+ assertTrue(dualStore.exists(path).join());
+
+ // First trigger PREPARATION (SessionLost)
+ MigrationState preparationState = new MigrationState(MigrationPhase.PREPARATION, targetUrl);
+ sourceStore.put(MigrationState.MIGRATION_FLAG_PATH,
+ ObjectMapperFactory.getMapper().writer().writeValueAsBytes(preparationState),
+ Optional.empty()).join();
+
+ // Switch to COMPLETED phase
+ MigrationState completedState = new MigrationState(MigrationPhase.COMPLETED, targetUrl);
+ sourceStore.put(MigrationState.MIGRATION_FLAG_PATH,
+ ObjectMapperFactory.getMapper().writer().writeValueAsBytes(completedState),
+ Optional.empty()).join();
+
+ Thread.sleep(1000);
+
+ // Create in target
+ String targetPath = prefix + "/target-key";
+ targetStore.put(targetPath, data, Optional.empty()).join();
+
+ // Exists should check target in COMPLETED phase
+ assertTrue(dualStore.exists(targetPath).join());
+ }
+}
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MigrationCoordinatorTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MigrationCoordinatorTest.java
new file mode 100644
index 0000000000000..466a3b2aadb0b
--- /dev/null
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MigrationCoordinatorTest.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.nio.charset.StandardCharsets;
+import java.util.EnumSet;
+import java.util.Optional;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.migration.MigrationPhase;
+import org.apache.pulsar.common.migration.MigrationState;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.api.extended.CreateOption;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.coordination.impl.MigrationCoordinator;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class MigrationCoordinatorTest extends BaseMetadataStoreTest {
+
+ protected String getOxiaServerConnectString() {
+ return "oxia://" + super.getOxiaServerConnectString();
+ }
+
+ @BeforeMethod(alwaysRun = true)
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ }
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ public void cleanup() throws Exception {
+ super.cleanup();
+ }
+
+ @Test
+ public void testPersistentDataCopy() throws Exception {
+ String prefix = newKey();
+
+ @Cleanup
+ MetadataStoreExtended sourceStore =
+ (MetadataStoreExtended) MetadataStoreFactory.create(zks.getConnectionString(),
+ MetadataStoreConfig.builder().build());
+
+ String targetUrl = getOxiaServerConnectString();
+
+ @Cleanup
+ MetadataStore targetStore = MetadataStoreFactory.create(targetUrl, MetadataStoreConfig.builder().build());
+
+ // Create persistent nodes
+ String key1 = prefix + "/persistent/key1";
+ String key2 = prefix + "/persistent/key2";
+ String key3 = prefix + "/persistent/nested/key3";
+
+ sourceStore.put(key1, "value1".getBytes(StandardCharsets.UTF_8), Optional.empty()).join();
+ sourceStore.put(key2, "value2".getBytes(StandardCharsets.UTF_8), Optional.empty()).join();
+ sourceStore.put(key3, "value3".getBytes(StandardCharsets.UTF_8), Optional.empty()).join();
+
+ // Create ephemeral node (should NOT be copied)
+ String ephemeralKey = prefix + "/ephemeral/key";
+ sourceStore.put(ephemeralKey, "ephemeral-value".getBytes(StandardCharsets.UTF_8),
+ Optional.empty(), EnumSet.of(CreateOption.Ephemeral)).join();
+
+ // Start migration
+ MigrationCoordinator coordinator = new MigrationCoordinator(sourceStore, targetUrl);
+ coordinator.startMigration();
+
+ Optional result = sourceStore.get(MigrationState.MIGRATION_FLAG_PATH).join();
+ assertTrue(result.isPresent());
+ MigrationState state = ObjectMapperFactory.getMapper().reader()
+ .readValue(result.get().getValue(), MigrationState.class);
+ assertEquals(state.getPhase(), MigrationPhase.COMPLETED);
+
+ // Verify persistent nodes were copied
+ Optional target1 = targetStore.get(key1).join();
+ assertTrue(target1.isPresent());
+ assertEquals(new String(target1.get().getValue(), StandardCharsets.UTF_8), "value1");
+
+ Optional target2 = targetStore.get(key2).join();
+ assertTrue(target2.isPresent());
+ assertEquals(new String(target2.get().getValue(), StandardCharsets.UTF_8), "value2");
+
+ Optional target3 = targetStore.get(key3).join();
+ assertTrue(target3.isPresent());
+ assertEquals(new String(target3.get().getValue(), StandardCharsets.UTF_8), "value3");
+
+ // Verify ephemeral node is in the target store
+ Optional targetEphemeral = targetStore.get(ephemeralKey).join();
+ assertTrue(targetEphemeral.isPresent());
+ assertEquals(new String(targetEphemeral.get().getValue(), StandardCharsets.UTF_8), "ephemeral-value");
+ }
+
+ @Test
+ public void testVersionPreservation() throws Exception {
+ String prefix = newKey();
+
+ @Cleanup
+ MetadataStore sourceStore = MetadataStoreFactory.create(zks.getConnectionString(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
+
+ String targetUrl = getOxiaServerConnectString();
+
+ @Cleanup
+ MetadataStore targetStore = MetadataStoreFactory.create(targetUrl,
+ MetadataStoreConfig.builder().build());
+
+ // Create a node and update it multiple times to get a specific version
+ String key = prefix + "/versioned-key";
+ sourceStore.put(key, "v1".getBytes(StandardCharsets.UTF_8), Optional.empty()).join();
+ sourceStore.put(key, "v2".getBytes(StandardCharsets.UTF_8), Optional.empty()).join();
+ sourceStore.put(key, "v3".getBytes(StandardCharsets.UTF_8), Optional.empty()).join();
+
+ // Get the version from source
+ Optional sourceResult = sourceStore.get(key).join();
+ assertTrue(sourceResult.isPresent());
+ long sourceVersion = sourceResult.get().getStat().getVersion();
+
+ // Start migration
+ MigrationCoordinator coordinator = new MigrationCoordinator(sourceStore, targetUrl);
+ coordinator.startMigration();
+
+ Optional result = sourceStore.get(MigrationState.MIGRATION_FLAG_PATH).join();
+ assertTrue(result.isPresent());
+ MigrationState state = ObjectMapperFactory.getMapper().reader()
+ .readValue(result.get().getValue(), MigrationState.class);
+ assertEquals(state.getPhase(), MigrationPhase.COMPLETED);
+
+ // Verify version and modification count were preserved in target
+ Optional targetResult = targetStore.get(key).join();
+ assertTrue(targetResult.isPresent());
+ assertEquals(targetResult.get().getStat().getVersion(), sourceVersion);
+ assertEquals(new String(targetResult.get().getValue(), StandardCharsets.UTF_8), "v3");
+ }
+
+ @Test
+ public void testEmptyMetadataMigration() throws Exception {
+ @Cleanup
+ MetadataStore sourceStore = MetadataStoreFactory.create(zks.getConnectionString(),
+ MetadataStoreConfig.builder().build());
+
+ String targetUrl = getOxiaServerConnectString();
+
+ // Start migration with empty metadata
+ MigrationCoordinator coordinator = new MigrationCoordinator(sourceStore, targetUrl);
+ coordinator.startMigration();
+
+ Optional result = sourceStore.get(MigrationState.MIGRATION_FLAG_PATH).join();
+ assertTrue(result.isPresent());
+ MigrationState state = ObjectMapperFactory.getMapper().reader()
+ .readValue(result.get().getValue(), MigrationState.class);
+ assertEquals(state.getPhase(), MigrationPhase.COMPLETED);
+ }
+
+ @Test
+ public void testLargeDatasetMigration() throws Exception {
+ String prefix = newKey();
+
+ @Cleanup
+ MetadataStore sourceStore = MetadataStoreFactory.create(zks.getConnectionString(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
+
+ String targetUrl = getOxiaServerConnectString();
+
+ @Cleanup
+ MetadataStore targetStore = MetadataStoreFactory.create(targetUrl,
+ MetadataStoreConfig.builder().build());
+
+ // Create a larger dataset (100 nodes)
+ int nodeCount = 100;
+ for (int i = 0; i < nodeCount; i++) {
+ String key = prefix + "/data/node-" + i;
+ String value = "value-" + i;
+ sourceStore.put(key, value.getBytes(StandardCharsets.UTF_8), Optional.empty()).join();
+ }
+
+ long startTime = System.currentTimeMillis();
+
+ // Start migration
+ MigrationCoordinator coordinator = new MigrationCoordinator(sourceStore, targetUrl);
+ coordinator.startMigration();
+
+ Optional result = sourceStore.get(MigrationState.MIGRATION_FLAG_PATH).join();
+ assertTrue(result.isPresent());
+ MigrationState state = ObjectMapperFactory.getMapper().reader()
+ .readValue(result.get().getValue(), MigrationState.class);
+ assertEquals(state.getPhase(), MigrationPhase.COMPLETED);
+
+ long duration = System.currentTimeMillis() - startTime;
+ log.info("Migration of {} nodes completed in {} ms", nodeCount, duration);
+
+ // Verify all nodes were copied
+ for (int i = 0; i < nodeCount; i++) {
+ String key = prefix + "/data/node-" + i;
+ Optional targetResult = targetStore.get(key).join();
+ assertTrue(targetResult.isPresent(), "Node " + key + " should exist in target");
+ assertEquals(new String(targetResult.get().getValue(), StandardCharsets.UTF_8),
+ "value-" + i);
+ }
+ }
+
+ @Test
+ public void testNestedPathMigration() throws Exception {
+ String prefix = newKey();
+
+ @Cleanup
+ MetadataStore sourceStore = MetadataStoreFactory.create(zks.getConnectionString(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
+
+ String targetUrl = getOxiaServerConnectString();
+
+ @Cleanup
+ MetadataStore targetStore = MetadataStoreFactory.create(targetUrl,
+ MetadataStoreConfig.builder().build());
+
+ // Create nested paths
+ sourceStore.put(prefix + "/level1/key1", "value1".getBytes(StandardCharsets.UTF_8),
+ Optional.empty()).join();
+ sourceStore.put(prefix + "/level1/level2/key2", "value2".getBytes(StandardCharsets.UTF_8),
+ Optional.empty()).join();
+ sourceStore.put(prefix + "/level1/level2/level3/key3",
+ "value3".getBytes(StandardCharsets.UTF_8), Optional.empty()).join();
+
+ // Start migration
+ MigrationCoordinator coordinator = new MigrationCoordinator(sourceStore, targetUrl);
+ coordinator.startMigration();
+
+ Optional result = sourceStore.get(MigrationState.MIGRATION_FLAG_PATH).join();
+ assertTrue(result.isPresent());
+ MigrationState state = ObjectMapperFactory.getMapper().reader()
+ .readValue(result.get().getValue(), MigrationState.class);
+ assertEquals(state.getPhase(), MigrationPhase.COMPLETED);
+
+ // Verify all nested paths were copied
+ Optional target1 = targetStore.get(prefix + "/level1/key1").join();
+ assertTrue(target1.isPresent());
+ assertEquals(new String(target1.get().getValue(), StandardCharsets.UTF_8), "value1");
+
+ Optional target2 = targetStore.get(prefix + "/level1/level2/key2").join();
+ assertTrue(target2.isPresent());
+ assertEquals(new String(target2.get().getValue(), StandardCharsets.UTF_8), "value2");
+
+ Optional target3 = targetStore.get(prefix + "/level1/level2/level3/key3").join();
+ assertTrue(target3.isPresent());
+ assertEquals(new String(target3.get().getValue(), StandardCharsets.UTF_8), "value3");
+ }
+
+ @Test
+ public void testMigrationStateStructure() throws Exception {
+ @Cleanup
+ MetadataStore sourceStore = MetadataStoreFactory.create(zks.getConnectionString(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
+
+ String targetUrl = getOxiaServerConnectString();
+
+ // Start migration
+ MigrationCoordinator coordinator = new MigrationCoordinator(sourceStore, targetUrl);
+ coordinator.startMigration();
+
+ // Verify migration state structure
+ Optional result = sourceStore.get(MigrationState.MIGRATION_FLAG_PATH).join();
+ assertTrue(result.isPresent());
+
+ MigrationState state = ObjectMapperFactory.getMapper().reader()
+ .readValue(result.get().getValue(), MigrationState.class);
+
+ assertNotNull(state.getPhase());
+ assertNotNull(state.getTargetUrl());
+ assertEquals(state.getTargetUrl(), targetUrl);
+
+ // Phase should be PREPARATION or COPYING or COMPLETED
+ assertTrue(state.getPhase() == MigrationPhase.PREPARATION
+ || state.getPhase() == MigrationPhase.COPYING
+ || state.getPhase() == MigrationPhase.COMPLETED);
+ }
+}