Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -859,14 +859,22 @@ private static Collection<ClusterNode> nodes(final GridCacheContext<?, ?> cctx,
if (part != null && part >= cctx.affinity().partitions())
throw new IgniteCheckedException("Invalid partition number: " + part);

final Set<ClusterNode> owners =
part == null ? Collections.<ClusterNode>emptySet() : new HashSet<>(cctx.topology().owners(part, topVer));
final Set<ClusterNode> partHolders;

if (part != null) {
if (cctx.config().getCacheMode() == CacheMode.PARTITIONED)
partHolders = Collections.singleton(cctx.affinity().primaryByPartition(part, topVer));
else
partHolders = new HashSet<>(cctx.topology().owners(part, topVer));
}
else
partHolders = Collections.emptySet();

return F.view(affNodes, new P1<ClusterNode>() {
@Override public boolean apply(ClusterNode n) {
return cctx.discovery().cacheAffinityNode(n, cctx.name()) &&
(prj == null || prj.node(n.id()) != null) &&
(part == null || owners.contains(n));
(part == null || partHolders.contains(n));
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheEntry;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.QueryIndexType;
import org.apache.ignite.cache.query.IndexQuery;
import org.apache.ignite.cache.query.QueryMetrics;
Expand Down Expand Up @@ -1123,7 +1124,10 @@ public GridCloseableIterator indexQueryLocal(final CacheQuery qry) throws Ignite
if (part != null) {
final GridDhtLocalPartition locPart = cctx.dht().topology().localPartition(part);

if (locPart == null || locPart.state() != OWNING) {
boolean nonLocPart = locPart == null || locPart.state() != OWNING ||
(cctx.config().getCacheMode() == CacheMode.PARTITIONED && !locPart.primary(AffinityTopologyVersion.NONE));

if (nonLocPart) {
throw new CacheInvalidStateException("Failed to execute index query because required partition " +
"has not been found on local node [cacheName=" + cctx.name() + ", part=" + part + "]");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public static List<Object[]> params() {
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
.setCacheMode(cacheMode)
.setIndexedTypes(Integer.class, Person.class)
.setBackups(1)
.setAffinity(new RendezvousAffinityFunction().setPartitions(100));

cfg.setCacheConfiguration(ccfg);
Expand Down Expand Up @@ -136,7 +137,7 @@ public void testSinglePartition() {
}
}

assertEquals(sendReq, TestRecordingCommunicationSpi.spi(grid()).recordedMessages(true).size());
assertEquals("part=" + part, sendReq, TestRecordingCommunicationSpi.spi(grid()).recordedMessages(true).size());
}
}

Expand Down
Loading