diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java index eb4e4ba602cd6..a2e5e3d7e9baa 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java @@ -43,7 +43,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; import java.util.function.Function; import java.util.function.UnaryOperator; @@ -91,6 +90,7 @@ import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.db.IgniteCacheGroupsWithRestartsTest; import org.apache.ignite.internal.processors.cache.persistence.diagnostic.pagelocktracker.dumpprocessors.ToFileDumpProcessor; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreStatusDetails; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl; @@ -3214,29 +3214,31 @@ public void testSnapshotRestoreCancelAndStatus() throws Exception { ((SingleNodeMessage)msg).type() == RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE.ordinal()); // Restore single cache group. - assertEquals(EXIT_CODE_OK, execute(h, "--snapshot", "restore", snpName, "--start", DEFAULT_CACHE_NAME)); - assertContains(log, testOut.toString(), - "Snapshot cache group restore operation started [snapshot=" + snpName + ", group(s)=" + DEFAULT_CACHE_NAME + ']'); + ig.snapshot().restoreSnapshot(snpName, Collections.singleton(DEFAULT_CACHE_NAME)); assertEquals(EXIT_CODE_OK, execute(h, "--snapshot", "restore", snpName, "--status")); assertContains(log, testOut.toString(), - "Snapshot cache group restore operation is running [snapshot=" + snpName + ']'); + "Restore operation for snapshot \"" + snpName + "\" is still in progress"); // Check wrong snapshot name. assertEquals(EXIT_CODE_OK, execute(h, "--snapshot", "restore", missingSnpName, "--status")); assertContains(log, testOut.toString(), - "Snapshot cache group restore operation is NOT running [snapshot=" + missingSnpName + ']'); + "No information about restoring snapshot \"" + missingSnpName + "\" is available."); assertEquals(EXIT_CODE_OK, execute(h, "--snapshot", "restore", missingSnpName, "--cancel")); assertContains(log, testOut.toString(), "Snapshot cache group restore operation is not in progress [snapshot=" + missingSnpName + ']'); - GridTestUtils.runAsync(() -> { + IgniteInternalFuture fut = runAsync(() -> { // Wait for the process to be interrupted. - AtomicReference errRef = U.field((Object)U.field((Object)U.field( - grid(0).context().cache().context().snapshotMgr(), "restoreCacheGrpProc"), "opCtx"), "err"); + boolean canceled = waitForCondition(() -> { + SnapshotRestoreStatusDetails status = + grid(0).context().cache().context().snapshotMgr().localRestoreStatus(snpName); + + return status != null && status.errorMessage() != null; + }, getTestTimeout()); - waitForCondition(() -> errRef.get() != null, getTestTimeout()); + assertTrue(canceled); spi.stopBlock(); @@ -3247,9 +3249,21 @@ public void testSnapshotRestoreCancelAndStatus() throws Exception { assertContains(log, testOut.toString(), "Snapshot cache group restore operation canceled [snapshot=" + snpName + ']'); + fut.get(getTestTimeout()); + assertEquals(EXIT_CODE_OK, execute(h, "--snapshot", "restore", snpName, "--status")); assertContains(log, testOut.toString(), - "Snapshot cache group restore operation is NOT running [snapshot=" + snpName + ']'); + "Error: Operation has been canceled by the user."); + + ig.snapshot().restoreSnapshot(snpName, null).get(getTestTimeout()); + + assertEquals(EXIT_CODE_OK, execute(h, "--snapshot", "restore", snpName, "--status")); + + String out = testOut.toString(); + + assertContains(log, out, "Restore operation for snapshot \"" + snpName + "\" completed successfully"); + assertContains(log, out, "Cache groups: " + DEFAULT_CACHE_NAME); + assertContains(log, out, "Progress: 100% completed"); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java index 6c7aa2f7886bb..0f835a5b04a56 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java @@ -406,6 +406,8 @@ public static String partDeltaFileName(int partId) { "The list of names of all snapshots currently saved on the local node with respect to " + "the configured via IgniteConfiguration snapshot working path."); + restoreCacheGrpProc.registerMetrics(); + storeFactory = storeMgr::getPageStoreFactory; cctx.exchange().registerExchangeAwareComponent(this); @@ -884,16 +886,6 @@ public boolean isRestoring() { return restoreCacheGrpProc.restoringSnapshotName() != null; } - /** - * Check if snapshot restore process is currently running. - * - * @param snpName Snapshot name. - * @return {@code True} if the snapshot restore operation from the specified snapshot is in progress locally. - */ - public boolean isRestoring(String snpName) { - return snpName.equals(restoreCacheGrpProc.restoringSnapshotName()); - } - /** * Check if the cache or group with the specified name is currently being restored from the snapshot. * @@ -906,17 +898,27 @@ public boolean isRestoring(String cacheName, @Nullable String grpName) { } /** - * Status of the restore operation cluster-wide. + * Get the status of a cluster-wide restore operation. * * @param snpName Snapshot name. - * @return Future that will be completed when the status of the restore operation is received from all the server - * nodes. The result of this future will be {@code false} if the restore process with the specified snapshot name is - * not running on all nodes. + * @return Future that will be completed when the status of the restore operation is received from all server nodes. + * The result of this future is the node ids mapping with restore operation state. */ - public IgniteFuture restoreStatus(String snpName) { + public IgniteFuture> clusterRestoreStatus(String snpName) { return executeRestoreManagementTask(SnapshotRestoreStatusTask.class, snpName); } + /** + * Get the status of the last local snapshot restore operation. + * + * @param snpName Snapshot name. + * @return Status of the last local snapshot restore operation, {@code null} if the snapshot name of the last + * started operation differs from the specified one. + */ + public @Nullable SnapshotRestoreStatusDetails localRestoreStatus(String snpName) { + return restoreCacheGrpProc.status(snpName); + } + /** * @param restoreId Restore process ID. * @return Server nodes on which a successful start of the cache(s) is required, if any of these nodes fails when @@ -1734,9 +1736,11 @@ static void copy(FileIOFactory factory, File from, File to, long length) { /** * @param taskCls Snapshot restore operation management task class. * @param snpName Snapshot name. + * @param Type of the task result returning from {@link SnapshotRestoreManagementTask#reduce(List)} method. + * @return Task future. */ - private IgniteFuture executeRestoreManagementTask( - Class> taskCls, + private IgniteFuture executeRestoreManagementTask( + Class> taskCls, String snpName ) { cctx.kernalContext().security().authorize(ADMIN_SNAPSHOT); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMXBeanImpl.java index 9dc13618e234d..60abf07606ae6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMXBeanImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMXBeanImpl.java @@ -17,7 +17,11 @@ package org.apache.ignite.internal.processors.cache.persistence.snapshot; +import java.util.Arrays; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.mxbean.SnapshotMXBean; @@ -47,4 +51,20 @@ public SnapshotMXBeanImpl(GridKernalContext ctx) { @Override public void cancelSnapshot(String snpName) { mgr.cancelSnapshot(snpName).get(); } + + /** {@inheritDoc} */ + @Override public void restoreSnapshot(String name, String grpNames) { + Set grpNamesSet = F.isEmpty(grpNames) ? null : + Arrays.stream(grpNames.split(",")).map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toSet()); + + IgniteFuture fut = mgr.restoreSnapshot(name, grpNamesSet); + + if (fut.isDone()) + fut.get(); + } + + /** {@inheritDoc} */ + @Override public void cancelSnapshotRestore(String name) { + mgr.cancelSnapshotRestore(name).get(); + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreCancelTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreCancelTask.java index 10cc8d43b8907..6e2547778ca71 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreCancelTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreCancelTask.java @@ -17,9 +17,11 @@ package org.apache.ignite.internal.processors.cache.persistence.snapshot; +import java.util.List; import org.apache.ignite.IgniteException; import org.apache.ignite.compute.ComputeJob; import org.apache.ignite.compute.ComputeJobAdapter; +import org.apache.ignite.compute.ComputeJobResult; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.resources.IgniteInstanceResource; @@ -28,7 +30,7 @@ * Snapshot restore cancel task. */ @GridInternal -class SnapshotRestoreCancelTask extends SnapshotRestoreManagementTask { +class SnapshotRestoreCancelTask extends SnapshotRestoreManagementTask { /** Serial version uid. */ private static final long serialVersionUID = 0L; @@ -44,4 +46,18 @@ class SnapshotRestoreCancelTask extends SnapshotRestoreManagementTask { } }; } + + /** {@inheritDoc} */ + @Override public Boolean reduce(List results) throws IgniteException { + boolean ret = false; + + for (ComputeJobResult r : results) { + if (r.getException() != null) + throw new IgniteException("Failed to execute job [nodeId=" + r.getNode().id() + ']', r.getException()); + + ret |= Boolean.TRUE.equals(r.getData()); + } + + return ret; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreManagementTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreManagementTask.java index 1cc6956c509c7..6f21cead173d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreManagementTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreManagementTask.java @@ -31,7 +31,7 @@ /** * Snapshot restore management task. */ -abstract class SnapshotRestoreManagementTask extends ComputeTaskAdapter { +abstract class SnapshotRestoreManagementTask extends ComputeTaskAdapter { /** * @param param Compute job argument. * @return Compute job. @@ -51,20 +51,6 @@ abstract class SnapshotRestoreManagementTask extends ComputeTaskAdapter results) throws IgniteException { - boolean ret = false; - - for (ComputeJobResult r : results) { - if (r.getException() != null) - throw new IgniteException("Failed to execute job [nodeId=" + r.getNode().id() + ']', r.getException()); - - ret |= Boolean.TRUE.equals(r.getData()); - } - - return ret; - } - /** {@inheritDoc} */ @Override public ComputeJobResultPolicy result(ComputeJobResult res, List rcvd) throws IgniteException { // Handle all exceptions during the `reduce` operation. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java index 85897480b73f8..d08c9c3d4bd0e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java @@ -32,6 +32,7 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BooleanSupplier; import java.util.function.Consumer; @@ -55,6 +56,7 @@ import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.ClusterSnapshotFuture; import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState; +import org.apache.ignite.internal.processors.metric.MetricRegistry; import org.apache.ignite.internal.util.distributed.DistributedProcess; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -83,6 +85,9 @@ public class SnapshotRestoreProcess { /** Temporary cache directory prefix. */ public static final String TMP_CACHE_DIR_PREFIX = "_tmp_snp_restore_"; + /** Snapshot restore metrics prefix. */ + public static final String SNAPSHOT_RESTORE_METRICS = "snapshot-restore"; + /** Reject operation message. */ private static final String OP_REJECT_MSG = "Cache group restore operation was rejected. "; @@ -107,12 +112,18 @@ public class SnapshotRestoreProcess { /** Logger. */ private final IgniteLogger log; - /** Future to be completed when the cache restore process is complete (this future will be returned to the user). */ - private volatile ClusterSnapshotFuture fut; + /** + * Future to be completed when the cache restore process is complete. By default, this is a stub. + * When the process is started the future is recreated on the initiator node and passed to the user. + */ + private volatile ClusterSnapshotFuture fut = new ClusterSnapshotFuture(); - /** Snapshot restore operation context. */ + /** Current snapshot restore operation context (will be {@code null} when the operation is not running). */ private volatile SnapshotRestoreContext opCtx; + /** Last snapshot restore operation context (saves the metrics of the last operation). */ + private volatile SnapshotRestoreContext lastOpCtx = new SnapshotRestoreContext(); + /** * @param ctx Kernal context. */ @@ -149,6 +160,30 @@ protected void cleanup() throws IgniteCheckedException { } } + /** + * Register local metrics. + */ + protected void registerMetrics() { + MetricRegistry mreg = ctx.metric().registry(SNAPSHOT_RESTORE_METRICS); + + mreg.register("startTime", () -> lastOpCtx.startTime, + "The system time of the start of the cluster snapshot restore operation on this node."); + mreg.register("endTime", () -> lastOpCtx.endTime, + "The system time when the restore operation of a cluster snapshot on this node ended."); + mreg.register("snapshotName", () -> lastOpCtx.snpName, String.class, + "The snapshot name of the last running cluster snapshot restore operation on this node."); + mreg.register("cacheGroupNames", () -> lastOpCtx.cacheGrpNames, String.class, + "The names of the cache groups that are being restored from the snapshot."); + mreg.register("totalPartitions", () -> lastOpCtx.totalParts, + "The total number of partitions to be restored on this node."); + mreg.register("processedPartitions", () -> lastOpCtx.processedParts.get(), + "The number of processed partitions on this node."); + mreg.register("totalPartitionsSize", () -> lastOpCtx.totalBytes, + "The total size of the partitions to be restored on this node."); + mreg.register("processedPartitionsSize", () -> lastOpCtx.processedBytes.get(), + "The total size of processed partitions on this node."); + } + /** * Start cache group restore operation. * @@ -182,9 +217,7 @@ public IgniteFuture start(String snpName, @Nullable Collection cac if (restoringSnapshotName() != null) throw new IgniteException(OP_REJECT_MSG + "The previous snapshot restore operation was not completed."); - fut = new ClusterSnapshotFuture(UUID.randomUUID(), snpName); - - fut0 = fut; + fut0 = fut = new ClusterSnapshotFuture(UUID.randomUUID(), snpName); } } catch (IgniteException e) { @@ -311,7 +344,7 @@ public IgniteFuture start(String snpName, @Nullable Collection cac ClusterSnapshotFuture fut0 = fut; - return fut0 != null ? fut0.name : null; + return fut0.isDone() ? null : fut0.name; } /** @@ -353,6 +386,49 @@ else if (CU.cacheId(locGrpName) == cacheId) return false; } + /** + * Get the status of the last local snapshot restore operation. + * + * @param snpName Snapshot name. + * @return Status details. + */ + public @Nullable SnapshotRestoreStatusDetails status(String snpName) { + SnapshotRestoreContext opCtx = lastOpCtx; + ClusterSnapshotFuture fut0 = fut; + + // Future is created only on node initiator, context is created on all nodes, but later. + boolean futValid = snpName.equals(fut0.name); + boolean ctxValid = snpName.equals(opCtx.snpName); + + if (!ctxValid && !futValid) + return null; + + if (ctxValid && futValid && !fut0.rqId.equals(opCtx.reqId)) { + // If the request ID does not match, we must read the metrics of the later operation. + if (fut0.startTime <= opCtx.startTime) + futValid = false; + else + ctxValid = false; + } + + long startTime = futValid ? fut0.startTime : opCtx.startTime; + long endTime = futValid ? fut0.endTime : opCtx.endTime; + UUID reqId = futValid ? fut0.rqId : opCtx.reqId; + Throwable err = ctxValid ? opCtx.err.get() : fut0.interruptEx; + + return new SnapshotRestoreStatusDetails( + reqId, + startTime, + endTime, + err == null ? null : err.getMessage(), + ctxValid ? opCtx.processedParts.get() : 0, + ctxValid ? opCtx.processedBytes.get() : 0, + ctxValid ? opCtx.cacheGrpNames : null, + ctxValid ? opCtx.totalParts : 0, + ctxValid ? opCtx.totalBytes : 0 + ); + } + /** * @param reqId Request ID. * @return Server nodes on which a successful start of the cache(s) is required, if any of these nodes fails when @@ -367,15 +443,6 @@ public Set cacheStartRequiredAliveNodes(IgniteUuid reqId) { return Collections.unmodifiableSet(opCtx0.nodes); } - /** - * Finish local cache group restore process. - * - * @param reqId Request ID. - */ - private void finishProcess(UUID reqId) { - finishProcess(reqId, null); - } - /** * Finish local cache group restore process. * @@ -390,16 +457,21 @@ else if (log.isInfoEnabled()) SnapshotRestoreContext opCtx0 = opCtx; - if (opCtx0 != null && reqId.equals(opCtx0.reqId)) + if (opCtx0 != null && reqId.equals(opCtx0.reqId)) { opCtx = null; + opCtx0.endTime = U.currentTimeMillis(); + } + synchronized (this) { ClusterSnapshotFuture fut0 = fut; - if (fut0 != null && reqId.equals(fut0.rqId)) { - fut = null; + if (!fut0.isDone() && reqId.equals(fut0.rqId)) { + ctx.pools().getSystemExecutorService().submit(() -> { + fut0.endTime = U.currentTimeMillis(); - ctx.pools().getSystemExecutorService().submit(() -> fut0.onDone(null, err)); + fut0.onDone(null, err); + }); } } } @@ -433,7 +505,7 @@ public IgniteFuture cancel(IgniteCheckedException reason, String snpNam synchronized (this) { opCtx0 = opCtx; - if (fut != null && fut.name.equals(snpName)) { + if (!fut.isDone() && fut.name.equals(snpName)) { fut0 = fut; fut0.interruptEx = reason; @@ -523,11 +595,11 @@ private IgniteInternalFuture> prepare(SnapshotOperati SnapshotRestoreContext opCtx0 = prepareContext(req); synchronized (this) { - opCtx = opCtx0; + lastOpCtx = opCtx = opCtx0; ClusterSnapshotFuture fut0 = fut; - if (fut0 != null && fut0.interruptEx != null) + if (!fut0.isDone() && fut0.interruptEx != null) opCtx0.err.compareAndSet(null, fut0.interruptEx); } @@ -558,7 +630,7 @@ private IgniteInternalFuture> prepare(SnapshotOperati opCtx0.stopFut = new IgniteFutureImpl<>(retFut.chain(f -> null)); - restoreAsync(opCtx0.snpName, opCtx0.dirs, ctx.localNodeId().equals(req.operationalNodeId()), stopChecker, errHnd) + restoreAsync(opCtx0, ctx.localNodeId().equals(req.operationalNodeId()), stopChecker, errHnd) .thenAccept(res -> { try { Throwable err = opCtx.err.get(); @@ -602,16 +674,14 @@ private File formatTmpDirName(File cacheDir) { /** * Copy partition files and update binary metadata. * - * @param snpName Snapshot name. - * @param dirs Cache directories to restore from the snapshot. + * @param opCtx Snapshot restore operation context. * @param updateMeta Update binary metadata flag. * @param stopChecker Process interrupt checker. * @param errHnd Error handler. * @throws IgniteCheckedException If failed. */ private CompletableFuture restoreAsync( - String snpName, - Collection dirs, + SnapshotRestoreContext opCtx, boolean updateMeta, BooleanSupplier stopChecker, Consumer errHnd @@ -622,7 +692,7 @@ private CompletableFuture restoreAsync( List> futs = new ArrayList<>(); if (updateMeta) { - File binDir = binaryWorkDir(snapshotMgr.snapshotLocalDir(snpName).getAbsolutePath(), pdsFolderName); + File binDir = binaryWorkDir(snapshotMgr.snapshotLocalDir(opCtx.snpName).getAbsolutePath(), pdsFolderName); futs.add(CompletableFuture.runAsync(() -> { try { @@ -634,14 +704,22 @@ private CompletableFuture restoreAsync( }, snapshotMgr.snapshotExecutorService())); } - for (File cacheDir : dirs) { + long totalParts = 0; + long totalBytes = 0; + + for (File cacheDir : opCtx.dirs) { File tmpCacheDir = formatTmpDirName(cacheDir); - File snpCacheDir = new File(ctx.cache().context().snapshotMgr().snapshotLocalDir(snpName), + File snpCacheDir = new File(ctx.cache().context().snapshotMgr().snapshotLocalDir(opCtx.snpName), Paths.get(databaseRelativePath(pdsFolderName), cacheDir.getName()).toString()); assert snpCacheDir.exists() : "node=" + ctx.localNodeId() + ", dir=" + snpCacheDir; for (File snpFile : snpCacheDir.listFiles()) { + if (snpFile.getName().endsWith(FilePageStoreManager.FILE_SUFFIX)) { + totalBytes += snpFile.length(); + totalParts += 1; + } + futs.add(CompletableFuture.runAsync(() -> { if (stopChecker.getAsBoolean()) return; @@ -654,18 +732,26 @@ private CompletableFuture restoreAsync( if (log.isDebugEnabled()) { log.debug("Copying file from the snapshot " + - "[snapshot=" + snpName + + "[snapshot=" + opCtx.snpName + ", src=" + snpFile + ", target=" + target + "]"); } IgniteSnapshotManager.copy(snapshotMgr.ioFactory(), snpFile, target, snpFile.length()); + + if (snpFile.getName().endsWith(FilePageStoreManager.FILE_SUFFIX)) { + opCtx.processedParts.incrementAndGet(); + opCtx.processedBytes.addAndGet(snpFile.length()); + } } catch (Throwable t) { errHnd.accept(t); } }, ctx.cache().context().snapshotMgr().snapshotExecutorService())); } + + opCtx.totalParts = totalParts; + opCtx.totalBytes = totalBytes; } int futsSize = futs.size(); @@ -851,7 +937,7 @@ private void finishCacheStart(UUID reqId, Map res, Map err = new AtomicReference<>(); + /** Operation start time. */ + private final long startTime; + + /** Names of the restored cache groups. */ + private final String cacheGrpNames; + + /** Number of processed (copied) partitions. */ + private final AtomicLong processedParts = new AtomicLong(); + + /** Size of processed (copied) partitions in bytes. */ + private final AtomicLong processedBytes = new AtomicLong(); + + /** Total number of partitions to be restored. */ + private volatile long totalParts; + + /** Total size of the partitions to be restored in bytes. */ + private volatile long totalBytes; + /** Cache ID to configuration mapping. */ private volatile Map cfgs; /** Graceful shutdown future. */ private volatile IgniteFuture stopFut; + /** Operation end time. */ + private volatile long endTime; + /** * @param req Request to prepare cache group restore from the snapshot. - * @param dirs List of cache group names to restore from the snapshot. + * @param dirs List of restored cache group directories. * @param cfgs Cache ID to configuration mapping. */ protected SnapshotRestoreContext(SnapshotOperationRequest req, Collection dirs, Map cfgs) { + this.dirs = dirs; + this.cfgs = cfgs; + reqId = req.requestId(); snpName = req.snapshotName(); nodes = new HashSet<>(req.nodes()); + startTime = U.currentTimeMillis(); + cacheGrpNames = F.concat(F.viewReadOnly(dirs, FilePageStoreManager::cacheGroupName), ","); + } - this.dirs = dirs; - this.cfgs = cfgs; + /** + * Default constructor. + */ + protected SnapshotRestoreContext() { + reqId = null; + dirs = null; + nodes = null; + snpName = cacheGrpNames = ""; + startTime = 0; } } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreStatusDetails.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreStatusDetails.java new file mode 100644 index 0000000000000..95d455f77259b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreStatusDetails.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.snapshot; + +import java.io.Serializable; +import java.util.UUID; + +/** + * Snapshot restore operation details. + */ +public class SnapshotRestoreStatusDetails implements Serializable { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** Message of the exception that led to an interruption of the process. */ + private String errMsg; + + /** Request ID. */ + private UUID reqId; + + /** Operation start time. */ + private long startTime; + + /** Operation end time. */ + private long endTime; + + /** Names of the restored cache groups. */ + private String cacheGrpNames; + + /** Number of processed (copied) partitions. */ + private long processedParts; + + /** Total number of partitions to be restored. */ + private long totalParts; + + /** Size of processed (copied) partitions in bytes. */ + private long processedBytes; + + /** Total size of the partitions to be restored in bytes. */ + private long totalBytes; + + /** Default constructor. */ + public SnapshotRestoreStatusDetails() { + // No-op. + } + + /** + * @param reqId Request ID. + * @param startTime Operation start time. + * @param endTime Operation end time. + * @param errMsg Message of the exception that led to an interruption of the process. + * @param processedParts Number of processed (copied) partitions. + * @param processedBytes Size of processed (copied) partitions in bytes. + * @param cacheGrpNames Names of the restored cache groups. + * @param totalParts Total number of partitions to be restored. + * @param totalBytes Total size of the partitions to be restored in bytes. + */ + public SnapshotRestoreStatusDetails(UUID reqId, long startTime, long endTime, String errMsg, + long processedParts, long processedBytes, String cacheGrpNames, long totalParts, long totalBytes) { + this.reqId = reqId; + this.errMsg = errMsg; + this.startTime = startTime; + this.cacheGrpNames = cacheGrpNames; + this.processedParts = processedParts; + this.processedBytes = processedBytes; + this.totalParts = totalParts; + this.totalBytes = totalBytes; + this.endTime = endTime; + } + + /** @return Message of the exception that led to an interruption of the process. */ + public String errorMessage() { + return errMsg; + } + + /** @return Request ID. */ + public UUID requestId() { + return reqId; + } + + /** @return Operation start time. */ + public long startTime() { + return startTime; + } + + /** @return Operation end time. */ + public long endTime() { + return endTime; + } + + /** @return Names of the restored cache groups. */ + public String cacheGroupNames() { + return cacheGrpNames; + } + + /** @return Number of processed (copied) partitions. */ + public long processedParts() { + return processedParts; + } + + /** @return Size of processed (copied) partitions in bytes. */ + public long processedBytes() { + return processedBytes; + } + + /** @return Total number of partitions to be restored. */ + public long totalParts() { + return totalParts; + } + + /** @return Total size of the partitions to be restored in bytes. */ + public long totalBytes() { + return totalBytes; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreStatusTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreStatusTask.java index e988042b965d5..e617485cc90c9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreStatusTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreStatusTask.java @@ -17,31 +17,59 @@ package org.apache.ignite.internal.processors.cache.persistence.snapshot; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; import org.apache.ignite.IgniteException; import org.apache.ignite.compute.ComputeJob; import org.apache.ignite.compute.ComputeJobAdapter; +import org.apache.ignite.compute.ComputeJobResult; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.processors.task.GridInternal; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.resources.IgniteInstanceResource; /** * Snapshot restore status task. */ @GridInternal -class SnapshotRestoreStatusTask extends SnapshotRestoreManagementTask { +class SnapshotRestoreStatusTask extends SnapshotRestoreManagementTask> { /** Serial version uid. */ private static final long serialVersionUID = 0L; /** {@inheritDoc} */ @Override protected ComputeJob makeJob(String snpName) { return new ComputeJobAdapter() { - /** Auto-injected grid instance. */ @IgniteInstanceResource private transient IgniteEx ignite; - @Override public Boolean execute() throws IgniteException { - return ignite.context().cache().context().snapshotMgr().isRestoring(snpName); + @Override public SnapshotRestoreStatusDetails execute() throws IgniteException { + return ignite.context().cache().context().snapshotMgr().localRestoreStatus(snpName); } }; } + + /** {@inheritDoc} */ + @Override public Map reduce(List results) throws IgniteException { + Map> reqMap = new HashMap<>(); + T2 oldestUUID = new T2<>(0L, null); + + for (ComputeJobResult r : results) { + if (r.getException() != null) + throw new IgniteException("Failed to execute job [nodeId=" + r.getNode().id() + ']', r.getException()); + + SnapshotRestoreStatusDetails details = r.getData(); + + if (details == null) + continue; + + if (oldestUUID.get1() < details.startTime()) + oldestUUID.set(details.startTime(), details.requestId()); + + reqMap.computeIfAbsent(details.requestId(), v -> new HashMap<>()).put(r.getNode().id(), details); + } + + return reqMap.isEmpty() ? null : reqMap.get(oldestUUID.get2()); + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/snapshot/VisorSnapshotRestoreTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/snapshot/VisorSnapshotRestoreTask.java index 8f4ab226e189b..b9181cb007715 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/snapshot/VisorSnapshotRestoreTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/snapshot/VisorSnapshotRestoreTask.java @@ -17,9 +17,16 @@ package org.apache.ignite.internal.visor.snapshot; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Locale; +import java.util.Map; +import java.util.UUID; import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreStatusDetails; import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.visor.VisorJob; import org.apache.ignite.internal.visor.VisorOneNodeTask; import org.apache.ignite.lang.IgniteFuture; @@ -112,10 +119,99 @@ protected VisorSnapshotRestoreStatusJob(VisorSnapshotRestoreTaskArg arg, boolean /** {@inheritDoc} */ @Override protected String run(VisorSnapshotRestoreTaskArg arg) throws IgniteException { - boolean state = ignite.context().cache().context().snapshotMgr().restoreStatus(arg.snapshotName()).get(); + Map nodesStatus = + ignite.context().cache().context().snapshotMgr().clusterRestoreStatus(arg.snapshotName()).get(); - return "Snapshot cache group restore operation is " + (state ? "" : "NOT ") + - "running [snapshot=" + arg.snapshotName() + ']'; + long clusterProcParts = 0; + long clusterProcBytes = 0; + long globalStartTime = 0; + long globalEndTime = 0; + long clusterParts = 0; + long clusterBytes = 0; + String errMsg = null; + String grps = null; + UUID reqId = null; + + if (nodesStatus == null) + return "No information about restoring snapshot \"" + arg.snapshotName() + "\" is available."; + + SB buf = new SB(); + + for (Map.Entry e : nodesStatus.entrySet()) { + SnapshotRestoreStatusDetails details = e.getValue(); + + if (globalStartTime == 0 || globalStartTime > details.startTime()) { + globalStartTime = details.startTime(); + errMsg = details.errorMessage(); + globalEndTime = details.endTime(); + reqId = details.requestId(); + } + + if (grps == null && !F.isEmpty(details.cacheGroupNames())) + grps = details.cacheGroupNames(); + + long procParts = details.processedParts(); + long totalParts = details.totalParts(); + long procBytes = details.processedBytes(); + long totalBytes = details.totalBytes(); + + clusterParts += totalParts; + clusterBytes += totalBytes; + clusterProcParts += procParts; + clusterProcBytes += procBytes; + + if (totalBytes == 0) + continue; + + buf.a(" Node ").a(e.getKey()).a(": ").a(fprmatProgress(procParts, totalParts, procBytes, totalBytes)); + } + + boolean err = errMsg != null; + String nodesInfo = buf.toString(); + + buf.setLength(0); + + buf.a("Restore operation for snapshot \"").a(arg.snapshotName()).a("\" ") + .a(err ? "failed" : (globalEndTime == 0 ? " is still in progress" : "completed successfully")) + .a(" (requestId=").a(reqId).a(").").a(System.lineSeparator()).a(System.lineSeparator()); + + if (err) + buf.a(" Error: ").a(errMsg).a(System.lineSeparator()); + else if (clusterBytes != 0) + buf.a(" Progress: ").a(fprmatProgress(clusterProcParts, clusterParts, clusterProcBytes, clusterBytes)); + + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + + buf.a(" Started: ").a(dateFormat.format(new Date(globalStartTime))).a(System.lineSeparator()); + + if (globalEndTime != 0) + buf.a(" Finished: ").a(dateFormat.format(new Date(globalEndTime))).a(System.lineSeparator()); + + if (grps != null) + buf.a(" Cache groups: ").a(grps).a(System.lineSeparator()); + + buf.a(System.lineSeparator()).a(nodesInfo); + + return buf.toString(); + } + + /** + * @param procParts Processed partitions. + * @param totalParts Total partitions. + * @param procBytes Size in bytes of processed partitions. + * @param totalBytes Total partitions size. + */ + private String fprmatProgress(long procParts, long totalParts, long procBytes, long totalBytes) { + long base = 1024L; + + int exponent = Math.max((int)(Math.log(totalBytes) / Math.log(base)), 0); + String unit = String.valueOf(" KMGTPE".charAt(exponent)).trim(); + double baseBound = Math.pow(base, exponent); + + return new SB().a(procBytes * 100 / totalBytes).a("% completed (") + .a(procParts).a('/').a(totalParts).a(" partitions, ") + .a(String.format((Locale)null, "%.1f/%.1f %sB)", procBytes / baseBound, totalBytes / baseBound, unit)) + .a(System.lineSeparator()).toString(); } } } diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/SnapshotMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/SnapshotMXBean.java index e6b425835503d..fcb7a399c9878 100644 --- a/modules/core/src/main/java/org/apache/ignite/mxbean/SnapshotMXBean.java +++ b/modules/core/src/main/java/org/apache/ignite/mxbean/SnapshotMXBean.java @@ -17,6 +17,7 @@ package org.apache.ignite.mxbean; +import java.util.Collection; import org.apache.ignite.IgniteSnapshot; /** @@ -40,4 +41,28 @@ public interface SnapshotMXBean { */ @MXBeanDescription("Cancel started cluster-wide snapshot on the node initiator.") public void cancelSnapshot(@MXBeanParameter(name = "snpName", description = "Snapshot name.") String snpName); + + /** + * Restore cluster-wide snapshot. + * + * @param name Snapshot name. + * @param cacheGroupNames Optional comma-separated list of cache group names. + * @see IgniteSnapshot#restoreSnapshot(String, Collection) + */ + @MXBeanDescription("Restore cluster-wide snapshot.") + public void restoreSnapshot( + @MXBeanParameter(name = "snpName", description = "Snapshot name.") + String name, + @MXBeanParameter(name = "cacheGroupNames", description = "Optional comma-separated list of cache group names.") + String cacheGroupNames + ); + + /** + * Cancel previously started snapshot restore operation. + * + * @param name Snapshot name. + * @see IgniteSnapshot#cancelSnapshotRestore(String) + */ + @MXBeanDescription("Cancel previously started snapshot restore operation.") + public void cancelSnapshotRestore(@MXBeanParameter(name = "snpName", description = "Snapshot name.") String name); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotMXBeanTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotMXBeanTest.java index c662a5a9ed7d5..b44076ad4d1c6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotMXBeanTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotMXBeanTest.java @@ -17,19 +17,38 @@ package org.apache.ignite.internal.processors.cache.persistence.snapshot; +import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import javax.management.AttributeNotFoundException; import javax.management.DynamicMBean; import javax.management.MBeanException; import javax.management.ReflectionException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.util.distributed.SingleNodeMessage; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.mxbean.SnapshotMXBean; import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi; import org.apache.ignite.testframework.GridTestUtils; import org.junit.Test; import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNAPSHOT_METRICS; +import static org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreProcess.SNAPSHOT_RESTORE_METRICS; +import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE; +import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; /** * Tests {@link SnapshotMXBean}. @@ -56,7 +75,7 @@ public void testCreateSnapshot() throws Exception { mxBean.createSnapshot(SNAPSHOT_NAME); assertTrue("Waiting for snapshot operation failed.", - GridTestUtils.waitForCondition(() -> getLastSnapshotEndTime(snpMBean) > 0, 10_000)); + GridTestUtils.waitForCondition(() -> getLastSnapshotEndTime(snpMBean) > 0, TIMEOUT)); stopAllGrids(); @@ -81,8 +100,121 @@ public void testCancelSnapshot() throws Exception { mxBean::cancelSnapshot); } - /** + /** @throws Exception If fails. */ + @Test + public void testRestoreSnapshot() throws Exception { + CacheConfiguration ccfg1 = new CacheConfiguration<>(dfltCacheCfg).setBackups(1).setName("cache1"); + CacheConfiguration ccfg2 = new CacheConfiguration<>(ccfg1).setName("cache2"); + + IgniteEx ignite = startGridsWithCache(2, CACHE_KEYS_RANGE, Integer::new, ccfg1, ccfg2); + + DynamicMBean snpMBean = metricRegistry(ignite.name(), null, SNAPSHOT_METRICS); + + assertEquals("Snapshot end time must be undefined on first snapshot operation starts.", + 0, getLastSnapshotEndTime(snpMBean)); + + SnapshotMXBean mxBean = getMxBean(ignite.name(), "Snapshot", SnapshotMXBeanImpl.class, SnapshotMXBean.class); + + mxBean.createSnapshot(SNAPSHOT_NAME); + + assertTrue(GridTestUtils.waitForCondition(() -> getLastSnapshotEndTime(snpMBean) > 0, TIMEOUT)); + + ignite.destroyCaches(Arrays.asList(ccfg1.getName(), ccfg2.getName())); + + awaitPartitionMapExchange(); + + DynamicMBean restoreMBean = metricRegistry(ignite.name(), null, SNAPSHOT_RESTORE_METRICS); + + assertEquals(0, getLongMetric("endTime", restoreMBean)); + assertEquals(0, getLongMetric("totalPartitions", restoreMBean)); + assertEquals(0, getLongMetric("processedPartitions", restoreMBean)); + assertEquals(0, getLongMetric("totalPartitionsSize", restoreMBean)); + assertEquals(0, getLongMetric("processedPartitionsSize", restoreMBean)); + assertTrue(String.valueOf(restoreMBean.getAttribute("snapshotName")).isEmpty()); + assertTrue(String.valueOf(restoreMBean.getAttribute("cacheGroupNames")).isEmpty()); + + Set grpNames = new HashSet<>(F.asList(ccfg1.getName(), ccfg2.getName())); + + mxBean.restoreSnapshot(SNAPSHOT_NAME, F.concat(grpNames, " ,")); + + assertTrue(GridTestUtils.waitForCondition(() -> getLongMetric("endTime", restoreMBean) > 0, TIMEOUT)); + + int expPartCnt = grid(0).cachex(ccfg1.getName()).context().affinity().partitions() + + grid(0).cachex(ccfg2.getName()).context().affinity().partitions(); + + for (Ignite grid : G.allGrids()) { + DynamicMBean mReg = metricRegistry(grid.name(), null, SNAPSHOT_RESTORE_METRICS); + + assertEquals(expPartCnt, getLongMetric("totalPartitions", mReg)); + assertEquals(expPartCnt, getLongMetric("processedPartitions", mReg)); + + long totalPartsSize = getLongMetric("totalPartitionsSize", mReg); + + assertTrue(totalPartsSize > 0); + assertEquals(totalPartsSize, getLongMetric("processedPartitionsSize", mReg)); + + assertEquals(SNAPSHOT_NAME, restoreMBean.getAttribute("snapshotName")); + + String cacheGrpNames = (String)restoreMBean.getAttribute("cacheGroupNames"); + + assertNotNull(cacheGrpNames); + + assertEquals(grpNames, Arrays.stream(cacheGrpNames.split(",")).collect(Collectors.toSet())); + } + + assertSnapshotCacheKeys(ignite.cache(ccfg1.getName())); + assertSnapshotCacheKeys(ignite.cache(ccfg2.getName())); + } + + /** @throws Exception If fails. */ + @Test + public void testCancelRestoreSnapshot() throws Exception { + IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE); + + DynamicMBean snpMBean = metricRegistry(ignite.name(), null, SNAPSHOT_METRICS); + + assertEquals("Snapshot end time must be undefined on first snapshot operation starts.", + 0, getLastSnapshotEndTime(snpMBean)); + + SnapshotMXBean mxBean = getMxBean(ignite.name(), "Snapshot", SnapshotMXBeanImpl.class, SnapshotMXBean.class); + + mxBean.createSnapshot(SNAPSHOT_NAME); + + assertTrue(GridTestUtils.waitForCondition(() -> getLastSnapshotEndTime(snpMBean) > 0, TIMEOUT)); + + ignite.destroyCache(dfltCacheCfg.getName()); + + awaitPartitionMapExchange(); + + TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(grid(1)); + + spi.blockMessages((node, msg) -> msg instanceof SingleNodeMessage && + ((SingleNodeMessage)msg).type() == RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE.ordinal()); + + IgniteFuture fut = ignite.snapshot().restoreSnapshot(SNAPSHOT_NAME, null); + + spi.waitForBlocked(); + + IgniteInternalFuture interruptFut = GridTestUtils.runAsync(() -> { + // Wait for the process to be interrupted. + AtomicReference errRef = U.field((Object)U.field((Object)U.field( + grid(0).context().cache().context().snapshotMgr(), "restoreCacheGrpProc"), "opCtx"), "err"); + boolean interrupted = waitForCondition(() -> errRef.get() != null, getTestTimeout()); + + spi.stopBlock(); + + return interrupted; + }); + + mxBean.cancelSnapshotRestore(SNAPSHOT_NAME); + + assertTrue(interruptFut.get()); + + assertThrowsAnyCause(log, fut::get, IgniteCheckedException.class, "Operation has been canceled by the user"); + } + + /** * @param mBean Ignite snapshot MBean. * @return Value of snapshot end time. */ @@ -94,4 +226,19 @@ private static long getLastSnapshotEndTime(DynamicMBean mBean) { throw new RuntimeException(e); } } + + /** + * @param mBean Ignite snapshot restore MBean. + * @param name Metric name. + * @return Metric value. + */ + private static long getLongMetric(String name, DynamicMBean mBean) { + try { + return (long)mBean.getAttribute(name); + } + catch (MBeanException | ReflectionException | AttributeNotFoundException e) { + throw new RuntimeException(e); + } + } + }