Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.cache.persistence.snapshot;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeJobResultPolicy;
import org.apache.ignite.compute.ComputeTask;
import org.apache.ignite.compute.ComputeTaskAdapter;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.jetbrains.annotations.NotNull;

/**
* The task for checking the consistency of snapshots in the cluster.
*
* @param <R> Type of the task result returning from {@link ComputeTask#reduce(List)} method.
*/
public abstract class AbstractSnapshotVerificationTask extends
ComputeTaskAdapter<SnapshotPartitionsVerifyTaskArg, SnapshotPartitionsVerifyTaskResult> {
/** Serial version uid. */
private static final long serialVersionUID = 0L;

/** Task argument. */
protected final Map<ClusterNode, List<SnapshotMetadata>> metas = new HashMap<>();

/** Ignite instance. */
@IgniteInstanceResource
protected IgniteEx ignite;

/** Cache groups to be restored from the snapshot. May be empty if all cache groups are being restored. */
protected Collection<String> grps;

/** {@inheritDoc} */
@Override public @NotNull Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
SnapshotPartitionsVerifyTaskArg arg) throws IgniteException {
grps = arg.cacheGroupNames();

Map<ClusterNode, List<SnapshotMetadata>> clusterMetas = arg.clusterMetadata();

if (!subgrid.containsAll(clusterMetas.keySet())) {
throw new IgniteSnapshotVerifyException(F.asMap(ignite.localNode(),
new IgniteException("Some of Ignite nodes left the cluster during the snapshot verification " +
"[curr=" + F.viewReadOnly(subgrid, F.node2id()) +
", init=" + F.viewReadOnly(clusterMetas.keySet(), F.node2id()) + ']')));
}

Map<ComputeJob, ClusterNode> jobs = new HashMap<>();
Set<SnapshotMetadata> allMetas = new HashSet<>();
clusterMetas.values().forEach(allMetas::addAll);

Set<String> missed = null;

for (SnapshotMetadata meta : allMetas) {
if (missed == null)
missed = new HashSet<>(meta.baselineNodes());

missed.remove(meta.consistentId());

if (missed.isEmpty())
break;
}

if (!missed.isEmpty()) {
throw new IgniteSnapshotVerifyException(F.asMap(ignite.localNode(),
new IgniteException("Some metadata is missing from the snapshot: " + missed)));
}

metas.putAll(clusterMetas);

while (!allMetas.isEmpty()) {
for (Map.Entry<ClusterNode, List<SnapshotMetadata>> e : clusterMetas.entrySet()) {
SnapshotMetadata meta = F.find(e.getValue(), null, allMetas::remove);

if (meta == null)
continue;

jobs.put(makeJob(meta.snapshotName(), meta.consistentId(), arg.cacheGroupNames()),
e.getKey());

if (allMetas.isEmpty())
break;
}
}

return jobs;
}

/** {@inheritDoc} */
@Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteException {
// Handle all exceptions during the `reduce` operation.
return ComputeJobResultPolicy.WAIT;
}

/**
* @param name Snapshot name.
* @param constId Snapshot metadata file name.
* @param groups Cache groups to be restored from the snapshot. May be empty if all cache groups are being restored.
* @return Compute job.
*/
protected abstract ComputeJob makeJob(String name, String constId, Collection<String> groups);
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
Expand Down Expand Up @@ -100,6 +101,11 @@
import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage;
import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.lifecycle.SnapshotHandler;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.lifecycle.SnapshotHandlerContext;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.lifecycle.SnapshotHandlerResult;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.lifecycle.SnapshotHandlerType;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.lifecycle.SnapshotRestoreHandleTask;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPagePayload;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
Expand Down Expand Up @@ -268,6 +274,9 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
/** Distributed process to restore cache group from the snapshot. */
private final SnapshotRestoreProcess restoreCacheGrpProc;

/** Snapshot operation handlers. */
private final Map<SnapshotHandlerType, List<SnapshotHandler<?>>> handlers = new EnumMap<>(SnapshotHandlerType.class);

/** Resolved persistent data storage settings. */
private volatile PdsFolderSettings pdsSettings;

Expand Down Expand Up @@ -310,6 +319,8 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
/** Last seen cluster snapshot operation. */
private volatile ClusterSnapshotFuture lastSeenSnpFut = new ClusterSnapshotFuture();

// private SnapshotConsistencyValidator validator;

