diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotVerificationTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotVerificationTask.java new file mode 100644 index 0000000000000..36915c988824b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotVerificationTask.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.snapshot; + +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.compute.ComputeJob; +import org.apache.ignite.compute.ComputeJobResult; +import org.apache.ignite.compute.ComputeJobResultPolicy; +import org.apache.ignite.compute.ComputeTask; +import org.apache.ignite.compute.ComputeTaskAdapter; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.jetbrains.annotations.NotNull; + +/** + * The task for checking the consistency of snapshots in the cluster. + * + * @param Type of the task result returning from {@link ComputeTask#reduce(List)} method. + */ +public abstract class AbstractSnapshotVerificationTask extends + ComputeTaskAdapter { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** Task argument. */ + protected final Map> metas = new HashMap<>(); + + /** Ignite instance. */ + @IgniteInstanceResource + protected IgniteEx ignite; + + /** Cache groups to be restored from the snapshot. May be empty if all cache groups are being restored. */ + protected Collection grps; + + /** {@inheritDoc} */ + @Override public @NotNull Map map(List subgrid, + SnapshotPartitionsVerifyTaskArg arg) throws IgniteException { + grps = arg.cacheGroupNames(); + + Map> clusterMetas = arg.clusterMetadata(); + + if (!subgrid.containsAll(clusterMetas.keySet())) { + throw new IgniteSnapshotVerifyException(F.asMap(ignite.localNode(), + new IgniteException("Some of Ignite nodes left the cluster during the snapshot verification " + + "[curr=" + F.viewReadOnly(subgrid, F.node2id()) + + ", init=" + F.viewReadOnly(clusterMetas.keySet(), F.node2id()) + ']'))); + } + + Map jobs = new HashMap<>(); + Set allMetas = new HashSet<>(); + clusterMetas.values().forEach(allMetas::addAll); + + Set missed = null; + + for (SnapshotMetadata meta : allMetas) { + if (missed == null) + missed = new HashSet<>(meta.baselineNodes()); + + missed.remove(meta.consistentId()); + + if (missed.isEmpty()) + break; + } + + if (!missed.isEmpty()) { + throw new IgniteSnapshotVerifyException(F.asMap(ignite.localNode(), + new IgniteException("Some metadata is missing from the snapshot: " + missed))); + } + + metas.putAll(clusterMetas); + + while (!allMetas.isEmpty()) { + for (Map.Entry> e : clusterMetas.entrySet()) { + SnapshotMetadata meta = F.find(e.getValue(), null, allMetas::remove); + + if (meta == null) + continue; + + jobs.put(makeJob(meta.snapshotName(), meta.consistentId(), arg.cacheGroupNames()), + e.getKey()); + + if (allMetas.isEmpty()) + break; + } + } + + return jobs; + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List rcvd) throws IgniteException { + // Handle all exceptions during the `reduce` operation. + return ComputeJobResultPolicy.WAIT; + } + + /** + * @param name Snapshot name. + * @param constId Snapshot metadata file name. + * @param groups Cache groups to be restored from the snapshot. May be empty if all cache groups are being restored. + * @return Compute job. + */ + protected abstract ComputeJob makeJob(String name, String constId, Collection groups); +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java index f9ea6a5734d9a..eac8144509b76 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java @@ -42,6 +42,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Deque; +import java.util.EnumMap; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; @@ -100,6 +101,11 @@ import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage; import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage; import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.lifecycle.SnapshotHandler; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.lifecycle.SnapshotHandlerContext; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.lifecycle.SnapshotHandlerResult; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.lifecycle.SnapshotHandlerType; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.lifecycle.SnapshotRestoreHandleTask; import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPagePayload; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; @@ -268,6 +274,9 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter /** Distributed process to restore cache group from the snapshot. */ private final SnapshotRestoreProcess restoreCacheGrpProc; + /** Snapshot operation handlers. */ + private final Map>> handlers = new EnumMap<>(SnapshotHandlerType.class); + /** Resolved persistent data storage settings. */ private volatile PdsFolderSettings pdsSettings; @@ -310,6 +319,8 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter /** Last seen cluster snapshot operation. */ private volatile ClusterSnapshotFuture lastSeenSnpFut = new ClusterSnapshotFuture(); +// private SnapshotConsistencyValidator validator; + /** * @param ctx Kernal context. */ @@ -381,6 +392,24 @@ public static String partDeltaFileName(int partId) { U.ensureDirectory(locSnpDir, "snapshot work directory", log); U.ensureDirectory(tmpWorkDir, "temp directory for snapshot creation", log); + SnapshotPartitionsVerifyRestoreHandler integrityCheck = new SnapshotPartitionsVerifyRestoreHandler(ctx); + + handlers.put(integrityCheck.type(), new ArrayList<>(Collections.singleton(integrityCheck))); + + SnapshotHandler[] handlers0 = ctx.plugins().extensions(SnapshotHandler.class); + + if (handlers0 != null) { + for (SnapshotHandler hnd : handlers0) + handlers.computeIfAbsent(hnd.type(), v -> new ArrayList<>()).add(hnd); + } + + for (SnapshotHandlerType type : SnapshotHandlerType.values()) { + List> hndList = handlers.putIfAbsent(type, Collections.emptyList()); + + if (hndList != null) + handlers.put(type, Collections.unmodifiableList(hndList)); + } + MetricRegistry mreg = cctx.kernalContext().metric().registry(SNAPSHOT_METRICS); mreg.register("LastSnapshotStartTime", () -> lastSeenSnpFut.startTime, @@ -538,6 +567,10 @@ public void deleteSnapshot(File snpDir, String folderName) { } } + public Collection> handlers(SnapshotHandlerType type) { + return handlers.get(type); + } + /** * @param snpName Snapshot name. * @return Local snapshot directory for snapshot with given name. @@ -654,22 +687,37 @@ private IgniteInternalFuture initLocalSnapshotStartSt smf.getParentFile().mkdirs(); + SnapshotMetadata meta = new SnapshotMetadata(req.requestId(), + req.snapshotName(), + cctx.localNode().consistentId().toString(), + pdsSettings.folderName(), + cctx.gridConfig().getDataStorageConfiguration().getPageSize(), + grpIds, + blts, + fut.result()); + try (OutputStream out = new BufferedOutputStream(new FileOutputStream(smf))) { - U.marshal(marsh, - new SnapshotMetadata(req.requestId(), - req.snapshotName(), - cctx.localNode().consistentId().toString(), - pdsSettings.folderName(), - cctx.gridConfig().getDataStorageConfiguration().getPageSize(), - grpIds, - blts, - fut.result()), - out); + U.marshal(marsh, meta, out); log.info("Snapshot metafile has been created: " + smf.getAbsolutePath()); } - return new SnapshotOperationResponse(); + Map> results = new HashMap<>(); + SnapshotHandlerContext ctx = new SnapshotHandlerContext(meta, null); + + for (SnapshotHandler hnd : handlers(SnapshotHandlerType.CREATE)) { + SnapshotHandlerResult res; + + try { + res = new SnapshotHandlerResult<>(hnd.handle(ctx), null, cctx.localNode()); + } catch (Exception e) { + res = new SnapshotHandlerResult<>(null, e, cctx.localNode()); + } + + results.put(hnd.getClass().getName(), res); + } + + return new SnapshotOperationResponse(results); } catch (IOException | IgniteCheckedException e) { throw F.wrap(e); @@ -722,11 +770,46 @@ else if (!F.isEmpty(err) || !missed.isEmpty()) { "due to some of nodes left the cluster. Uncompleted snapshot will be deleted " + "[err=" + err + ", missed=" + missed + ']')); } + else + snpReq.error(handleClusterSnapshotCreation(snpReq.snapshotName(), res.values())); endSnpProc.start(UUID.randomUUID(), snpReq); } } + /** + * @param snpName Snapshot name. + * @param responses Snapshot operation responses. + * @throws IgniteCheckedException If any of the handlers were unable to process the local results of the nodes. + * @return Exception If any of the handlers were unable to process the local results of the nodes. + */ + @SuppressWarnings({"rawtypes", "unchecked"}) + private @Nullable Exception handleClusterSnapshotCreation(String snpName, Collection responses) { + Map> hndResults = new HashMap<>(); + + for (SnapshotOperationResponse response : responses) { + if (response.operationHandlerResults() == null) + continue; + + for (Map.Entry> entry : response.operationHandlerResults().entrySet()) + hndResults.computeIfAbsent(entry.getKey(), v -> new ArrayList<>()).add(entry.getValue()); + } + + if (hndResults.isEmpty()) + return null; + + for (SnapshotHandler hnd : handlers(SnapshotHandlerType.CREATE)) { + try { + hnd.reduce(snpName, hndResults.get(hnd.getClass().getName())); + } + catch (Exception err) { + return err; + } + } + + return null; + } + /** * @param req Request on snapshot creation. * @return Future which will be completed when the snapshot will be finalized. @@ -950,7 +1033,7 @@ public IgniteInternalFuture checkSnapshot(String name) { cctx.kernalContext().security().authorize(ADMIN_SNAPSHOT); - return checkSnapshot(name, null).chain(f -> { + return checkSnapshot(name, null, false).chain(f -> { try { return f.get().idleVerifyResult(); } @@ -967,9 +1050,15 @@ public IgniteInternalFuture checkSnapshot(String name) { * * @param name Snapshot name. * @param grps Collection of cache group names to check. + * @param includeCustomHandlers {@code True} to invoke all user-defined {@link SnapshotHandlerType#RESTORE} + * handlers, otherwise only system consistency check will be performed. * @return {@code true} if snapshot is OK. */ - public IgniteInternalFuture checkSnapshot(String name, @Nullable Collection grps) { + public IgniteInternalFuture checkSnapshot( + String name, + @Nullable Collection grps, + boolean includeCustomHandlers + ) { A.notNullOrEmpty(name, "Snapshot name cannot be null or empty."); A.ensure(U.alphanumericUnderscore(name), "Snapshot name must satisfy the following name pattern: a-zA-Z0-9_"); A.ensure(grps == null || grps.stream().filter(Objects::isNull).collect(Collectors.toSet()).isEmpty(), @@ -1009,16 +1098,18 @@ public IgniteInternalFuture checkSnapshot(St kctx0.task().setThreadContext(TC_SKIP_AUTH, true); kctx0.task().setThreadContext(TC_SUBGRID, new ArrayList<>(metas.keySet())); - kctx0.task().execute(SnapshotPartitionsVerifyTask.class, new SnapshotPartitionsVerifyTaskArg(grps, metas)) - .listen(f1 -> { - if (f1.error() == null) - res.onDone(f1.result()); - else if (f1.error() instanceof IgniteSnapshotVerifyException) - res.onDone(new SnapshotPartitionsVerifyTaskResult(metas, - new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f1.error()).exceptions()))); - else - res.onDone(f1.error()); - }); + Class cls = + includeCustomHandlers ? SnapshotPartitionsVerifyTask.class : SnapshotRestoreHandleTask.class; + + kctx0.task().execute(cls, new SnapshotPartitionsVerifyTaskArg(grps, metas)).listen(f1 -> { + if (f1.error() == null) + res.onDone(f1.result()); + else if (f1.error() instanceof IgniteSnapshotVerifyException) + res.onDone(new SnapshotPartitionsVerifyTaskResult(metas, + new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f1.error()).exceptions()))); + else + res.onDone(f1.error()); + }); } else { if (f0.error() instanceof IgniteSnapshotVerifyException) @@ -1069,7 +1160,7 @@ public SnapshotMetadata readSnapshotMetadata(String snpName, String consId) { * @param smf File denoting to snapshot metafile. * @return Snapshot metadata instance. */ - private SnapshotMetadata readSnapshotMetadata(File smf) { + public SnapshotMetadata readSnapshotMetadata(File smf) { if (!smf.exists()) throw new IgniteException("Snapshot metafile cannot be read due to it doesn't exist: " + smf); @@ -1358,7 +1449,7 @@ public void onCacheGroupsStopped(List grps) { * @param consId Consistent node id. * @return Snapshot metadata file name. */ - private static String snapshotMetaFileName(String consId) { + public static String snapshotMetaFileName(String consId) { return U.maskForFileName(consId) + SNAPSHOT_METAFILE_EXT; } @@ -2067,6 +2158,20 @@ public LocalSnapshotSender(String snpName) { private static class SnapshotOperationResponse implements Serializable { /** Serial version uid. */ private static final long serialVersionUID = 0L; + + private final Map> hndResults; + + public SnapshotOperationResponse() { + this(null); + } + + public SnapshotOperationResponse(Map> hndResults) { + this.hndResults = F.isEmpty(hndResults) ? null : hndResults; + } + + public @Nullable Map> operationHandlerResults() { + return hndResults; + } } /** Snapshot operation start message. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyRestoreHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyRestoreHandler.java new file mode 100644 index 0000000000000..0941b8dee422b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyRestoreHandler.java @@ -0,0 +1,78 @@ +package org.apache.ignite.internal.processors.cache.persistence.snapshot; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.lifecycle.SnapshotHandlerResult; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.lifecycle.SnapshotHandlerType; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.lifecycle.SnapshotHandlerContext; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.lifecycle.SnapshotHandler; +import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2; +import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2; +import org.apache.ignite.internal.processors.cache.verify.PartitionKeyV2; +import org.apache.ignite.internal.util.typedef.F; + +public class SnapshotPartitionsVerifyRestoreHandler implements SnapshotHandler> { + /** */ + private final GridKernalContext ctx; + + /** */ + private final IgniteLogger log; + + public SnapshotPartitionsVerifyRestoreHandler(GridKernalContext ctx) { + this.ctx = ctx; + + log = ctx.log(getClass()); + } + + /** {@inheritDoc} */ + @Override public SnapshotHandlerType type() { + return SnapshotHandlerType.RESTORE; + } + + /** {@inheritDoc} */ + @Override public Map handle( + SnapshotHandlerContext opCtx + ) throws IgniteCheckedException { + return VisorVerifySnapshotPartitionsJob.checkPartitions(opCtx.metadata(), opCtx.groups(), ctx.cache().context(), log); + } + + /** {@inheritDoc} */ + @Override public void reduce( + String name, + Collection>> results + ) throws IgniteCheckedException { + Map> clusterHashes = new HashMap<>(); + Map errs = new HashMap<>(); + + for (SnapshotHandlerResult> res : results) { + if (res.exception() != null) { + errs.put(res.node(), res.exception()); + + continue; + } + + for (Map.Entry e : res.data().entrySet()) { + List records = clusterHashes.computeIfAbsent(e.getKey(), k -> new ArrayList<>()); + + records.add(e.getValue()); + } + } + + IdleVerifyResultV2 verifyResult = new IdleVerifyResultV2(clusterHashes, errs); + + if (!F.isEmpty(errs) || verifyResult.hasConflicts()) { + StringBuilder sb = new StringBuilder(); + + verifyResult.print(sb::append, true); + + throw new IgniteCheckedException(sb.toString()); + } + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java index 0e99e8cfc094c..6311ee45cfbef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java @@ -17,62 +17,17 @@ package org.apache.ignite.internal.processors.cache.persistence.snapshot; -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.compute.ComputeJob; -import org.apache.ignite.compute.ComputeJobAdapter; import org.apache.ignite.compute.ComputeJobResult; -import org.apache.ignite.compute.ComputeJobResultPolicy; -import org.apache.ignite.compute.ComputeTaskAdapter; -import org.apache.ignite.internal.GridComponent; -import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteEx; -import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; -import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore; -import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; -import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage; -import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; -import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO; -import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2; -import org.apache.ignite.internal.processors.cache.verify.PartitionKeyV2; import org.apache.ignite.internal.processors.cache.verify.VerifyBackupPartitionsTaskV2; import org.apache.ignite.internal.processors.task.GridInternal; -import org.apache.ignite.internal.util.GridUnsafe; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.CU; -import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.resources.IgniteInstanceResource; -import org.apache.ignite.resources.LoggerResource; -import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_DATA; -import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX; -import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION; -import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING; -import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.fromOrdinal; -import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheGroupName; -import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cachePartitionFiles; -import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId; -import static org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getTypeByPartId; -import static org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.calculatePartitionHash; -import static org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.checkPartitionsPageCrcSum; import static org.apache.ignite.internal.processors.cache.verify.VerifyBackupPartitionsTaskV2.reduce0; /** @@ -81,276 +36,25 @@ * to be hold on. */ @GridInternal -public class SnapshotPartitionsVerifyTask - extends ComputeTaskAdapter { +public class SnapshotPartitionsVerifyTask extends AbstractSnapshotVerificationTask { /** Serial version uid. */ private static final long serialVersionUID = 0L; - /** Task argument. */ - private final Map> metas = new HashMap<>(); - - /** Ignite instance. */ + /** + * @param ignite Ignite instance. + */ @IgniteInstanceResource - private IgniteEx ignite; + public void ignite(IgniteEx ignite) { + this.ignite = ignite; + } /** {@inheritDoc} */ - @Override public @NotNull Map map( - List subgrid, - @Nullable SnapshotPartitionsVerifyTaskArg arg - ) throws IgniteException { - Map> clusterMetas = arg.clusterMetadata(); - - if (!subgrid.containsAll(clusterMetas.keySet())) { - throw new IgniteSnapshotVerifyException(F.asMap(ignite.localNode(), - new IgniteException("Some of Ignite nodes left the cluster during the snapshot verification " + - "[curr=" + F.viewReadOnly(subgrid, F.node2id()) + - ", init=" + F.viewReadOnly(clusterMetas.keySet(), F.node2id()) + ']'))); - } - - Map jobs = new HashMap<>(); - Set allMetas = new HashSet<>(); - clusterMetas.values().forEach(allMetas::addAll); - - Set missed = null; - - for (SnapshotMetadata meta : allMetas) { - if (missed == null) - missed = new HashSet<>(meta.baselineNodes()); - - missed.remove(meta.consistentId()); - - if (missed.isEmpty()) - break; - } - - if (!missed.isEmpty()) { - throw new IgniteSnapshotVerifyException(F.asMap(ignite.localNode(), - new IgniteException("Some metadata is missing from the snapshot: " + missed))); - } - - metas.putAll(clusterMetas); - - while (!allMetas.isEmpty()) { - for (Map.Entry> e : clusterMetas.entrySet()) { - SnapshotMetadata meta = F.find(e.getValue(), null, allMetas::remove); - - if (meta == null) - continue; - - jobs.put(new VisorVerifySnapshotPartitionsJob(meta.snapshotName(), meta.consistentId(), arg.cacheGroupNames()), - e.getKey()); - - if (allMetas.isEmpty()) - break; - } - } - - return jobs; + @Override protected ComputeJob makeJob(String name, String constId, Collection groups) { + return new VisorVerifySnapshotPartitionsJob(name, constId, groups); } /** {@inheritDoc} */ @Override public @Nullable SnapshotPartitionsVerifyTaskResult reduce(List results) throws IgniteException { return new SnapshotPartitionsVerifyTaskResult(metas, reduce0(results)); } - - /** {@inheritDoc} */ - @Override public ComputeJobResultPolicy result(ComputeJobResult res, List rcvd) throws IgniteException { - // Handle all exceptions during the `reduce` operation. - return ComputeJobResultPolicy.WAIT; - } - - /** Job that collects update counters of snapshot partitions on the node it executes. */ - private static class VisorVerifySnapshotPartitionsJob extends ComputeJobAdapter { - /** Serial version uid. */ - private static final long serialVersionUID = 0L; - - /** Ignite instance. */ - @IgniteInstanceResource - private IgniteEx ignite; - - /** Injected logger. */ - @LoggerResource - private IgniteLogger log; - - /** Snapshot name to validate. */ - private final String snpName; - - /** Consistent snapshot metadata file name. */ - private final String consId; - - /** Set of cache groups to be checked in the snapshot or {@code empty} to check everything. */ - private final Set rqGrps; - - /** - * @param snpName Snapshot name to validate. - * @param consId Consistent snapshot metadata file name. - * @param rqGrps Set of cache groups to be checked in the snapshot or {@code empty} to check everything. - */ - public VisorVerifySnapshotPartitionsJob(String snpName, String consId, Collection rqGrps) { - this.snpName = snpName; - this.consId = consId; - - this.rqGrps = rqGrps == null ? Collections.emptySet() : new HashSet<>(rqGrps); - } - - @Override public Map execute() throws IgniteException { - IgniteSnapshotManager snpMgr = ignite.context().cache().context().snapshotMgr(); - - if (log.isInfoEnabled()) { - log.info("Verify snapshot partitions procedure has been initiated " + - "[snpName=" + snpName + ", consId=" + consId + ']'); - } - - SnapshotMetadata meta = snpMgr.readSnapshotMetadata(snpName, consId); - Set grps = rqGrps.isEmpty() ? new HashSet<>(meta.partitions().keySet()) : - rqGrps.stream().map(CU::cacheId).collect(Collectors.toSet()); - Set partFiles = new HashSet<>(); - - for (File dir : snpMgr.snapshotCacheDirectories(snpName, meta.folderName())) { - int grpId = CU.cacheId(cacheGroupName(dir)); - - if (!grps.remove(grpId)) - continue; - - Set parts = new HashSet<>(meta.partitions().get(grpId)); - - for (File part : cachePartitionFiles(dir)) { - int partId = partId(part.getName()); - - if (!parts.remove(partId)) - continue; - - partFiles.add(part); - } - - if (!parts.isEmpty()) { - throw new IgniteException("Snapshot data doesn't contain required cache group partition " + - "[grpId=" + grpId + ", snpName=" + snpName + ", consId=" + consId + - ", missed=" + parts + ", meta=" + meta + ']'); - } - } - - if (!grps.isEmpty()) { - throw new IgniteException("Snapshot data doesn't contain required cache groups " + - "[grps=" + grps + ", snpName=" + snpName + ", consId=" + consId + - ", meta=" + meta + ']'); - } - - Map res = new ConcurrentHashMap<>(); - ThreadLocal buff = ThreadLocal.withInitial(() -> ByteBuffer.allocateDirect(meta.pageSize()) - .order(ByteOrder.nativeOrder())); - - try { - GridKernalContext snpCtx = snpMgr.createStandaloneKernalContext(snpName, meta.folderName()); - - for (GridComponent comp : snpCtx) - comp.start(); - - try { - U.doInParallel( - snpMgr.snapshotExecutorService(), - partFiles, - part -> { - String grpName = cacheGroupName(part.getParentFile()); - int grpId = CU.cacheId(grpName); - int partId = partId(part.getName()); - - FilePageStoreManager storeMgr = (FilePageStoreManager)ignite.context().cache().context().pageStore(); - - try (FilePageStore pageStore = (FilePageStore)storeMgr.getPageStoreFactory(grpId, false) - .createPageStore(getTypeByPartId(partId), - part::toPath, - val -> { - }) - ) { - if (partId == INDEX_PARTITION) { - checkPartitionsPageCrcSum(() -> pageStore, INDEX_PARTITION, FLAG_IDX); - - return null; - } - - if (grpId == MetaStorage.METASTORAGE_CACHE_ID) { - checkPartitionsPageCrcSum(() -> pageStore, partId, FLAG_DATA); - - return null; - } - - ByteBuffer pageBuff = buff.get(); - pageBuff.clear(); - pageStore.read(0, pageBuff, true); - - long pageAddr = GridUnsafe.bufferAddress(pageBuff); - - PagePartitionMetaIO io = PageIO.getPageIO(pageBuff); - GridDhtPartitionState partState = fromOrdinal(io.getPartitionState(pageAddr)); - - if (partState != OWNING) { - throw new IgniteCheckedException("Snapshot partitions must be in the OWNING " + - "state only: " + partState); - } - - long updateCntr = io.getUpdateCounter(pageAddr); - long size = io.getSize(pageAddr); - - if (log.isDebugEnabled()) { - log.debug("Partition [grpId=" + grpId - + ", id=" + partId - + ", counter=" + updateCntr - + ", size=" + size + "]"); - } - - // Snapshot partitions must always be in OWNING state. - // There is no `primary` partitions for snapshot. - PartitionKeyV2 key = new PartitionKeyV2(grpId, partId, grpName); - - PartitionHashRecordV2 hash = calculatePartitionHash(key, - updateCntr, - consId, - GridDhtPartitionState.OWNING, - false, - size, - snpMgr.partitionRowIterator(snpCtx, grpName, partId, pageStore)); - - assert hash != null : "OWNING must have hash: " + key; - - res.put(key, hash); - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - - return null; - } - ); - } - finally { - for (GridComponent comp : snpCtx) - comp.stop(true); - } - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } - - return res; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - VisorVerifySnapshotPartitionsJob job = (VisorVerifySnapshotPartitionsJob)o; - - return snpName.equals(job.snpName) && consId.equals(job.consId); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return Objects.hash(snpName, consId); - } - } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java index 5061532d9e65d..e3a64cf0e33b2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java @@ -53,7 +53,6 @@ import org.apache.ignite.internal.processors.cache.StoredCacheData; import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.ClusterSnapshotFuture; -import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2; import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState; import org.apache.ignite.internal.util.distributed.DistributedProcess; import org.apache.ignite.internal.util.future.GridFinishedFuture; @@ -184,7 +183,7 @@ public IgniteFuture start(String snpName, @Nullable Collection cac return new IgniteFinishedFutureImpl<>(e); } - ctx.cache().context().snapshotMgr().checkSnapshot(snpName, cacheGrpNames).listen(f -> { + ctx.cache().context().snapshotMgr().checkSnapshot(snpName, cacheGrpNames, true).listen(f -> { if (f.error() != null) { finishProcess(fut0.rqId, f.error()); @@ -251,18 +250,6 @@ public IgniteFuture start(String snpName, @Nullable Collection cac return; } - IdleVerifyResultV2 res = f.result().idleVerifyResult(); - - if (!F.isEmpty(res.exceptions()) || res.hasConflicts()) { - StringBuilder sb = new StringBuilder(); - - res.print(sb::append, true); - - finishProcess(fut0.rqId, new IgniteException(sb.toString())); - - return; - } - SnapshotOperationRequest req = new SnapshotOperationRequest( fut0.rqId, F.first(dataNodes), snpName, cacheGrpNames, dataNodes); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/VisorVerifySnapshotPartitionsJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/VisorVerifySnapshotPartitionsJob.java new file mode 100644 index 0000000000000..a6f8efd621b07 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/VisorVerifySnapshotPartitionsJob.java @@ -0,0 +1,255 @@ +package org.apache.ignite.internal.processors.cache.persistence.snapshot; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.compute.ComputeJobAdapter; +import org.apache.ignite.internal.GridComponent; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; +import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore; +import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; +import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO; +import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2; +import org.apache.ignite.internal.processors.cache.verify.PartitionKeyV2; +import org.apache.ignite.internal.util.GridUnsafe; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.resources.LoggerResource; + +import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_DATA; +import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX; +import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION; +import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING; +import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.fromOrdinal; +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheGroupName; +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cachePartitionFiles; +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId; +import static org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getTypeByPartId; +import static org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.calculatePartitionHash; +import static org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.checkPartitionsPageCrcSum; + +/** Job that collects update counters of snapshot partitions on the node it executes. */ +class VisorVerifySnapshotPartitionsJob extends ComputeJobAdapter { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** Ignite instance. */ + @IgniteInstanceResource + private IgniteEx ignite; + + /** Injected logger. */ + @LoggerResource + private IgniteLogger log; + + /** Snapshot name to validate. */ + private final String snpName; + + /** Consistent snapshot metadata file name. */ + private final String consId; + + /** Set of cache groups to be checked in the snapshot or {@code empty} to check everything. */ + private final Set rqGrps; + + /** + * @param snpName Snapshot name to validate. + * @param consId Consistent snapshot metadata file name. + * @param rqGrps Set of cache groups to be checked in the snapshot or {@code empty} to check everything. + */ + public VisorVerifySnapshotPartitionsJob(String snpName, String consId, Collection rqGrps) { + this.snpName = snpName; + this.consId = consId; + + this.rqGrps = rqGrps == null ? Collections.emptySet() : new HashSet<>(rqGrps); + } + + @Override public Map execute() throws IgniteException { + IgniteSnapshotManager snpMgr = ignite.context().cache().context().snapshotMgr(); + + if (log.isInfoEnabled()) { + log.info("Verify snapshot partitions procedure has been initiated " + + "[snpName=" + snpName + ", consId=" + consId + ']'); + } + + SnapshotMetadata meta = snpMgr.readSnapshotMetadata(snpName, consId); + + try { + return checkPartitions(meta, rqGrps, ignite.context().cache().context(), log); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + public static Map checkPartitions( + SnapshotMetadata meta, + Collection groups, + GridCacheSharedContext cctx, + IgniteLogger log + ) throws IgniteCheckedException { + IgniteSnapshotManager snpMgr = cctx.snapshotMgr(); + + Set grps = F.isEmpty(groups) ? new HashSet<>(meta.partitions().keySet()) : + groups.stream().map(CU::cacheId).collect(Collectors.toSet()); + Set partFiles = new HashSet<>(); + + for (File dir : snpMgr.snapshotCacheDirectories(meta.snapshotName(), meta.folderName())) { + int grpId = CU.cacheId(cacheGroupName(dir)); + + if (!grps.remove(grpId)) + continue; + + Set parts = new HashSet<>(meta.partitions().get(grpId)); + + for (File part : cachePartitionFiles(dir)) { + int partId = partId(part.getName()); + + if (!parts.remove(partId)) + continue; + + partFiles.add(part); + } + + if (!parts.isEmpty()) { + throw new IgniteException("Snapshot data doesn't contain required cache group partition " + + "[grpId=" + grpId + ", snpName=" + meta.snapshotName() + ", consId=" + meta.consistentId() + + ", missed=" + parts + ", meta=" + meta + ']'); + } + } + + if (!grps.isEmpty()) { + throw new IgniteException("Snapshot data doesn't contain required cache groups " + + "[grps=" + grps + ", snpName=" + meta.snapshotName() + ", consId=" + meta.consistentId() + + ", meta=" + meta + ']'); + } + + Map res = new ConcurrentHashMap<>(); + ThreadLocal buff = ThreadLocal.withInitial(() -> ByteBuffer.allocateDirect(meta.pageSize()) + .order(ByteOrder.nativeOrder())); + + GridKernalContext snpCtx = snpMgr.createStandaloneKernalContext(meta.snapshotName(), meta.folderName()); + + for (GridComponent comp : snpCtx) + comp.start(); + + try { + U.doInParallel( + snpMgr.snapshotExecutorService(), + partFiles, + part -> { + String grpName = cacheGroupName(part.getParentFile()); + int grpId = CU.cacheId(grpName); + int partId = partId(part.getName()); + + FilePageStoreManager storeMgr = (FilePageStoreManager)cctx.pageStore(); + + try (FilePageStore pageStore = (FilePageStore)storeMgr.getPageStoreFactory(grpId, false) + .createPageStore(getTypeByPartId(partId), + part::toPath, + val -> { + }) + ) { + if (partId == INDEX_PARTITION) { + checkPartitionsPageCrcSum(() -> pageStore, INDEX_PARTITION, FLAG_IDX); + + return null; + } + + if (grpId == MetaStorage.METASTORAGE_CACHE_ID) { + checkPartitionsPageCrcSum(() -> pageStore, partId, FLAG_DATA); + + return null; + } + + ByteBuffer pageBuff = buff.get(); + pageBuff.clear(); + pageStore.read(0, pageBuff, true); + + long pageAddr = GridUnsafe.bufferAddress(pageBuff); + + PagePartitionMetaIO io = PageIO.getPageIO(pageBuff); + GridDhtPartitionState partState = fromOrdinal(io.getPartitionState(pageAddr)); + + if (partState != OWNING) { + throw new IgniteCheckedException("Snapshot partitions must be in the OWNING " + + "state only: " + partState); + } + + long updateCntr = io.getUpdateCounter(pageAddr); + long size = io.getSize(pageAddr); + + if (log.isDebugEnabled()) { + log.debug("Partition [grpId=" + grpId + + ", id=" + partId + + ", counter=" + updateCntr + + ", size=" + size + "]"); + } + + // Snapshot partitions must always be in OWNING state. + // There is no `primary` partitions for snapshot. + PartitionKeyV2 key = new PartitionKeyV2(grpId, partId, grpName); + + PartitionHashRecordV2 hash = calculatePartitionHash(key, + updateCntr, + meta.consistentId(), + GridDhtPartitionState.OWNING, + false, + size, + snpMgr.partitionRowIterator(snpCtx, grpName, partId, pageStore)); + + assert hash != null : "OWNING must have hash: " + key; + + res.put(key, hash); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + + return null; + } + ); + } + finally { + for (GridComponent comp : snpCtx) + comp.stop(true); + } + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + VisorVerifySnapshotPartitionsJob job = (VisorVerifySnapshotPartitionsJob)o; + + return snpName.equals(job.snpName) && consId.equals(job.consId); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return Objects.hash(snpName, consId); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/lifecycle/SnapshotHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/lifecycle/SnapshotHandler.java new file mode 100644 index 0000000000000..70d14bdf95e98 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/lifecycle/SnapshotHandler.java @@ -0,0 +1,31 @@ +package org.apache.ignite.internal.processors.cache.persistence.snapshot.lifecycle; + +import java.util.Collection; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.plugin.Extension; + +/** + * Snapshot operation handler. + * + * @param type of the local handling result. + */ +public interface SnapshotHandler extends Extension { + /** Snapshot handler type. */ + public SnapshotHandlerType type(); + + /** */ + public T handle(SnapshotHandlerContext ctx) throws IgniteCheckedException; + + /** */ + public default void reduce(String name, Collection> results) throws IgniteCheckedException { + for (SnapshotHandlerResult res : results) { + if (res.exception() == null) + continue;; + + throw new IgniteCheckedException("Snapshot handler has failed " + + "[snapshot=" + name + + ", handler=" + getClass().getName() + + ", nodeId=" + res.node().id() + "].", res.exception()); + } + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/lifecycle/SnapshotHandlerContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/lifecycle/SnapshotHandlerContext.java new file mode 100644 index 0000000000000..d6c6a618a7e9b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/lifecycle/SnapshotHandlerContext.java @@ -0,0 +1,26 @@ +package org.apache.ignite.internal.processors.cache.persistence.snapshot.lifecycle; + +import java.util.Collection; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotMetadata; +import org.jetbrains.annotations.Nullable; + +public class SnapshotHandlerContext { + private final SnapshotMetadata metadata; + + private final Collection grps; + + public SnapshotHandlerContext(SnapshotMetadata metadata, Collection grps) { + this.metadata = metadata; + this.grps = grps; + } + + public SnapshotMetadata metadata() { + return metadata; + } + + public @Nullable Collection groups() { + return grps; + } + + +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/lifecycle/SnapshotHandlerResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/lifecycle/SnapshotHandlerResult.java new file mode 100644 index 0000000000000..edc4cd499efb3 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/lifecycle/SnapshotHandlerResult.java @@ -0,0 +1,37 @@ +package org.apache.ignite.internal.processors.cache.persistence.snapshot.lifecycle; + +import java.io.Serializable; +import org.apache.ignite.cluster.ClusterNode; +import org.jetbrains.annotations.Nullable; + +public class SnapshotHandlerResult implements Serializable { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + private final T data; + private final Exception err; + private final ClusterNode node; + + public SnapshotHandlerResult(T data, Exception err, ClusterNode node) { + this.data = data; + this.err = err; + this.node = node; + } + +// protected SnapshotHandlerResult() { +// // No-op. +// } + + public @Nullable T data() { + return data; + } + + public @Nullable Exception exception() { + return err; + } + + public ClusterNode node() { + return node; + } + +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/lifecycle/SnapshotHandlerType.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/lifecycle/SnapshotHandlerType.java new file mode 100644 index 0000000000000..7f9d3b2b258d3 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/lifecycle/SnapshotHandlerType.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.snapshot.lifecycle; + +/** + * Type of snapshot operation handlers. + */ +public enum SnapshotHandlerType { + /** Handler is called immediately after the snapshot is taken. */ + CREATE, + + /** Handler is called just before restore operation is started. */ + RESTORE +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/lifecycle/SnapshotRestoreHandleTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/lifecycle/SnapshotRestoreHandleTask.java new file mode 100644 index 0000000000000..32f133d044d73 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/lifecycle/SnapshotRestoreHandleTask.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.snapshot.lifecycle; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.compute.ComputeJob; +import org.apache.ignite.compute.ComputeJobAdapter; +import org.apache.ignite.compute.ComputeJobResult; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.AbstractSnapshotVerificationTask; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotMetadata; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotPartitionsVerifyTaskResult; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.resources.LoggerResource; +import org.jetbrains.annotations.Nullable; + +/** + * Snapshot restore operation handling task. + */ +public class SnapshotRestoreHandleTask extends AbstractSnapshotVerificationTask { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override protected ComputeJob makeJob(String snpName, String constId, Collection groups) { + return new RestoreHandleJob(snpName, constId, groups); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"rawtypes", "unchecked"}) + @Nullable @Override public SnapshotPartitionsVerifyTaskResult reduce(List results) throws IgniteException { + Map> clusterResults = new HashMap<>(); + + for (ComputeJobResult res : results) { + // Unhandled exception. + if (res.getException() != null) + throw res.getException(); + + Map nodeDataMap = res.getData(); + + for (Map.Entry entry : nodeDataMap.entrySet()) { + String lsnrName = entry.getKey(); + + clusterResults.computeIfAbsent(lsnrName, v -> new ArrayList<>()).add(entry.getValue()); + } + } + + String snpName = F.first(F.first(metas.values())).snapshotName(); + + for (SnapshotHandler hnd : ignite.context().cache().context().snapshotMgr().handlers(SnapshotHandlerType.RESTORE)) { + List res = clusterResults.get(hnd.getClass().getName()); + + if (res == null) + continue; + + try { + hnd.reduce(snpName, res); + } catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + return new SnapshotPartitionsVerifyTaskResult(metas, null); + } + + static class RestoreHandleJob extends ComputeJobAdapter { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** Ignite instance. */ + @IgniteInstanceResource + private IgniteEx ignite; + + /** Injected logger. */ + @LoggerResource + private IgniteLogger log; + + /** Snapshot name. */ + private final String snpName; + + /** String representation of the consistent node ID. */ + private final String consistentId; + + /** Cache groups to be restored from the snapshot. May be empty if all cache groups are being restored. */ + Collection grps; + + /** + * @param snpName Snapshot name. + * @param consistentId String representation of the consistent node ID. + * @param grps Cache groups to be restored from the snapshot. May be empty if all cache groups are being restored. + */ + public RestoreHandleJob(String snpName, String consistentId, Collection grps) { + this.snpName = snpName; + this.consistentId = consistentId; + this.grps = grps; + } + + /** {@inheritDoc} */ + @Override public Map> execute() { + Map> resMap = new HashMap<>(); + IgniteSnapshotManager snpMgr = ignite.context().cache().context().snapshotMgr(); + SnapshotMetadata meta = snpMgr.readSnapshotMetadata(snpName, consistentId); + SnapshotHandlerContext ctx = new SnapshotHandlerContext(meta, grps); + + for (SnapshotHandler hnd : snpMgr.handlers(SnapshotHandlerType.RESTORE)) { + SnapshotHandlerResult res; + + try { + res = new SnapshotHandlerResult<>(hnd.handle(ctx), null, ignite.localNode()); + } + catch (Exception e) { + res = new SnapshotHandlerResult<>(null, e, ignite.localNode()); + } + + resMap.put(hnd.getClass().getName(), res); + } + + return resMap; + } + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java index c57ce1ae5f540..0198b5dc88c48 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java @@ -593,7 +593,7 @@ private SnapshotPartitionsVerifyTaskResult checkSnapshotWithTwoCachesWhenOneIsCo corruptPartitionFile(ignite, SNAPSHOT_NAME, ccfg1, PART_ID); - return snp(ignite).checkSnapshot(SNAPSHOT_NAME, cachesToCheck).get(TIMEOUT); + return snp(ignite).checkSnapshot(SNAPSHOT_NAME, cachesToCheck, false).get(TIMEOUT); } /** diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotHandlerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotHandlerTest.java new file mode 100644 index 0000000000000..15f4999a164f3 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotHandlerTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.snapshot; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.lifecycle.SnapshotHandlerType; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.lifecycle.SnapshotHandlerContext; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.lifecycle.SnapshotHandler; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.plugin.AbstractTestPluginProvider; +import org.apache.ignite.plugin.ExtensionRegistry; +import org.apache.ignite.plugin.PluginContext; +import org.apache.ignite.testframework.GridTestUtils; +import org.junit.Test; + +public class IgniteClusterSnapshotHandlerTest extends IgniteClusterSnapshotRestoreBaseTest { + /** */ + private List> extensions = new ArrayList<>(); + + private SnapshotLifecyclePluginProvider testPluginProvider = new SnapshotLifecyclePluginProvider(extensions); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + return super.getConfiguration(igniteInstanceName).setPluginProviders(testPluginProvider); + } + + + /** @throws Exception If fails. */ + @Test + public void testClusterSnapshotHandlerFailure() throws Exception { + String expMsg = "Test verification exception message."; + + AtomicBoolean failCreateFlag = new AtomicBoolean(true); + AtomicBoolean failRestoreFlag = new AtomicBoolean(true); + + extensions.add(new SnapshotHandler() { + @Override public SnapshotHandlerType type() { + return SnapshotHandlerType.CREATE; + } + + @Override public Void handle(SnapshotHandlerContext ctx) throws IgniteCheckedException { + if (failCreateFlag.get()) + throw new IgniteCheckedException(expMsg); + + return null; + } + }); + + extensions.add(new SnapshotHandler() { + @Override public SnapshotHandlerType type() { + return SnapshotHandlerType.RESTORE; + } + + @Override public Void handle(SnapshotHandlerContext ctx) throws IgniteCheckedException { + if (failRestoreFlag.get()) + throw new IgniteCheckedException(expMsg); + + return null; + } + }); + + IgniteEx ignite = startGridsWithCache(2, CACHE_KEYS_RANGE, valueBuilder(), dfltCacheCfg); + + IgniteFuture fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME); + + GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(TIMEOUT), IgniteCheckedException.class, expMsg); + + failCreateFlag.set(false); + + ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT); + + ignite.cache(DEFAULT_CACHE_NAME).destroy(); + + awaitPartitionMapExchange(); + + IgniteFuture fut0 = ignite.snapshot().restoreSnapshot(SNAPSHOT_NAME, null); + + GridTestUtils.assertThrowsAnyCause(log, () -> fut0.get(TIMEOUT), IgniteCheckedException.class, expMsg); + + failRestoreFlag.set(false); + + ignite.snapshot().restoreSnapshot(SNAPSHOT_NAME, null).get(TIMEOUT); + + assertCacheKeys(ignite.cache(DEFAULT_CACHE_NAME), CACHE_KEYS_RANGE); + } + + private static class SnapshotLifecyclePluginProvider extends AbstractTestPluginProvider { + private final List> extensions; + + public SnapshotLifecyclePluginProvider(List> extensions) { + this.extensions = extensions; + } + + @Override public String name() { + return "SnapshotVerifier"; + } + + /** {@inheritDoc} */ + @Override public void initExtensions(PluginContext ctx, ExtensionRegistry registry) { + for (SnapshotHandler hnd : extensions) + registry.registerExtension(SnapshotHandler.class, hnd); + } + }; + + /** {@inheritDoc} */ + @Override protected Function valueBuilder() { + return Integer::new; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreBaseTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreBaseTest.java index 18d02f5242b15..95970554e6e78 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreBaseTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreBaseTest.java @@ -17,10 +17,22 @@ package org.apache.ignite.internal.processors.cache.persistence.snapshot; +import java.io.File; +import java.util.Arrays; import java.util.function.Function; +import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.binary.BinaryObjectBuilder; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor; +import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.testframework.GridTestUtils; /** * Snapshot restore test base. @@ -29,6 +41,13 @@ public abstract class IgniteClusterSnapshotRestoreBaseTest extends AbstractSnaps /** Cache value builder. */ protected abstract Function valueBuilder(); + protected final int PARTS_NUMBER = GridTestUtils.SF.apply(512); + + /** {@inheritDoc} */ + @Override protected CacheConfiguration txCacheConfig(CacheConfiguration ccfg) { + return super.txCacheConfig(ccfg).setAffinity(new RendezvousAffinityFunction(false, PARTS_NUMBER)); + } + /** * @param nodesCnt Nodes count. * @param keysCnt Number of keys to create. @@ -72,6 +91,38 @@ protected void assertCacheKeys(IgniteCache cache, int keysCnt) { assertEquals(valueBuilder().apply(i), cache.get(i)); } + /** + * @param ccfg Cache configuration. + * @throws IgniteCheckedException if failed. + */ + protected void ensureCacheAbsent(CacheConfiguration ccfg) throws IgniteCheckedException { + String cacheName = ccfg.getName(); + + for (Ignite ignite : G.allGrids()) { + GridKernalContext kctx = ((IgniteEx)ignite).context(); + + if (kctx.clientNode()) + continue; + + CacheGroupDescriptor desc = kctx.cache().cacheGroupDescriptors().get(CU.cacheId(cacheName)); + + assertNull("nodeId=" + kctx.localNodeId() + ", cache=" + cacheName, desc); + + boolean success = GridTestUtils.waitForCondition( + () -> !kctx.cache().context().snapshotMgr().isRestoring(), + TIMEOUT); + + assertTrue("The process has not finished on the node " + kctx.localNodeId(), success); + + File dir = ((FilePageStoreManager)kctx.cache().context().pageStore()).cacheWorkDir(ccfg); + + String errMsg = String.format("%s, dir=%s, exists=%b, files=%s", + ignite.name(), dir, dir.exists(), Arrays.toString(dir.list())); + + assertTrue(errMsg, !dir.exists() || dir.list().length == 0); + } + } + /** */ protected class BinaryValueBuilder implements Function { /** Binary type name. */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java index c4d6a7ee6f614..f02da07b680c5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java @@ -20,6 +20,7 @@ import java.io.File; import java.io.IOException; import java.nio.file.OpenOption; +import java.nio.file.Path; import java.nio.file.Paths; import java.util.Arrays; import java.util.Collections; @@ -46,22 +47,20 @@ import org.apache.ignite.cluster.ClusterState; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.TestRecordingCommunicationSpi; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; -import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor; import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.lifecycle.SnapshotHandlerType; import org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType; import org.apache.ignite.internal.util.distributed.SingleNodeMessage; import org.apache.ignite.internal.util.typedef.G; -import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.spi.IgniteSpiException; @@ -72,9 +71,11 @@ import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DIR_PREFIX; import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.FILE_SUFFIX; import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX; +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.getPartitionFileName; import static org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreProcess.TMP_CACHE_DIR_PREFIX; import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE; import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_START; +import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause; import static org.apache.ignite.testframework.GridTestUtils.runAsync; /** @@ -114,6 +115,28 @@ public class IgniteClusterSnapshotRestoreSelfTest extends IgniteClusterSnapshotR return valBuilder; } + /** @throws Exception If fails. */ + @Test + public void testRestoreWithMissedPart() throws Exception { + IgniteEx ignite = startGridsWithSnapshot(2, CACHE_KEYS_RANGE); + + Path part0 = U.searchFileRecursively(snp(ignite).snapshotLocalDir(SNAPSHOT_NAME).toPath(), + getPartitionFileName(0)); + + assertNotNull(part0); + assertTrue(part0.toString(), part0.toFile().exists()); + assertTrue(part0.toFile().delete()); + + assertThrowsAnyCause( + log, + () -> ignite.snapshot().restoreSnapshot(SNAPSHOT_NAME, null).get(), + IgniteException.class, + "Snapshot data doesn't contain required cache group partition" + ); + + ensureCacheAbsent(dfltCacheCfg); + } + /** @throws Exception If failed. */ @Test public void testRestoreAllGroups() throws Exception { @@ -712,36 +735,6 @@ private void checkClusterStateChange( assertCacheKeys(ignite.cache(cacheName), CACHE_KEYS_RANGE); } - /** - * @param ccfg Cache configuration. - * @throws IgniteCheckedException if failed. - */ - private void ensureCacheAbsent(CacheConfiguration ccfg) throws IgniteCheckedException { - String cacheName = ccfg.getName(); - - for (Ignite ignite : G.allGrids()) { - GridKernalContext kctx = ((IgniteEx)ignite).context(); - - if (kctx.clientNode()) - continue; - - CacheGroupDescriptor desc = kctx.cache().cacheGroupDescriptors().get(CU.cacheId(cacheName)); - - assertNull("nodeId=" + kctx.localNodeId() + ", cache=" + cacheName, desc); - - GridTestUtils.waitForCondition( - () -> !kctx.cache().context().snapshotMgr().isRestoring(), - TIMEOUT); - - File dir = ((FilePageStoreManager)kctx.cache().context().pageStore()).cacheWorkDir(ccfg); - - String errMsg = String.format("%s, dir=%s, exists=%b, files=%s", - ignite.name(), dir, dir.exists(), Arrays.toString(dir.list())); - - assertTrue(errMsg, !dir.exists() || dir.list().length == 0); - } - } - /** * @param spi Test communication spi. * @param restorePhase The type of distributed process on which communication is blocked.