Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountedCompleter;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.eclipse.mat.SnapshotException;
import org.eclipse.mat.collect.BitField;
import org.eclipse.mat.collect.ConcurrentBitField;
import org.eclipse.mat.parser.index.IIndexReader;
import org.eclipse.mat.parser.internal.Messages;
import org.eclipse.mat.snapshot.ExcludedReferencesDescriptor;
Expand All @@ -39,7 +43,6 @@
public class ObjectMarker implements IObjectMarker
{
int[] roots;
// TODO we can create a new BitField called ConcurrentBitField that would be 1/8th footprint
boolean[] bits;
IIndexReader.IOne2ManyIndex outbound;
IProgressListener progressListener;
Expand All @@ -61,7 +64,7 @@ public class ObjectMarker implements IObjectMarker
// inline = 10: 2.0 sec
// inline = 50: 3.0 sec
// (previous impl, for reference, 5.1 sec)

final int LEVELS_RUN_INLINE = 4;

public ObjectMarker(int[] roots, boolean[] bits, IIndexReader.IOne2ManyIndex outbound,
Expand All @@ -79,107 +82,171 @@ public ObjectMarker(int[] roots, boolean[] bits, IIndexReader.IOne2ManyIndex out
this.progressListener = progressListener;
}

public class FjObjectMarker extends RecursiveAction
{
final int position;
final boolean[] visited;
final boolean topLevel;

private FjObjectMarker(final int position, final boolean[] visited, final boolean topLevel)
{
// mark as soon as we are created and about to be queued
visited[position] = true;
this.position = position;
this.visited = visited;
this.topLevel = topLevel;
}

public void compute()
{
if (progressListener.isCanceled())
{ return; }

compute(position, LEVELS_RUN_INLINE);

// only mark progress from the top level tasks; as each root level element
// is completed, a progress marker is updated
if (topLevel)
{
synchronized (progressListener) {
progressListener.worked(1);
}
}
}

void compute(final int outboundPosition, final int levelsLeft)
{
// TODO can this be an IteratorInt to avoid allocating arrays?
// not yet supported by IndexReader but could be in future;
// esp. for very large outbounds (ie: arrays)
final int[] process = outbound.get(outboundPosition);

for (int r : process)
{
if (!visited[r])
{
visited[r] = true;

if (levelsLeft <= 0) {
new FjObjectMarker(r, visited, false).fork();
} else {
compute(r, levelsLeft - 1);
}
}
}
}
}

@Override
public int markSingleThreaded()
{
int before = countMarked();
try
{
markMultiThreaded(1);
return (int) markMultiThreadedInner(1);
Copy link
Contributor Author

@jasonk000 jasonk000 Dec 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be convertible to multi-thread-able if this works.

}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
int after = countMarked();
return after - before;
}

@Override
public void markMultiThreaded(int threads) throws InterruptedException
{
List<FjObjectMarker> rootTasks = IntStream.of(roots)
markMultiThreadedInner(threads);
}

public long markMultiThreadedInner(int threads) throws InterruptedException
{
ConcurrentBitField bitField = new ConcurrentBitField(bits);

int[] claimedRoots = IntStream.of(roots)
.filter(r -> !bits[r])
.mapToObj(r -> new FjObjectMarker(r, bits, true))
.collect(Collectors.toList());
.peek(bitField::set)
.toArray();

progressListener.beginTask(Messages.ObjectMarker_MarkingObjects, rootTasks.size());
progressListener.beginTask(Messages.ObjectMarker_MarkingObjects, claimedRoots.length);

ForkJoinPool pool = new ForkJoinPool(threads);
rootTasks.forEach(r -> pool.execute(r));
rootTasks.forEach(FjObjectMarker::join);
LongAdder count = new LongAdder();
count.add(claimedRoots.length);

pool.shutdown();
while (!pool.awaitTermination(1000, TimeUnit.MILLISECONDS))
{
// wait until completion
ForkJoinPool pool = new ForkJoinPool(threads);
try {
ObjectMarkerRootCC root = new ObjectMarkerRootCC(null, claimedRoots, bitField, LEVELS_RUN_INLINE, count);
// blocks until all work is done
pool.invoke(root);
}
finally {
pool.shutdown();
while (!pool.awaitTermination(1000, TimeUnit.MILLISECONDS))
{
// wait until completion
}
}

bitField.intoBooleanArrayNonAtomic(bits);
progressListener.done();

return count.sum();
}

int countMarked()
{
int marked = 0;
for (boolean b : bits)
if (b)
marked++;
return marked;
final class ObjectMarkerRootCC extends CountedCompleter<Void> {
final int[] roots;
final ConcurrentBitField visited;
final int levelsInline;
final LongAdder count;

ObjectMarkerRootCC(CountedCompleter<?> parent, int[] roots, ConcurrentBitField visited, int levelsInline, LongAdder count) {
super(parent);
this.roots = roots;
this.visited = visited;
this.levelsInline = levelsInline;
this.count = count;
}

@Override
public void compute() {
if (progressListener.isCanceled()) {
tryComplete();
return;
}

// fork one CC per root
addToPendingCount(roots.length);
for (int r : roots) {
new ObjectMarkerCC(this, r, visited, levelsInline, true, count).fork();
}

// completes when children complete
tryComplete();
}
}

final class ObjectMarkerCC extends CountedCompleter<Void> {
final int position;
final ConcurrentBitField visited;
final int levelsLeft;
final boolean topLevel;
final LongAdder count;

ObjectMarkerCC(
CountedCompleter<?> parent,
int position,
ConcurrentBitField visited,
int levelsLeft,
boolean topLevel,
LongAdder count
) {
super(parent);
this.position = position;
this.visited = visited;
this.levelsLeft = levelsLeft;
this.topLevel = topLevel;
this.count = count;
}

@Override
public void compute() {
if (progressListener.isCanceled()) {
tryComplete();
return;
}

final int[] process = outbound.get(position);

if (levelsLeft > 0) {
// inline traversal
for (int r : process) {
if (!visited.compareAndSet(r, false, true)) continue;
count.increment();

new ObjectMarkerCC(
this, r, visited, levelsLeft - 1, false, count
).compute();
}
onDone();
tryComplete();
return;
}

// fork boundary
int forks = 0;
for (int r : process) {
if (!visited.compareAndSet(r, false, true)) continue;
count.increment();

forks++;
new ObjectMarkerCC(
this, r, visited, 0, false, count
).fork();
}

if (forks == 0) {
onDone();
tryComplete();
} else {
addToPendingCount(forks);
// children will call tryComplete()
}
}

@Override
public void onCompletion(CountedCompleter<?> caller) {
onDone();
}

private void onDone() {
if (topLevel) {
synchronized (progressListener) {
progressListener.worked(1);
}
}
}
}

@Override
Expand Down Expand Up @@ -280,7 +347,7 @@ public int markSingleThreaded(ExcludedReferencesDescriptor[] excludeSets, ISnaps
}

private boolean refersOnlyThroughExcluded(int referrerId, int referentId,
ExcludedReferencesDescriptor[] excludeSets, BitField excludeObjectsBF,
ExcludedReferencesDescriptor[] excludeSets, BitField excludeObjectsBF,
List<NamedReference> refCache, ISnapshot snapshot)
throws SnapshotException
{
Expand Down
2 changes: 1 addition & 1 deletion plugins/org.eclipse.mat.report/META-INF/MANIFEST.MF
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Bundle-Name: %Bundle-Name
Bundle-Vendor: %Bundle-Vendor
Bundle-SymbolicName: org.eclipse.mat.report;singleton:=true
Bundle-Version: 1.17.0.qualifier
Bundle-RequiredExecutionEnvironment: JavaSE-1.8
Bundle-RequiredExecutionEnvironment: JavaSE-17
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was required to support compareAndExchange. If we cannot do xchg, it can be done with compareAndSet, but involves additional CAS operations.

Export-Package: org.eclipse.mat,
org.eclipse.mat.collect,
org.eclipse.mat.query,
Expand Down
Loading