From 26582201931337c8a44270bdf192f0f078fdab1a Mon Sep 17 00:00:00 2001 From: Maxim Muzafarov Date: Fri, 30 Nov 2018 21:06:33 +0300 Subject: [PATCH 01/10] Created diagnostic info processor. Handle rebalance states --- .../ignite/internal/GridKernalContext.java | 4 + .../internal/GridKernalContextImpl.java | 12 +- .../apache/ignite/internal/IgniteKernal.java | 3 + .../processors/cache/GridCacheMapEntry.java | 14 ++ .../GridCachePartitionExchangeManager.java | 3 + .../preloader/GridDhtPartitionDemander.java | 36 +++++ .../reader/StandaloneGridKernalContext.java | 6 + .../processors/diag/DiagnosticProcessor.java | 130 ++++++++++++++++++ .../processors/diag/DiagnosticTopics.java | 35 +++++ .../processors/diag/package-info.java | 22 +++ 10 files changed, 264 insertions(+), 1 deletion(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticProcessor.java create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticTopics.java create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/diag/package-info.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index e19450e254179..228ee100b97ea 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -35,6 +35,7 @@ import org.apache.ignite.internal.managers.loadbalancer.GridLoadBalancerManager; import org.apache.ignite.internal.processors.cache.mvcc.MvccProcessor; import org.apache.ignite.internal.processors.compress.CompressionProcessor; +import org.apache.ignite.internal.processors.diag.DiagnosticProcessor; import org.apache.ignite.internal.worker.WorkersRegistry; import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor; import org.apache.ignite.internal.processors.authentication.IgniteAuthenticationProcessor; @@ -466,6 +467,9 @@ public interface GridKernalContext extends Iterable { */ public FailureProcessor failure(); + /** */ + public DiagnosticProcessor diagnostic(); + /** * Print grid kernal memory stats (sizes of internal structures, etc.). * diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index ef69167b9b598..d2ad55a009efc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -50,6 +50,7 @@ import org.apache.ignite.internal.managers.loadbalancer.GridLoadBalancerManager; import org.apache.ignite.internal.processors.cache.mvcc.MvccProcessor; import org.apache.ignite.internal.processors.compress.CompressionProcessor; +import org.apache.ignite.internal.processors.diag.DiagnosticProcessor; import org.apache.ignite.internal.worker.WorkersRegistry; import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor; import org.apache.ignite.internal.processors.authentication.IgniteAuthenticationProcessor; @@ -420,6 +421,9 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable /** Failure processor. */ private FailureProcessor failureProc; + /** */ + private DiagnosticProcessor diagProc; + /** Recovery mode flag. Flag is set to {@code false} when discovery manager started. */ private boolean recoveryMode = true; @@ -577,7 +581,8 @@ else if (comp instanceof GridEncryptionManager) * Processors. * ========== */ - + else if (comp instanceof DiagnosticProcessor) + diagProc = (DiagnosticProcessor)comp; else if (comp instanceof FailureProcessor) failureProc = (FailureProcessor)comp; else if (comp instanceof GridTaskProcessor) @@ -1185,6 +1190,11 @@ void disconnected(boolean disconnected) { return failureProc; } + /** {@inheritDoc} */ + @Override public DiagnosticProcessor diagnostic() { + return diagProc; + } + /** {@inheritDoc} */ @Override public Thread.UncaughtExceptionHandler uncaughtExceptionHandler() { return hnd; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 284a4cb7dafca..52d0818035f8b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -141,6 +141,7 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor; import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor; import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor; +import org.apache.ignite.internal.processors.diag.DiagnosticProcessor; import org.apache.ignite.internal.processors.failure.FailureProcessor; import org.apache.ignite.internal.processors.hadoop.Hadoop; import org.apache.ignite.internal.processors.hadoop.HadoopProcessorAdapter; @@ -962,6 +963,8 @@ public void start( startProcessor(new FailureProcessor(ctx)); + startProcessor(new DiagnosticProcessor(ctx)); + startProcessor(new PoolProcessor(ctx)); // Closure processor should be started before all others diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index bbdff352711ff..92e94d2dadf82 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -76,6 +76,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx; +import org.apache.ignite.internal.processors.diag.DiagnosticTopics; import org.apache.ignite.internal.processors.dr.GridDrType; import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheFilter; @@ -121,6 +122,7 @@ import static org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter.RowData.NO_KEY; import static org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode.DUPLICATE_KEY; import static org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode.TRANSACTION_SERIALIZATION_ERROR; +import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.PRELOAD_UPDATED; import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE; /** @@ -2979,6 +2981,8 @@ protected final void update(@Nullable CacheObject val, long expireTime, long ttl assert lock.isHeldByCurrentThread(); assert ttl != CU.TTL_ZERO && ttl != CU.TTL_NOT_CHANGED && ttl >= 0 : ttl; + cctx.kernalContext().diagnostic().beginTrack(PRELOAD_UPDATED); + boolean trackNear = addTracked && isNear() && cctx.config().isEagerTtl(); long oldExpireTime = expireTimeExtras(); @@ -2995,6 +2999,8 @@ protected final void update(@Nullable CacheObject val, long expireTime, long ttl if (trackNear && expireTime != 0 && (expireTime != oldExpireTime || isStartVersion())) cctx.ttl().addTrackedEntry((GridNearCacheEntry)this); + + cctx.kernalContext().diagnostic().endTrack(PRELOAD_UPDATED); } /** @@ -3481,6 +3487,7 @@ else if (deletedUnlocked()) mvccVer == null ? MvccUtils.INITIAL_VERSION : mvccVer ))); } else { + cctx.kernalContext().diagnostic().beginTrack(DiagnosticTopics.PRELOAD_ON_WAL_LOG); cctx.shared().wal().log(new DataRecord(new DataEntry( cctx.cacheId(), key, @@ -3492,12 +3499,14 @@ else if (deletedUnlocked()) partition(), updateCntr ))); + cctx.kernalContext().diagnostic().endTrack(DiagnosticTopics.PRELOAD_ON_WAL_LOG); } } drReplicate(drType, val, ver, topVer); if (!skipQryNtf) { + cctx.kernalContext().diagnostic().beginTrack(DiagnosticTopics.PRELOAD_ON_ENTRY_UPDATED); cctx.continuousQueries().onEntryUpdated( key, val, @@ -3509,6 +3518,7 @@ else if (deletedUnlocked()) updateCntr, null, topVer); + cctx.kernalContext().diagnostic().endTrack(DiagnosticTopics.PRELOAD_ON_ENTRY_UPDATED); } onUpdateFinished(updateCntr); @@ -4299,10 +4309,14 @@ protected boolean storeValue( @Nullable IgnitePredicate predicate) throws IgniteCheckedException { assert lock.isHeldByCurrentThread(); + cctx.kernalContext().diagnostic().beginTrack(DiagnosticTopics.PRELOAD_STORE_ENTRY); + UpdateClosure closure = new UpdateClosure(this, val, ver, expireTime, predicate); cctx.offheap().invoke(cctx, key, localPartition(), closure); + cctx.kernalContext().diagnostic().endTrack(DiagnosticTopics.PRELOAD_STORE_ENTRY); + return closure.treeOp != IgniteTree.OperationType.NOOP; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 6dad367d698e3..77f9121d843e7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -138,6 +138,7 @@ import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE; import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.nextDumpTimeout; import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader.DFLT_PRELOAD_RESEND_TIMEOUT; +import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.TOTAL; /** * Partition exchange manager. @@ -3027,6 +3028,8 @@ else if (task instanceof ForceRebalanceExchangeTask) { if (task instanceof ForceRebalanceExchangeTask) forcedRebFut = ((ForceRebalanceExchangeTask)task).forcedRebalanceFuture(); + cctx.kernalContext().diagnostic().beginTrack(TOTAL); + for (Integer order : orderMap.descendingKeySet()) { for (Integer grpId : orderMap.get(order)) { CacheGroupContext grp = cctx.cache().cacheGroup(grpId); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index a75fae7d204aa..a6108c9c34cc3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -78,6 +78,11 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED; import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING; +import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.CLEAR_FUTS; +import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.PRELOAD_ENTRY; +import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.SEND_DEMAND; +import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.SEND_RECEIVE; +import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.TOTAL; import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE; import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRELOAD; @@ -372,6 +377,12 @@ Runnable addAssignments( log.debug(e.getMessage()); } }); + else + fut.listen(f -> { + ctx.kernalContext().diagnostic().endTrack(TOTAL); + + ctx.kernalContext().diagnostic().printStats(); + }); requestPartitions(fut, assignments); }; @@ -510,9 +521,13 @@ private void requestPartitions(final RebalanceFuture fut, GridDhtPreloaderAssign return; try { + ctx.kernalContext().diagnostic().beginTrack(SEND_DEMAND); + ctx.io().sendOrderedMessage(node, rebalanceTopics.get(topicId), demandMsg.convertIfNeeded(node.version()), grp.ioPolicy(), demandMsg.timeout()); + ctx.kernalContext().diagnostic().beginTrack(SEND_RECEIVE); + // Cleanup required in case partitions demanded in parallel with cancellation. synchronized (fut) { if (fut.isDone()) @@ -540,6 +555,9 @@ private void requestPartitions(final RebalanceFuture fut, GridDhtPreloaderAssign fut.cancel(); } + + ctx.kernalContext().diagnostic().endTrack(SEND_DEMAND); + }, true)); } } @@ -562,6 +580,10 @@ private IgniteInternalFuture clearFullPartitions(RebalanceFuture fut, Set ctx.kernalContext().diagnostic().endTrack(CLEAR_FUTS)); + for (GridCacheContext cctx : grp.caches()) { if (cctx.statisticsEnabled()) { final CacheMetricsImpl metrics = cctx.cache().metrics0(); @@ -661,6 +683,8 @@ public void handleSupplyMessage( final UUID nodeId, final GridDhtPartitionSupplyMessage supplyMsg ) { + ctx.kernalContext().diagnostic().endTrack(SEND_RECEIVE); + AffinityTopologyVersion topVer = supplyMsg.topologyVersion(); final RebalanceFuture fut = rebalanceFut; @@ -858,9 +882,13 @@ public void handleSupplyMessage( if (!topologyChanged(fut) && !fut.isDone()) { // Send demand message. try { + ctx.kernalContext().diagnostic().beginTrack(SEND_DEMAND); + ctx.io().sendOrderedMessage(node, rebalanceTopics.get(topicId), d.convertIfNeeded(node.version()), grp.ioPolicy(), grp.config().getRebalanceTimeout()); + ctx.kernalContext().diagnostic().beginTrack(SEND_RECEIVE); + if (log.isDebugEnabled()) log.debug("Send next demand message [" + demandRoutineInfo(topicId, nodeId, supplyMsg) + "]"); } @@ -869,6 +897,9 @@ public void handleSupplyMessage( log.debug("Supplier has left [" + demandRoutineInfo(topicId, nodeId, supplyMsg) + ", errMsg=" + e.getMessage() + ']'); } + + ctx.kernalContext().diagnostic().endTrack(SEND_DEMAND); + } else { if (log.isDebugEnabled()) @@ -901,6 +932,8 @@ private boolean preloadEntry( assert ctx.database().checkpointLockIsHeldByThread(); try { + ctx.kernalContext().diagnostic().beginTrack(PRELOAD_ENTRY); + GridCacheEntryEx cached = null; try { @@ -966,6 +999,9 @@ else if (log.isTraceEnabled()) throw new IgniteCheckedException("Failed to cache rebalanced entry (will stop rebalancing) [local=" + ctx.localNode() + ", node=" + from.id() + ", key=" + entry.key() + ", part=" + p + ']', e); } + finally { + ctx.kernalContext().diagnostic().endTrack(PRELOAD_ENTRY); + } return true; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java index 03bac2baccf93..e6f683b691875 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java @@ -61,6 +61,7 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor; import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor; import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor; +import org.apache.ignite.internal.processors.diag.DiagnosticProcessor; import org.apache.ignite.internal.processors.failure.FailureProcessor; import org.apache.ignite.internal.processors.hadoop.HadoopHelper; import org.apache.ignite.internal.processors.hadoop.HadoopProcessorAdapter; @@ -494,6 +495,11 @@ protected IgniteConfiguration prepareIgniteConfiguration() { return null; } + /** {@inheritDoc} */ + @Override public DiagnosticProcessor diagnostic() { + return null; + } + /** {@inheritDoc} */ @Override public void printMemoryStats() { } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticProcessor.java new file mode 100644 index 0000000000000..ee8c63eb82c26 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticProcessor.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.diag; + +import java.util.Comparator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.GridProcessorAdapter; +import org.apache.ignite.internal.util.typedef.internal.U; + +import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.TOTAL; + +/** + * General rebalance diagnostic processing API + */ +public class DiagnosticProcessor extends GridProcessorAdapter { + /** */ + private final ConcurrentMap counts = new ConcurrentHashMap<>(); + + /** */ + private final ConcurrentMap tracks = new ConcurrentHashMap<>(); + + /** */ + private volatile boolean enabled = false; + + /** + * @param ctx Context. + */ + public DiagnosticProcessor(GridKernalContext ctx) { + super(ctx); + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteCheckedException { + + U.quietAndInfo(log, "DiagnosticProcessor started"); + } + + /** {@inheritDoc} */ + @Override public void stop(boolean cancel) throws IgniteCheckedException { + super.stop(cancel); + + counts.clear(); + } + + /** */ + public void beginTrack(DiagnosticTopics topic) { + beginTrack(topic.getName()); + } + + /** */ + public void endTrack(DiagnosticTopics topic) { + endTrack(topic.getName()); + } + + /** */ + public synchronized void beginTrack(String topic) { + if (TOTAL.getName().equals(topic)) + enabled = true; + + if (!enabled) + return; + + tracks.putIfAbsent(topic, U.currentTimeMillis()); + } + + /** */ + public synchronized void endTrack(String topic) { + if (!enabled) + return; + + if (TOTAL.getName().equals(topic)) + enabled = false; + + Long point = tracks.remove(topic); + + if (point == null) + return; + + counts.merge(topic, U.currentTimeMillis() - point, (a, b) -> a + b); + } + + /** */ + public synchronized void printStats() { + System.out.println("### Diagnostic processor info:"); + + Long total = counts.get(TOTAL.getName()); + + counts.entrySet() + .stream() + .sorted(new Comparator>() { + @Override public int compare(Map.Entry o1, Map.Entry o2) { + return o1.getValue().compareTo(o2.getValue()); + } + }) + .peek(e -> System.out.println("#### " + e.getKey() + " : " + e.getValue() + " ms")) + .count(); + + counts.clear(); + + if (!tracks.isEmpty()) { + System.out.println("### Unfinished tracks:"); + + tracks.entrySet() + .stream() + .peek(e -> System.out.println("#### " + e.getKey() + " : " + e.getValue())) + .count(); + } + + tracks.clear(); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticTopics.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticTopics.java new file mode 100644 index 0000000000000..fc92165647dce --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticTopics.java @@ -0,0 +1,35 @@ +package org.apache.ignite.internal.processors.diag; + +/** + * + */ +public enum DiagnosticTopics { + SEND_DEMAND("cache send demand message"), + SEND_RECEIVE("network delay"), + CLEAR_FUTS("cache wait clearAllFutures"), + TOTAL("total"), + PRELOAD_ENTRY("preload entries total"), + PRELOAD_ON_ENTRY_UPDATED("preload entry onEntryUpdated"), + PRELOAD_ON_WAL_LOG("preload entry wal log"), + PRELOAD_STORE_ENTRY("preload entry tree invoke"), + PRELOAD_UPDATED("preload entry updated"); + + /** */ + private String name; + + /** */ + DiagnosticTopics(String name) { + this.name = name; + } + + /** */ + public String getName() { + return name; + } + + /** */ + public DiagnosticTopics setName(String name) { + this.name = name; + return this; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/package-info.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/package-info.java new file mode 100644 index 0000000000000..0c90411c18416 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * + * Failure processor. + */ +package org.apache.ignite.internal.processors.diag; \ No newline at end of file From f39896f1416ce68750f07d74ba0028c72432df0f Mon Sep 17 00:00:00 2001 From: Maxim Muzafarov Date: Mon, 3 Dec 2018 14:47:37 +0300 Subject: [PATCH 02/10] IGNITE-10422: add percentage for rebalancing supplier --- .../dht/preloader/GridDhtPartitionSupplier.java | 13 +++++++++++++ .../processors/diag/DiagnosticProcessor.java | 5 ++++- .../internal/processors/diag/DiagnosticTopics.java | 7 ++++--- 3 files changed, 21 insertions(+), 4 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java index 92547667ca39e..df2dc28c79fd1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@ -53,6 +53,8 @@ import org.apache.ignite.spi.IgniteSpiException; import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING; +import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.SUPPLIER_PROCESS_MSG; +import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.TOTAL; /** * Class for supplying partitions to demanding nodes. @@ -254,6 +256,8 @@ public void handleDemandMessage(int topicId, UUID nodeId, GridDhtPartitionDemand long maxBatchesCnt = grp.config().getRebalanceBatchesPrefetchCount(); if (sctx == null) { + grp.singleCacheContext().kernalContext().diagnostic().beginTrack(TOTAL); + if (log.isDebugEnabled()) log.debug("Starting supplying rebalancing [" + supplyRoutineInfo(topicId, nodeId, demandMsg) + ", fullPartitions=" + S.compact(demandMsg.partitions().fullSet()) + @@ -315,6 +319,8 @@ public void handleDemandMessage(int topicId, UUID nodeId, GridDhtPartitionDemand long batchesCnt = 0; + grp.singleCacheContext().kernalContext().diagnostic().beginTrack(SUPPLIER_PROCESS_MSG); + while (iter.hasNext()) { if (supplyMsg.messageSize() >= msgMaxSize) { if (++batchesCnt >= maxBatchesCnt) { @@ -443,6 +449,10 @@ else if (iter.isPartitionMissing(p)) { reply(topicId, demanderNode, demandMsg, supplyMsg, contextId); + // Print statistics for the Supplier. + grp.singleCacheContext().kernalContext().diagnostic().endTrack(TOTAL); + grp.singleCacheContext().kernalContext().diagnostic().printStats(); + if (log.isInfoEnabled()) log.info("Finished supplying rebalancing [" + supplyRoutineInfo(topicId, nodeId, demandMsg) + "]"); } @@ -513,6 +523,9 @@ private boolean reply( GridDhtPartitionSupplyMessage supplyMsg, T3 contextId ) throws IgniteCheckedException { + + grp.singleCacheContext().kernalContext().diagnostic().endTrack(SUPPLIER_PROCESS_MSG); + try { if (log.isDebugEnabled()) log.debug("Send next supply message [" + supplyRoutineInfo(topicId, demander.id(), demandMsg) + "]"); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticProcessor.java index ee8c63eb82c26..2faa85fcbd19d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticProcessor.java @@ -111,7 +111,10 @@ public synchronized void printStats() { return o1.getValue().compareTo(o2.getValue()); } }) - .peek(e -> System.out.println("#### " + e.getKey() + " : " + e.getValue() + " ms")) + .peek(e -> System.out.printf("#### %s : %s ms : %.2f \r\n", + e.getKey(), + e.getValue(), + ((float)e.getValue() / total * 100))) .count(); counts.clear(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticTopics.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticTopics.java index fc92165647dce..0a0232a8820df 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticTopics.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticTopics.java @@ -4,15 +4,16 @@ * */ public enum DiagnosticTopics { - SEND_DEMAND("cache send demand message"), - SEND_RECEIVE("network delay"), + SEND_DEMAND("demand message serialization"), + SEND_RECEIVE("network delay between nodes"), CLEAR_FUTS("cache wait clearAllFutures"), TOTAL("total"), PRELOAD_ENTRY("preload entries total"), PRELOAD_ON_ENTRY_UPDATED("preload entry onEntryUpdated"), PRELOAD_ON_WAL_LOG("preload entry wal log"), PRELOAD_STORE_ENTRY("preload entry tree invoke"), - PRELOAD_UPDATED("preload entry updated"); + PRELOAD_UPDATED("preload entry updated"), + SUPPLIER_PROCESS_MSG("prepare message by supplier"); /** */ private String name; From be0753bd7ff6fb46bfb0f801906f653a53448ff3 Mon Sep 17 00:00:00 2001 From: Maxim Muzafarov Date: Mon, 3 Dec 2018 18:30:16 +0300 Subject: [PATCH 03/10] IGNITE-10422: remove redundant logging --- .../wal/FileWriteAheadLogManager.java | 8 +++---- .../processors/diag/DiagnosticProcessor.java | 22 ++++++++++--------- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index fad1ec1039d5a..6abb59399220b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -1760,8 +1760,8 @@ private SegmentArchiveResult archiveSegment(long absIdx) throws StorageException File dstFile = new File(walArchiveDir, name); - if (log.isInfoEnabled()) - log.info("Starting to copy WAL segment [absIdx=" + absIdx + ", segIdx=" + segIdx + + if (log.isDebugEnabled()) + log.debug("Starting to copy WAL segment [absIdx=" + absIdx + ", segIdx=" + segIdx + ", origFile=" + origFile.getAbsolutePath() + ", dstFile=" + dstFile.getAbsolutePath() + ']'); try { @@ -1783,8 +1783,8 @@ private SegmentArchiveResult archiveSegment(long absIdx) throws StorageException ", dstFile=" + dstTmpFile.getAbsolutePath() + ']', e); } - if (log.isInfoEnabled()) - log.info("Copied file [src=" + origFile.getAbsolutePath() + + if (log.isDebugEnabled()) + log.debug("Copied file [src=" + origFile.getAbsolutePath() + ", dst=" + dstFile.getAbsolutePath() + ']'); return new SegmentArchiveResult(absIdx, origFile, dstFile); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticProcessor.java index 2faa85fcbd19d..7a8f48f6d1308 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticProcessor.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.GridProcessorAdapter; @@ -100,32 +101,33 @@ public synchronized void endTrack(String topic) { /** */ public synchronized void printStats() { - System.out.println("### Diagnostic processor info:"); - Long total = counts.get(TOTAL.getName()); - counts.entrySet() + String out = counts.entrySet() .stream() .sorted(new Comparator>() { @Override public int compare(Map.Entry o1, Map.Entry o2) { return o1.getValue().compareTo(o2.getValue()); } }) - .peek(e -> System.out.printf("#### %s : %s ms : %.2f \r\n", + .map(e -> String.format("#### %s : %s ms : %.2f", e.getKey(), e.getValue(), ((float)e.getValue() / total * 100))) - .count(); + .collect(Collectors.joining("\n")); + + log.info("\n### Diagnostic processor info: \n" + out); + counts.clear(); if (!tracks.isEmpty()) { - System.out.println("### Unfinished tracks:"); - - tracks.entrySet() + String str = tracks.entrySet() .stream() - .peek(e -> System.out.println("#### " + e.getKey() + " : " + e.getValue())) - .count(); + .map(e -> "#### " + e.getKey() + " : " + e.getValue()) + .collect(Collectors.joining("\n")); + + log.info("\n### Unfinished tracks: \n" + str); } tracks.clear(); From 2d7f80916611fd579df1b883296f8cc31e510bb6 Mon Sep 17 00:00:00 2001 From: Maxim Muzafarov Date: Tue, 4 Dec 2018 10:11:05 +0300 Subject: [PATCH 04/10] IGNITE-10422: improve logging details --- .../processors/cache/GridCacheMapEntry.java | 4 +- .../cache/IgniteCacheOffheapManagerImpl.java | 15 ++++- .../preloader/GridDhtPartitionDemander.java | 5 -- .../cache/persistence/RowStore.java | 5 +- .../wal/filehandle/FileHandleManagerImpl.java | 2 + .../wal/filehandle/FileWriteHandleImpl.java | 7 +++ .../cache/query/GridCacheQueryManager.java | 10 ++++ .../processors/diag/DiagnosticProcessor.java | 56 ++++++++++++------- .../processors/diag/DiagnosticTopics.java | 56 +++++++++++++++---- 9 files changed, 120 insertions(+), 40 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 92e94d2dadf82..6fbbb987b68f5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -4309,13 +4309,13 @@ protected boolean storeValue( @Nullable IgnitePredicate predicate) throws IgniteCheckedException { assert lock.isHeldByCurrentThread(); - cctx.kernalContext().diagnostic().beginTrack(DiagnosticTopics.PRELOAD_STORE_ENTRY); + cctx.kernalContext().diagnostic().beginTrack(DiagnosticTopics.PRELOAD_OFFHEAP_INVOKE); UpdateClosure closure = new UpdateClosure(this, val, ver, expireTime, predicate); cctx.offheap().invoke(cctx, key, localPartition(), closure); - cctx.kernalContext().diagnostic().endTrack(DiagnosticTopics.PRELOAD_STORE_ENTRY); + cctx.kernalContext().diagnostic().endTrack(DiagnosticTopics.PRELOAD_OFFHEAP_INVOKE); return closure.treeOp != IgniteTree.OperationType.NOOP; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index e15009ec6b9a1..ccfb54845b386 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -127,6 +127,9 @@ import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.unexpectedStateException; import static org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager.EMPTY_CURSOR; import static org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO.MVCC_INFO_SIZE; +import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.PRELOAD_PENDING_TREE_PUT; +import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.PRELOAD_PENDING_TREE_REMOVE; +import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.PRELOAD_TREE_INVOKE; import static org.apache.ignite.internal.util.IgniteTree.OperationType.NOOP; import static org.apache.ignite.internal.util.IgniteTree.OperationType.PUT; @@ -1664,8 +1667,12 @@ private void invoke0(GridCacheContext cctx, CacheSearchRow row, OffheapInvokeClo try { assert cctx.shared().database().checkpointLockIsHeldByThread(); + ctx.kernalContext().diagnostic().beginTrack(PRELOAD_TREE_INVOKE); + dataTree.invoke(row, CacheDataRowAdapter.RowData.NO_KEY, c); + ctx.kernalContext().diagnostic().endTrack(PRELOAD_TREE_INVOKE); + switch (c.operationType()) { case PUT: { assert c.newRow() != null : c; @@ -2689,13 +2696,17 @@ private void updatePendingEntries(GridCacheContext cctx, CacheDataRow newRow, @N if (oldRow != null) { assert oldRow.link() != 0 : oldRow; - if (pendingTree() != null && oldRow.expireTime() != 0) + if (pendingTree() != null && oldRow.expireTime() != 0) { + cctx.kernalContext().diagnostic().beginTrack(PRELOAD_PENDING_TREE_REMOVE); pendingTree().removex(new PendingRow(cacheId, oldRow.expireTime(), oldRow.link())); + cctx.kernalContext().diagnostic().endTrack(PRELOAD_PENDING_TREE_REMOVE); + } } if (pendingTree() != null && expireTime != 0) { + cctx.kernalContext().diagnostic().beginTrack(PRELOAD_PENDING_TREE_PUT); pendingTree().putx(new PendingRow(cacheId, expireTime, newRow.link())); - + cctx.kernalContext().diagnostic().endTrack(PRELOAD_PENDING_TREE_PUT); hasPendingEntries = true; } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index a6108c9c34cc3..de116725e0ef7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -78,7 +78,6 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED; import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING; -import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.CLEAR_FUTS; import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.PRELOAD_ENTRY; import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.SEND_DEMAND; import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.SEND_RECEIVE; @@ -580,10 +579,6 @@ private IgniteInternalFuture clearFullPartitions(RebalanceFuture fut, Set ctx.kernalContext().diagnostic().endTrack(CLEAR_FUTS)); - for (GridCacheContext cctx : grp.caches()) { if (cctx.statisticsEnabled()) { final CacheMetricsImpl metrics = cctx.cache().metrics0(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java index 6900b7ee6a0e7..b3b05fd7699a6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java @@ -26,6 +26,8 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler; import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner; +import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.PRELOAD_FREELIST_REMOVE; + /** * Data store for H2 rows. */ @@ -79,11 +81,12 @@ public void removeRow(long link) throws IgniteCheckedException { freeList.removeDataRowByLink(link); else { ctx.database().checkpointReadLock(); - + ctx.kernalContext().diagnostic().beginTrack(PRELOAD_FREELIST_REMOVE); try { freeList.removeDataRowByLink(link); } finally { + ctx.kernalContext().diagnostic().endTrack(PRELOAD_FREELIST_REMOVE); ctx.database().checkpointReadUnlock(); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java index 1daac31cafc60..c37d66029745d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.LockSupport; import java.util.function.Supplier; import org.apache.ignite.IgniteCheckedException; @@ -49,6 +50,7 @@ import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR; import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION; import static org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer.BufferMode.DIRECT; +import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.PRELOAD_ON_WAL_FLUSH; import static org.apache.ignite.internal.util.IgniteUtils.sleep; /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java index e40ada2de29a5..810c2e7a9b54b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java @@ -57,6 +57,7 @@ import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.prepareSerializerVersionBuffer; import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory.LATEST_SERIALIZER_VERSION; import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.HEADER_RECORD_SIZE; +import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.PRELOAD_ON_WAL_FLUSH; import static org.apache.ignite.internal.util.IgniteUtils.findField; import static org.apache.ignite.internal.util.IgniteUtils.findNonPublicMethod; @@ -309,15 +310,21 @@ public void flushOrWait(FileWALPointer ptr) throws IgniteCheckedException { * @param ptr Pointer. */ public void flush(FileWALPointer ptr) throws IgniteCheckedException { + Long st = U.currentTimeMillis(); + if (ptr == null) { // Unconditional flush. walWriter.flushAll(); + cctx.kernalContext().diagnostic().mergeSafe(PRELOAD_ON_WAL_FLUSH, U.currentTimeMillis() - st); + return; } assert ptr.index() == getSegmentId(); walWriter.flushBuffer(ptr.fileOffset()); + + cctx.kernalContext().diagnostic().mergeSafe(PRELOAD_ON_WAL_FLUSH, U.currentTimeMillis() - st); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index f42164e51bdcc..76becc48650ac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -137,6 +137,8 @@ import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL; import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS; import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.TEXT; +import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.PRELOAD_INDEXING_REMOVE; +import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.PRELOAD_INDEXING_STORE; /** * Query and index manager. @@ -388,6 +390,8 @@ public void store(CacheDataRow newRow, @Nullable CacheDataRow prevRow, if (!enterBusy()) throw new NodeStoppingException("Operation has been cancelled (node is stopping)."); + cctx.kernalContext().diagnostic().beginTrack(PRELOAD_INDEXING_STORE); + try { if (isIndexingSpiEnabled()) { CacheObjectContext coctx = cctx.cacheObjectContext(); @@ -403,6 +407,8 @@ public void store(CacheDataRow newRow, @Nullable CacheDataRow prevRow, qryProc.store(cctx, newRow, prevRow, prevRowAvailable); } finally { + cctx.kernalContext().diagnostic().endTrack(PRELOAD_INDEXING_STORE); + invalidateResultCache(); leaveBusy(); @@ -422,6 +428,8 @@ public void remove(KeyCacheObject key, @Nullable CacheDataRow prevRow) if (!enterBusy()) return; // Ignore index update when node is stopping. + cctx.kernalContext().diagnostic().beginTrack(PRELOAD_INDEXING_REMOVE); + try { if (isIndexingSpiEnabled()) { Object key0 = unwrapIfNeeded(key, cctx.cacheObjectContext()); @@ -434,6 +442,8 @@ public void remove(KeyCacheObject key, @Nullable CacheDataRow prevRow) qryProc.remove(cctx, prevRow); } finally { + cctx.kernalContext().diagnostic().endTrack(PRELOAD_INDEXING_REMOVE); + invalidateResultCache(); leaveBusy(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticProcessor.java index 7a8f48f6d1308..28d210cb028e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticProcessor.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.LongAdder; import java.util.stream.Collectors; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridKernalContext; @@ -34,13 +35,13 @@ */ public class DiagnosticProcessor extends GridProcessorAdapter { /** */ - private final ConcurrentMap counts = new ConcurrentHashMap<>(); + private final ConcurrentMap counts = new ConcurrentHashMap<>(); /** */ private final ConcurrentMap tracks = new ConcurrentHashMap<>(); /** */ - private volatile boolean enabled = false; + private volatile boolean enabled; /** * @param ctx Context. @@ -51,6 +52,8 @@ public DiagnosticProcessor(GridKernalContext ctx) { /** {@inheritDoc} */ @Override public void start() throws IgniteCheckedException { + for (DiagnosticTopics topics : DiagnosticTopics.values()) + counts.put(topics.getName(), new LongAdder()); U.quietAndInfo(log, "DiagnosticProcessor started"); } @@ -59,7 +62,7 @@ public DiagnosticProcessor(GridKernalContext ctx) { @Override public void stop(boolean cancel) throws IgniteCheckedException { super.stop(cancel); - counts.clear(); + resetCounts(); } /** */ @@ -91,45 +94,58 @@ public synchronized void endTrack(String topic) { if (TOTAL.getName().equals(topic)) enabled = false; - Long point = tracks.remove(topic); + Long value = tracks.remove(topic); - if (point == null) + if (value == null) return; - counts.merge(topic, U.currentTimeMillis() - point, (a, b) -> a + b); + counts.get(topic).add(U.currentTimeMillis() - value); + } + + /** */ + public void mergeSafe(DiagnosticTopics topic, Long value) { + mergeSafe(topic.getName(), U.currentTimeMillis() - value); + } + + /** */ + public void mergeSafe(String topic, Long value) { + if (enabled) + counts.get(topic).add(value); } /** */ public synchronized void printStats() { - Long total = counts.get(TOTAL.getName()); + Long total = counts.get(TOTAL.getName()).longValue(); String out = counts.entrySet() .stream() - .sorted(new Comparator>() { - @Override public int compare(Map.Entry o1, Map.Entry o2) { - return o1.getValue().compareTo(o2.getValue()); - } - }) - .map(e -> String.format("#### %s : %s ms : %.2f", + .filter(e -> e.getValue().longValue() != 0) + .sorted(Comparator.comparingInt(o -> DiagnosticTopics.get(o.getKey()).ordinal())) + .map(e -> String.format("# %s : %.3f s : %.2f", e.getKey(), - e.getValue(), - ((float)e.getValue() / total * 100))) + (float)(e.getValue().longValue() / 1000), + ((float)e.getValue().longValue() / total * 100))) .collect(Collectors.joining("\n")); - log.info("\n### Diagnostic processor info: \n" + out); - + log.info("\n# Diagnostic processor info: \n" + out); - counts.clear(); + resetCounts(); if (!tracks.isEmpty()) { String str = tracks.entrySet() .stream() - .map(e -> "#### " + e.getKey() + " : " + e.getValue()) + .map(e -> "# " + e.getKey() + " : " + e.getValue()) .collect(Collectors.joining("\n")); - log.info("\n### Unfinished tracks: \n" + str); + log.info("\n# Unfinished tracks: \n" + str); } tracks.clear(); } + + /** */ + public synchronized void resetCounts() { + for (Map.Entry e : counts.entrySet()) + e.getValue().reset(); + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticTopics.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticTopics.java index 0a0232a8820df..11a3005899b24 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticTopics.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticTopics.java @@ -1,19 +1,50 @@ package org.apache.ignite.internal.processors.diag; +import java.util.HashMap; +import java.util.Map; + /** * */ public enum DiagnosticTopics { - SEND_DEMAND("demand message serialization"), - SEND_RECEIVE("network delay between nodes"), - CLEAR_FUTS("cache wait clearAllFutures"), - TOTAL("total"), - PRELOAD_ENTRY("preload entries total"), - PRELOAD_ON_ENTRY_UPDATED("preload entry onEntryUpdated"), - PRELOAD_ON_WAL_LOG("preload entry wal log"), - PRELOAD_STORE_ENTRY("preload entry tree invoke"), - PRELOAD_UPDATED("preload entry updated"), - SUPPLIER_PROCESS_MSG("prepare message by supplier"); + /** Root. */ + TOTAL("# rebalance total"), + /** GridDhtPartitionDemander#preloadEntry(..) */ + PRELOAD_ENTRY("## preload entry total"), + /** GridCacheMapEntry#storeValue(..) */ + PRELOAD_OFFHEAP_INVOKE("### initialValue(..) -> offheap().invoke(..)"), + /** CacheDataStoreImpl#invoke0(..) */ + PRELOAD_TREE_INVOKE("#### tree.invoke(..)"), + /** CacheDataStoreImpl.finishUpdate(..) */ + PRELOAD_INDEXING_STORE("#### finishUpdate -> indexing().store(..)"), + /** CacheDataStoreImpl.finishUpdate(..) */ + PRELOAD_PENDING_TREE_REMOVE("#### finishUpdate -> pendingTree().removex(..)"), + /** CacheDataStoreImpl.finishUpdate(..) */ + PRELOAD_PENDING_TREE_PUT("#### finishUpdate -> pendingTree().putx(..)"), + /** CacheDataStoreImpl#finishRemove(..) */ + PRELOAD_INDEXING_REMOVE("#### finishRemove -> indexing().remove(..)"), + /** CacheDataStoreImpl#finishRemove(..) */ + PRELOAD_FREELIST_REMOVE("#### finishRemove -> freeList.removeDataRowByLink(..)"), + /** */ + PRELOAD_UPDATED("### initialValue(..) -> GridCacheMapEntry.updated(..)"), + /** */ + PRELOAD_ON_WAL_LOG("### initialValue(..) -> wal.log(..)"), + /** */ + PRELOAD_ON_WAL_FLUSH("#### wal.log(..) -> flushBuffer(..)"), + /** */ + PRELOAD_ON_ENTRY_UPDATED("### initialValue(..) -> cq().onEntryUpdated(..)"), + + SEND_DEMAND("# demand message serialization"), + SEND_RECEIVE("# network delay between nodes"), + SUPPLIER_PROCESS_MSG("# prepare message supplier"); + + /** Reverse-lookup map for getting a day from an abbreviation */ + private static final Map lookup = new HashMap(); + + static { + for (DiagnosticTopics t : DiagnosticTopics.values()) + lookup.put(t.getName(), t); + } /** */ private String name; @@ -23,6 +54,11 @@ public enum DiagnosticTopics { this.name = name; } + /** */ + public static DiagnosticTopics get(String topic) { + return lookup.get(topic); + } + /** */ public String getName() { return name; From 8fcc7bdb6ae6e6675be90793d029c57de2f199f9 Mon Sep 17 00:00:00 2001 From: Maxim Muzafarov Date: Tue, 4 Dec 2018 11:06:34 +0300 Subject: [PATCH 05/10] add row topic --- .../processors/cache/IgniteCacheOffheapManagerImpl.java | 3 +++ .../persistence/wal/filehandle/FileWriteHandleImpl.java | 7 ------- .../ignite/internal/processors/diag/DiagnosticTopics.java | 5 +++-- 3 files changed, 6 insertions(+), 9 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index ccfb54845b386..18aed14ff71df 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -129,6 +129,7 @@ import static org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO.MVCC_INFO_SIZE; import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.PRELOAD_PENDING_TREE_PUT; import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.PRELOAD_PENDING_TREE_REMOVE; +import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.PRELOAD_TREE_ADD_ROW; import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.PRELOAD_TREE_INVOKE; import static org.apache.ignite.internal.util.IgniteTree.OperationType.NOOP; import static org.apache.ignite.internal.util.IgniteTree.OperationType.PUT; @@ -1716,6 +1717,7 @@ private void invoke0(GridCacheContext cctx, CacheSearchRow row, OffheapInvokeClo DataRow dataRow = makeDataRow(key, val, ver, expireTime, cacheId); + ctx.kernalContext().diagnostic().beginTrack(PRELOAD_TREE_ADD_ROW); if (canUpdateOldRow(cctx, oldRow, dataRow) && rowStore.updateRow(oldRow.link(), dataRow)) dataRow.link(oldRow.link()); else { @@ -1726,6 +1728,7 @@ private void invoke0(GridCacheContext cctx, CacheSearchRow row, OffheapInvokeClo rowStore.addRow(dataRow); } + ctx.kernalContext().diagnostic().endTrack(PRELOAD_TREE_ADD_ROW); assert dataRow.link() != 0 : dataRow; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java index 810c2e7a9b54b..e40ada2de29a5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java @@ -57,7 +57,6 @@ import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.prepareSerializerVersionBuffer; import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory.LATEST_SERIALIZER_VERSION; import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.HEADER_RECORD_SIZE; -import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.PRELOAD_ON_WAL_FLUSH; import static org.apache.ignite.internal.util.IgniteUtils.findField; import static org.apache.ignite.internal.util.IgniteUtils.findNonPublicMethod; @@ -310,21 +309,15 @@ public void flushOrWait(FileWALPointer ptr) throws IgniteCheckedException { * @param ptr Pointer. */ public void flush(FileWALPointer ptr) throws IgniteCheckedException { - Long st = U.currentTimeMillis(); - if (ptr == null) { // Unconditional flush. walWriter.flushAll(); - cctx.kernalContext().diagnostic().mergeSafe(PRELOAD_ON_WAL_FLUSH, U.currentTimeMillis() - st); - return; } assert ptr.index() == getSegmentId(); walWriter.flushBuffer(ptr.fileOffset()); - - cctx.kernalContext().diagnostic().mergeSafe(PRELOAD_ON_WAL_FLUSH, U.currentTimeMillis() - st); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticTopics.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticTopics.java index 11a3005899b24..83ce2f3e77aab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticTopics.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticTopics.java @@ -15,6 +15,9 @@ public enum DiagnosticTopics { PRELOAD_OFFHEAP_INVOKE("### initialValue(..) -> offheap().invoke(..)"), /** CacheDataStoreImpl#invoke0(..) */ PRELOAD_TREE_INVOKE("#### tree.invoke(..)"), + /** */ + PRELOAD_TREE_ADD_ROW("#### rowStore.addRow(..)"), + /** CacheDataStoreImpl.finishUpdate(..) */ PRELOAD_INDEXING_STORE("#### finishUpdate -> indexing().store(..)"), /** CacheDataStoreImpl.finishUpdate(..) */ @@ -30,8 +33,6 @@ public enum DiagnosticTopics { /** */ PRELOAD_ON_WAL_LOG("### initialValue(..) -> wal.log(..)"), /** */ - PRELOAD_ON_WAL_FLUSH("#### wal.log(..) -> flushBuffer(..)"), - /** */ PRELOAD_ON_ENTRY_UPDATED("### initialValue(..) -> cq().onEntryUpdated(..)"), SEND_DEMAND("# demand message serialization"), From 298e8df5062e8ec196f531c1c0317a369caf3718 Mon Sep 17 00:00:00 2001 From: Maxim Muzafarov Date: Tue, 4 Dec 2018 11:09:47 +0300 Subject: [PATCH 06/10] add row topic 2 --- .../cache/persistence/wal/filehandle/FileHandleManagerImpl.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java index c37d66029745d..1daac31cafc60 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java @@ -24,7 +24,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.LockSupport; import java.util.function.Supplier; import org.apache.ignite.IgniteCheckedException; @@ -50,7 +49,6 @@ import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR; import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION; import static org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer.BufferMode.DIRECT; -import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.PRELOAD_ON_WAL_FLUSH; import static org.apache.ignite.internal.util.IgniteUtils.sleep; /** From dc3d6d230f8433a4899e6948b2fc3fcfc6aada81 Mon Sep 17 00:00:00 2001 From: Maxim Muzafarov Date: Tue, 4 Dec 2018 13:24:12 +0300 Subject: [PATCH 07/10] add invoke counters --- .../cache/IgniteCacheOffheapManagerImpl.java | 4 +- .../preloader/GridDhtPartitionDemander.java | 3 ++ .../processors/diag/DiagnosticProcessor.java | 47 ++++++++++++------- .../processors/diag/DiagnosticTopics.java | 29 ++++++------ 4 files changed, 51 insertions(+), 32 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index 18aed14ff71df..486c85e60ace8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -130,6 +130,7 @@ import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.PRELOAD_PENDING_TREE_PUT; import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.PRELOAD_PENDING_TREE_REMOVE; import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.PRELOAD_TREE_ADD_ROW; +import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.PRELOAD_TREE_FINISH_UPDATE; import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.PRELOAD_TREE_INVOKE; import static org.apache.ignite.internal.util.IgniteTree.OperationType.NOOP; import static org.apache.ignite.internal.util.IgniteTree.OperationType.PUT; @@ -1679,8 +1680,9 @@ private void invoke0(GridCacheContext cctx, CacheSearchRow row, OffheapInvokeClo assert c.newRow() != null : c; CacheDataRow oldRow = c.oldRow(); - + ctx.kernalContext().diagnostic().beginTrack(PRELOAD_TREE_FINISH_UPDATE); finishUpdate(cctx, c.newRow(), oldRow); + ctx.kernalContext().diagnostic().endTrack(PRELOAD_TREE_FINISH_UPDATE); break; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index de116725e0ef7..85822e0929fe8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -726,6 +726,8 @@ public void handleSupplyMessage( final GridDhtPartitionTopology top = grp.topology(); + ctx.kernalContext().diagnostic().countMessage("demand messages total"); + if (grp.sharedGroup()) { for (GridCacheContext cctx : grp.caches()) { if (cctx.statisticsEnabled()) { @@ -811,6 +813,7 @@ public void handleSupplyMessage( break; } + ctx.kernalContext().diagnostic().countMessage("cache key received"); for (GridCacheContext cctx : grp.caches()) { if (cctx.statisticsEnabled()) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticProcessor.java index 28d210cb028e6..c001c4f04624c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticProcessor.java @@ -34,9 +34,15 @@ * General rebalance diagnostic processing API */ public class DiagnosticProcessor extends GridProcessorAdapter { + /** */ + private final ConcurrentMap timings = new ConcurrentHashMap<>(); + /** */ private final ConcurrentMap counts = new ConcurrentHashMap<>(); + /** */ + private final ConcurrentMap msgs = new ConcurrentHashMap<>(); + /** */ private final ConcurrentMap tracks = new ConcurrentHashMap<>(); @@ -52,8 +58,11 @@ public DiagnosticProcessor(GridKernalContext ctx) { /** {@inheritDoc} */ @Override public void start() throws IgniteCheckedException { - for (DiagnosticTopics topics : DiagnosticTopics.values()) + for (DiagnosticTopics topics : DiagnosticTopics.values()) { + timings.put(topics.getName(), new LongAdder()); + counts.put(topics.getName(), new LongAdder()); + } U.quietAndInfo(log, "DiagnosticProcessor started"); } @@ -99,35 +108,36 @@ public synchronized void endTrack(String topic) { if (value == null) return; - counts.get(topic).add(U.currentTimeMillis() - value); + timings.get(topic).add(U.currentTimeMillis() - value); + counts.get(topic).increment(); } /** */ - public void mergeSafe(DiagnosticTopics topic, Long value) { - mergeSafe(topic.getName(), U.currentTimeMillis() - value); - } - - /** */ - public void mergeSafe(String topic, Long value) { - if (enabled) - counts.get(topic).add(value); + public synchronized void countMessage(String topic) { + msgs.getOrDefault(topic, new LongAdder()).increment(); } /** */ public synchronized void printStats() { - Long total = counts.get(TOTAL.getName()).longValue(); + Long total = timings.get(TOTAL.getName()).longValue(); - String out = counts.entrySet() + String out = timings.entrySet() .stream() .filter(e -> e.getValue().longValue() != 0) .sorted(Comparator.comparingInt(o -> DiagnosticTopics.get(o.getKey()).ordinal())) - .map(e -> String.format("# %s : %.3f s : %.2f", + .map(e -> String.format("# %s : %s ms : %.2f : %s", e.getKey(), - (float)(e.getValue().longValue() / 1000), - ((float)e.getValue().longValue() / total * 100))) + e.getValue().longValue(), + ((float)e.getValue().longValue() / total * 100), + counts.get(e.getKey()).longValue())) + .collect(Collectors.joining("\n")); + + String msgsCount = msgs.entrySet() + .stream() + .map(e -> String.format("# %s : %s", e.getKey(), e.getValue().longValue())) .collect(Collectors.joining("\n")); - log.info("\n# Diagnostic processor info: \n" + out); + log.info("\n# Diagnostic processor info: \n" + out + "\n" + msgsCount); resetCounts(); @@ -145,7 +155,10 @@ public synchronized void printStats() { /** */ public synchronized void resetCounts() { - for (Map.Entry e : counts.entrySet()) + for (Map.Entry e : timings.entrySet()) e.getValue().reset(); + + for (Map.Entry c : counts.entrySet()) + c.getValue().reset(); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticTopics.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticTopics.java index 83ce2f3e77aab..f54848058ba39 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticTopics.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticTopics.java @@ -8,32 +8,33 @@ */ public enum DiagnosticTopics { /** Root. */ - TOTAL("# rebalance total"), + TOTAL("# cache rebalance total"), /** GridDhtPartitionDemander#preloadEntry(..) */ - PRELOAD_ENTRY("## preload entry total"), + PRELOAD_ENTRY("# # preload entry total"), /** GridCacheMapEntry#storeValue(..) */ - PRELOAD_OFFHEAP_INVOKE("### initialValue(..) -> offheap().invoke(..)"), + PRELOAD_OFFHEAP_INVOKE("# # # offheap().invoke(..)"), /** CacheDataStoreImpl#invoke0(..) */ - PRELOAD_TREE_INVOKE("#### tree.invoke(..)"), + PRELOAD_TREE_INVOKE("# # # # dataTree.invoke(..)"), + /** rowStore.addRow(..) */ + PRELOAD_TREE_ADD_ROW("# # # # # FreeList.insertDataRow(..)"), /** */ - PRELOAD_TREE_ADD_ROW("#### rowStore.addRow(..)"), - + PRELOAD_TREE_FINISH_UPDATE("# # # # CacheDataStoreImpl.finishUpdate(..)"), /** CacheDataStoreImpl.finishUpdate(..) */ - PRELOAD_INDEXING_STORE("#### finishUpdate -> indexing().store(..)"), + PRELOAD_INDEXING_STORE("# # # # # indexing().store(..)"), /** CacheDataStoreImpl.finishUpdate(..) */ - PRELOAD_PENDING_TREE_REMOVE("#### finishUpdate -> pendingTree().removex(..)"), + PRELOAD_PENDING_TREE_REMOVE("# # # # # pendingTree().removex(..)"), /** CacheDataStoreImpl.finishUpdate(..) */ - PRELOAD_PENDING_TREE_PUT("#### finishUpdate -> pendingTree().putx(..)"), + PRELOAD_PENDING_TREE_PUT("# # # # # pendingTree().putx(..)"), /** CacheDataStoreImpl#finishRemove(..) */ - PRELOAD_INDEXING_REMOVE("#### finishRemove -> indexing().remove(..)"), + PRELOAD_INDEXING_REMOVE("# # # # finishRemove -> indexing().remove(..)"), /** CacheDataStoreImpl#finishRemove(..) */ - PRELOAD_FREELIST_REMOVE("#### finishRemove -> freeList.removeDataRowByLink(..)"), + PRELOAD_FREELIST_REMOVE("# # # # finishRemove -> freeList.removeDataRowByLink(..)"), /** */ - PRELOAD_UPDATED("### initialValue(..) -> GridCacheMapEntry.updated(..)"), + PRELOAD_UPDATED("# # # initialValue(..) -> GridCacheMapEntry.updated(..)"), /** */ - PRELOAD_ON_WAL_LOG("### initialValue(..) -> wal.log(..)"), + PRELOAD_ON_WAL_LOG("# # # initialValue(..) -> wal.log(..)"), /** */ - PRELOAD_ON_ENTRY_UPDATED("### initialValue(..) -> cq().onEntryUpdated(..)"), + PRELOAD_ON_ENTRY_UPDATED("# # # initialValue(..) -> cq().onEntryUpdated(..)"), SEND_DEMAND("# demand message serialization"), SEND_RECEIVE("# network delay between nodes"), From f8550557c00c31c75023fd9342df3c5001aef171 Mon Sep 17 00:00:00 2001 From: Maxim Muzafarov Date: Tue, 4 Dec 2018 14:21:47 +0300 Subject: [PATCH 08/10] remove message counts --- .../dht/preloader/GridDhtPartitionDemander.java | 3 --- .../processors/diag/DiagnosticProcessor.java | 15 +-------------- 2 files changed, 1 insertion(+), 17 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 85822e0929fe8..de116725e0ef7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -726,8 +726,6 @@ public void handleSupplyMessage( final GridDhtPartitionTopology top = grp.topology(); - ctx.kernalContext().diagnostic().countMessage("demand messages total"); - if (grp.sharedGroup()) { for (GridCacheContext cctx : grp.caches()) { if (cctx.statisticsEnabled()) { @@ -813,7 +811,6 @@ public void handleSupplyMessage( break; } - ctx.kernalContext().diagnostic().countMessage("cache key received"); for (GridCacheContext cctx : grp.caches()) { if (cctx.statisticsEnabled()) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticProcessor.java index c001c4f04624c..5394c0fe20290 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticProcessor.java @@ -40,9 +40,6 @@ public class DiagnosticProcessor extends GridProcessorAdapter { /** */ private final ConcurrentMap counts = new ConcurrentHashMap<>(); - /** */ - private final ConcurrentMap msgs = new ConcurrentHashMap<>(); - /** */ private final ConcurrentMap tracks = new ConcurrentHashMap<>(); @@ -112,11 +109,6 @@ public synchronized void endTrack(String topic) { counts.get(topic).increment(); } - /** */ - public synchronized void countMessage(String topic) { - msgs.getOrDefault(topic, new LongAdder()).increment(); - } - /** */ public synchronized void printStats() { Long total = timings.get(TOTAL.getName()).longValue(); @@ -132,12 +124,7 @@ public synchronized void printStats() { counts.get(e.getKey()).longValue())) .collect(Collectors.joining("\n")); - String msgsCount = msgs.entrySet() - .stream() - .map(e -> String.format("# %s : %s", e.getKey(), e.getValue().longValue())) - .collect(Collectors.joining("\n")); - - log.info("\n# Diagnostic processor info: \n" + out + "\n" + msgsCount); + log.info("\n# Diagnostic processor info: \n" + out); resetCounts(); From d89ccaf94b2ac69c63085c57e1d43b397c028e82 Mon Sep 17 00:00:00 2001 From: Maxim Muzafarov Date: Tue, 4 Dec 2018 15:21:15 +0300 Subject: [PATCH 09/10] rename topics --- .../internal/processors/diag/DiagnosticTopics.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticTopics.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticTopics.java index f54848058ba39..c650239b8e4d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticTopics.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticTopics.java @@ -10,7 +10,7 @@ public enum DiagnosticTopics { /** Root. */ TOTAL("# cache rebalance total"), /** GridDhtPartitionDemander#preloadEntry(..) */ - PRELOAD_ENTRY("# # preload entry total"), + PRELOAD_ENTRY("# # preload on demander"), /** GridCacheMapEntry#storeValue(..) */ PRELOAD_OFFHEAP_INVOKE("# # # offheap().invoke(..)"), /** CacheDataStoreImpl#invoke0(..) */ @@ -30,15 +30,15 @@ public enum DiagnosticTopics { /** CacheDataStoreImpl#finishRemove(..) */ PRELOAD_FREELIST_REMOVE("# # # # finishRemove -> freeList.removeDataRowByLink(..)"), /** */ - PRELOAD_UPDATED("# # # initialValue(..) -> GridCacheMapEntry.updated(..)"), + PRELOAD_UPDATED("# # # ttl().addTrackedEntry(..)"), /** */ - PRELOAD_ON_WAL_LOG("# # # initialValue(..) -> wal.log(..)"), + PRELOAD_ON_WAL_LOG("# # # wal.log(..)"), /** */ - PRELOAD_ON_ENTRY_UPDATED("# # # initialValue(..) -> cq().onEntryUpdated(..)"), + PRELOAD_ON_ENTRY_UPDATED("# # # continuousQueries().onEntryUpdated(..)"), - SEND_DEMAND("# demand message serialization"), + SEND_DEMAND("# message serialization"), SEND_RECEIVE("# network delay between nodes"), - SUPPLIER_PROCESS_MSG("# prepare message supplier"); + SUPPLIER_PROCESS_MSG("# make batch on supplier handleDemandMessage(..)"); /** Reverse-lookup map for getting a day from an abbreviation */ private static final Map lookup = new HashMap(); From 18927c4264344586b6a9fe10fc7b483e296d3dab Mon Sep 17 00:00:00 2001 From: Maxim Muzafarov Date: Fri, 7 Dec 2018 11:41:21 +0300 Subject: [PATCH 10/10] add example configuration --- examples/config/example-rebalance.xml | 147 ++++++++++++++++++++++++++ 1 file changed, 147 insertions(+) create mode 100644 examples/config/example-rebalance.xml diff --git a/examples/config/example-rebalance.xml b/examples/config/example-rebalance.xml new file mode 100644 index 0000000000000..10f3367d4c6e1 --- /dev/null +++ b/examples/config/example-rebalance.xml @@ -0,0 +1,147 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + 10.116.172.13:48601..48609 + 10.116.172.15:48601..48609 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +