diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java index 243d99d93e46f..6e5dcc861b941 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java @@ -137,6 +137,7 @@ public void initializeForCache(CacheGroupDescriptor grpDesc, StoredCacheData cac * @param grpId Cache group ID. * @param pageId Page ID. * @param pageBuf Page buffer to write. + * @param tag Partition tag (growing 1-based partition file version). Used to validate page is not outdated * @throws IgniteCheckedException If failed to write page. */ public void write(int grpId, long pageId, ByteBuffer pageBuf, int tag) throws IgniteCheckedException; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheCompressionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheCompressionManager.java deleted file mode 100644 index a3900c3bfa349..0000000000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheCompressionManager.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import java.io.File; -import java.nio.ByteBuffer; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.DataStorageConfiguration; -import org.apache.ignite.configuration.DiskPageCompression; -import org.apache.ignite.internal.pagemem.store.PageStore; -import org.apache.ignite.internal.processors.compress.CompressionProcessor; -import org.apache.ignite.internal.util.typedef.internal.U; - -import static org.apache.ignite.internal.processors.compress.CompressionProcessor.checkCompressionLevelBounds; -import static org.apache.ignite.internal.processors.compress.CompressionProcessor.getDefaultCompressionLevel; - -/** - * Cache compression manager. - */ -public class CacheCompressionManager extends GridCacheManagerAdapter { - /** */ - private DiskPageCompression diskPageCompression; - - /** */ - private int diskPageCompressLevel; - - /** */ - private CompressionProcessor compressProc; - - /** {@inheritDoc} */ - @Override protected void start0() throws IgniteCheckedException { - if (cctx.kernalContext().clientNode()) { - diskPageCompression = DiskPageCompression.DISABLED; - - return; - } - - compressProc = cctx.kernalContext().compress(); - - CacheConfiguration cfg = cctx.config(); - - diskPageCompression = cctx.kernalContext().config().isClientMode() ? null : cfg.getDiskPageCompression(); - - if (diskPageCompression != DiskPageCompression.DISABLED) { - if (!cctx.dataRegion().config().isPersistenceEnabled()) - throw new IgniteCheckedException("Disk page compression makes sense only with enabled persistence."); - - Integer lvl = cfg.getDiskPageCompressionLevel(); - diskPageCompressLevel = lvl != null ? - checkCompressionLevelBounds(lvl, diskPageCompression) : - getDefaultCompressionLevel(diskPageCompression); - - DataStorageConfiguration dsCfg = cctx.kernalContext().config().getDataStorageConfiguration(); - - File dbPath = cctx.kernalContext().pdsFolderResolver().resolveFolders().persistentStoreRootPath(); - - assert dbPath != null; - - compressProc.checkPageCompressionSupported(dbPath.toPath(), dsCfg.getPageSize()); - - if (log.isInfoEnabled()) { - log.info("Disk page compression is enabled [cache=" + cctx.name() + - ", compression=" + diskPageCompression + ", level=" + diskPageCompressLevel + "]"); - } - } - } - - /** - * @param page Page buffer. - * @param store Page store. - * @return Compressed or the same buffer. - * @throws IgniteCheckedException If failed. - */ - public ByteBuffer compressPage(ByteBuffer page, PageStore store) throws IgniteCheckedException { - if (diskPageCompression == DiskPageCompression.DISABLED) - return page; - - int blockSize = store.getBlockSize(); - - if (blockSize <= 0) - throw new IgniteCheckedException("Failed to detect storage block size on " + U.osString()); - - return compressProc.compressPage(page, store.getPageSize(), blockSize, diskPageCompression, diskPageCompressLevel); - } -} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 03f26be0557b2..41da76d23e052 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -183,9 +183,6 @@ public class GridCacheContext implements Externalizable { /** Store manager. */ private CacheStoreManager storeMgr; - /** Compression manager. */ - private CacheCompressionManager compressMgr; - /** Replication manager. */ private GridCacheDrManager drMgr; @@ -336,7 +333,6 @@ public GridCacheContext( * =========================== */ - CacheCompressionManager compressMgr, GridCacheEventManager evtMgr, CacheStoreManager storeMgr, CacheEvictionManager evictMgr, @@ -355,7 +351,6 @@ public GridCacheContext( assert cacheCfg != null; assert locStartTopVer != null : cacheCfg.getName(); - assert compressMgr != null; assert grp != null; assert evtMgr != null; assert storeMgr != null; @@ -382,7 +377,6 @@ public GridCacheContext( * Managers in starting order! * =========================== */ - this.compressMgr = add(compressMgr); this.evtMgr = add(evtMgr); this.storeMgr = add(storeMgr); this.evictMgr = add(evictMgr); @@ -1246,13 +1240,6 @@ public GridCacheVersion[] emptyVersion() { return EMPTY_VERSION; } - /** - * @return Compression manager. - */ - public CacheCompressionManager compress() { - return compressMgr; - } - /** * Sets cache object context. * diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 91d29b00b5125..dea6f7ca7be10 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1216,7 +1216,6 @@ private void onKernalStop(GridCacheAdapter cache, boolean cancel) { boolean nearEnabled = GridCacheUtils.isNearEnabled(cfg); - CacheCompressionManager compressMgr = new CacheCompressionManager(); GridCacheAffinityManager affMgr = new GridCacheAffinityManager(); GridCacheEventManager evtMgr = new GridCacheEventManager(); CacheEvictionManager evictMgr = (nearEnabled || cfg.isOnheapCacheEnabled()) @@ -1258,7 +1257,6 @@ private void onKernalStop(GridCacheAdapter cache, boolean cancel) { * Managers in starting order! * =========================== */ - compressMgr, evtMgr, storeMgr, evictMgr, @@ -1397,7 +1395,6 @@ private void onKernalStop(GridCacheAdapter cache, boolean cancel) { * Managers in starting order! * =========================== */ - compressMgr, evtMgr, storeMgr, evictMgr, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index 511ab253be756..a6d4bd645d299 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -96,6 +96,7 @@ import org.apache.ignite.internal.metric.IoStatisticsHolderNoOp; import org.apache.ignite.internal.pagemem.FullPageId; import org.apache.ignite.internal.pagemem.PageIdAllocator; +import org.apache.ignite.internal.pagemem.PageIdUtils; import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.pagemem.PageUtils; import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager; @@ -3159,7 +3160,17 @@ private void finalizeCheckpointOnRecovery( long pageId = fullPageId.pageId(); // Write buf to page store. - PageStore store = storeMgr.writeInternal(groupId, pageId, buf, tag, true); + int partId = PageIdUtils.partId(pageId); + PageStore store = storeMgr.getStore(groupId, partId); + + try { + store.write(pageId, buf, tag, true); + } + catch (StorageException e) { + cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e)); + + throw e; + } // Save store for future fsync. updStores.add(store); @@ -4763,7 +4774,18 @@ private PageStoreWriter createPageStoreWriter(Map getCheckpointer().currentProgress().updateWrittenPages(1); - PageStore store = storeMgr.writeInternal(groupId, pageId, buf, tag, true); + int partId = PageIdUtils.partId(pageId); + + PageStore store = storeMgr.getStore(groupId, partId); + + try { + store.write(pageId, buf, tag, true); + } + catch (StorageException e) { + cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e)); + + throw e; + } updStores.computeIfAbsent(store, k -> new LongAdder()).increment(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java index 05f4a733234c1..e30383797e606 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java @@ -588,67 +588,26 @@ public void read(int grpId, long pageId, ByteBuffer pageBuf, boolean keepCrc) th /** {@inheritDoc} */ @Override public void write(int grpId, long pageId, ByteBuffer pageBuf, int tag) throws IgniteCheckedException { - writeInternal(grpId, pageId, pageBuf, tag, true); - } - - /** {@inheritDoc} */ - @Override public long pageOffset(int grpId, long pageId) throws IgniteCheckedException { - PageStore store = getStore(grpId, PageIdUtils.partId(pageId)); - - return store.pageOffset(pageId); - } - - /** - * @param cacheId Cache ID to write. - * @param pageId Page ID. - * @param pageBuf Page buffer. - * @param tag Partition tag (growing 1-based partition file version). Used to validate page is not outdated - * @param calculateCrc if {@code False} crc calculation will be forcibly skipped. - * @return PageStore to which the page has been written. - * @throws IgniteCheckedException If IO error occurred. - */ - public PageStore writeInternal(int cacheId, long pageId, ByteBuffer pageBuf, int tag, boolean calculateCrc) - throws IgniteCheckedException { int partId = PageIdUtils.partId(pageId); - PageStore store = getStore(cacheId, partId); + PageStore store = getStore(grpId, partId); try { - int pageSize = store.getPageSize(); - int compressedPageSize = pageSize; - - GridCacheContext cctx0 = cctx.cacheContext(cacheId); - - if (cctx0 != null) { - assert pageBuf.position() == 0 && pageBuf.limit() == pageSize : pageBuf; - - ByteBuffer compressedPageBuf = cctx0.compress().compressPage(pageBuf, store); - - if (compressedPageBuf != pageBuf) { - compressedPageSize = PageIO.getCompressedSize(compressedPageBuf); - - if (!calculateCrc) { - calculateCrc = true; - PageIO.setCrc(compressedPageBuf, 0); // It will be recalculated over compressed data further. - } - - PageIO.setCrc(pageBuf, 0); // It is expected to be reset to 0 after each write. - pageBuf = compressedPageBuf; - } - } - - store.write(pageId, pageBuf, tag, calculateCrc); - - if (pageSize > compressedPageSize) - store.punchHole(pageId, compressedPageSize); // TODO maybe add async punch mode? + // Do we still need to set calculateCrc as true? + store.write(pageId, pageBuf, tag, true); } catch (StorageException e) { cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e)); throw e; } + } - return store; + /** {@inheritDoc} */ + @Override public long pageOffset(int grpId, long pageId) throws IgniteCheckedException { + PageStore store = getStore(grpId, PageIdUtils.partId(pageId)); + + return store.pageOffset(pageId); } /** diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java index 38da7574f8838..74acc3a363fdc 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java @@ -22,7 +22,6 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager; -import org.apache.ignite.internal.processors.cache.CacheCompressionManager; import org.apache.ignite.internal.processors.cache.CacheDiagnosticManager; import org.apache.ignite.internal.processors.cache.CacheOsConflictResolutionManager; import org.apache.ignite.internal.processors.cache.CacheType; @@ -98,7 +97,6 @@ public GridCacheTestContext(GridTestKernalContext ctx) throws Exception { true, false, false, - new CacheCompressionManager(), new GridCacheEventManager(), new CacheOsStoreManager(null, new CacheConfiguration()), new GridCacheEvictionManager(),