diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSnapshot.java b/modules/core/src/main/java/org/apache/ignite/IgniteSnapshot.java index 3623c1f8e66d1..0d46cbdc48821 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSnapshot.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSnapshot.java @@ -17,7 +17,9 @@ package org.apache.ignite; +import java.util.Collection; import org.apache.ignite.lang.IgniteFuture; +import org.jetbrains.annotations.Nullable; /** * This interface provides functionality for creating cluster-wide cache data snapshots. @@ -48,4 +50,16 @@ public interface IgniteSnapshot { * @return Future which will be completed when cancel operation finished. */ public IgniteFuture cancelSnapshot(String name); + + /** + * Restore cache group(s) from the snapshot. + *

+ * NOTE: Cache groups to be restored from the snapshot must not present in the cluster, if they present, + * they must be destroyed by the user (eg with {@link IgniteCache#destroy()}) before starting this operation. + * + * @param name Snapshot name. + * @param cacheGroupNames Cache groups to be restored or {@code null} to restore all cache groups from the snapshot. + * @return Future which will be completed when restore operation finished. + */ + public IgniteFuture restoreSnapshot(String name, @Nullable Collection cacheGroupNames); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java index fa487a0554dfa..4f8fdba44cbd1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java @@ -139,7 +139,10 @@ public enum IgniteFeatures { CACHE_GROUP_KEY_CHANGE(47), /** Collecting performance statistics. */ - PERFORMANCE_STATISTICS(48); + PERFORMANCE_STATISTICS(48), + + /** Restore cache group from the snapshot. */ + SNAPSHOT_RESTORE_CACHE_GROUP(49); /** * Unique feature identifier. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java index ffe51c8796f94..236051a974ee5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -48,6 +48,7 @@ import org.apache.ignite.internal.IgniteFeatures; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteNodeAttributes; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi; import org.apache.ignite.internal.managers.encryption.GridEncryptionManager; @@ -55,6 +56,7 @@ import org.apache.ignite.internal.managers.systemview.walker.CacheViewWalker; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteClusterReadOnlyException; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage; import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState; @@ -582,6 +584,28 @@ public boolean onCacheChangeRequested(DynamicCacheChangeBatch batch, AffinityTop DiscoveryDataClusterState state = ctx.state().clusterState(); if (state.active() && !state.transition()) { + Set restartIds = new HashSet<>(F.viewReadOnly( + batch.requests(), DynamicCacheChangeRequest::restartId, req -> req.start() && req.restartId() != null)); + + assert restartIds.size() <= 1 : batch.requests(); + + Collection nodes = ctx.cache().context().snapshotMgr().cacheStartRequiredAliveNodes(F.first(restartIds)); + + for (UUID nodeId : nodes) { + ClusterNode node = ctx.discovery().node(nodeId); + + if (node != null && CU.baselineNode(node, state) && ctx.discovery().alive(node)) + continue; + + ClusterTopologyCheckedException err = + new ClusterTopologyCheckedException("Required node has left the cluster [nodeId=" + nodeId + ']'); + + for (DynamicCacheChangeRequest req : batch.requests()) + ctx.cache().completeCacheStartFuture(req, false, err); + + return false; + } + ExchangeActions exchangeActions = new ExchangeActions(); CacheChangeProcessResult res = processCacheChangeRequests(exchangeActions, @@ -593,6 +617,9 @@ public boolean onCacheChangeRequested(DynamicCacheChangeBatch batch, AffinityTop assert !exchangeActions.empty() : exchangeActions; batch.exchangeActions(exchangeActions); + + if (!nodes.isEmpty()) + exchangeActions.cacheStartRequiredAliveNodes(nodes); } return res.needExchange; @@ -1007,6 +1034,16 @@ else if (encMgr.masterKeyDigest() != null && } } + if (err == null && req.restartId() == null) { + IgniteSnapshotManager snapshotMgr = ctx.cache().context().snapshotMgr(); + + if (snapshotMgr.isRestoring(cacheName, ccfg.getGroupName())) { + err = new IgniteCheckedException("Cache start failed. A cache or group with the same name is " + + "currently being restored from a snapshot [cache=" + cacheName + + (ccfg.getGroupName() == null ? "" : ", group=" + ccfg.getGroupName()) + ']'); + } + } + if (err != null) { if (persistedCfgs) res.errs.add(err); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java index cbe7df41b81ab..8736a88d53c66 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -43,6 +44,12 @@ public class ExchangeActions { /** */ private Map cachesToStart; + /** + * Server nodes on which a successful start of the cache(s) is required, if any of these nodes fails when starting + * the cache(s), the whole procedure is rolled back. + */ + private Collection cacheStartRequiredAliveNodes; + /** */ private Map cachesToStop; @@ -319,6 +326,23 @@ public boolean cacheGroupStarting(int grpId) { return false; } + /** + * @return Server nodes on which a successful start of the cache(s) is required, if any of these nodes fails when + * starting the cache(s), the whole procedure is rolled back. + */ + public Collection cacheStartRequiredAliveNodes() { + return cacheStartRequiredAliveNodes == null ? Collections.emptyList() : cacheStartRequiredAliveNodes; + } + + /** + * @param cacheStartRequiredAliveNodes Server nodes on which a successful start of the cache(s) is required, if any + * of these nodes fails when starting the cache(s), the whole procedure is + * rolled back. + */ + public void cacheStartRequiredAliveNodes(Collection cacheStartRequiredAliveNodes) { + this.cacheStartRequiredAliveNodes = new ArrayList<>(cacheStartRequiredAliveNodes); + } + /** * @param grpDesc Group descriptor. * @param destroy Destroy flag. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 507a35ad5cdb9..00b56b83cb632 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -4268,6 +4268,9 @@ else if (msg0 instanceof WalStateFinishMessage) if (res == null) res = validateRestartingCaches(node); + if (res == null) + res = validateRestoringCaches(node); + return res; } @@ -4294,6 +4297,20 @@ private IgniteNodeValidationResult validateRestartingCaches(ClusterNode node) { return null; } + /** + * @param node Joining node to validate. + * @return Node validation result if there was an issue with the joining node, {@code null} otherwise. + */ + private IgniteNodeValidationResult validateRestoringCaches(ClusterNode node) { + if (ctx.cache().context().snapshotMgr().isRestoring()) { + String msg = "Joining node during caches restore is not allowed [joiningNodeId=" + node.id() + ']'; + + return new IgniteNodeValidationResult(node.id(), msg); + } + + return null; + } + /** * @return Keep static cache configuration flag. If {@code true}, static cache configuration will override * configuration persisted on disk. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index f3495ac74516a..cb513f3b0a67c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -293,6 +293,8 @@ public GridCacheSharedContext( stateAwareMgrs.add(snpMgr); + stateAwareMgrs.add(snapshotMgr); + for (PluginProvider prv : kernalCtx.plugins().allProviders()) if (prv instanceof IgniteChangeGlobalStateSupport) stateAwareMgrs.add(((IgniteChangeGlobalStateSupport)prv)); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java index 4d613d9aafc0b..15619d398bb37 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java @@ -32,6 +32,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.function.BooleanSupplier; import javax.cache.CacheException; import org.apache.ignite.IgniteBinary; import org.apache.ignite.IgniteCheckedException; @@ -55,6 +56,7 @@ import org.apache.ignite.internal.IgniteFeatures; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.UnregisteredBinaryTypeException; @@ -992,6 +994,42 @@ public BinaryMetadata binaryMetadata(int typeId) throws BinaryObjectException { } } + /** {@inheritDoc} */ + @Override public void updateMetadata(File metadataDir, BooleanSupplier stopChecker) throws IgniteCheckedException { + if (!metadataDir.exists()) + return; + + try { + ConcurrentMap metaCache = new ConcurrentHashMap<>(); + + new BinaryMetadataFileStore(metaCache, ctx, log, metadataDir) + .restoreMetadata(); + + Collection metadata = F.viewReadOnly(metaCache.values(), BinaryMetadataHolder::metadata); + + // Check the compatibility of the binary metadata. + for (BinaryMetadata newMeta : metadata) { + BinaryMetadata oldMeta = binaryMetadata(newMeta.typeId()); + + if (oldMeta != null) + BinaryUtils.mergeMetadata(oldMeta, newMeta, null); + } + + // Update cluster metadata. + for (BinaryMetadata newMeta : metadata) { + if (stopChecker.getAsBoolean()) + return; + + if (Thread.interrupted()) + throw new IgniteInterruptedCheckedException("Thread has been interrupted."); + + addMeta(newMeta.typeId(), newMeta.wrap(binaryContext()), false); + } + } catch (BinaryObjectException e) { + throw new IgniteCheckedException(e); + } + } + /** {@inheritDoc} */ @Override public BinaryObject buildEnum(String typeName, int ord) throws BinaryObjectException { A.notNullOrEmpty(typeName, "enum type name"); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index e63d78aaf9301..6ceab2f83f35e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -299,7 +299,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte private boolean forceAffReassignment; /** Exception that was thrown during init phase on local node. */ - private Exception exchangeLocE; + private volatile Exception exchangeLocE; /** Exchange exceptions from all participating nodes. */ private final Map exchangeGlobalExceptions = new ConcurrentHashMap<>(); @@ -5126,6 +5126,12 @@ public void onNodeLeft(final ClusterNode node) { if (crd0 == null) finishState = new FinishState(null, initialVersion(), null); + + if (dynamicCacheStartExchange() && exchangeLocE == null && + exchActions.cacheStartRequiredAliveNodes().contains(node.id())) { + exchangeGlobalExceptions.put(cctx.localNodeId(), exchangeLocE = new ClusterTopologyCheckedException( + "Required node has left the cluster [nodeId=" + node.id() + ']')); + } } if (crd0 == null) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java index a7c682dc32013..24954a40888ab 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java @@ -974,29 +974,37 @@ public void readConfigurationFiles(List> ccfgs, Arrays.sort(files); for (File file : files) { - if (file.isDirectory()) { - if (file.getName().startsWith(CACHE_DIR_PREFIX)) { - File conf = new File(file, CACHE_DATA_FILENAME); + if (file.isDirectory()) + readCacheConfigurations(file, ccfgs); + } - if (conf.exists() && conf.length() > 0) { - StoredCacheData cacheData = readCacheData(conf); + return ccfgs; + } - String cacheName = cacheData.config().getName(); + /** + * @param dir Cache (group) directory. + * @param ccfgs Cache configurations. + * @throws IgniteCheckedException If failed. + */ + public void readCacheConfigurations(File dir, Map ccfgs) throws IgniteCheckedException { + if (dir.getName().startsWith(CACHE_DIR_PREFIX)) { + File conf = new File(dir, CACHE_DATA_FILENAME); - if (!ccfgs.containsKey(cacheName)) - ccfgs.put(cacheName, cacheData); - else { - U.warn(log, "Cache with name=" + cacheName + " is already registered, skipping config file " - + file.getName()); - } - } + if (conf.exists() && conf.length() > 0) { + StoredCacheData cacheData = readCacheData(conf); + + String cacheName = cacheData.config().getName(); + + if (!ccfgs.containsKey(cacheName)) + ccfgs.put(cacheName, cacheData); + else { + U.warn(log, "Cache with name=" + cacheName + " is already registered, skipping config file " + + dir.getName()); } - else if (file.getName().startsWith(CACHE_GRP_DIR_PREFIX)) - readCacheGroupCaches(file, ccfgs); } } - - return ccfgs; + else if (dir.getName().startsWith(CACHE_GRP_DIR_PREFIX)) + readCacheGroupCaches(dir, ccfgs); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java index d23b5843579f9..90304bb0758be 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java @@ -51,7 +51,6 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.function.BiFunction; @@ -105,6 +104,7 @@ import org.apache.ignite.internal.processors.cache.tree.DataRow; import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2; import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState; +import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport; import org.apache.ignite.internal.processors.marshaller.MappedName; import org.apache.ignite.internal.processors.metric.MetricRegistry; import org.apache.ignite.internal.processors.task.GridInternal; @@ -121,7 +121,6 @@ import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.lang.IgniteThrowableFunction; import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -129,6 +128,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.marshaller.MarshallerUtils; import org.apache.ignite.resources.IgniteInstanceResource; @@ -188,7 +188,7 @@ * */ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter - implements IgniteSnapshot, PartitionsExchangeAware, MetastorageLifecycleListener { + implements IgniteSnapshot, PartitionsExchangeAware, MetastorageLifecycleListener, IgniteChangeGlobalStateSupport { /** File with delta pages suffix. */ public static final String DELTA_SUFFIX = ".delta"; @@ -258,6 +258,9 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter /** Marshaller. */ private final Marshaller marsh; + /** Distributed process to restore cache group from the snapshot. */ + private final SnapshotRestoreProcess restoreCacheGrpProc; + /** Resolved persistent data storage settings. */ private volatile PdsFolderSettings pdsSettings; @@ -315,6 +318,8 @@ public IgniteSnapshotManager(GridKernalContext ctx) { this::processLocalSnapshotEndStageResult); marsh = MarshallerUtils.jdkMarshaller(ctx.igniteInstanceName()); + + restoreCacheGrpProc = new SnapshotRestoreProcess(ctx); } /** @@ -404,12 +409,14 @@ public static String partDeltaFileName(int partId) { for (SnapshotFutureTask sctx : locSnpTasks.values()) { if (sctx.sourceNodeId().equals(leftNodeId) || (snpReq != null && - snpReq.snpName.equals(sctx.snapshotName()) && - snpReq.bltNodes.contains(leftNodeId))) { + snpReq.snapshotName().equals(sctx.snapshotName()) && + snpReq.nodes().contains(leftNodeId))) { sctx.acceptException(new ClusterTopologyCheckedException("Snapshot operation interrupted. " + "One of baseline nodes left the cluster: " + leftNodeId)); } } + + restoreCacheGrpProc.onNodeLeft(leftNodeId); } } finally { @@ -423,6 +430,8 @@ public static String partDeltaFileName(int partId) { busyLock.block(); try { + restoreCacheGrpProc.interrupt(new NodeStoppingException("Node is stopping.")); + // Try stop all snapshot processing if not yet. for (SnapshotFutureTask sctx : locSnpTasks.values()) sctx.acceptException(new NodeStoppingException(SNP_NODE_STOPPING_ERR_MSG)); @@ -450,6 +459,16 @@ public static String partDeltaFileName(int partId) { } } + /** {@inheritDoc} */ + @Override public void onActivate(GridKernalContext kctx) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onDeActivate(GridKernalContext kctx) { + restoreCacheGrpProc.interrupt(new IgniteCheckedException("The cluster has been deactivated.")); + } + /** * @param snpDir Snapshot dir. * @param folderName Local node folder name (see {@link U#maskForFileName} with consistent id). @@ -547,7 +566,7 @@ private IgniteInternalFuture initLocalSnapshotStartSt "Another snapshot operation in progress [req=" + req + ", curr=" + clusterSnpReq + ']')); } - Set leftNodes = new HashSet<>(req.bltNodes); + Set leftNodes = new HashSet<>(req.nodes()); leftNodes.removeAll(F.viewReadOnly(cctx.discovery().serverNodes(AffinityTopologyVersion.NONE), F.node2id())); @@ -556,7 +575,9 @@ private IgniteInternalFuture initLocalSnapshotStartSt "prior to snapshot operation start: " + leftNodes)); } - Set leftGrps = new HashSet<>(req.grpIds); + List grpIds = new ArrayList<>(F.viewReadOnly(req.groups(), CU::cacheId)); + + Set leftGrps = new HashSet<>(grpIds); leftGrps.removeAll(cctx.cache().cacheGroupDescriptors().keySet()); if (!leftGrps.isEmpty()) { @@ -568,7 +589,7 @@ private IgniteInternalFuture initLocalSnapshotStartSt // Prepare collection of pairs group and appropriate cache partition to be snapshot. // Cache group context may be 'null' on some nodes e.g. a node filter is set. - for (Integer grpId : req.grpIds) { + for (Integer grpId : grpIds) { if (cctx.cache().cacheGroup(grpId) == null) continue; @@ -580,10 +601,10 @@ private IgniteInternalFuture initLocalSnapshotStartSt if (parts.isEmpty()) task0 = new GridFinishedFuture<>(Collections.emptySet()); else { - task0 = registerSnapshotTask(req.snpName, - req.srcNodeId, + task0 = registerSnapshotTask(req.snapshotName(), + req.operationalNodeId(), parts, - locSndrFactory.apply(req.snpName)); + locSndrFactory.apply(req.snapshotName())); clusterSnpReq = req; } @@ -593,11 +614,11 @@ private IgniteInternalFuture initLocalSnapshotStartSt throw F.wrap(fut.error()); try { - Set blts = req.bltNodes.stream() + Set blts = req.nodes().stream() .map(n -> cctx.discovery().node(n).consistentId().toString()) .collect(Collectors.toSet()); - File smf = new File(snapshotLocalDir(req.snpName), snapshotMetaFileName(cctx.localNode().consistentId().toString())); + File smf = new File(snapshotLocalDir(req.snapshotName()), snapshotMetaFileName(cctx.localNode().consistentId().toString())); if (smf.exists()) throw new GridClosureException(new IgniteException("Snapshot metafile must not exist: " + smf.getAbsolutePath())); @@ -606,12 +627,12 @@ private IgniteInternalFuture initLocalSnapshotStartSt try (OutputStream out = new BufferedOutputStream(new FileOutputStream(smf))) { U.marshal(marsh, - new SnapshotMetadata(req.rqId, - req.snpName, + new SnapshotMetadata(req.requestId(), + req.snapshotName(), cctx.localNode().consistentId().toString(), pdsSettings.folderName(), cctx.gridConfig().getDataStorageConfiguration().getPageSize(), - req.grpIds, + grpIds, blts, fut.result()), out); @@ -640,7 +661,7 @@ private void processLocalSnapshotStartStageResult(UUID id, Map e instanceof IgniteFutureCancelledCheckedException); - if (snpReq == null || !snpReq.rqId.equals(id)) { + if (snpReq == null || !snpReq.requestId().equals(id)) { synchronized (snpOpMux) { if (clusterSnpFut != null && clusterSnpFut.rqId.equals(id)) { if (cancelled) { @@ -659,18 +680,18 @@ private void processLocalSnapshotStartStageResult(UUID id, Map missed = new HashSet<>(snpReq.bltNodes); + Set missed = new HashSet<>(snpReq.nodes()); missed.removeAll(res.keySet()); missed.removeAll(err.keySet()); if (cancelled) { - snpReq.err = new IgniteFutureCancelledCheckedException("Execution of snapshot tasks " + - "has been cancelled by external process [err=" + err + ", missed=" + missed + ']'); + snpReq.error(new IgniteFutureCancelledCheckedException("Execution of snapshot tasks " + + "has been cancelled by external process [err=" + err + ", missed=" + missed + ']')); } else if (!F.isEmpty(err) || !missed.isEmpty()) { - snpReq.err = new IgniteCheckedException("Execution of local snapshot tasks fails or them haven't been executed " + + snpReq.error(new IgniteCheckedException("Execution of local snapshot tasks fails or them haven't been executed " + "due to some of nodes left the cluster. Uncompleted snapshot will be deleted " + - "[err=" + err + ", missed=" + missed + ']'); + "[err=" + err + ", missed=" + missed + ']')); } endSnpProc.start(UUID.randomUUID(), snpReq); @@ -686,8 +707,8 @@ private IgniteInternalFuture initLocalSnapshotEndStag return new GridFinishedFuture<>(new SnapshotOperationResponse()); try { - if (req.err != null) - deleteSnapshot(snapshotLocalDir(req.snpName), pdsSettings.folderName()); + if (req.error() != null) + deleteSnapshot(snapshotLocalDir(req.snapshotName()), pdsSettings.folderName()); removeLastMetaStorageKey(); } @@ -709,26 +730,26 @@ private void processLocalSnapshotEndStageResult(UUID id, Map endFail = new HashSet<>(snpReq.bltNodes); + Set endFail = new HashSet<>(snpReq.nodes()); endFail.removeAll(res.keySet()); clusterSnpReq = null; synchronized (snpOpMux) { if (clusterSnpFut != null) { - if (endFail.isEmpty() && snpReq.err == null) { + if (endFail.isEmpty() && snpReq.error() == null) { clusterSnpFut.onDone(); if (log.isInfoEnabled()) log.info(SNAPSHOT_FINISHED_MSG + snpReq); } - else if (snpReq.err == null) { + else if (snpReq.error() == null) { clusterSnpFut.onDone(new IgniteCheckedException("Snapshot creation has been finished with an error. " + "Local snapshot tasks may not finished completely or finalizing results fails " + "[fail=" + endFail + ", err=" + err + ']')); } else - clusterSnpFut.onDone(snpReq.err); + clusterSnpFut.onDone(snpReq.error()); clusterSnpFut = null; } @@ -747,6 +768,38 @@ public boolean isSnapshotCreating() { } } + /** + * Check if snapshot restore process is currently running. + * + * @return {@code True} if the snapshot restore operation is in progress. + */ + public boolean isRestoring() { + return restoreCacheGrpProc.isRestoring(); + } + + /** + * @param restoreId Restore process ID. + * @return Server nodes on which a successful start of the cache(s) is required, if any of these nodes fails when + * starting the cache(s), the whole procedure is rolled back. + */ + public Set cacheStartRequiredAliveNodes(@Nullable IgniteUuid restoreId) { + if (restoreId == null) + return Collections.emptySet(); + + return restoreCacheGrpProc.cacheStartRequiredAliveNodes(restoreId); + } + + /** + * Check if the cache or group with the specified name is currently being restored from the snapshot. + * + * @param cacheName Cache name. + * @param grpName Cache group name. + * @return {@code True} if the cache or group with the specified name is being restored. + */ + public boolean isRestoring(String cacheName, @Nullable String grpName) { + return restoreCacheGrpProc.isRestoring(cacheName, grpName); + } + /** * @return List of all known snapshots on the local node. */ @@ -829,34 +882,20 @@ public IgniteInternalFuture checkSnapshot(String name) { A.notNullOrEmpty(name, "Snapshot name cannot be null or empty."); A.ensure(U.alphanumericUnderscore(name), "Snapshot name must satisfy the following name pattern: a-zA-Z0-9_"); - GridKernalContext kctx0 = cctx.kernalContext(); GridFutureAdapter res = new GridFutureAdapter<>(); - kctx0.security().authorize(ADMIN_SNAPSHOT); - - Collection bltNodes = F.view(cctx.discovery().serverNodes(AffinityTopologyVersion.NONE), - (node) -> CU.baselineNode(node, kctx0.state().clusterState())); - - kctx0.task().setThreadContext(TC_SKIP_AUTH, true); - kctx0.task().setThreadContext(TC_SUBGRID, bltNodes); - - kctx0.task().execute(SnapshotMetadataCollectorTask.class, name) - .listen(f0 -> { + collectSnapshotMetadata(name).listen(f0 -> { if (f0.error() == null) { Map> metas = f0.result(); - kctx0.task().setThreadContext(TC_SKIP_AUTH, true); - kctx0.task().setThreadContext(TC_SUBGRID, new ArrayList<>(metas.keySet())); - - kctx0.task().execute(SnapshotPartitionsVerifyTask.class, metas) - .listen(f1 -> { - if (f1.error() == null) - res.onDone(f1.result()); - else if (f1.error() instanceof IgniteSnapshotVerifyException) - res.onDone(new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f1.error()).exceptions())); - else - res.onDone(f1.error()); - }); + runSnapshotVerification(metas).listen(f1 -> { + if (f1.error() == null) + res.onDone(f1.result()); + else if (f1.error() instanceof IgniteSnapshotVerifyException) + res.onDone(new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f1.error()).exceptions())); + else + res.onDone(f1.error()); + }); } else { if (f0.error() instanceof IgniteSnapshotVerifyException) @@ -869,6 +908,37 @@ else if (f1.error() instanceof IgniteSnapshotVerifyException) return res; } + /** + * @param name Snapshot name. + * @return Future with snapshot metadata obtained from nodes. + */ + IgniteInternalFuture>> collectSnapshotMetadata(String name) { + GridKernalContext kctx0 = cctx.kernalContext(); + + kctx0.security().authorize(ADMIN_SNAPSHOT); + + Collection bltNodes = F.view(cctx.discovery().serverNodes(AffinityTopologyVersion.NONE), + (node) -> CU.baselineNode(node, kctx0.state().clusterState())); + + kctx0.task().setThreadContext(TC_SKIP_AUTH, true); + kctx0.task().setThreadContext(TC_SUBGRID, bltNodes); + + return kctx0.task().execute(SnapshotMetadataCollectorTask.class, name); + } + + /** + * @param metas Nodes snapshot metadata. + * @return Future with the verification results. + */ + IgniteInternalFuture runSnapshotVerification(Map> metas) { + GridKernalContext kctx0 = cctx.kernalContext(); + + kctx0.task().setThreadContext(TC_SKIP_AUTH, true); + kctx0.task().setThreadContext(TC_SUBGRID, new ArrayList<>(metas.keySet())); + + return kctx0.task().execute(SnapshotPartitionsVerifyTask.class, metas); + } + /** * @param snpName Snapshot name. * @param folderName Directory name for cache group. @@ -1012,16 +1082,19 @@ public List readSnapshotMetadatas(String snpName) { if (localSnapshotNames().contains(name)) throw new IgniteException("Create snapshot request has been rejected. Snapshot with given name already exists on local node."); + if (isRestoring()) + throw new IgniteException("Snapshot operation has been rejected. Cache group restore operation is currently in progress."); + snpFut0 = new ClusterSnapshotFuture(UUID.randomUUID(), name); clusterSnpFut = snpFut0; lastSeenSnpFut = snpFut0; } - List grps = cctx.cache().persistentGroups().stream() + List grps = cctx.cache().persistentGroups().stream() .filter(g -> cctx.cache().cacheType(g.cacheOrGroupName()) == CacheType.USER) .filter(g -> !g.config().isEncryptionEnabled()) - .map(CacheGroupDescriptor::groupId) + .map(CacheGroupDescriptor::cacheOrGroupName) .collect(Collectors.toList()); List srvNodes = cctx.discovery().serverNodes(AffinityTopologyVersion.NONE); @@ -1039,7 +1112,8 @@ public List readSnapshotMetadatas(String snpName) { grps, new HashSet<>(F.viewReadOnly(srvNodes, F.node2id(), - (node) -> CU.baselineNode(node, clusterState))))); + (node) -> CU.baselineNode(node, clusterState))) + )); String msg = "Cluster-wide snapshot operation started [snpName=" + name + ", grps=" + grps + ']'; @@ -1061,6 +1135,15 @@ public List readSnapshotMetadatas(String snpName) { } } + /** {@inheritDoc} */ + @Override public IgniteFuture restoreSnapshot(String name, @Nullable Collection grpNames) { + A.notNullOrEmpty(name, "Snapshot name cannot be null or empty."); + A.ensure(U.alphanumericUnderscore(name), "Snapshot name must satisfy the following name pattern: a-zA-Z0-9_"); + A.ensure(grpNames == null || !grpNames.isEmpty(), "List of cache group names cannot be empty."); + + return restoreCacheGrpProc.start(name, grpNames); + } + /** {@inheritDoc} */ @Override public void onReadyForReadWrite(ReadWriteMetastorage metaStorage) throws IgniteCheckedException { synchronized (snpOpMux) { @@ -1075,6 +1158,8 @@ public List readSnapshotMetadatas(String snpName) { /** {@inheritDoc} */ @Override public void onReadyForRead(ReadOnlyMetastorage metaStorage) throws IgniteCheckedException { + restoreCacheGrpProc.cleanup(); + // Snapshot which has not been completed due to the local node crashed must be deleted. String snpName = (String)metaStorage.read(SNP_RUNNING_KEY); @@ -1111,13 +1196,13 @@ public static boolean isSnapshotOperation(DiscoveryEvent evt) { SnapshotOperationRequest snpReq = clusterSnpReq; - SnapshotFutureTask task = locSnpTasks.get(snpReq.snpName); + SnapshotFutureTask task = locSnpTasks.get(snpReq.snapshotName()); if (task == null) return; if (task.start()) { - cctx.database().forceCheckpoint(String.format("Start snapshot operation: %s", snpReq.snpName)); + cctx.database().forceCheckpoint(String.format("Start snapshot operation: %s", snpReq.snapshotName())); // Schedule task on a checkpoint and wait when it starts. try { @@ -1305,7 +1390,7 @@ private void recordSnapshotEvent(String snpName, String msg, int type) { /** * @return The executor used to run snapshot tasks. */ - Executor snapshotExecutorService() { + ExecutorService snapshotExecutorService() { assert snpRunner != null; return snpRunner; @@ -1318,6 +1403,13 @@ void ioFactory(FileIOFactory ioFactory) { this.ioFactory = ioFactory; } + /** + * @return Factory to create IO interface over a page stores. + */ + FileIOFactory ioFactory() { + return ioFactory; + } + /** * @return Relative configured path of persistence data storage directory for the local node. * Example: {@code snapshotWorkDir/db/IgniteNodeName0} @@ -1775,49 +1867,6 @@ public LocalSnapshotSender(String snpName) { } } - /** Snapshot start request for {@link DistributedProcess} initiate message. */ - private static class SnapshotOperationRequest implements Serializable { - /** Serial version uid. */ - private static final long serialVersionUID = 0L; - - /** Unique snapshot request id. */ - private final UUID rqId; - - /** Source node id which trigger request. */ - private final UUID srcNodeId; - - /** Snapshot name. */ - private final String snpName; - - /** The list of cache groups to include into snapshot. */ - @GridToStringInclude - private final List grpIds; - - /** The list of affected by snapshot operation baseline nodes. */ - @GridToStringInclude - private final Set bltNodes; - - /** Exception occurred during snapshot operation processing. */ - private volatile IgniteCheckedException err; - - /** - * @param snpName Snapshot name. - * @param grpIds Cache groups to include into snapshot. - */ - public SnapshotOperationRequest(UUID rqId, UUID srcNodeId, String snpName, List grpIds, Set bltNodes) { - this.rqId = rqId; - this.srcNodeId = srcNodeId; - this.snpName = snpName; - this.grpIds = grpIds; - this.bltNodes = bltNodes; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(SnapshotOperationRequest.class, this); - } - } - /** */ private static class SnapshotOperationResponse implements Serializable { /** Serial version uid. */ @@ -1858,18 +1907,18 @@ public SnapshotStartDiscoveryMessage( } /** */ - private static class ClusterSnapshotFuture extends GridFutureAdapter { + protected static class ClusterSnapshotFuture extends GridFutureAdapter { /** Unique snapshot request id. */ - private final UUID rqId; + final UUID rqId; /** Snapshot name. */ - private final String name; + final String name; /** Snapshot start time. */ - private final long startTime; + final long startTime; /** Snapshot finish time. */ - private volatile long endTime; + volatile long endTime; /** * Default constructor. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java new file mode 100644 index 0000000000000..177133fe1f7dc --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.snapshot; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Set; +import java.util.UUID; +import org.apache.ignite.internal.util.distributed.DistributedProcess; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; + +/** + * Snapshot operation start request for {@link DistributedProcess} initiate message. + */ +public class SnapshotOperationRequest implements Serializable { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** Request ID. */ + private final UUID reqId; + + /** Snapshot name. */ + private final String snpName; + + /** Baseline node IDs that must be alive to complete the operation. */ + @GridToStringInclude + private final Set nodes; + + /** List of cache group names. */ + @GridToStringInclude + private final Collection grps; + + /** Operational node ID. */ + private final UUID opNodeId; + + /** Exception occurred during snapshot operation processing. */ + private volatile Throwable err; + + /** + * @param reqId Request ID. + * @param opNodeId Operational node ID. + * @param snpName Snapshot name. + * @param grps List of cache group names. + * @param nodes Baseline node IDs that must be alive to complete the operation. + */ + public SnapshotOperationRequest( + UUID reqId, + UUID opNodeId, + String snpName, + @Nullable Collection grps, + Set nodes + ) { + this.reqId = reqId; + this.opNodeId = opNodeId; + this.snpName = snpName; + this.grps = grps; + this.nodes = nodes; + } + + /** + * @return Request ID. + */ + public UUID requestId() { + return reqId; + } + + /** + * @return Snapshot name. + */ + public String snapshotName() { + return snpName; + } + + /** + * @return List of cache group names. + */ + public @Nullable Collection groups() { + return grps; + } + + /** + * @return Baseline node IDs that must be alive to complete the operation. + */ + public Set nodes() { + return nodes; + } + + /** + * @return Operational node ID. + */ + public UUID operationalNodeId() { + return opNodeId; + } + + /** + * @return Exception occurred during snapshot operation processing. + */ + public Throwable error() { + return err; + } + + /** + * @param err Exception occurred during snapshot operation processing. + */ + public void error(Throwable err) { + this.err = err; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SnapshotOperationRequest.class, this); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java new file mode 100644 index 0000000000000..038561bef39ad --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java @@ -0,0 +1,931 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.snapshot; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BooleanSupplier; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteIllegalStateException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.cluster.ClusterState; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteFeatures; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.NodeStoppingException; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.StoredCacheData; +import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.ClusterSnapshotFuture; +import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2; +import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState; +import org.apache.ignite.internal.util.distributed.DistributedProcess; +import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl; +import org.apache.ignite.internal.util.future.IgniteFutureImpl; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.internal.IgniteFeatures.SNAPSHOT_RESTORE_CACHE_GROUP; +import static org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.binaryWorkDir; +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_GRP_DIR_PREFIX; +import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.databaseRelativePath; +import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE; +import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_ROLLBACK; +import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_START; + +/** + * Distributed process to restore cache group from the snapshot. + */ +public class SnapshotRestoreProcess { + /** Temporary cache directory prefix. */ + public static final String TMP_CACHE_DIR_PREFIX = "_tmp_snp_restore_"; + + /** Reject operation message. */ + private static final String OP_REJECT_MSG = "Cache group restore operation was rejected. "; + + /** Kernal context. */ + private final GridKernalContext ctx; + + /** Cache group restore prepare phase. */ + private final DistributedProcess> prepareRestoreProc; + + /** Cache group restore cache start phase. */ + private final DistributedProcess cacheStartProc; + + /** Cache group restore rollback phase. */ + private final DistributedProcess rollbackRestoreProc; + + /** Logger. */ + private final IgniteLogger log; + + /** Future to be completed when the cache restore process is complete (this future will be returned to the user). */ + private volatile ClusterSnapshotFuture fut; + + /** Snapshot restore operation context. */ + private volatile SnapshotRestoreContext opCtx; + + /** + * @param ctx Kernal context. + */ + public SnapshotRestoreProcess(GridKernalContext ctx) { + this.ctx = ctx; + + log = ctx.log(getClass()); + + prepareRestoreProc = new DistributedProcess<>( + ctx, RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE, this::prepare, this::finishPrepare); + + cacheStartProc = new DistributedProcess<>( + ctx, RESTORE_CACHE_GROUP_SNAPSHOT_START, this::cacheStart, this::finishCacheStart); + + rollbackRestoreProc = new DistributedProcess<>( + ctx, RESTORE_CACHE_GROUP_SNAPSHOT_ROLLBACK, this::rollback, this::finishRollback); + } + + /** + * Cleanup temporary directories if any exists. + * + * @throws IgniteCheckedException If it was not possible to delete some temporary directory. + */ + protected void cleanup() throws IgniteCheckedException { + FilePageStoreManager pageStore = (FilePageStoreManager)ctx.cache().context().pageStore(); + + File dbDir = pageStore.workDir(); + + for (File dir : dbDir.listFiles(dir -> dir.isDirectory() && dir.getName().startsWith(TMP_CACHE_DIR_PREFIX))) { + if (!U.delete(dir)) { + throw new IgniteCheckedException("Unable to remove temporary directory, " + + "try deleting it manually [dir=" + dir + ']'); + } + } + } + + /** + * Start cache group restore operation. + * + * @param snpName Snapshot name. + * @param cacheGrpNames Cache groups to be restored or {@code null} to restore all cache groups from the snapshot. + * @return Future that will be completed when the restore operation is complete and the cache groups are started. + */ + public IgniteFuture start(String snpName, @Nullable Collection cacheGrpNames) { + ClusterSnapshotFuture fut0; + + try { + if (ctx.clientNode()) + throw new IgniteException(OP_REJECT_MSG + "Client and daemon nodes can not perform this operation."); + + DiscoveryDataClusterState clusterState = ctx.state().clusterState(); + + if (clusterState.state() != ClusterState.ACTIVE || clusterState.transition()) + throw new IgniteException(OP_REJECT_MSG + "The cluster should be active."); + + if (!clusterState.hasBaselineTopology()) + throw new IgniteException(OP_REJECT_MSG + "The baseline topology is not configured for cluster."); + + if (!IgniteFeatures.allNodesSupports(ctx.grid().cluster().nodes(), SNAPSHOT_RESTORE_CACHE_GROUP)) + throw new IgniteException(OP_REJECT_MSG + "Not all nodes in the cluster support restore operation."); + + if (ctx.cache().context().snapshotMgr().isSnapshotCreating()) + throw new IgniteException(OP_REJECT_MSG + "A cluster snapshot operation is in progress."); + + synchronized (this) { + if (isRestoring() || fut != null) + throw new IgniteException(OP_REJECT_MSG + "The previous snapshot restore operation was not completed."); + + fut = new ClusterSnapshotFuture(UUID.randomUUID(), snpName); + + fut0 = fut; + } + } + catch (IgniteException e) { + return new IgniteFinishedFutureImpl<>(e); + } + + ctx.cache().context().snapshotMgr().collectSnapshotMetadata(snpName).listen( + f -> { + if (f.error() != null) { + finishProcess(fut0.rqId, f.error()); + + return; + } + + Set dataNodes = new HashSet<>(); + Set snpBltNodes = null; + Map> metas = f.result(); + Map reqGrpIds = cacheGrpNames == null ? Collections.emptyMap() : + cacheGrpNames.stream().collect(Collectors.toMap(CU::cacheId, v -> v)); + + for (Map.Entry> entry : metas.entrySet()) { + SnapshotMetadata meta = F.first(entry.getValue()); + + assert meta != null : entry.getKey().id(); + + if (!entry.getKey().consistentId().equals(meta.consistentId())) + continue; + + if (snpBltNodes == null) + snpBltNodes = new HashSet<>(meta.baselineNodes()); + + dataNodes.add(entry.getKey().id()); + + reqGrpIds.keySet().removeAll(meta.partitions().keySet()); + } + + if (snpBltNodes == null) { + finishProcess(fut0.rqId, new IllegalArgumentException(OP_REJECT_MSG + "No snapshot data " + + "has been found [groups=" + reqGrpIds.values() + ", snapshot=" + snpName + ']')); + + return; + } + + if (!reqGrpIds.isEmpty()) { + finishProcess(fut0.rqId, new IllegalArgumentException(OP_REJECT_MSG + "Cache group(s) was not " + + "found in the snapshot [groups=" + reqGrpIds.values() + ", snapshot=" + snpName + ']')); + + return; + } + + Collection bltNodes = F.viewReadOnly(ctx.discovery().serverNodes(AffinityTopologyVersion.NONE), + node -> node.consistentId().toString(), (node) -> CU.baselineNode(node, ctx.state().clusterState())); + + snpBltNodes.removeAll(bltNodes); + + if (!snpBltNodes.isEmpty()) { + finishProcess(fut0.rqId, new IgniteIllegalStateException(OP_REJECT_MSG + "Some nodes required to " + + "restore a cache group are missing [nodeId(s)=" + snpBltNodes + ", snapshot=" + snpName + ']')); + + return; + } + + ctx.cache().context().snapshotMgr().runSnapshotVerification(metas).listen( + f0 -> { + if (f0.error() != null) { + finishProcess(fut0.rqId, f0.error()); + + return; + } + + IdleVerifyResultV2 res = f0.result(); + + if (!F.isEmpty(res.exceptions()) || res.hasConflicts()) { + StringBuilder sb = new StringBuilder(); + + res.print(sb::append, true); + + finishProcess(fut0.rqId, new IgniteException(sb.toString())); + + return; + } + + SnapshotOperationRequest req = new SnapshotOperationRequest( + fut0.rqId, F.first(dataNodes), snpName, cacheGrpNames, dataNodes); + + prepareRestoreProc.start(req.requestId(), req); + } + ); + } + ); + + return new IgniteFutureImpl<>(fut0); + } + + /** + * Check if snapshot restore process is currently running. + * + * @return {@code True} if the snapshot restore operation is in progress. + */ + public boolean isRestoring() { + return isRestoring(null, null); + } + + /** + * Check if the cache or group with the specified name is currently being restored from the snapshot. + * + * @param cacheName Cache name. + * @param grpName Cache group name. + * @return {@code True} if the cache or group with the specified name is currently being restored. + */ + public boolean isRestoring(@Nullable String cacheName, @Nullable String grpName) { + SnapshotRestoreContext opCtx0 = opCtx; + + if (opCtx0 == null) + return false; + + if (cacheName == null) + return true; + + Map cacheCfgs = opCtx0.cfgs; + + int cacheId = CU.cacheId(cacheName); + + if (cacheCfgs.containsKey(cacheId)) + return true; + + for (File grpDir : opCtx0.dirs) { + String locGrpName = FilePageStoreManager.cacheGroupName(grpDir); + + if (grpName != null) { + if (cacheName.equals(locGrpName)) + return true; + + if (CU.cacheId(locGrpName) == CU.cacheId(grpName)) + return true; + } + else if (CU.cacheId(locGrpName) == cacheId) + return true; + } + + return false; + } + + /** + * @param reqId Request ID. + * @return Server nodes on which a successful start of the cache(s) is required, if any of these nodes fails when + * starting the cache(s), the whole procedure is rolled back. + */ + public Set cacheStartRequiredAliveNodes(IgniteUuid reqId) { + SnapshotRestoreContext opCtx0 = opCtx; + + if (opCtx0 == null || !reqId.globalId().equals(opCtx0.reqId)) + return Collections.emptySet(); + + return Collections.unmodifiableSet(opCtx0.nodes); + } + + /** + * Finish local cache group restore process. + * + * @param reqId Request ID. + */ + private void finishProcess(UUID reqId) { + finishProcess(reqId, null); + } + + /** + * Finish local cache group restore process. + * + * @param reqId Request ID. + * @param err Error, if any. + */ + private void finishProcess(UUID reqId, @Nullable Throwable err) { + if (err != null) + log.error("Failed to restore snapshot cache group [reqId=" + reqId + ']', err); + else if (log.isInfoEnabled()) + log.info("Successfully restored cache group(s) from the snapshot [reqId=" + reqId + ']'); + + SnapshotRestoreContext opCtx0 = opCtx; + + if (opCtx0 != null && reqId.equals(opCtx0.reqId)) + opCtx = null; + + synchronized (this) { + ClusterSnapshotFuture fut0 = fut; + + if (fut0 != null && reqId.equals(fut0.rqId)) { + fut = null; + + ctx.getSystemExecutorService().submit(() -> fut0.onDone(null, err)); + } + } + } + + /** + * Node left callback. + * + * @param leftNodeId Left node ID. + */ + public void onNodeLeft(UUID leftNodeId) { + SnapshotRestoreContext opCtx0 = opCtx; + + if (opCtx0 != null && opCtx0.nodes.contains(leftNodeId)) { + opCtx0.err.compareAndSet(null, new ClusterTopologyCheckedException(OP_REJECT_MSG + + "Required node has left the cluster [nodeId=" + leftNodeId + ']')); + } + } + + /** + * Abort the currently running restore procedure (if any). + * + * @param reason Interruption reason. + */ + public void interrupt(Exception reason) { + SnapshotRestoreContext opCtx0 = opCtx; + + if (opCtx0 == null) + return; + + opCtx0.err.compareAndSet(null, reason); + + IgniteFuture stopFut; + + synchronized (this) { + stopFut = opCtx0.stopFut; + } + + if (stopFut != null) + stopFut.get(); + } + + /** + * Ensures that a cache with the specified name does not exist locally. + * + * @param name Cache name. + */ + private void ensureCacheAbsent(String name) { + int id = CU.cacheId(name); + + if (ctx.cache().cacheGroupDescriptors().containsKey(id) || ctx.cache().cacheDescriptor(id) != null) { + throw new IgniteIllegalStateException("Cache \"" + name + + "\" should be destroyed manually before perform restore operation."); + } + } + + /** + * @param req Request to prepare cache group restore from the snapshot. + * @return Result future. + */ + private IgniteInternalFuture> prepare(SnapshotOperationRequest req) { + if (ctx.clientNode()) + return new GridFinishedFuture<>(); + + try { + DiscoveryDataClusterState state = ctx.state().clusterState(); + + if (state.state() != ClusterState.ACTIVE || state.transition()) + throw new IgniteCheckedException(OP_REJECT_MSG + "The cluster should be active."); + + if (ctx.cache().context().snapshotMgr().isSnapshotCreating()) + throw new IgniteCheckedException(OP_REJECT_MSG + "A cluster snapshot operation is in progress."); + + for (UUID nodeId : req.nodes()) { + ClusterNode node = ctx.discovery().node(nodeId); + + if (node == null || !CU.baselineNode(node, state) || !ctx.discovery().alive(node)) { + throw new IgniteCheckedException( + OP_REJECT_MSG + "Required node has left the cluster [nodeId-" + nodeId + ']'); + } + } + + opCtx = prepareContext(req); + + SnapshotRestoreContext opCtx0 = opCtx; + + if (opCtx0.dirs.isEmpty()) + return new GridFinishedFuture<>(); + + // Ensure that shared cache groups has no conflicts. + for (StoredCacheData cfg : opCtx0.cfgs.values()) { + ensureCacheAbsent(cfg.config().getName()); + + if (!F.isEmpty(cfg.config().getGroupName())) + ensureCacheAbsent(cfg.config().getGroupName()); + } + + if (log.isInfoEnabled()) { + log.info("Starting local snapshot restore operation" + + " [reqId=" + req.requestId() + + ", snapshot=" + req.snapshotName() + + ", cache(s)=" + F.viewReadOnly(opCtx0.cfgs.values(), data -> data.config().getName()) + ']'); + } + + Consumer errHnd = (ex) -> opCtx.err.compareAndSet(null, ex); + BooleanSupplier stopChecker = () -> opCtx.err.get() != null; + GridFutureAdapter> retFut = new GridFutureAdapter<>(); + + if (ctx.isStopping()) + throw new NodeStoppingException("Node is stopping."); + + opCtx0.stopFut = new IgniteFutureImpl<>(retFut.chain(f -> null)); + + restoreAsync(opCtx0.snpName, opCtx0.dirs, ctx.localNodeId().equals(req.operationalNodeId()), stopChecker, errHnd) + .thenAccept(res -> { + try { + Throwable err = opCtx.err.get(); + + if (err != null) + throw err; + + for (File src : opCtx0.dirs) + Files.move(formatTmpDirName(src).toPath(), src.toPath(), StandardCopyOption.ATOMIC_MOVE); + } + catch (Throwable t) { + log.error("Unable to restore cache group(s) from the snapshot " + + "[reqId=" + opCtx.reqId + ", snapshot=" + opCtx.snpName + ']', t); + + retFut.onDone(t); + + return; + } + + retFut.onDone(new ArrayList<>(opCtx.cfgs.values())); + }); + + return retFut; + } + catch (IgniteIllegalStateException | IgniteCheckedException | RejectedExecutionException e) { + log.error("Unable to restore cache group(s) from the snapshot " + + "[reqId=" + req.requestId() + ", snapshot=" + req.snapshotName() + ']', e); + + return new GridFinishedFuture<>(e); + } + } + + /** + * @param cacheDir Cache directory. + * @return Temporary directory. + */ + private File formatTmpDirName(File cacheDir) { + return new File(cacheDir.getParent(), TMP_CACHE_DIR_PREFIX + cacheDir.getName()); + } + + /** + * Copy partition files and update binary metadata. + * + * @param snpName Snapshot name. + * @param dirs Cache directories to restore from the snapshot. + * @param updateMeta Update binary metadata flag. + * @param stopChecker Process interrupt checker. + * @param errHnd Error handler. + * @throws IgniteCheckedException If failed. + */ + private CompletableFuture restoreAsync( + String snpName, + Collection dirs, + boolean updateMeta, + BooleanSupplier stopChecker, + Consumer errHnd + ) throws IgniteCheckedException { + IgniteSnapshotManager snapshotMgr = ctx.cache().context().snapshotMgr(); + String pdsFolderName = ctx.pdsFolderResolver().resolveFolders().folderName(); + + List> futs = new ArrayList<>(); + + if (updateMeta) { + File binDir = binaryWorkDir(snapshotMgr.snapshotLocalDir(snpName).getAbsolutePath(), pdsFolderName); + + futs.add(CompletableFuture.runAsync(() -> { + try { + ctx.cacheObjects().updateMetadata(binDir, stopChecker); + } + catch (Throwable t) { + errHnd.accept(t); + } + }, snapshotMgr.snapshotExecutorService())); + } + + for (File cacheDir : dirs) { + File tmpCacheDir = formatTmpDirName(cacheDir); + File snpCacheDir = new File(ctx.cache().context().snapshotMgr().snapshotLocalDir(snpName), + Paths.get(databaseRelativePath(pdsFolderName), cacheDir.getName()).toString()); + + assert snpCacheDir.exists() : "node=" + ctx.localNodeId() + ", dir=" + snpCacheDir; + + for (File snpFile : snpCacheDir.listFiles()) { + futs.add(CompletableFuture.runAsync(() -> { + if (stopChecker.getAsBoolean()) + return; + + try { + if (Thread.interrupted()) + throw new IgniteInterruptedCheckedException("Thread has been interrupted."); + + File target = new File(tmpCacheDir, snpFile.getName()); + + if (log.isDebugEnabled()) { + log.debug("Copying file from the snapshot " + + "[snapshot=" + snpName + + ", src=" + snpFile + + ", target=" + target + "]"); + } + + IgniteSnapshotManager.copy(snapshotMgr.ioFactory(), snpFile, target, snpFile.length()); + } + catch (Throwable t) { + errHnd.accept(t); + } + }, ctx.cache().context().snapshotMgr().snapshotExecutorService())); + } + } + + int futsSize = futs.size(); + + return CompletableFuture.allOf(futs.toArray(new CompletableFuture[futsSize])); + } + + /** + * @param req Request to prepare cache group restore from the snapshot. + * @return Snapshot restore operation context. + * @throws IgniteCheckedException If failed. + */ + private SnapshotRestoreContext prepareContext(SnapshotOperationRequest req) throws IgniteCheckedException { + if (opCtx != null) { + throw new IgniteCheckedException(OP_REJECT_MSG + + "The previous snapshot restore operation was not completed."); + } + + GridCacheSharedContext cctx = ctx.cache().context(); + + SnapshotMetadata meta = F.first(cctx.snapshotMgr().readSnapshotMetadatas(req.snapshotName())); + + if (meta == null || !meta.consistentId().equals(cctx.localNode().consistentId().toString())) + return new SnapshotRestoreContext(req, Collections.emptyList(), Collections.emptyMap()); + + if (meta.pageSize() != cctx.database().pageSize()) { + throw new IgniteCheckedException("Incompatible memory page size " + + "[snapshotPageSize=" + meta.pageSize() + + ", local=" + cctx.database().pageSize() + + ", snapshot=" + req.snapshotName() + + ", nodeId=" + cctx.localNodeId() + ']'); + } + + List cacheDirs = new ArrayList<>(); + Map cfgsByName = new HashMap<>(); + FilePageStoreManager pageStore = (FilePageStoreManager)cctx.pageStore(); + + // Collect the cache configurations and prepare a temporary directory for copying files. + for (File snpCacheDir : cctx.snapshotMgr().snapshotCacheDirectories(req.snapshotName(), meta.folderName())) { + String grpName = FilePageStoreManager.cacheGroupName(snpCacheDir); + + if (!F.isEmpty(req.groups()) && !req.groups().contains(grpName)) + continue; + + File cacheDir = pageStore.cacheWorkDir(snpCacheDir.getName().startsWith(CACHE_GRP_DIR_PREFIX), grpName); + + if (cacheDir.exists()) { + if (!cacheDir.isDirectory()) { + throw new IgniteCheckedException("Unable to restore cache group, file with required directory " + + "name already exists [group=" + grpName + ", file=" + cacheDir + ']'); + } + + if (cacheDir.list().length > 0) { + throw new IgniteCheckedException("Unable to restore cache group, directory is not empty " + + "[group=" + grpName + ", dir=" + cacheDir + ']'); + } + + if (!cacheDir.delete()) { + throw new IgniteCheckedException("Unable to remove empty cache directory " + + "[group=" + grpName + ", dir=" + cacheDir + ']'); + } + } + + File tmpCacheDir = formatTmpDirName(cacheDir); + + if (tmpCacheDir.exists()) { + throw new IgniteCheckedException("Unable to restore cache group, temp directory already exists " + + "[group=" + grpName + ", dir=" + tmpCacheDir + ']'); + } + + if (!tmpCacheDir.mkdir()) { + throw new IgniteCheckedException("Unable to restore cache group, cannot create temp directory " + + "[group=" + grpName + ", dir=" + tmpCacheDir + ']'); + } + + cacheDirs.add(cacheDir); + + pageStore.readCacheConfigurations(snpCacheDir, cfgsByName); + } + + Map cfgsById = + cfgsByName.values().stream().collect(Collectors.toMap(v -> CU.cacheId(v.config().getName()), v -> v)); + + return new SnapshotRestoreContext(req, cacheDirs, cfgsById); + } + + /** + * @param reqId Request ID. + * @param res Results. + * @param errs Errors. + */ + private void finishPrepare(UUID reqId, Map> res, Map errs) { + if (ctx.clientNode()) + return; + + SnapshotRestoreContext opCtx0 = opCtx; + + Exception failure = F.first(errs.values()); + + assert opCtx0 != null || failure != null : "Context has not been created on the node " + ctx.localNodeId(); + + if (opCtx0 == null || !reqId.equals(opCtx0.reqId)) { + finishProcess(reqId, failure); + + return; + } + + if (failure == null) + failure = checkNodeLeft(opCtx0.nodes, res.keySet()); + + // Context has been created - should rollback changes cluster-wide. + if (failure != null) { + opCtx0.err.compareAndSet(null, failure); + + if (U.isLocalNodeCoordinator(ctx.discovery())) + rollbackRestoreProc.start(reqId, reqId); + + return; + } + + Map globalCfgs = new HashMap<>(); + + for (List storedCfgs : res.values()) { + if (storedCfgs == null) + continue; + + for (StoredCacheData cacheData : storedCfgs) + globalCfgs.put(CU.cacheId(cacheData.config().getName()), cacheData); + } + + opCtx0.cfgs = globalCfgs; + + if (U.isLocalNodeCoordinator(ctx.discovery())) + cacheStartProc.start(reqId, reqId); + } + + /** + * @param reqId Request ID. + * @return Result future. + */ + private IgniteInternalFuture cacheStart(UUID reqId) { + if (ctx.clientNode()) + return new GridFinishedFuture<>(); + + SnapshotRestoreContext opCtx0 = opCtx; + + Throwable err = opCtx0.err.get(); + + if (err != null) + return new GridFinishedFuture<>(err); + + if (!U.isLocalNodeCoordinator(ctx.discovery())) + return new GridFinishedFuture<>(); + + Collection ccfgs = opCtx0.cfgs.values(); + + if (log.isInfoEnabled()) { + log.info("Starting restored caches " + + "[reqId=" + opCtx0.reqId + ", snapshot=" + opCtx0.snpName + + ", caches=" + F.viewReadOnly(ccfgs, c -> c.config().getName()) + ']'); + } + + // We set the topology node IDs required to successfully start the cache, if any of the required nodes leave + // the cluster during the cache startup, the whole procedure will be rolled back. + return ctx.cache().dynamicStartCachesByStoredConf(ccfgs, true, true, false, IgniteUuid.fromUuid(reqId)); + } + + /** + * @param reqId Request ID. + * @param res Results. + * @param errs Errors. + */ + private void finishCacheStart(UUID reqId, Map res, Map errs) { + if (ctx.clientNode()) + return; + + SnapshotRestoreContext opCtx0 = opCtx; + + Exception failure = errs.values().stream().findFirst(). + orElse(checkNodeLeft(opCtx0.nodes, res.keySet())); + + if (failure == null) { + finishProcess(reqId); + + return; + } + + opCtx0.err.compareAndSet(null, failure); + + if (U.isLocalNodeCoordinator(ctx.discovery())) + rollbackRestoreProc.start(reqId, reqId); + } + + /** + * @param reqNodes Set of required topology nodes. + * @param respNodes Set of responding topology nodes. + * @return Error, if no response was received from the required topology node. + */ + private Exception checkNodeLeft(Set reqNodes, Set respNodes) { + if (!respNodes.containsAll(reqNodes)) { + Set leftNodes = new HashSet<>(reqNodes); + + leftNodes.removeAll(respNodes); + + return new ClusterTopologyCheckedException(OP_REJECT_MSG + + "Required node has left the cluster [nodeId=" + leftNodes + ']'); + } + + return null; + } + + /** + * @param reqId Request ID. + * @return Result future. + */ + private IgniteInternalFuture rollback(UUID reqId) { + if (ctx.clientNode()) + return new GridFinishedFuture<>(); + + SnapshotRestoreContext opCtx0 = opCtx; + + if (F.isEmpty(opCtx0.dirs)) + return new GridFinishedFuture<>(); + + GridFutureAdapter retFut = new GridFutureAdapter<>(); + + synchronized (this) { + opCtx0.stopFut = new IgniteFutureImpl<>(retFut.chain(f -> null)); + + try { + ctx.cache().context().snapshotMgr().snapshotExecutorService().execute(() -> { + if (log.isInfoEnabled()) { + log.info("Removing restored cache directories [reqId=" + reqId + + ", snapshot=" + opCtx0.snpName + ", dirs=" + opCtx0.dirs + ']'); + } + + IgniteCheckedException ex = null; + + for (File cacheDir : opCtx0.dirs) { + File tmpCacheDir = formatTmpDirName(cacheDir); + + if (tmpCacheDir.exists() && !U.delete(tmpCacheDir)) { + log.error("Unable to perform rollback routine completely, cannot remove temp directory " + + "[reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ", dir=" + tmpCacheDir + ']'); + + ex = new IgniteCheckedException("Unable to remove temporary cache directory " + cacheDir); + } + + if (cacheDir.exists() && !U.delete(cacheDir)) { + log.error("Unable to perform rollback routine completely, cannot remove cache directory " + + "[reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ", dir=" + cacheDir + ']'); + + ex = new IgniteCheckedException("Unable to remove cache directory " + cacheDir); + } + } + + if (ex != null) + retFut.onDone(ex); + else + retFut.onDone(true); + }); + } + catch (RejectedExecutionException e) { + log.error("Unable to perform rollback routine, task has been rejected " + + "[reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ']'); + + retFut.onDone(e); + } + } + + return retFut; + } + + /** + * @param reqId Request ID. + * @param res Results. + * @param errs Errors. + */ + private void finishRollback(UUID reqId, Map res, Map errs) { + if (ctx.clientNode()) + return; + + if (!errs.isEmpty()) { + log.warning("Some nodes were unable to complete the rollback routine completely, check the local log " + + "files for more information [nodeIds=" + errs.keySet() + ']'); + } + + SnapshotRestoreContext opCtx0 = opCtx; + + if (!res.keySet().containsAll(opCtx0.nodes)) { + Set leftNodes = new HashSet<>(opCtx0.nodes); + + leftNodes.removeAll(res.keySet()); + + log.warning("Some of the nodes left the cluster and were unable to complete the rollback" + + " operation [reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ", node(s)=" + leftNodes + ']'); + } + + finishProcess(reqId, opCtx0.err.get()); + } + + /** + * Cache group restore from snapshot operation context. + */ + private static class SnapshotRestoreContext { + /** Request ID. */ + private final UUID reqId; + + /** Snapshot name. */ + private final String snpName; + + /** Baseline node IDs that must be alive to complete the operation. */ + private final Set nodes; + + /** List of restored cache group directories. */ + private final Collection dirs; + + /** The exception that led to the interruption of the process. */ + private final AtomicReference err = new AtomicReference<>(); + + /** Cache ID to configuration mapping. */ + private volatile Map cfgs; + + /** Graceful shutdown future. */ + private volatile IgniteFuture stopFut; + + /** + * @param req Request to prepare cache group restore from the snapshot. + * @param dirs List of cache group names to restore from the snapshot. + * @param cfgs Cache ID to configuration mapping. + */ + protected SnapshotRestoreContext(SnapshotOperationRequest req, Collection dirs, + Map cfgs) { + reqId = req.requestId(); + snpName = req.snapshotName(); + nodes = new HashSet<>(req.nodes()); + + this.dirs = dirs; + this.cfgs = cfgs; + } + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java index 7ccfee02d7d4f..170a3c09c2845 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.util.Collection; import java.util.Map; +import java.util.function.BooleanSupplier; import org.apache.ignite.IgniteBinary; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; @@ -306,6 +307,15 @@ public void updateMetadata(int typeId, String typeName, @Nullable String affKeyF */ public void saveMetadata(Collection types, File dir); + /** + * Merge the binary metadata files stored in the specified directory. + * + * @param metadataDir Directory containing binary metadata files. + * @param stopChecker Process interrupt checker. + * @throws IgniteCheckedException If failed. + */ + public void updateMetadata(File metadataDir, BooleanSupplier stopChecker) throws IgniteCheckedException; + /** * @param typeName Type name. * @param ord ordinal. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java index 7d9e6ae391482..730b9e131c6db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java @@ -445,6 +445,21 @@ public enum DistributedProcessType { /** * Rotate performance statistics. */ - PERFORMANCE_STATISTICS_ROTATE + PERFORMANCE_STATISTICS_ROTATE, + + /** + * Cache group restore prepare phase. + */ + RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE, + + /** + * Cache group restore cache start phase. + */ + RESTORE_CACHE_GROUP_SNAPSHOT_START, + + /** + * Cache group restore rollback phase. + */ + RESTORE_CACHE_GROUP_SNAPSHOT_ROLLBACK } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java index 74976b13efc88..b873f7d0217dc 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java @@ -111,7 +111,7 @@ public abstract class AbstractSnapshotSelfTest extends GridCommonAbstractTest { protected final List locEvts = new CopyOnWriteArrayList<>(); /** Configuration for the 'default' cache. */ - protected volatile CacheConfiguration dfltCacheCfg; + protected volatile CacheConfiguration dfltCacheCfg; /** Enable default data region persistence. */ protected boolean persistence = true; @@ -124,6 +124,9 @@ public abstract class AbstractSnapshotSelfTest extends GridCommonAbstractTest { discoSpi.setIpFinder(((TcpDiscoverySpi)cfg.getDiscoverySpi()).getIpFinder()); + if (dfltCacheCfg != null) + cfg.setCacheConfiguration(dfltCacheCfg); + return cfg.setConsistentId(igniteInstanceName) .setCommunicationSpi(new TestRecordingCommunicationSpi()) .setDataStorageConfiguration(new DataStorageConfiguration() @@ -132,7 +135,6 @@ public abstract class AbstractSnapshotSelfTest extends GridCommonAbstractTest { .setPersistenceEnabled(persistence)) .setCheckpointFrequency(3000) .setPageSize(DFLT_PAGE_SIZE)) - .setCacheConfiguration(dfltCacheCfg) .setClusterStateOnStart(INACTIVE) .setIncludeEventTypes(EVTS_CLUSTER_SNAPSHOT) .setDiscoverySpi(discoSpi); @@ -185,7 +187,7 @@ protected void waitForEvents(List evts) throws IgniteInterruptedChecked * @param ccfg Default cache configuration. * @return Cache configuration. */ - protected static CacheConfiguration txCacheConfig(CacheConfiguration ccfg) { + protected CacheConfiguration txCacheConfig(CacheConfiguration ccfg) { return ccfg.setCacheMode(CacheMode.PARTITIONED) .setBackups(2) .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) @@ -239,7 +241,7 @@ public static Optional searchDirectoryRecursively(Path path, String dir) t * @return Ignite instance. * @throws Exception If fails. */ - protected IgniteEx startGridWithCache(CacheConfiguration ccfg, int keys) throws Exception { + protected IgniteEx startGridWithCache(CacheConfiguration ccfg, int keys) throws Exception { return startGridsWithCache(1, ccfg, keys); } @@ -250,7 +252,7 @@ protected IgniteEx startGridWithCache(CacheConfiguration ccfg, * @return Ignite instance. * @throws Exception If fails. */ - protected IgniteEx startGridsWithCache(int grids, CacheConfiguration ccfg, int keys) throws Exception { + protected IgniteEx startGridsWithCache(int grids, CacheConfiguration ccfg, int keys) throws Exception { dfltCacheCfg = ccfg; return startGridsWithCache(grids, keys, Integer::new, ccfg); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreBaseTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreBaseTest.java new file mode 100644 index 0000000000000..62f4619edb8b6 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreBaseTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.snapshot; + +import java.util.function.Function; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.binary.BinaryObjectBuilder; +import org.apache.ignite.internal.IgniteEx; + +/** + * Snapshot restore test base. + */ +public abstract class IgniteClusterSnapshotRestoreBaseTest extends AbstractSnapshotSelfTest { + /** Timeout. */ + protected static final long TIMEOUT = 15_000; + + /** Cache value builder. */ + protected abstract Function valueBuilder(); + + /** + * @param nodesCnt Nodes count. + * @param keysCnt Number of keys to create. + * @return Ignite coordinator instance. + * @throws Exception if failed. + */ + protected IgniteEx startGridsWithSnapshot(int nodesCnt, int keysCnt) throws Exception { + return startGridsWithSnapshot(nodesCnt, keysCnt, false); + } + + /** + * @param nodesCnt Nodes count. + * @param keysCnt Number of keys to create. + * @param startClient {@code True} to start an additional client node. + * @return Ignite coordinator instance. + * @throws Exception if failed. + */ + protected IgniteEx startGridsWithSnapshot(int nodesCnt, int keysCnt, boolean startClient) throws Exception { + IgniteEx ignite = startGridsWithCache(nodesCnt, keysCnt, valueBuilder(), dfltCacheCfg); + + if (startClient) + ignite = startClientGrid("client"); + + ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT); + + ignite.cache(dfltCacheCfg.getName()).destroy(); + + awaitPartitionMapExchange(); + + return ignite; + } + + /** + * @param cache Cache. + * @param keysCnt Expected number of keys. + */ + protected void assertCacheKeys(IgniteCache cache, int keysCnt) { + assertEquals(keysCnt, cache.size()); + + for (int i = 0; i < keysCnt; i++) + assertEquals(valueBuilder().apply(i), cache.get(i)); + } + + /** */ + protected class BinaryValueBuilder implements Function { + /** Binary type name. */ + private final String typeName; + + /** + * @param typeName Binary type name. + */ + BinaryValueBuilder(String typeName) { + this.typeName = typeName; + } + + /** {@inheritDoc} */ + @Override public Object apply(Integer key) { + BinaryObjectBuilder builder = grid(0).binary().builder(typeName); + + builder.setField("id", key); + builder.setField("name", String.valueOf(key)); + + return builder.build(); + } + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java new file mode 100644 index 0000000000000..f3359f1e4d36e --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java @@ -0,0 +1,770 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.snapshot; + +import java.io.File; +import java.io.IOException; +import java.nio.file.OpenOption; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.IntSupplier; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteIllegalStateException; +import org.apache.ignite.IgniteSnapshot; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.binary.BinaryObjectBuilder; +import org.apache.ignite.binary.BinaryObjectException; +import org.apache.ignite.binary.BinaryType; +import org.apache.ignite.cache.CacheExistsException; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cluster.ClusterState; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor; +import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; +import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; +import org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType; +import org.apache.ignite.internal.util.distributed.SingleNodeMessage; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.testframework.GridTestUtils; +import org.jetbrains.annotations.Nullable; +import org.junit.Test; + +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DIR_PREFIX; +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.FILE_SUFFIX; +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX; +import static org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreProcess.TMP_CACHE_DIR_PREFIX; +import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE; +import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_START; +import static org.apache.ignite.testframework.GridTestUtils.runAsync; + +/** + * Snapshot restore tests. + */ +public class IgniteClusterSnapshotRestoreSelfTest extends IgniteClusterSnapshotRestoreBaseTest { + /** Type name used for binary and SQL. */ + private static final String TYPE_NAME = "CustomType"; + + /** Cache 1 name. */ + private static final String CACHE1 = "cache1"; + + /** Cache 2 name. */ + private static final String CACHE2 = "cache2"; + + /** Default shared cache group name. */ + private static final String SHARED_GRP = "shared"; + + /** Cache value builder. */ + private Function valBuilder = String::valueOf; + + /** {@inheritDoc} */ + @Override protected Function valueBuilder() { + return valBuilder; + } + + /** @throws Exception If failed. */ + @Test + public void testRestoreAllGroups() throws Exception { + CacheConfiguration cacheCfg1 = + txCacheConfig(new CacheConfiguration(CACHE1)).setGroupName(SHARED_GRP); + + CacheConfiguration cacheCfg2 = + txCacheConfig(new CacheConfiguration(CACHE2)).setGroupName(SHARED_GRP); + + IgniteEx ignite = startGridsWithCache(2, CACHE_KEYS_RANGE, valBuilder, + dfltCacheCfg.setBackups(0), cacheCfg1, cacheCfg2); + + ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT); + + ignite.cache(CACHE1).destroy(); + ignite.cache(CACHE2).destroy(); + ignite.cache(DEFAULT_CACHE_NAME).destroy(); + + awaitPartitionMapExchange(); + + // Restore all cache groups. + grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, null).get(TIMEOUT); + + assertCacheKeys(ignite.cache(DEFAULT_CACHE_NAME), CACHE_KEYS_RANGE); + assertCacheKeys(ignite.cache(CACHE1), CACHE_KEYS_RANGE); + assertCacheKeys(ignite.cache(CACHE2), CACHE_KEYS_RANGE); + } + + /** @throws Exception If failed. */ + @Test + public void testStartClusterSnapshotRestoreMultipleThreadsSameNode() throws Exception { + checkStartClusterSnapshotRestoreMultithreaded(() -> 0); + } + + /** @throws Exception If failed. */ + @Test + public void testStartClusterSnapshotRestoreMultipleThreadsDiffNode() throws Exception { + AtomicInteger nodeIdx = new AtomicInteger(); + + checkStartClusterSnapshotRestoreMultithreaded(nodeIdx::getAndIncrement); + } + + /** + * @param nodeIdxSupplier Ignite node index supplier. + */ + private void checkStartClusterSnapshotRestoreMultithreaded(IntSupplier nodeIdxSupplier) throws Exception { + Ignite ignite = startGridsWithSnapshot(2, CACHE_KEYS_RANGE); + + AtomicInteger successCnt = new AtomicInteger(); + AtomicInteger failCnt = new AtomicInteger(); + + IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(() -> { + try { + nodeIdxSupplier.getAsInt(); + + grid(nodeIdxSupplier.getAsInt()).snapshot().restoreSnapshot( + SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME)).get(TIMEOUT); + + successCnt.incrementAndGet(); + } + catch (Exception e) { + failCnt.incrementAndGet(); + } + }, 2, "runner"); + + fut.get(TIMEOUT); + + assertEquals(1, successCnt.get()); + assertEquals(1, failCnt.get()); + + assertCacheKeys(ignite.cache(DEFAULT_CACHE_NAME), CACHE_KEYS_RANGE); + } + + /** @throws Exception If failed. */ + @Test + public void testCreateSnapshotDuringRestore() throws Exception { + Ignite ignite = startGridsWithSnapshot(2, CACHE_KEYS_RANGE); + + BlockingCustomMessageDiscoverySpi discoSpi = discoSpi(grid(0)); + + discoSpi.block((msg) -> msg instanceof DynamicCacheChangeBatch); + + IgniteFuture fut = + ignite.snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME)); + + discoSpi.waitBlocked(TIMEOUT); + + GridTestUtils.assertThrowsAnyCause( + log, + () -> grid(1).snapshot().createSnapshot("NEW_SNAPSHOT").get(TIMEOUT), + IgniteException.class, + "Cache group restore operation is currently in progress." + ); + + discoSpi.unblock(); + + fut.get(TIMEOUT); + + assertCacheKeys(ignite.cache(DEFAULT_CACHE_NAME), CACHE_KEYS_RANGE); + } + + /** + * Ensures that the cache doesn't start if one of the baseline nodes fails. + * + * @throws Exception If failed. + */ + @Test + public void testNodeLeftDuringCacheStartOnExchangeInit() throws Exception { + startGridsWithSnapshot(3, CACHE_KEYS_RANGE, true); + + BlockingCustomMessageDiscoverySpi discoSpi = discoSpi(grid(0)); + + discoSpi.block((msg) -> msg instanceof DynamicCacheChangeBatch); + + IgniteFuture fut = + grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME)); + + discoSpi.waitBlocked(TIMEOUT); + + stopGrid(2, true); + + discoSpi.unblock(); + + GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(TIMEOUT), ClusterTopologyCheckedException.class, null); + + ensureCacheAbsent(dfltCacheCfg); + } + + /** + * Ensures that the cache is not started if non-coordinator node left during the exchange. + * + * @throws Exception If failed. + */ + @Test + public void testNodeLeftDuringCacheStartOnExchangeFinish() throws Exception { + checkNodeLeftOnExchangeFinish( + false, ClusterTopologyCheckedException.class, "Required node has left the cluster"); + } + + /** + * Ensures that the cache is not started if the coordinator left during the exchange. + * + * @throws Exception If failed. + */ + @Test + public void testCrdLeftDuringCacheStartOnExchangeFinish() throws Exception { + checkNodeLeftOnExchangeFinish( + true, IgniteCheckedException.class, "Operation has been cancelled (node is stopping)"); + } + + /** + * @param crdStop {@code True} to stop coordinator node. + * @param expCls Expected exception class. + * @param expMsg Expected exception message. + * @throws Exception If failed. + */ + private void checkNodeLeftOnExchangeFinish( + boolean crdStop, + Class expCls, + String expMsg + ) throws Exception { + startGridsWithSnapshot(3, CACHE_KEYS_RANGE, true); + + TestRecordingCommunicationSpi node1spi = TestRecordingCommunicationSpi.spi(grid(1)); + TestRecordingCommunicationSpi node2spi = TestRecordingCommunicationSpi.spi(grid(2)); + + node1spi.blockMessages((node, msg) -> msg instanceof GridDhtPartitionsSingleMessage); + node2spi.blockMessages((node, msg) -> msg instanceof GridDhtPartitionsSingleMessage); + + IgniteFuture fut = + grid(1).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME)); + + node1spi.waitForBlocked(); + node2spi.waitForBlocked(); + + stopGrid(crdStop ? 0 : 2, true); + + node1spi.stopBlock(); + + if (crdStop) + node2spi.stopBlock(); + + GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(TIMEOUT), expCls, expMsg); + + awaitPartitionMapExchange(); + + ensureCacheAbsent(dfltCacheCfg); + } + + /** @throws Exception If failed. */ + @Test + public void testClusterSnapshotRestoreRejectOnInActiveCluster() throws Exception { + IgniteEx ignite = startGridsWithCache(2, CACHE_KEYS_RANGE, valBuilder, dfltCacheCfg); + + ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT); + + ignite.cluster().state(ClusterState.INACTIVE); + + IgniteFuture fut = + ignite.snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME)); + + GridTestUtils.assertThrowsAnyCause( + log, () -> fut.get(TIMEOUT), IgniteException.class, "The cluster should be active"); + } + + /** @throws Exception If failed. */ + @Test + public void testClusterSnapshotRestoreOnSmallerTopology() throws Exception { + startGridsWithSnapshot(2, CACHE_KEYS_RANGE, true); + + stopGrid(1); + + resetBaselineTopology(); + + IgniteFuture fut = + grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME)); + + GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(TIMEOUT), IgniteIllegalStateException.class, null); + + ensureCacheAbsent(dfltCacheCfg); + } + + /** @throws Exception If failed. */ + @Test + public void testRestoreSharedCacheGroup() throws Exception { + CacheConfiguration cacheCfg1 = + txCacheConfig(new CacheConfiguration(CACHE1)).setGroupName(SHARED_GRP); + + CacheConfiguration cacheCfg2 = + txCacheConfig(new CacheConfiguration(CACHE2)).setGroupName(SHARED_GRP); + + IgniteEx ignite = startGridsWithCache(2, CACHE_KEYS_RANGE, valBuilder, cacheCfg1, cacheCfg2); + + ignite.cluster().state(ClusterState.ACTIVE); + + ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT); + + ignite.cache(CACHE1).destroy(); + + awaitPartitionMapExchange(); + + IgniteSnapshot snp = ignite.snapshot(); + + GridTestUtils.assertThrowsAnyCause( + log, + () -> snp.restoreSnapshot(SNAPSHOT_NAME, Arrays.asList(CACHE1, CACHE2)).get(TIMEOUT), + IllegalArgumentException.class, + "Cache group(s) was not found in the snapshot" + ); + + ignite.cache(CACHE2).destroy(); + + awaitPartitionMapExchange(); + + snp.restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(SHARED_GRP)).get(TIMEOUT); + + assertCacheKeys(ignite.cache(CACHE1), CACHE_KEYS_RANGE); + assertCacheKeys(ignite.cache(CACHE2), CACHE_KEYS_RANGE); + } + + /** @throws Exception If failed. */ + @Test + public void testIncompatibleMetasUpdate() throws Exception { + valBuilder = new BinaryValueBuilder(TYPE_NAME); + + IgniteEx ignite = startGridsWithSnapshot(2, CACHE_KEYS_RANGE); + + int typeId = ignite.context().cacheObjects().typeId(TYPE_NAME); + + ignite.context().cacheObjects().removeType(typeId); + + BinaryObject[] objs = new BinaryObject[CACHE_KEYS_RANGE]; + + IgniteCache cache1 = createCacheWithBinaryType(ignite, "cache1", n -> { + BinaryObjectBuilder builder = ignite.binary().builder(TYPE_NAME); + + builder.setField("id", n); + + objs[n] = builder.build(); + + return objs[n]; + }); + + ignite.snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME)).get(TIMEOUT); + + // Ensure that existing type has been updated. + BinaryType type = ignite.context().cacheObjects().metadata(typeId); + + assertTrue(type.fieldNames().contains("name")); + + for (int i = 0; i < CACHE_KEYS_RANGE; i++) + assertEquals(objs[i], cache1.get(i)); + + cache1.destroy(); + + grid(0).cache(DEFAULT_CACHE_NAME).destroy(); + + ignite.context().cacheObjects().removeType(typeId); + + // Create cache with incompatible binary type. + cache1 = createCacheWithBinaryType(ignite, "cache1", n -> { + BinaryObjectBuilder builder = ignite.binary().builder(TYPE_NAME); + + builder.setField("id", UUID.randomUUID()); + + objs[n] = builder.build(); + + return objs[n]; + }); + + IgniteFuture fut0 = + ignite.snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME)); + + GridTestUtils.assertThrowsAnyCause(log, () -> fut0.get(TIMEOUT), BinaryObjectException.class, null); + + ensureCacheAbsent(dfltCacheCfg); + + for (int i = 0; i < CACHE_KEYS_RANGE; i++) + assertEquals(objs[i], cache1.get(i)); + } + + /** + * @param ignite Ignite. + * @param cacheName Cache name. + * @param valBuilder Binary value builder. + * @return Created cache. + */ + private IgniteCache createCacheWithBinaryType( + Ignite ignite, + String cacheName, + Function valBuilder + ) { + IgniteCache cache = ignite.createCache(new CacheConfiguration<>(cacheName)).withKeepBinary(); + + for (int i = 0; i < CACHE_KEYS_RANGE; i++) + cache.put(i, valBuilder.apply(i)); + + return cache; + } + + /** + * @throws Exception if failed + */ + @Test + public void testParallelCacheStartWithTheSameNameOnPrepare() throws Exception { + checkCacheStartWithTheSameName(RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE, IgniteCheckedException.class, + "Cache start failed. A cache or group with the same name is currently being restored from a snapshot"); + } + + /** + * @throws Exception if failed + */ + @Test + public void testParallelCacheStartWithTheSameNameOnStart() throws Exception { + checkCacheStartWithTheSameName(RESTORE_CACHE_GROUP_SNAPSHOT_START, CacheExistsException.class, + "Failed to start cache (a cache with the same name is already started):"); + } + + /** + * @param procType The type of distributed process on which communication is blocked. + * @throws Exception if failed. + */ + private void checkCacheStartWithTheSameName( + DistributedProcessType procType, + Class expCls, + String expMsg + ) throws Exception { + dfltCacheCfg = txCacheConfig(new CacheConfiguration(CACHE1)).setGroupName(SHARED_GRP); + + IgniteEx ignite = startGridsWithSnapshot(2, CACHE_KEYS_RANGE); + + TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(grid(1)); + + IgniteFuture fut = waitForBlockOnRestore(spi, procType, SHARED_GRP); + + GridTestUtils.assertThrowsAnyCause(log, () -> ignite.createCache(SHARED_GRP), IgniteCheckedException.class, null); + + GridTestUtils.assertThrowsAnyCause(log, () -> ignite.createCache(CACHE1), expCls, expMsg); + + spi.stopBlock(); + + fut.get(TIMEOUT); + + assertCacheKeys(grid(0).cache(CACHE1), CACHE_KEYS_RANGE); + } + + /** @throws Exception If failed. */ + @Test + public void testNodeFailDuringRestore() throws Exception { + startGridsWithSnapshot(4, CACHE_KEYS_RANGE); + + TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(grid(3)); + + IgniteFuture fut = waitForBlockOnRestore(spi, RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE, DEFAULT_CACHE_NAME); + + IgniteInternalFuture fut0 = runAsync(() -> stopGrid(3, true)); + + GridTestUtils.assertThrowsAnyCause( + log, + () -> fut.get(TIMEOUT), + ClusterTopologyCheckedException.class, + "Required node has left the cluster" + ); + + fut0.get(TIMEOUT); + + awaitPartitionMapExchange(); + + ensureCacheAbsent(dfltCacheCfg); + + GridTestUtils.assertThrowsAnyCause( + log, + () -> startGrid(3), + IgniteSpiException.class, + "to add the node to cluster - remove directories with the caches" + ); + } + + /** @throws Exception If failed. */ + @Test + public void testNodeFailDuringFilesCopy() throws Exception { + dfltCacheCfg.setCacheMode(CacheMode.REPLICATED); + + startGridsWithSnapshot(3, CACHE_KEYS_RANGE); + + TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(grid(2)); + CountDownLatch stopLatch = new CountDownLatch(1); + + spi.blockMessages((node, msg) -> msg instanceof SingleNodeMessage && + ((SingleNodeMessage)msg).type() == RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE.ordinal()); + + String failingFilePath = Paths.get(CACHE_DIR_PREFIX + DEFAULT_CACHE_NAME, + PART_FILE_PREFIX + (dfltCacheCfg.getAffinity().partitions() / 2) + FILE_SUFFIX).toString(); + + grid(2).context().cache().context().snapshotMgr().ioFactory( + new CustomFileIOFactory(new RandomAccessFileIOFactory(), + file -> { + if (file.getPath().endsWith(failingFilePath)) { + stopLatch.countDown(); + + throw new RuntimeException("Test exception"); + } + })); + + File node2dbDir = ((FilePageStoreManager)grid(2).context().cache().context().pageStore()). + cacheWorkDir(dfltCacheCfg).getParentFile(); + + IgniteInternalFuture stopFut = runAsync(() -> { + U.await(stopLatch, TIMEOUT, TimeUnit.MILLISECONDS); + + stopGrid(2, true); + + return null; + }); + + IgniteFuture fut = + grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME)); + + stopFut.get(TIMEOUT); + + GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(TIMEOUT), ClusterTopologyCheckedException.class, null); + + File[] files = node2dbDir.listFiles(file -> file.getName().startsWith(TMP_CACHE_DIR_PREFIX)); + assertEquals("A temp directory with potentially corrupted files must exist.", 1, files.length); + + ensureCacheAbsent(dfltCacheCfg); + + dfltCacheCfg = null; + + startGrid(2); + + files = node2dbDir.listFiles(file -> file.getName().startsWith(TMP_CACHE_DIR_PREFIX)); + assertEquals("A temp directory should be removed at node startup", 0, files.length); + } + + /** @throws Exception If failed. */ + @Test + public void testNodeJoinDuringRestore() throws Exception { + Ignite ignite = startGridsWithSnapshot(2, CACHE_KEYS_RANGE); + + TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(grid(1)); + + IgniteFuture fut = waitForBlockOnRestore(spi, RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE, DEFAULT_CACHE_NAME); + + GridTestUtils.assertThrowsAnyCause( + log, + () -> startGrid(2), + IgniteSpiException.class, + "Joining node during caches restore is not allowed" + ); + + spi.stopBlock(); + + fut.get(TIMEOUT); + + IgniteCache cache = ignite.cache(DEFAULT_CACHE_NAME); + + assertTrue(cache.indexReadyFuture().isDone()); + + assertCacheKeys(cache, CACHE_KEYS_RANGE); + } + + /** + * @throws Exception if failed. + */ + @Test + public void testClusterStateChangeActiveReadonlyOnPrepare() throws Exception { + checkClusterStateChange(ClusterState.ACTIVE_READ_ONLY, RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE, + IgniteException.class, "Failed to perform start cache operation (cluster is in read-only mode)"); + } + + /** + * @throws Exception if failed. + */ + @Test + public void testClusterStateChangeActiveReadonlyOnCacheStart() throws Exception { + checkClusterStateChange(ClusterState.ACTIVE_READ_ONLY, RESTORE_CACHE_GROUP_SNAPSHOT_START, null, null); + } + + /** + * @throws Exception if failed. + */ + @Test + public void testClusterDeactivateOnPrepare() throws Exception { + checkClusterStateChange(ClusterState.INACTIVE, RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE, + IgniteException.class, "The cluster has been deactivated."); + } + + /** + * @throws Exception if failed. + */ + @Test + public void testClusterDeactivateOnCacheStart() throws Exception { + checkClusterStateChange(ClusterState.INACTIVE, RESTORE_CACHE_GROUP_SNAPSHOT_START, null, null); + } + + /** + * @param state Cluster state. + * @param procType The type of distributed process on which communication is blocked. + * @param exCls Expected exception class. + * @param expMsg Expected exception message. + * @throws Exception if failed. + */ + private void checkClusterStateChange( + ClusterState state, + DistributedProcessType procType, + @Nullable Class exCls, + @Nullable String expMsg + ) throws Exception { + int nodesCnt = 2; + + Ignite ignite = startGridsWithSnapshot(nodesCnt, CACHE_KEYS_RANGE, true); + + TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(grid(nodesCnt - 1)); + + IgniteFuture fut = waitForBlockOnRestore(spi, procType, DEFAULT_CACHE_NAME); + + ignite.cluster().state(state); + + spi.stopBlock(); + + if (exCls == null) { + fut.get(TIMEOUT); + + ignite.cluster().state(ClusterState.ACTIVE); + + assertCacheKeys(ignite.cache(DEFAULT_CACHE_NAME), CACHE_KEYS_RANGE); + + return; + } + + GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(TIMEOUT), exCls, expMsg); + + ignite.cluster().state(ClusterState.ACTIVE); + + ensureCacheAbsent(dfltCacheCfg); + + String cacheName = DEFAULT_CACHE_NAME; + + grid(nodesCnt - 1).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(cacheName)).get(TIMEOUT); + + assertCacheKeys(ignite.cache(cacheName), CACHE_KEYS_RANGE); + } + + /** + * @param ccfg Cache configuration. + * @throws IgniteCheckedException if failed. + */ + private void ensureCacheAbsent(CacheConfiguration ccfg) throws IgniteCheckedException { + String cacheName = ccfg.getName(); + + for (Ignite ignite : G.allGrids()) { + GridKernalContext kctx = ((IgniteEx)ignite).context(); + + if (kctx.clientNode()) + continue; + + CacheGroupDescriptor desc = kctx.cache().cacheGroupDescriptors().get(CU.cacheId(cacheName)); + + assertNull("nodeId=" + kctx.localNodeId() + ", cache=" + cacheName, desc); + + GridTestUtils.waitForCondition( + () -> !kctx.cache().context().snapshotMgr().isRestoring(), + TIMEOUT); + + File dir = ((FilePageStoreManager)kctx.cache().context().pageStore()).cacheWorkDir(ccfg); + + String errMsg = String.format("%s, dir=%s, exists=%b, files=%s", + ignite.name(), dir, dir.exists(), Arrays.toString(dir.list())); + + assertTrue(errMsg, !dir.exists() || dir.list().length == 0); + } + } + + /** + * @param spi Test communication spi. + * @param restorePhase The type of distributed process on which communication is blocked. + * @param grpName Cache group name. + * @return Snapshot restore future. + * @throws InterruptedException if interrupted. + */ + private IgniteFuture waitForBlockOnRestore( + TestRecordingCommunicationSpi spi, + DistributedProcessType restorePhase, + String grpName + ) throws InterruptedException { + spi.blockMessages((node, msg) -> + msg instanceof SingleNodeMessage && ((SingleNodeMessage)msg).type() == restorePhase.ordinal()); + + IgniteFuture fut = + grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(grpName)); + + spi.waitForBlocked(); + + return fut; + } + + /** + * Custom I/O factory to preprocessing created files. + */ + private static class CustomFileIOFactory implements FileIOFactory { + /** Serial version UID. */ + private static final long serialVersionUID = 0L; + + /** Delegate factory. */ + private final FileIOFactory delegate; + + /** Preprocessor for created files. */ + private final Consumer hnd; + + /** + * @param delegate Delegate factory. + * @param hnd Preprocessor for created files. + */ + public CustomFileIOFactory(FileIOFactory delegate, Consumer hnd) { + this.delegate = delegate; + this.hnd = hnd; + } + + /** {@inheritDoc} */ + @Override public FileIO create(File file, OpenOption... modes) throws IOException { + FileIO delegate = this.delegate.create(file, modes); + + hnd.accept(file); + + return delegate; + } + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java index 14cd9d74a1845..fbbe62ec0c6cb 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java @@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.persistence.CommonPoolStarvationCheckpointTest; import org.apache.ignite.internal.processors.cache.persistence.SingleNodePersistenceSslTest; import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotCheckTest; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotRestoreSelfTest; import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotSelfTest; import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotMXBeanTest; import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManagerSelfTest; @@ -99,6 +100,7 @@ IgniteClusterSnapshotSelfTest.class, IgniteClusterSnapshotCheckTest.class, IgniteSnapshotMXBeanTest.class, + IgniteClusterSnapshotRestoreSelfTest.class, IgniteClusterIdTagTest.class, FullyConnectedComponentSearcherTest.class, diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckWithIndexesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckWithIndexesTest.java index e0aee42e3270a..7f4165d243e59 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckWithIndexesTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckWithIndexesTest.java @@ -119,7 +119,7 @@ public SelfNodeFilter(UUID nodeId) { * @param cacheName Cache name. * @return Cache configuration. */ - private static CacheConfiguration txFilteredCache(String cacheName) { + private CacheConfiguration txFilteredCache(String cacheName) { return txCacheConfig(new CacheConfiguration(cacheName)) .setCacheMode(CacheMode.REPLICATED) .setQueryEntities(singletonList(new QueryEntity(Integer.class.getName(), Account.class.getName()))); diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreWithIndexingTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreWithIndexingTest.java new file mode 100644 index 0000000000000..17f495c02046b --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreWithIndexingTest.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.snapshot; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Objects; +import java.util.function.Function; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.binary.BinaryBasicNameMapper; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.QueryIndex; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.query.GridQueryProcessor; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.G; +import org.junit.Test; + +/** + * Cluster snapshot restore tests verifying SQL and indexing. + */ +public class IgniteClusterSnapshotRestoreWithIndexingTest extends IgniteClusterSnapshotRestoreBaseTest { + /** Type name used for binary and SQL. */ + private static final String TYPE_NAME = IndexedObject.class.getName(); + + /** Number of cache keys to pre-create at node start. */ + private static final int CACHE_KEYS_RANGE = 10_000; + + /** Cache value builder. */ + private Function valBuilder = new BinaryValueBuilder(TYPE_NAME); + + /** {@inheritDoc} */ + @Override protected CacheConfiguration txCacheConfig(CacheConfiguration ccfg) { + return super.txCacheConfig(ccfg).setSqlIndexMaxInlineSize(255).setSqlSchema("PUBLIC") + .setQueryEntities(Collections.singletonList(new QueryEntity() + .setKeyType(Integer.class.getName()) + .setValueType(TYPE_NAME) + .setFields(new LinkedHashMap<>(F.asMap("id", Integer.class.getName(), "name", String.class.getName()))) + .setIndexes(Collections.singletonList(new QueryIndex("id"))))); + } + + /** {@inheritDoc} */ + @Override protected Function valueBuilder() { + return valBuilder; + } + + /** @throws Exception If failed. */ + @Test + public void testBasicClusterSnapshotRestore() throws Exception { + valBuilder = new IndexedValueBuilder(); + + IgniteEx client = startGridsWithSnapshot(2, CACHE_KEYS_RANGE, true); + + grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME)).get(TIMEOUT); + + assertCacheKeys(client.cache(DEFAULT_CACHE_NAME), CACHE_KEYS_RANGE); + } + + /** @throws Exception If failed. */ + @Test + public void testBasicClusterSnapshotRestoreWithMetadata() throws Exception { + IgniteEx ignite = startGridsWithSnapshot(2, CACHE_KEYS_RANGE); + + // Remove metadata. + int typeId = ignite.context().cacheObjects().typeId(TYPE_NAME); + + ignite.context().cacheObjects().removeType(typeId); + + forceCheckpoint(); + + ignite.snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME)).get(TIMEOUT); + + assertCacheKeys(ignite.cache(DEFAULT_CACHE_NAME).withKeepBinary(), CACHE_KEYS_RANGE); + + for (Ignite grid : G.allGrids()) + assertNotNull(((IgniteEx)grid).context().cacheObjects().metadata(typeId)); + } + + /** @throws Exception If failed. */ + @Test + public void testClusterSnapshotRestoreOnBiggerTopology() throws Exception { + int nodesCnt = 4; + + startGridsWithCache(nodesCnt - 2, CACHE_KEYS_RANGE, valBuilder, dfltCacheCfg); + + grid(0).snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT); + + startGrid(nodesCnt - 2); + + IgniteEx ignite = startGrid(nodesCnt - 1); + + resetBaselineTopology(); + + awaitPartitionMapExchange(); + + ignite.cache(DEFAULT_CACHE_NAME).destroy(); + + awaitPartitionMapExchange(); + + // Remove metadata. + int typeId = ignite.context().cacheObjects().typeId(TYPE_NAME); + + ignite.context().cacheObjects().removeType(typeId); + + forceCheckpoint(); + + // Restore from an empty node. + ignite.snapshot().restoreSnapshot( + SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME)).get(TIMEOUT); + + awaitPartitionMapExchange(); + + assertCacheKeys(ignite.cache(DEFAULT_CACHE_NAME).withKeepBinary(), CACHE_KEYS_RANGE); + } + + /** {@inheritDoc} */ + @Override protected void assertCacheKeys(IgniteCache cache, int keysCnt) { + super.assertCacheKeys(cache, keysCnt); + + String tblName = new BinaryBasicNameMapper(true).typeName(TYPE_NAME); + + for (Ignite grid : G.allGrids()) { + GridKernalContext ctx = ((IgniteEx)grid).context(); + + String nodeId = ctx.localNodeId().toString(); + + assertTrue("nodeId=" + nodeId, grid.cache(cache.getName()).indexReadyFuture().isDone()); + + // Make sure no index rebuild happened. + assertEquals("nodeId=" + nodeId, + 0, ctx.cache().cache(cache.getName()).context().cache().metrics0().getIndexRebuildKeysProcessed()); + + GridQueryProcessor qry = ((IgniteEx)grid).context().query(); + + // Make sure SQL works fine. + assertEquals("nodeId=" + nodeId, (long)keysCnt, qry.querySqlFields(new SqlFieldsQuery( + "SELECT count(*) FROM " + tblName), true).getAll().get(0).get(0)); + + // Make sure the index is in use. + String explainPlan = (String)qry.querySqlFields(new SqlFieldsQuery( + "explain SELECT * FROM " + tblName + " WHERE id < 10"), true).getAll().get(0).get(0); + + assertTrue("nodeId=" + nodeId + "\n" + explainPlan, explainPlan.contains("ID_ASC_IDX")); + } + } + + /** */ + private static class IndexedValueBuilder implements Function { + /** {@inheritDoc} */ + @Override public Object apply(Integer key) { + return new IndexedObject(key, "Person number #" + key); + } + } + + /** */ + private static class IndexedObject { + /** Id. */ + @QuerySqlField(index = true) + private final int id; + + /** Name. */ + @QuerySqlField + private final String name; + + /** + * @param id Id. + */ + public IndexedObject(int id, String name) { + this.id = id; + this.name = name; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + IndexedObject obj = (IndexedObject)o; + + return id == obj.id && Objects.equals(name, obj.name); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return Objects.hash(name, id); + } + } +} diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java index 593f5dfab9ea7..69cbb71e83b1c 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java @@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.LongDestroyDurableBackgroundTaskTest; import org.apache.ignite.internal.processors.cache.persistence.db.MultipleParallelCacheDeleteDeadlockTest; import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotCheckWithIndexesTest; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotRestoreWithIndexingTest; import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotWithIndexesTest; import org.apache.ignite.internal.processors.database.IgniteDbMultiNodeWithIndexingPutGetTest; import org.apache.ignite.internal.processors.database.IgniteDbSingleNodeWithIndexingPutGetTest; @@ -66,7 +67,8 @@ CacheGroupReencryptionTest.class, IgnitePdsIndexingDefragmentationTest.class, StopRebuildIndexTest.class, - ForceRebuildIndexTest.class + ForceRebuildIndexTest.class, + IgniteClusterSnapshotRestoreWithIndexingTest.class }) public class IgnitePdsWithIndexingTestSuite { }