diff --git a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/pagemem/JmhCacheFreelistBenchmark.java b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/pagemem/JmhCacheFreelistBenchmark.java new file mode 100644 index 0000000000000..8302f6bd36726 --- /dev/null +++ b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/pagemem/JmhCacheFreelistBenchmark.java @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.benchmarks.jmh.pagemem; + +import java.util.AbstractCollection; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider; +import org.apache.ignite.internal.metric.IoStatisticsHolder; +import org.apache.ignite.internal.metric.IoStatisticsHolderNoOp; +import org.apache.ignite.internal.pagemem.PageIdAllocator; +import org.apache.ignite.internal.pagemem.PageMemory; +import org.apache.ignite.internal.pagemem.impl.PageMemoryNoStoreImpl; +import org.apache.ignite.internal.processors.cache.CacheObjectImpl; +import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter; +import org.apache.ignite.internal.processors.cache.persistence.DataRegion; +import org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl; +import org.apache.ignite.internal.processors.cache.persistence.Storable; +import org.apache.ignite.internal.processors.cache.persistence.evict.NoOpPageEvictionTracker; +import org.apache.ignite.internal.processors.cache.persistence.freelist.CacheFreeList; +import org.apache.ignite.internal.processors.cache.persistence.freelist.FreeList; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.typedef.CAX; +import org.apache.ignite.logger.java.JavaLogger; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +/** + * Performance comparision between FreeList.insertRow(..) and FreeList.insertRows(..). + */ +@BenchmarkMode(Mode.AverageTime) +@Fork(value = 1, jvmArgsAppend = {"-Xms1g", "-server", "-XX:+AggressiveOpts", "-XX:MaxMetaspaceSize=256m", "-ea"}) +@OutputTimeUnit(MICROSECONDS) +@State(Scope.Benchmark) +@Threads(1) +@Warmup(iterations = 10, time = 200, timeUnit = MILLISECONDS) +@Measurement(iterations = 11, time = 200, timeUnit = MILLISECONDS) +public class JmhCacheFreelistBenchmark { + /** */ + private static final long MEMORY_REGION_SIZE = 10 * 1024 * 1024 * 1024L; // 10 GB + + /** */ + private static final int PAGE_SIZE = 4096; + + /** */ + private static final int ROWS_COUNT = 200; + + /** */ + public enum DATA_ROW_SIZE { + /** */ + r4_64(4, 64), + + /** */ + r100_300(100, 300), + + /** */ + r300_700(300, 700), + + /** */ + r700_1200(700, 1200), + + /** */ + r1200_3000(1_200, 3_000), + + /** */ + r1000_8000(1_000, 8_000), + + /** Large objects only. */ + r4000_16000(4_000, 16_000), + + /** Mixed objects, mostly large objects. */ + r100_32000(100, 32_000); + + /** */ + private final int min; + + /** */ + private final int max; + + /** */ + DATA_ROW_SIZE(int min, int max) { + this.min = min; + this.max = max; + } + } + + /** + * Check {@link FreeList#insertDataRow(Storable, IoStatisticsHolder)} performance. + */ + @Benchmark + public void insertRow(FreeListProvider provider, Data rows) throws IgniteCheckedException { + for (CacheDataRow row : rows) + provider.freeList.insertDataRow(row, IoStatisticsHolderNoOp.INSTANCE); + } + + /** + * Check {@link FreeList#insertDataRows(Collection, IoStatisticsHolder, org.apache.ignite.internal.util.typedef.CAX)} performance. + */ + @Benchmark + public void insertRows(FreeListProvider provider, Data rows) throws IgniteCheckedException { + provider.freeList.insertDataRows(rows, IoStatisticsHolderNoOp.INSTANCE, new CAX() { + @Override public void applyx() { + // No-op. + } + }); + } + + /** */ + @State(Scope.Thread) + public static class Data extends AbstractCollection { + /** */ + @Param + private DATA_ROW_SIZE range; + + /** */ + private Collection rows = new ArrayList<>(ROWS_COUNT); + + /** */ + @Setup(Level.Invocation) + public void prepare() { + Random rnd = ThreadLocalRandom.current(); + + int randomRange = range.max - range.min; + + for (int i = 0; i < ROWS_COUNT; i++) { + int keySize = (range.min + rnd.nextInt(randomRange)) / 2; + int valSize = (range.min + rnd.nextInt(randomRange)) / 2; + + CacheDataRow row = new TestDataRow(keySize, valSize); + + rows.add(row); + } + } + + /** */ + @TearDown(Level.Invocation) + public void cleanup() { + rows.clear(); + } + + /** {@inheritDoc} */ + @Override public Iterator iterator() { + return rows.iterator(); + } + + /** {@inheritDoc} */ + @Override public int size() { + return rows.size(); + } + } + + /** */ + @State(Scope.Thread) + public static class FreeListProvider { + /** */ + private final DataRegionConfiguration plcCfg = + new DataRegionConfiguration().setInitialSize(MEMORY_REGION_SIZE).setMaxSize(MEMORY_REGION_SIZE); + + /** */ + private final JavaLogger log = new JavaLogger(); + + /** */ + private PageMemory pageMem; + + /** */ + private FreeList freeList; + + /** */ + @Setup(Level.Trial) + public void setup() throws IgniteCheckedException { + pageMem = createPageMemory(log, PAGE_SIZE, plcCfg); + + freeList = createFreeList(pageMem, plcCfg); + } + + /** */ + @TearDown(Level.Trial) + public void tearDown() { + pageMem.stop(true); + } + + /** + * @return Page memory. + */ + protected PageMemory createPageMemory(IgniteLogger log, int pageSize, DataRegionConfiguration plcCfg) { + PageMemory pageMem = new PageMemoryNoStoreImpl(log, + new UnsafeMemoryProvider(log), + null, + pageSize, + plcCfg, + new DataRegionMetricsImpl(plcCfg), + true); + + pageMem.start(); + + return pageMem; + } + + /** + * @param pageMem Page memory. + * @return Free list. + * @throws IgniteCheckedException If failed. + */ + private FreeList createFreeList( + PageMemory pageMem, + DataRegionConfiguration plcCfg + ) throws IgniteCheckedException { + long metaPageId = pageMem.allocatePage(1, 1, PageIdAllocator.FLAG_DATA); + + DataRegionMetricsImpl regionMetrics = new DataRegionMetricsImpl(plcCfg); + + DataRegion dataRegion = new DataRegion(pageMem, plcCfg, regionMetrics, new NoOpPageEvictionTracker()); + + return new CacheFreeList( + 1, + "freelist", + regionMetrics, + dataRegion, + null, + null, + metaPageId, + true, + null + ); + } + } + + /** */ + private static class TestDataRow extends CacheDataRowAdapter { + /** */ + private long link; + + /** + * @param keySize Key size. + * @param valSize Value size. + */ + private TestDataRow(int keySize, int valSize) { + super( + new KeyCacheObjectImpl(0, new byte[keySize], 0), + new CacheObjectImpl(0, new byte[valSize]), + new GridCacheVersion(keySize, valSize, 1), + 0 + ); + } + + /** {@inheritDoc} */ + @Override public long link() { + return link; + } + + /** {@inheritDoc} */ + @Override public void link(long link) { + this.link = link; + } + } + + /** + * Run benchmark. + * + * @param args Args. + */ + public static void main(String[] args) throws RunnerException { + final Options options = new OptionsBuilder() + .include(JmhCacheFreelistBenchmark.class.getSimpleName()) + .build(); + + new Runner(options).run(); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index 9aec3996c3204..71702edf87ca2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -792,6 +792,42 @@ default boolean initialValue(CacheObject val, * @throws IgniteCheckedException In case of error. * @throws GridCacheEntryRemovedException If entry was removed. */ + default boolean initialValue(CacheObject val, + GridCacheVersion ver, + @Nullable MvccVersion mvccVer, + @Nullable MvccVersion newMvccVer, + byte mvccTxState, + byte newMvccTxState, + long ttl, + long expireTime, + boolean preload, + AffinityTopologyVersion topVer, + GridDrType drType, + boolean fromStore) throws IgniteCheckedException, GridCacheEntryRemovedException { + return initialValue(val, ver, null, null, TxState.NA, TxState.NA, + ttl, expireTime, preload, topVer, drType, fromStore, null); + } + + /** + * Sets new value if current version is 0 + * + * @param val New value. + * @param ver Version to use. + * @param mvccVer Mvcc version. + * @param newMvccVer New mvcc version. + * @param mvccTxState Tx state hint for mvcc version. + * @param newMvccTxState Tx state hint for new mvcc version. + * @param ttl Time to live. + * @param expireTime Expiration time. + * @param preload Flag indicating whether entry is being preloaded. + * @param topVer Topology version. + * @param drType DR type. + * @param fromStore {@code True} if value was loaded from store. + * @param row Pre-created data row, associated with this cache entry. + * @return {@code True} if initial value was set. + * @throws IgniteCheckedException In case of error. + * @throws GridCacheEntryRemovedException If entry was removed. + */ public boolean initialValue(CacheObject val, GridCacheVersion ver, @Nullable MvccVersion mvccVer, @@ -803,7 +839,8 @@ public boolean initialValue(CacheObject val, boolean preload, AffinityTopologyVersion topVer, GridDrType drType, - boolean fromStore) throws IgniteCheckedException, GridCacheEntryRemovedException; + boolean fromStore, + @Nullable CacheDataRow row) throws IgniteCheckedException, GridCacheEntryRemovedException; /** * Create versioned entry for this cache entry. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index f1b7ec76b2152..02cc42b0dfe05 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -3310,7 +3310,8 @@ private boolean skipInterceptor(@Nullable GridCacheVersion explicitVer) { boolean preload, AffinityTopologyVersion topVer, GridDrType drType, - boolean fromStore + boolean fromStore, + CacheDataRow row ) throws IgniteCheckedException, GridCacheEntryRemovedException { ensureFreeSpace(); @@ -3386,7 +3387,7 @@ else if (val == null) cctx.offheap().mvccInitialValue(this, val, ver, expTime, mvccVer, newMvccVer); } else - storeValue(val, expTime, ver); + storeValue(val, expTime, ver, null, row); } } else { @@ -3417,7 +3418,7 @@ else if (val == null) } else // Optimization to access storage only once. - update = storeValue(val, expTime, ver, p); + update = storeValue(val, expTime, ver, p, row); } if (update) { @@ -4257,7 +4258,7 @@ private IgniteTxLocalAdapter currentTx() { protected boolean storeValue(@Nullable CacheObject val, long expireTime, GridCacheVersion ver) throws IgniteCheckedException { - return storeValue(val, expireTime, ver, null); + return storeValue(val, expireTime, ver, null, null); } /** @@ -4267,6 +4268,7 @@ protected boolean storeValue(@Nullable CacheObject val, * @param expireTime Expire time. * @param ver New entry version. * @param predicate Optional predicate. + * @param row Pre-created data row, associated with this cache entry. * @return {@code True} if storage was modified. * @throws IgniteCheckedException If update failed. */ @@ -4274,10 +4276,12 @@ protected boolean storeValue( @Nullable CacheObject val, long expireTime, GridCacheVersion ver, - @Nullable IgnitePredicate predicate) throws IgniteCheckedException { + @Nullable IgnitePredicate predicate, + @Nullable CacheDataRow row + ) throws IgniteCheckedException { assert lock.isHeldByCurrentThread(); - UpdateClosure closure = new UpdateClosure(this, val, ver, expireTime, predicate); + UpdateClosure closure = new UpdateClosure(this, val, ver, expireTime, predicate, row); cctx.offheap().invoke(cctx, key, localPartition(), closure); @@ -5716,16 +5720,19 @@ private static class UpdateClosure implements IgniteCacheOffheapManager.OffheapI * @param predicate Optional predicate. */ UpdateClosure(GridCacheMapEntry entry, @Nullable CacheObject val, GridCacheVersion ver, long expireTime, - @Nullable IgnitePredicate predicate) { + @Nullable IgnitePredicate predicate, @Nullable CacheDataRow newRow) { this.entry = entry; this.val = val; this.ver = ver; this.expireTime = expireTime; this.predicate = predicate; + this.newRow = newRow; } /** {@inheritDoc} */ @Override public void call(@Nullable CacheDataRow oldRow) throws IgniteCheckedException { + assert newRow == null || val != null; + if (oldRow != null) { oldRow.key(entry.key); @@ -5741,6 +5748,14 @@ private static class UpdateClosure implements IgniteCacheOffheapManager.OffheapI } if (val != null) { + // If there is a pre created row, we cannot update the old one. + // The old row will be removed after the operation is completed, as usual. + if (newRow != null) { + treeOp = IgniteTree.OperationType.PUT; + + return; + } + newRow = entry.cctx.offheap().dataStore(entry.localPartition()).createRow( entry.cctx, entry.key, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java index ab8d338e46b05..45a14aa94242d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache; +import java.util.Collection; import java.util.List; import java.util.Map; import javax.cache.Cache; @@ -47,6 +48,7 @@ import org.apache.ignite.internal.util.lang.GridCursor; import org.apache.ignite.internal.util.lang.GridIterator; import org.apache.ignite.internal.util.lang.IgniteInClosure2X; +import org.apache.ignite.internal.util.lang.IgnitePredicate2X; import org.apache.ignite.lang.IgniteBiTuple; import org.jetbrains.annotations.Nullable; @@ -694,6 +696,10 @@ CacheDataRow createRow( long expireTime, @Nullable CacheDataRow oldRow) throws IgniteCheckedException; + /** */ + public void createRows(Collection infos, IgnitePredicate2X pred + ) throws IgniteCheckedException; + /** * @param cctx Cache context. * @param cleanupRows Rows to cleanup. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index acd9ca04cad93..7b40b38730675 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -25,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; @@ -40,6 +42,7 @@ import org.apache.ignite.failure.FailureContext; import org.apache.ignite.failure.FailureType; import org.apache.ignite.internal.NodeStoppingException; +import org.apache.ignite.internal.metric.IoStatisticsHolder; import org.apache.ignite.internal.pagemem.FullPageId; import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageMvccMarkUpdatedRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageMvccUpdateNewTxStateHintRecord; @@ -93,7 +96,6 @@ import org.apache.ignite.internal.processors.cache.tree.mvcc.search.MvccTreeClosure; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner; -import org.apache.ignite.internal.metric.IoStatisticsHolder; import org.apache.ignite.internal.transactions.IgniteTxUnexpectedStateCheckedException; import org.apache.ignite.internal.util.GridAtomicLong; import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; @@ -106,6 +108,8 @@ import org.apache.ignite.internal.util.lang.GridCursor; import org.apache.ignite.internal.util.lang.GridIterator; import org.apache.ignite.internal.util.lang.IgniteInClosure2X; +import org.apache.ignite.internal.util.lang.IgnitePredicate2X; +import org.apache.ignite.internal.util.typedef.CAX; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -1724,6 +1728,54 @@ private void invoke0(GridCacheContext cctx, CacheSearchRow row, OffheapInvokeClo return dataRow; } + /** {@inheritDoc} */ + @Override public void createRows(Collection infos, IgnitePredicate2X pred) throws IgniteCheckedException { + Collection rows = new ArrayList<>(infos.size()); + + for (GridCacheEntryInfo info : infos) { + rows.add(info.value() == null ? null : + makeDataRow(info.key(), + info.value(), + info.version(), + info.expireTime(), + grp.storeCacheIdInDataPage() ? info.cacheId() : CU.UNDEFINED_CACHE_ID)); + } + + if (!busyLock.enterBusy()) + throw new NodeStoppingException("Operation has been cancelled (node is stopping)."); + + try { + rowStore.addRows(F.view(rows, Objects::nonNull), grp.statisticsHolderData(), new CAX() { + @Override public void applyx() throws IgniteCheckedException { + grp.shared().database().ensureFreeSpace(grp.dataRegion()); + } + }); + } + finally { + busyLock.leaveBusy(); + } + + Iterator iter = rows.iterator(); + + try { + for (GridCacheEntryInfo info : infos) { + DataRow row = iter.next(); + + if (row != null && grp.sharedGroup() && row.cacheId() == CU.UNDEFINED_CACHE_ID) + row.cacheId(info.cacheId()); + + if (!pred.apply(info, row) && row != null) + rowStore.removeRow(row.link(), grp.statisticsHolderData()); + } + } + finally { + // Clean up unprocessed rows. + while (iter.hasNext()) + rowStore.removeRow(iter.next().link(), grp.statisticsHolderData()); + } + } + /** * @param key Cache key. * @param val Cache value. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 0cf5abac01220..d22f1858ac685 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -59,12 +59,14 @@ import org.apache.ignite.internal.processors.cache.mvcc.MvccUpdateVersionAware; import org.apache.ignite.internal.processors.cache.mvcc.MvccVersionAware; import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.IgniteInClosureX; +import org.apache.ignite.internal.util.lang.IgnitePredicate2X; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; @@ -89,6 +91,9 @@ * Thread pool for requesting partitions from other nodes and populating local cache. */ public class GridDhtPartitionDemander { + /** The maximum number of entries that can be preloaded between checkpoints. */ + private static final int CHECKPOINT_THRESHOLD = 100; + /** */ private final GridCacheSharedContext ctx; @@ -780,21 +785,28 @@ public void handleSupplyMessage( part.beforeApplyBatch(last); try { - Iterator infos = e.getValue().infos().iterator(); - - if (grp.mvccEnabled()) - mvccPreloadEntries(topVer, node, p, infos); - else - preloadEntries(topVer, node, p, infos); - - // If message was last for this partition, - // then we take ownership. - if (last) { - fut.partitionDone(nodeId, p, true); + List infos = e.getValue().infos(); + try { + if (grp.mvccEnabled()) + mvccPreloadEntries(topVer, node, p, infos); + else + preloadEntries(topVer, node, p, infos); + } + catch (GridDhtInvalidPartitionException ignored) { if (log.isDebugEnabled()) - log.debug("Finished rebalancing partition: " + - "[" + demandRoutineInfo(topicId, nodeId, supplyMsg) + ", p=" + p + "]"); + log.debug("Partition became invalid during rebalancing (will ignore): " + p); + } + finally { + // If message was last for this partition, + // then we take ownership. + if (last) { + fut.partitionDone(nodeId, p, true); + + if (log.isDebugEnabled()) + log.debug("Finished rebalancing partition: " + + "[" + demandRoutineInfo(topicId, nodeId, supplyMsg) + ", p=" + p + "]"); + } } } finally { @@ -879,21 +891,23 @@ public void handleSupplyMessage( * @throws IgniteInterruptedCheckedException If interrupted. */ private void mvccPreloadEntries(AffinityTopologyVersion topVer, ClusterNode node, int p, - Iterator infos) throws IgniteCheckedException { - if (!infos.hasNext()) + List infos) throws IgniteCheckedException { + if (infos.isEmpty()) return; List entryHist = new ArrayList<>(); GridCacheContext cctx = grp.sharedGroup() ? null : grp.singleCacheContext(); + Iterator iter = infos.iterator(); + // Loop through all received entries and try to preload them. - while (infos.hasNext() || !entryHist.isEmpty()) { + while (iter.hasNext() || !entryHist.isEmpty()) { ctx.database().checkpointReadLock(); try { - for (int i = 0; i < 100; i++) { - boolean hasMore = infos.hasNext(); + for (int i = 0; i < CHECKPOINT_THRESHOLD; i++) { + boolean hasMore = iter.hasNext(); assert hasMore || !entryHist.isEmpty(); @@ -902,7 +916,7 @@ private void mvccPreloadEntries(AffinityTopologyVersion topVer, ClusterNode node boolean flushHistory; if (hasMore) { - entry = (GridCacheMvccEntryInfo)infos.next(); + entry = (GridCacheMvccEntryInfo)iter.next(); GridCacheMvccEntryInfo prev = entryHist.isEmpty() ? null : entryHist.get(0); @@ -924,20 +938,9 @@ private void mvccPreloadEntries(AffinityTopologyVersion topVer, ClusterNode node } if (cctx != null) { - if (!mvccPreloadEntry(cctx, node, entryHist, topVer, p)) { - if (log.isTraceEnabled()) - log.trace("Got entries for invalid partition during " + - "preloading (will skip) [p=" + p + - ", entry=" + entryHist.get(entryHist.size() - 1) + ']'); - - return; // Skip current partition. - } + mvccPreloadEntry(cctx, node, entryHist, topVer, p); - //TODO: IGNITE-11330: Update metrics for touched cache only. - for (GridCacheContext ctx : grp.caches()) { - if (ctx.statisticsEnabled()) - ctx.cache().metrics0().onRebalanceKeyReceived(); - } + updateCacheMetrics(); } if (!hasMore) @@ -956,52 +959,36 @@ private void mvccPreloadEntries(AffinityTopologyVersion topVer, ClusterNode node } /** - * Adds entries with theirs history to partition p. - * - * @param node Node which sent entry. - * @param p Partition id. - * @param infos Entries info for preload. * @param topVer Topology version. - * @throws IgniteInterruptedCheckedException If interrupted. + * @param from Node which sent entry. + * @param p Partition id. + * @param infos Preloaded entries. + * @throws IgniteCheckedException If failed. */ - private void preloadEntries(AffinityTopologyVersion topVer, ClusterNode node, int p, - Iterator infos) throws IgniteCheckedException { - GridCacheContext cctx = null; + private void preloadEntries( + AffinityTopologyVersion topVer, + ClusterNode from, + int p, + List infos + ) throws IgniteCheckedException { + int cnt = infos.size(); + int off = 0; + + while (off < cnt) { + Collection batch = infos.subList(off, Math.min(cnt, off += CHECKPOINT_THRESHOLD)); - // Loop through all received entries and try to preload them. - while (infos.hasNext()) { ctx.database().checkpointReadLock(); try { - for (int i = 0; i < 100; i++) { - if (!infos.hasNext()) - break; - - GridCacheEntryInfo entry = infos.next(); + GridDhtLocalPartition part = grp.topology().localPartition(p); - if (cctx == null || (grp.sharedGroup() && entry.cacheId() != cctx.cacheId())) { - cctx = grp.sharedGroup() ? grp.shared().cacheContext(entry.cacheId()) : grp.singleCacheContext(); - - if (cctx == null) - continue; - else if (cctx.isNear()) - cctx = cctx.dhtCache().context(); - } - - if (!preloadEntry(node, p, entry, topVer, cctx)) { - if (log.isTraceEnabled()) - log.trace("Got entries for invalid partition during " + - "preloading (will skip) [p=" + p + ", entry=" + entry + ']'); + part.dataStore().createRows(batch, new IgnitePredicate2X() { + @Override public boolean applyx(GridCacheEntryInfo info, CacheDataRow row) throws IgniteCheckedException { + GridCacheContext cctx = resolveCacheContext(info); - return; + return cctx != null && preloadEntry(from, p, info, topVer, cctx, row); } - - //TODO: IGNITE-11330: Update metrics for touched cache only. - for (GridCacheContext ctx : grp.caches()) { - if (ctx.statisticsEnabled()) - ctx.cache().metrics0().onRebalanceKeyReceived(); - } - } + }); } finally { ctx.database().checkpointReadUnlock(); @@ -1017,7 +1004,8 @@ else if (cctx.isNear()) * @param entry Preloaded entry. * @param topVer Topology version. * @param cctx Cache context. - * @return {@code False} if partition has become invalid during preloading. + * @param row Pre-created data row, associated with this cache entry. + * @return {@code True} if the initial value was set for the specified cache entry. * @throws IgniteInterruptedCheckedException If interrupted. */ private boolean preloadEntry( @@ -1025,7 +1013,8 @@ private boolean preloadEntry( int p, GridCacheEntryInfo entry, AffinityTopologyVersion topVer, - GridCacheContext cctx + GridCacheContext cctx, + @Nullable CacheDataRow row ) throws IgniteCheckedException { assert ctx.database().checkpointLockIsHeldByThread(); @@ -1053,7 +1042,8 @@ private boolean preloadEntry( true, topVer, cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE, - false + false, + row )) { cached.touch(); // Start tracking. @@ -1061,6 +1051,8 @@ private boolean preloadEntry( cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(), null, null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, entry.value(), true, null, false, null, null, null, true); + + return true; } else { cached.touch(); // Start tracking. @@ -1069,21 +1061,15 @@ private boolean preloadEntry( log.trace("Rebalancing entry is already in cache (will ignore) [key=" + cached.key() + ", part=" + p + ']'); } + + updateCacheMetrics(); } - else if (log.isTraceEnabled()) - log.trace("Rebalance predicate evaluated to false for entry (will ignore): " + entry); } catch (GridCacheEntryRemovedException ignored) { if (log.isTraceEnabled()) log.trace("Entry has been concurrently removed while rebalancing (will ignore) [key=" + cached.key() + ", part=" + p + ']'); } - catch (GridDhtInvalidPartitionException ignored) { - if (log.isDebugEnabled()) - log.debug("Partition became invalid during rebalancing (will ignore): " + p); - - return false; - } } catch (IgniteInterruptedCheckedException e) { throw e; @@ -1093,7 +1079,7 @@ else if (log.isTraceEnabled()) ctx.localNode() + ", node=" + from.id() + ", key=" + entry.key() + ", part=" + p + ']', e); } - return true; + return false; } /** @@ -1104,7 +1090,7 @@ else if (log.isTraceEnabled()) * @param history Mvcc entry history. * @param topVer Topology version. * @param p Partition id. - * @return {@code False} if partition has become invalid during preloading. + * @return {@code True} if the initial value was set for the specified cache entry. * @throws IgniteInterruptedCheckedException If interrupted. */ private boolean mvccPreloadEntry( @@ -1137,6 +1123,8 @@ private boolean mvccPreloadEntry( cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(), null, null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, null, true, null, false, null, null, null, true); + + return true; } else { cached.touch(); // Start tracking. @@ -1151,12 +1139,6 @@ private boolean mvccPreloadEntry( log.trace("Entry has been concurrently removed while rebalancing (will ignore) [key=" + cached.key() + ", part=" + p + ']'); } - catch (GridDhtInvalidPartitionException ignored) { - if (log.isDebugEnabled()) - log.debug("Partition became invalid during rebalancing (will ignore): " + p); - - return false; - } } catch (IgniteInterruptedCheckedException | ClusterTopologyCheckedException e) { throw e; @@ -1166,7 +1148,7 @@ private boolean mvccPreloadEntry( ctx.localNode() + ", node=" + from.id() + ", key=" + info.key() + ", part=" + p + ']', e); } - return true; + return false; } /** @@ -1180,6 +1162,34 @@ private String demandRoutineInfo(int topicId, UUID supplier, GridDhtPartitionSup return "grp=" + grp.cacheOrGroupName() + ", topVer=" + supplyMsg.topologyVersion() + ", supplier=" + supplier + ", topic=" + topicId; } + /** + * Get cache context. + * + * @param info Preloading entry. + * @return Cache context or {@code null} if context does not exists.. + */ + private GridCacheContext resolveCacheContext(GridCacheEntryInfo info) { + GridCacheContext cctx = grp.sharedGroup() ? ctx.cacheContext(info.cacheId()) : grp.singleCacheContext(); + + if (cctx == null) + return null; + + return cctx.isNear() ? cctx.dhtCache().context() : cctx; + } + + /** + * Update rebalancing metrics. + */ + private void updateCacheMetrics() { + // TODO: IGNITE-11330: Update metrics for touched cache only. + // Due to historical rebalancing "EstimatedRebalancingKeys" metric is currently calculated for the whole cache + // group (by partition counters), so "RebalancedKeys" and "RebalancingKeysRate" is calculated in the same way. + for (GridCacheContext cctx0 : grp.caches()) { + if (cctx0.statisticsEnabled()) + cctx0.cache().metrics0().onRebalanceKeyReceived(); + } + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridDhtPartitionDemander.class, this); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java index 8207b2bc2dc28..880157248fbd4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.persistence; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -59,6 +60,7 @@ import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; +import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; import org.apache.ignite.internal.processors.cache.GridCacheMvccEntryInfo; import org.apache.ignite.internal.processors.cache.GridCacheTtlManager; import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl; @@ -100,6 +102,7 @@ import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.lang.GridCursor; import org.apache.ignite.internal.util.lang.IgniteInClosure2X; +import org.apache.ignite.internal.util.lang.IgnitePredicate2X; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; @@ -2399,6 +2402,17 @@ private Metas getOrAllocatePartitionMetas() throws IgniteCheckedException { delegate.invoke(cctx, key, c); } + /** {@inheritDoc} */ + @Override public void createRows( + Collection infos, IgnitePredicate2X pred + ) throws IgniteCheckedException { + assert ctx.database().checkpointLockIsHeldByThread(); + + CacheDataStore delegate = init0(false); + + delegate.createRows(infos, pred); + } + /** {@inheritDoc} */ @Override public void remove(GridCacheContext cctx, KeyCacheObject key, int partId) throws IgniteCheckedException { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java index bdd1c2db3782c..aa3f98e388730 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java @@ -17,7 +17,9 @@ package org.apache.ignite.internal.processors.cache.persistence; +import java.util.Collection; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.metric.IoStatisticsHolder; import org.apache.ignite.internal.pagemem.PageIdUtils; import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.processors.cache.CacheGroupContext; @@ -26,7 +28,7 @@ import org.apache.ignite.internal.processors.cache.persistence.freelist.FreeList; import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler; import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner; -import org.apache.ignite.internal.metric.IoStatisticsHolder; +import org.apache.ignite.internal.util.typedef.CAX; import org.apache.ignite.internal.util.typedef.internal.U; /** @@ -117,6 +119,18 @@ public void addRow(CacheDataRow row, IoStatisticsHolder statHolder) throws Ignit ", link=" + U.hexLong(row.link()) + ']'; } + /** + * @param rows Rows. + * @param statHolder Statistics holder to track IO operations. + * @param clo + * @throws IgniteCheckedException If failed. + */ + public void addRows(Collection rows, IoStatisticsHolder statHolder, CAX clo) throws IgniteCheckedException { + assert ctx.database().checkpointLockIsHeldByThread(); + + freeList.insertDataRows(rows, statHolder, clo); + } + /** * @param link Row link. * @param row New row data. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java index b677116353a38..65403b46cadd0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.processors.cache.persistence.freelist; +import java.util.Collection; +import java.util.Iterator; import java.util.concurrent.atomic.AtomicReferenceArray; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; @@ -42,6 +44,7 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageLockListener; import org.apache.ignite.internal.metric.IoStatisticsHolder; import org.apache.ignite.internal.metric.IoStatisticsHolderNoOp; +import org.apache.ignite.internal.util.typedef.CAX; import org.apache.ignite.internal.util.typedef.internal.U; /** @@ -130,13 +133,25 @@ private final class UpdateRowHandler extends PageHandler { } } - /** */ - private final PageHandler writeRow = new WriteRowHandler(); + /** Write handler which puts memory page into the free list after an update. */ + private final PageHandler writeRow = new WriteRowHandler(true); - /** - * - */ + /** Write handler which doesn't put memory page into the free list after an update. */ + private final PageHandler writeRowKeepPage = new WriteRowHandler(false); + + /** */ private final class WriteRowHandler extends PageHandler { + /** */ + private final boolean putPageIntoFreeList; + + /** + * @param putPageIntoFreeList Put page into the free list after an update. + */ + WriteRowHandler(boolean putPageIntoFreeList) { + this.putPageIntoFreeList = putPageIntoFreeList; + } + + /** {@inheritDoc} */ @Override public Integer run( int cacheId, long pageId, @@ -159,13 +174,15 @@ private final class WriteRowHandler extends PageHandler { written = (written == 0 && oldFreeSpace >= rowSize) ? addRow(pageId, page, pageAddr, io, row, rowSize) : addRowFragment(pageId, page, pageAddr, io, row, written, rowSize); - // Reread free space after update. - int newFreeSpace = io.getFreeSpace(pageAddr); + if (putPageIntoFreeList) { + // Reread free space after update. + int newFreeSpace = io.getFreeSpace(pageAddr); - if (newFreeSpace > MIN_PAGE_FREE_SPACE) { - int bucket = bucket(newFreeSpace, false); + if (newFreeSpace > MIN_PAGE_FREE_SPACE) { + int bucket = bucket(newFreeSpace, false); - put(null, pageId, page, pageAddr, bucket, statHolder); + put(null, pageId, page, pageAddr, bucket, statHolder); + } } if (written == rowSize) @@ -487,7 +504,7 @@ private long allocateDataPage(int part) throws IgniteCheckedException { long pageId = 0L; if (remaining < MIN_SIZE_FOR_DATA_PAGE) { - for (int b = bucket(remaining, false) + 1; b < BUCKETS - 1; b++) { + for (int b = bucket(remaining, false) + 1; b < REUSE_BUCKET; b++) { pageId = takeEmptyPage(b, row.ioVersions(), statHolder); if (pageId != 0L) @@ -510,7 +527,7 @@ private long allocateDataPage(int part) throws IgniteCheckedException { initIo = row.ioVersions().latest(); } else if (PageIdUtils.tag(pageId) != PageIdAllocator.FLAG_DATA) // Page is taken from reuse bucket. - pageId = initReusedPage(row, pageId, row.partition(), statHolder); + pageId = initReusedPage(row, pageId, statHolder); else // Page is taken from free space bucket. For in-memory mode partition must be changed. pageId = PageIdUtils.changePartitionId(pageId, (row.partition())); @@ -529,15 +546,185 @@ else if (PageIdUtils.tag(pageId) != PageIdAllocator.FLAG_DATA) // Page is taken } /** + * Reduces the workload on the free list by writing multiple rows into a single memory page at once.
+ *
+ * Rows are sequentially added to the page as long as there is enough free space on it. If the row is large then + * those fragments that occupy the whole memory page are written to other pages, and the remainder is added to the + * current one. + * + * @param rows Rows. + * @param statHolder Statistics holder to track IO operations. + * @param checkFreeSpace + * @throws IgniteCheckedException If failed. + */ + @Override public void insertDataRows(Collection rows, + IoStatisticsHolder statHolder, CAX checkFreeSpace) throws IgniteCheckedException { + try { + Iterator iter = rows.iterator(); + + T row = null; + + int written = COMPLETE; + + while (iter.hasNext() || written != COMPLETE) { + checkFreeSpace.applyx(); + + if (written == COMPLETE) { + row = iter.next(); + + // If the data row was completely written without remainder, proceed to the next. + if ((written = writeWholePages(row, statHolder)) == COMPLETE) + continue; + } + + int remaining = row.size() - written; + + long pageId = 0L; + + if (remaining < MIN_SIZE_FOR_DATA_PAGE) { + for (int b = bucket(remaining, false) + 1; b < REUSE_BUCKET; b++) { + pageId = takeEmptyPage(b, row.ioVersions(), statHolder); + + if (pageId != 0L) + break; + } + } + + if (pageId == 0L) { // Handle reuse bucket. + if (reuseList == this) + pageId = takeEmptyPage(REUSE_BUCKET, row.ioVersions(), statHolder); + else + pageId = reuseList.takeRecycledPage(); + } + + AbstractDataPageIO initIo = null; + + if (pageId == 0L) { + pageId = allocateDataPage(row.partition()); + + initIo = row.ioVersions().latest(); + } + else if (PageIdUtils.tag(pageId) != PageIdAllocator.FLAG_DATA) // Page is taken from reuse bucket. + pageId = initReusedPage(row, pageId, statHolder); + else // Page is taken from free space bucket. For in-memory mode partition must be changed. + pageId = PageIdUtils.changePartitionId(pageId, (row.partition())); + + // Acquire and lock page. + long page = acquirePage(pageId, statHolder); + + try { + long pageAddr = writeLock(pageId, page); + + assert pageAddr != 0; + + boolean dirty = false; + + try { + AbstractDataPageIO io = initIo == null ? PageIO.getPageIO(pageAddr) : initIo; + + // Fill the page up to the end. + while (iter.hasNext() || written != COMPLETE) { + if (written == COMPLETE) { + row = iter.next(); + + // If the data row was completely written without remainder, proceed to the next. + if ((written = writeWholePages(row, statHolder)) == COMPLETE) + continue; + + if (io.getFreeSpace(pageAddr) < row.size() - written) + break; + } + + written = PageHandler.writePage(pageMem, grpId, pageId, page, pageAddr, lockLsnr, + writeRowKeepPage, initIo, wal, null, row, written, statHolder); + + initIo = null; + + dirty = true; + + assert written == COMPLETE; + } + + int freeSpace = io.getFreeSpace(pageAddr); + + // Put page into the free list if needed. + if (freeSpace > MIN_PAGE_FREE_SPACE) { + int bucket = bucket(freeSpace, false); + + put(null, pageId, page, pageAddr, bucket, statHolder); + } + + assert PageIO.getCrc(pageAddr) == 0; //TODO GG-11480 + } + finally { + // Should always unlock data page after an update. + assert writeRowKeepPage.releaseAfterWrite(grpId, pageId, page, pageAddr, row, 0); + + writeUnlock(pageId, page, pageAddr, dirty); + } + } + finally { + releasePage(pageId, page); + } + } + } + catch (RuntimeException e) { + throw new CorruptedFreeListException("Failed to insert data rows", e); + } + } + + /** + * Write fragments of the row, which occupy the whole memory page. + * + * @param row Row to process. + * @param statHolder Statistics holder to track IO operations. + * @return Number of bytes written, {@link #COMPLETE} if the row was fully written. + * @throws IgniteCheckedException If failed. + */ + private int writeWholePages(T row, IoStatisticsHolder statHolder) throws IgniteCheckedException { + if (row.size() < MIN_SIZE_FOR_DATA_PAGE) + return 0; + + assert row.link() == 0 : row.link(); + + int written = 0; + + do { + long pageId = reuseList == this ? takeEmptyPage(REUSE_BUCKET, row.ioVersions(), statHolder) : + reuseList.takeRecycledPage(); + + AbstractDataPageIO initIo = null; + + if (pageId == 0L) { + pageId = allocateDataPage(row.partition()); + + initIo = row.ioVersions().latest(); + } + else if (PageIdUtils.tag(pageId) != PageIdAllocator.FLAG_DATA) // Page is taken from reuse bucket. + pageId = initReusedPage(row, pageId, statHolder); + else // Page is taken from free space bucket. For in-memory mode partition must be changed. + pageId = PageIdUtils.changePartitionId(pageId, (row.partition())); + + written = write(pageId, writeRow, initIo, row, written, FAIL_I, statHolder); + + assert written != FAIL_I; // We can't fail here. + + memMetrics.incrementLargeEntriesPages(); + } + while (row.size() - written >= MIN_SIZE_FOR_DATA_PAGE); + + return written; + } + + /** + * @param row Row. * @param reusedPageId Reused page id. - * @param partId Partition id. * @param statHolder Statistics holder to track IO operations. * @return Prepared page id. * * @see PagesList#initReusedPage(long, long, long, int, byte, PageIO) */ - private long initReusedPage(T row, long reusedPageId, int partId, - IoStatisticsHolder statHolder) throws IgniteCheckedException { + private long initReusedPage(T row, long reusedPageId, IoStatisticsHolder statHolder) throws IgniteCheckedException { long reusedPage = acquirePage(reusedPageId, statHolder); try { long reusedPageAddr = writeLock(reusedPageId, reusedPage); @@ -546,7 +733,7 @@ private long initReusedPage(T row, long reusedPageId, int partId, try { return initReusedPage(reusedPageId, reusedPage, reusedPageAddr, - partId, PageIdAllocator.FLAG_DATA, row.ioVersions().latest()); + row.partition(), PageIdAllocator.FLAG_DATA, row.ioVersions().latest()); } finally { writeUnlock(reusedPageId, reusedPage, reusedPageAddr, true); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeList.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeList.java index 28f5a50964c93..5fd28a7ae235d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeList.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeList.java @@ -17,24 +17,37 @@ package org.apache.ignite.internal.processors.cache.persistence.freelist; +import java.util.Collection; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.processors.cache.persistence.Storable; import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler; import org.apache.ignite.internal.metric.IoStatisticsHolder; +import org.apache.ignite.internal.util.typedef.CAX; /** */ public interface FreeList { /** * @param row Row. + * @param statHolder Statistics holder to track IO operations. * @throws IgniteCheckedException If failed. */ public void insertDataRow(T row, IoStatisticsHolder statHolder) throws IgniteCheckedException; + /** + * @param rows Rows. + * @param statHolder Statistics holder to track IO operations. + * @param checkFreeSpace + * @throws IgniteCheckedException If failed. + */ + public void insertDataRows(Collection rows, IoStatisticsHolder statHolder, + CAX checkFreeSpace) throws IgniteCheckedException; + /** * @param link Row link. * @param row New row data. + * @param statHolder Statistics holder to track IO operations. * @return {@code True} if was able to update row. * @throws IgniteCheckedException If failed. */ @@ -46,6 +59,7 @@ public interface FreeList { * @param arg Handler argument. * @param Argument type. * @param Result type. + * @param statHolder Statistics holder to track IO operations. * @return Result. * @throws IgniteCheckedException If failed. */ @@ -54,6 +68,7 @@ public R updateDataRow(long link, PageHandler pageHnd, S arg, IoSta /** * @param link Row link. + * @param statHolder Statistics holder to track IO operations. * @throws IgniteCheckedException If failed. */ public void removeDataRowByLink(long link, IoStatisticsHolder statHolder) throws IgniteCheckedException; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java index 283b3bae1aa86..79041d90f463e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java @@ -282,15 +282,8 @@ public static R writePage( boolean ok = false; try { - if (init != null) { - // It is a new page and we have to initialize it. - doInitPage(pageMem, grpId, pageId, page, pageAddr, init, wal); - walPlc = FALSE; - } - else - init = PageIO.getPageIO(pageAddr); - - R res = h.run(grpId, pageId, page, pageAddr, init, walPlc, arg, intArg, statHolder); + R res = writePage( + pageMem, grpId, pageId, page, pageAddr, lsnr, h, init, wal, walPlc, arg, intArg, statHolder); ok = true; @@ -309,6 +302,48 @@ public static R writePage( } } + /** + * @param pageMem Page memory. + * @param grpId Group ID. + * @param pageId Page ID. + * @param pageAddr Page address. + * @param lsnr Lock listener. + * @param h Handler. + * @param init IO for new page initialization or {@code null} if it is an existing page. + * @param wal Write ahead log. + * @param walPlc Full page WAL record policy. + * @param arg Argument. + * @param intArg Argument of type {@code int}. + * @param statHolder Statistics holder to track IO operations. + * @return Handler result. + * @throws IgniteCheckedException If failed. + */ + public static R writePage( + PageMemory pageMem, + int grpId, + long pageId, + long page, + long pageAddr, + PageLockListener lsnr, + PageHandler h, + PageIO init, + IgniteWriteAheadLogManager wal, + Boolean walPlc, + X arg, + int intArg, + IoStatisticsHolder statHolder + ) throws IgniteCheckedException { + if (init != null) { + // It is a new page and we have to initialize it. + doInitPage(pageMem, grpId, pageId, page, pageAddr, init, wal); + walPlc = FALSE; + } + else + init = PageIO.getPageIO(pageAddr); + + return h.run(grpId, pageId, page, pageAddr, init, walPlc, arg, intArg, statHolder); + } + /** * @param pageMem Page memory. * @param grpId Group ID. diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java index 8d1ab878f2970..f6900b8401ad6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java @@ -711,7 +711,8 @@ void recheckLock() { boolean preload, AffinityTopologyVersion topVer, GridDrType drType, - boolean fromStore + boolean fromStore, + CacheDataRow row ) throws IgniteCheckedException, GridCacheEntryRemovedException { assert false; diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MemoryLeakAfterRebalanceSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MemoryLeakAfterRebalanceSelfTest.java new file mode 100644 index 0000000000000..19809d674d46e --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MemoryLeakAfterRebalanceSelfTest.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.pagemem; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import javax.cache.Cache; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage; +import org.apache.ignite.internal.processors.cache.IgniteInternalCache; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; +import org.apache.ignite.internal.util.lang.GridCloseableIterator; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * Check page memory consistency after preloading. + */ +@RunWith(Parameterized.class) +public class MemoryLeakAfterRebalanceSelfTest extends GridCommonAbstractTest { + /** */ + @Parameterized.Parameter + public CacheAtomicityMode cacheAtomicityMode; + + /** */ + @Parameterized.Parameters(name = " [atomicity={0}]") + public static Iterable parameters() { + return Arrays.asList(new Object[][] {{CacheAtomicityMode.ATOMIC}, {CacheAtomicityMode.TRANSACTIONAL}}); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setCommunicationSpi(new TestRecordingCommunicationSpi()); + + cfg.setDataStorageConfiguration(new DataStorageConfiguration(). + setDefaultDataRegionConfiguration( + new DataRegionConfiguration() + .setMaxSize(200 * 1024 * 1024) + .setPersistenceEnabled(true))); + + CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME); + + ccfg.setCacheMode(CacheMode.REPLICATED); + ccfg.setAtomicityMode(cacheAtomicityMode); + ccfg.setAffinity(new RendezvousAffinityFunction(false, 16)); + + cfg.setCacheConfiguration(ccfg); + + cfg.setRebalanceThreadPoolSize(4); + + return cfg; + } + + /** Initialization. */ + @Before + public void before() throws Exception { + cleanPersistenceDir(); + } + + /** Clean up. */ + @After + public void after() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testPreloadingWithConcurrentUpdates() throws Exception { + int size = GridTestUtils.SF.applyLB(500_000, 5_000); + + // Prepare data. + Map data = new HashMap<>(U.capacity(size)); + + for (int i = 0; i < size; i++) + data.put(i, i + " v.1"); + + // Start 1 node. + Ignite node0 = startGrid(0); + + node0.cluster().active(true); + + node0.cluster().baselineAutoAdjustTimeout(0); + + IgniteCache cache0 = node0.cache(DEFAULT_CACHE_NAME); + + // Load data. + cache0.putAll(data); + + TestRecordingCommunicationSpi.spi(node0) + .blockMessages((node, msg) -> + msg instanceof GridDhtPartitionSupplyMessage + && ((GridCacheGroupIdMessage)msg).groupId() == groupIdForCache(node0, DEFAULT_CACHE_NAME) + ); + + // Start 2 node. + IgniteEx node1 = startGrid(1); + + TestRecordingCommunicationSpi.spi(node0).waitForBlocked(); + + // Simulate concurrent updates when preloading. + for (int i = 0; i < size; i += 10) { + String val = i + " v.2"; + + cache0.put(i, val); + data.put(i, val); + + // Start preloading. + if (i == size / 2) + TestRecordingCommunicationSpi.spi(node0).stopBlock(); + } + + awaitPartitionMapExchange(); + + // Stop node 1. + stopGrid(0); + + awaitPartitionMapExchange(); + + IgniteInternalCache cache1 = node1.cachex(DEFAULT_CACHE_NAME); + + assertEquals(data.size(), cache1.size()); + + GridCacheContext cctx = cache1.context(); + + // Make sure that there are no duplicate entries on the data pages in the pages memory. + try (GridCloseableIterator> iter = cctx.offheap().cacheEntriesIterator( + cctx, + true, + true, + cctx.topology().readyTopologyVersion(), + false, + null, + true)) { + + while (iter.hasNext()) { + Cache.Entry entry = iter.next(); + + Integer key = entry.getKey(); + + String exp = data.remove(key); + + assertEquals(exp, entry.getValue()); + } + } + + assertTrue(data.isEmpty()); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListSelfTest.java index 12b804f93a86c..bc6d708599745 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListSelfTest.java @@ -18,9 +18,11 @@ package org.apache.ignite.internal.processors.database; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Random; import java.util.concurrent.Callable; @@ -48,6 +50,7 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.CacheVersionIO; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.metric.IoStatisticsHolderNoOp; +import org.apache.ignite.internal.util.typedef.CAX; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.apache.ignite.testframework.GridTestUtils; @@ -65,6 +68,9 @@ public class CacheFreeListSelfTest extends GridCommonAbstractTest { /** */ private static final long MB = 1024L * 1024L; + /** */ + private static final int BATCH_SIZE = 100; + /** */ private PageMemory pageMem; @@ -78,6 +84,46 @@ public class CacheFreeListSelfTest extends GridCommonAbstractTest { pageMem = null; } + /** + * @throws Exception if failed. + */ + @Test + public void testInsertDeleteSingleThreaded_batched_1024() throws Exception { + checkInsertDeleteSingleThreaded(1024, true); + } + + /** + * @throws Exception if failed. + */ + @Test + public void testInsertDeleteSingleThreaded_batched_2048() throws Exception { + checkInsertDeleteSingleThreaded(2048, true); + } + + /** + * @throws Exception if failed. + */ + @Test + public void testInsertDeleteSingleThreaded_batched_4096() throws Exception { + checkInsertDeleteSingleThreaded(4096, true); + } + + /** + * @throws Exception if failed. + */ + @Test + public void testInsertDeleteSingleThreaded_batched_8192() throws Exception { + checkInsertDeleteSingleThreaded(8192, true); + } + + /** + * @throws Exception if failed. + */ + @Test + public void testInsertDeleteSingleThreaded_batched_16384() throws Exception { + checkInsertDeleteSingleThreaded(16384, true); + } + /** * @throws Exception if failed. */ @@ -158,11 +204,60 @@ public void testInsertDeleteMultiThreaded_16384() throws Exception { checkInsertDeleteMultiThreaded(16384); } + /** + * @throws Exception if failed. + */ + @Test + public void testInsertDeleteMultiThreaded_batched_1024() throws Exception { + checkInsertDeleteMultiThreaded(1024, true); + } + + /** + * @throws Exception if failed. + */ + @Test + public void testInsertDeleteMultiThreaded_batched_2048() throws Exception { + checkInsertDeleteMultiThreaded(2048, true); + } + + /** + * @throws Exception if failed. + */ + @Test + public void testInsertDeleteMultiThreaded_batched_4096() throws Exception { + checkInsertDeleteMultiThreaded(4096, true); + } + + /** + * @throws Exception if failed. + */ + @Test + public void testInsertDeleteMultiThreaded_batched_8192() throws Exception { + checkInsertDeleteMultiThreaded(8192, true); + } + + /** + * @throws Exception if failed. + */ + @Test + public void testInsertDeleteMultiThreaded_batched_16384() throws Exception { + checkInsertDeleteMultiThreaded(16384, true); + } + /** * @param pageSize Page size. * @throws Exception If failed. */ - protected void checkInsertDeleteMultiThreaded(final int pageSize) throws Exception { + protected void checkInsertDeleteMultiThreaded(int pageSize) throws Exception { + checkInsertDeleteMultiThreaded(pageSize, false); + } + + /** + * @param pageSize Page size. + * @param batched Batch mode flag. + * @throws Exception If failed. + */ + protected void checkInsertDeleteMultiThreaded(final int pageSize, final boolean batched) throws Exception { final FreeList list = createFreeList(pageSize); Random rnd = new Random(); @@ -188,6 +283,8 @@ protected void checkInsertDeleteMultiThreaded(final int pageSize) throws Excepti GridTestUtils.runMultiThreaded(new Callable() { @Override public Object call() throws Exception { + List rows = new ArrayList<>(BATCH_SIZE); + Random rnd = ThreadLocalRandom.current(); for (int i = 0; i < 200_000; i++) { @@ -218,16 +315,38 @@ protected void checkInsertDeleteMultiThreaded(final int pageSize) throws Excepti TestDataRow row = new TestDataRow(keySize, valSize); - list.insertDataRow(row, IoStatisticsHolderNoOp.INSTANCE); + if (batched) + rows.add(row); + else { + list.insertDataRow(row, IoStatisticsHolderNoOp.INSTANCE); - assertTrue(row.link() != 0L); + assertTrue(row.link() != 0L); - TestDataRow old = stored.put(row.link(), row); + TestDataRow old = stored.put(row.link(), row); - assertNull(old); + assertNull(old); + } + + if (rows.size() == BATCH_SIZE) { + list.insertDataRows(rows, IoStatisticsHolderNoOp.INSTANCE, new CAX() { + @Override public void applyx() { + // No-op. + } + }); + + for (TestDataRow row0 : rows) { + assertTrue(row0.link() != 0L); + + TestDataRow old = stored.put(row0.link(), row0); + + assertNull(old); + } + + rows.clear(); + } } else { - while (true) { + while (!stored.isEmpty()) { Iterator it = stored.values().iterator(); if (it.hasNext()) { @@ -251,9 +370,19 @@ protected void checkInsertDeleteMultiThreaded(final int pageSize) throws Excepti } /** - * @throws Exception if failed. + * @param pageSize Page size. + * @throws Exception If failed. */ protected void checkInsertDeleteSingleThreaded(int pageSize) throws Exception { + checkInsertDeleteSingleThreaded(pageSize, false); + } + + /** + * @param pageSize Page size. + * @param batched Batch mode flag. + * @throws Exception if failed. + */ + protected void checkInsertDeleteSingleThreaded(int pageSize, boolean batched) throws Exception { FreeList list = createFreeList(pageSize); Random rnd = new Random(); @@ -277,6 +406,8 @@ protected void checkInsertDeleteSingleThreaded(int pageSize) throws Exception { boolean grow = true; + List rows = new ArrayList<>(BATCH_SIZE); + for (int i = 0; i < 1_000_000; i++) { if (grow) { if (stored.size() > 20_000) { @@ -301,13 +432,35 @@ protected void checkInsertDeleteSingleThreaded(int pageSize) throws Exception { TestDataRow row = new TestDataRow(keySize, valSize); - list.insertDataRow(row, IoStatisticsHolderNoOp.INSTANCE); + if (batched) + rows.add(row); + else { + list.insertDataRow(row, IoStatisticsHolderNoOp.INSTANCE); + + assertTrue(row.link() != 0L); + + TestDataRow old = stored.put(row.link(), row); + + assertNull(old); + } + + if (rows.size() == BATCH_SIZE) { + list.insertDataRows(rows, IoStatisticsHolderNoOp.INSTANCE, new CAX() { + @Override public void applyx() throws IgniteCheckedException { + // No-op. + } + }); + + for (TestDataRow row0 : rows) { + assertTrue(row0.link() != 0L); - assertTrue(row.link() != 0L); + TestDataRow old = stored.put(row0.link(), row0); - TestDataRow old = stored.put(row.link(), row); + assertNull(old); + } - assertNull(old); + rows.clear(); + } } else { Iterator it = stored.values().iterator(); diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite.java index 220c41ff9d620..12efe5302489c 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite.java @@ -35,6 +35,7 @@ import org.apache.ignite.internal.processors.cache.persistence.pagemem.BPlusTreeReuseListPageMemoryImplTest; import org.apache.ignite.internal.processors.cache.persistence.pagemem.FillFactorMetricTest; import org.apache.ignite.internal.processors.cache.persistence.pagemem.IndexStoragePageMemoryImplTest; +import org.apache.ignite.internal.processors.cache.persistence.pagemem.MemoryLeakAfterRebalanceSelfTest; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImplNoLoadTest; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImplTest; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryNoStoreLeakTest; @@ -83,6 +84,8 @@ public static List> suite() { ignoredTests.add(IgnitePdsDestroyCacheTest.class); ignoredTests.add(IgnitePdsDestroyCacheWithoutCheckpointsTest.class); + ignoredTests.add(MemoryLeakAfterRebalanceSelfTest.class); + return new ArrayList<>(IgnitePdsTestSuite.suite(ignoredTests)); } } diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java index f865bb8363967..74e763b82c378 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java @@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.persistence.pagemem.BPlusTreeReuseListPageMemoryImplTest; import org.apache.ignite.internal.processors.cache.persistence.pagemem.FillFactorMetricTest; import org.apache.ignite.internal.processors.cache.persistence.pagemem.IndexStoragePageMemoryImplTest; +import org.apache.ignite.internal.processors.cache.persistence.pagemem.MemoryLeakAfterRebalanceSelfTest; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImplNoLoadTest; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImplTest; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryLazyAllocationTest; @@ -98,6 +99,8 @@ public static List> suite(Collection ignoredTests) { //GridTestUtils.addTestIfNeeded(suite, PageIdDistributionTest.class, ignoredTests); //GridTestUtils.addTestIfNeeded(suite, TrackingPageIOTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, MemoryLeakAfterRebalanceSelfTest.class, ignoredTests); + // BTree tests with store page memory. GridTestUtils.addTestIfNeeded(suite, BPlusTreePageMemoryImplTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, BPlusTreeReuseListPageMemoryImplTest.class, ignoredTests);