/**
* @param ctx Kernal context.
*/
Expand Down Expand Up @@ -381,6 +392,24 @@ public static String partDeltaFileName(int partId) {
U.ensureDirectory(locSnpDir, "snapshot work directory", log);
U.ensureDirectory(tmpWorkDir, "temp directory for snapshot creation", log);

SnapshotPartitionsVerifyRestoreHandler integrityCheck = new SnapshotPartitionsVerifyRestoreHandler(ctx);

handlers.put(integrityCheck.type(), new ArrayList<>(Collections.singleton(integrityCheck)));

SnapshotHandler<?>[] handlers0 = ctx.plugins().extensions(SnapshotHandler.class);

if (handlers0 != null) {
for (SnapshotHandler<?> hnd : handlers0)
handlers.computeIfAbsent(hnd.type(), v -> new ArrayList<>()).add(hnd);
}

for (SnapshotHandlerType type : SnapshotHandlerType.values()) {
List<SnapshotHandler<?>> hndList = handlers.putIfAbsent(type, Collections.emptyList());

if (hndList != null)
handlers.put(type, Collections.unmodifiableList(hndList));
}

MetricRegistry mreg = cctx.kernalContext().metric().registry(SNAPSHOT_METRICS);

mreg.register("LastSnapshotStartTime", () -> lastSeenSnpFut.startTime,
Expand Down Expand Up @@ -538,6 +567,10 @@ public void deleteSnapshot(File snpDir, String folderName) {
}
}

public Collection<SnapshotHandler<?>> handlers(SnapshotHandlerType type) {
return handlers.get(type);
}

/**
* @param snpName Snapshot name.
* @return Local snapshot directory for snapshot with given name.
Expand Down Expand Up @@ -654,22 +687,37 @@ private IgniteInternalFuture<SnapshotOperationResponse> initLocalSnapshotStartSt

smf.getParentFile().mkdirs();

SnapshotMetadata meta = new SnapshotMetadata(req.requestId(),
req.snapshotName(),
cctx.localNode().consistentId().toString(),
pdsSettings.folderName(),
cctx.gridConfig().getDataStorageConfiguration().getPageSize(),
grpIds,
blts,
fut.result());

try (OutputStream out = new BufferedOutputStream(new FileOutputStream(smf))) {
U.marshal(marsh,
new SnapshotMetadata(req.requestId(),
req.snapshotName(),
cctx.localNode().consistentId().toString(),
pdsSettings.folderName(),
cctx.gridConfig().getDataStorageConfiguration().getPageSize(),
grpIds,
blts,
fut.result()),
out);
U.marshal(marsh, meta, out);

log.info("Snapshot metafile has been created: " + smf.getAbsolutePath());
}

return new SnapshotOperationResponse();
Map<String, SnapshotHandlerResult<Object>> results = new HashMap<>();
SnapshotHandlerContext ctx = new SnapshotHandlerContext(meta, null);

for (SnapshotHandler<?> hnd : handlers(SnapshotHandlerType.CREATE)) {
SnapshotHandlerResult<Object> res;

try {
res = new SnapshotHandlerResult<>(hnd.handle(ctx), null, cctx.localNode());
} catch (Exception e) {
res = new SnapshotHandlerResult<>(null, e, cctx.localNode());
}

results.put(hnd.getClass().getName(), res);
}

return new SnapshotOperationResponse(results);
}
catch (IOException | IgniteCheckedException e) {
throw F.wrap(e);
Expand Down Expand Up @@ -722,11 +770,46 @@ else if (!F.isEmpty(err) || !missed.isEmpty()) {
"due to some of nodes left the cluster. Uncompleted snapshot will be deleted " +
"[err=" + err + ", missed=" + missed + ']'));
}
else
snpReq.error(handleClusterSnapshotCreation(snpReq.snapshotName(), res.values()));

endSnpProc.start(UUID.randomUUID(), snpReq);
}
}

/**
* @param snpName Snapshot name.
* @param responses Snapshot operation responses.
* @throws IgniteCheckedException If any of the handlers were unable to process the local results of the nodes.
* @return Exception If any of the handlers were unable to process the local results of the nodes.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
private @Nullable Exception handleClusterSnapshotCreation(String snpName, Collection<SnapshotOperationResponse> responses) {
Map<String, List<SnapshotHandlerResult>> hndResults = new HashMap<>();

for (SnapshotOperationResponse response : responses) {
if (response.operationHandlerResults() == null)
continue;

for (Map.Entry<String, SnapshotHandlerResult<Object>> entry : response.operationHandlerResults().entrySet())
hndResults.computeIfAbsent(entry.getKey(), v -> new ArrayList<>()).add(entry.getValue());
}

if (hndResults.isEmpty())
return null;

for (SnapshotHandler hnd : handlers(SnapshotHandlerType.CREATE)) {
try {
hnd.reduce(snpName, hndResults.get(hnd.getClass().getName()));
}
catch (Exception err) {
return err;
}
}

return null;
}

/**
* @param req Request on snapshot creation.
* @return Future which will be completed when the snapshot will be finalized.
Expand Down Expand Up @@ -950,7 +1033,7 @@ public IgniteInternalFuture<IdleVerifyResultV2> checkSnapshot(String name) {

cctx.kernalContext().security().authorize(ADMIN_SNAPSHOT);

return checkSnapshot(name, null).chain(f -> {
return checkSnapshot(name, null, false).chain(f -> {
try {
return f.get().idleVerifyResult();
}
Expand All @@ -967,9 +1050,15 @@ public IgniteInternalFuture<IdleVerifyResultV2> checkSnapshot(String name) {
*
* @param name Snapshot name.
* @param grps Collection of cache group names to check.
* @param includeCustomHandlers {@code True} to invoke all user-defined {@link SnapshotHandlerType#RESTORE}
* handlers, otherwise only system consistency check will be performed.
* @return {@code true} if snapshot is OK.
*/
public IgniteInternalFuture<SnapshotPartitionsVerifyTaskResult> checkSnapshot(String name, @Nullable Collection<String> grps) {
public IgniteInternalFuture<SnapshotPartitionsVerifyTaskResult> checkSnapshot(
String name,
@Nullable Collection<String> grps,
boolean includeCustomHandlers
) {
A.notNullOrEmpty(name, "Snapshot name cannot be null or empty.");
A.ensure(U.alphanumericUnderscore(name), "Snapshot name must satisfy the following name pattern: a-zA-Z0-9_");
A.ensure(grps == null || grps.stream().filter(Objects::isNull).collect(Collectors.toSet()).isEmpty(),
Expand Down Expand Up @@ -1009,16 +1098,18 @@ public IgniteInternalFuture<SnapshotPartitionsVerifyTaskResult> checkSnapshot(St
kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
kctx0.task().setThreadContext(TC_SUBGRID, new ArrayList<>(metas.keySet()));

kctx0.task().execute(SnapshotPartitionsVerifyTask.class, new SnapshotPartitionsVerifyTaskArg(grps, metas))
.listen(f1 -> {
if (f1.error() == null)
res.onDone(f1.result());
else if (f1.error() instanceof IgniteSnapshotVerifyException)
res.onDone(new SnapshotPartitionsVerifyTaskResult(metas,
new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f1.error()).exceptions())));
else
res.onDone(f1.error());
});
Class<? extends AbstractSnapshotVerificationTask> cls =
includeCustomHandlers ? SnapshotPartitionsVerifyTask.class : SnapshotRestoreHandleTask.class;

kctx0.task().execute(cls, new SnapshotPartitionsVerifyTaskArg(grps, metas)).listen(f1 -> {
if (f1.error() == null)
res.onDone(f1.result());
else if (f1.error() instanceof IgniteSnapshotVerifyException)
res.onDone(new SnapshotPartitionsVerifyTaskResult(metas,
new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f1.error()).exceptions())));
else
res.onDone(f1.error());
});
}
else {
if (f0.error() instanceof IgniteSnapshotVerifyException)
Expand Down Expand Up @@ -1069,7 +1160,7 @@ public SnapshotMetadata readSnapshotMetadata(String snpName, String consId) {
* @param smf File denoting to snapshot metafile.
* @return Snapshot metadata instance.
*/
private SnapshotMetadata readSnapshotMetadata(File smf) {
public SnapshotMetadata readSnapshotMetadata(File smf) {
if (!smf.exists())
throw new IgniteException("Snapshot metafile cannot be read due to it doesn't exist: " + smf);

Expand Down Expand Up @@ -1358,7 +1449,7 @@ public void onCacheGroupsStopped(List<Integer> grps) {
* @param consId Consistent node id.
* @return Snapshot metadata file name.
*/
private static String snapshotMetaFileName(String consId) {
public static String snapshotMetaFileName(String consId) {
return U.maskForFileName(consId) + SNAPSHOT_METAFILE_EXT;
}

Expand Down Expand Up @@ -2067,6 +2158,20 @@ public LocalSnapshotSender(String snpName) {
private static class SnapshotOperationResponse implements Serializable {
/** Serial version uid. */
private static final long serialVersionUID = 0L;

private final Map<String, SnapshotHandlerResult<Object>> hndResults;

public SnapshotOperationResponse() {
this(null);
}

public SnapshotOperationResponse(Map<String, SnapshotHandlerResult<Object>> hndResults) {
this.hndResults = F.isEmpty(hndResults) ? null : hndResults;
}

public @Nullable Map<String, SnapshotHandlerResult<Object>> operationHandlerResults() {
return hndResults;
}
}

/** Snapshot operation start message. */
Expand Down
Loading