Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package datadog.opentelemetry.shim.metrics.data;

import datadog.opentelemetry.shim.metrics.OtelInstrumentDescriptor;
import datadog.opentelemetry.shim.metrics.OtelInstrumentType;
import datadog.opentelemetry.shim.metrics.export.OtelInstrumentVisitor;
import datadog.trace.api.Config;
import datadog.trace.api.config.OtlpConfig;
Expand All @@ -23,15 +24,16 @@ public final class OtelMetricStorage {
private static final RatelimitedLogger RATELIMITED_LOGGER =
new RatelimitedLogger(LOGGER, 5, TimeUnit.MINUTES);

private static final OtlpConfig.Temporality TEMPORALITY_PREFERENCE =
Config.get().getOtlpMetricsTemporalityPreference();

private static final int CARDINALITY_LIMIT = Config.get().getMetricsOtelCardinalityLimit();

private static final Attributes CARDINALITY_OVERFLOW =
Attributes.builder().put("otel.metric.overflow", true).build();

private static final boolean RESET_ON_COLLECT =
Config.get().getOtlpMetricsTemporalityPreference() == OtlpConfig.Temporality.DELTA;

private final OtelInstrumentDescriptor descriptor;
private final boolean resetOnCollect;
private final Function<Attributes, OtelAggregator> aggregatorSupplier;
private volatile Recording currentRecording;

Expand All @@ -41,13 +43,31 @@ public final class OtelMetricStorage {
private OtelMetricStorage(
OtelInstrumentDescriptor descriptor, Supplier<OtelAggregator> aggregatorSupplier) {
this.descriptor = descriptor;
this.resetOnCollect = shouldResetOnCollect(descriptor.getType());
this.aggregatorSupplier = unused -> aggregatorSupplier.get();
this.currentRecording = new Recording();
if (RESET_ON_COLLECT) {
if (resetOnCollect) {
this.previousRecording = new Recording();
}
}

/** Should storage reset on collect? (Depends on instrument type and temporality preference.) */
private static boolean shouldResetOnCollect(OtelInstrumentType type) {
switch (TEMPORALITY_PREFERENCE) {
case DELTA:
// gauges and up/down counters stay as cumulative
return type == OtelInstrumentType.HISTOGRAM
|| type == OtelInstrumentType.COUNTER
|| type == OtelInstrumentType.OBSERVABLE_COUNTER;
case LOWMEMORY:
// observable counters, gauges, and up/down counters stay as cumulative
return type == OtelInstrumentType.HISTOGRAM || type == OtelInstrumentType.COUNTER;
case CUMULATIVE:
default:
return false;
}
}

public static OtelMetricStorage newDoubleSumStorage(OtelInstrumentDescriptor descriptor) {
return new OtelMetricStorage(descriptor, OtelDoubleSum::new);
}
Expand Down Expand Up @@ -78,20 +98,30 @@ public OtelInstrumentDescriptor getDescriptor() {
}

public void recordLong(long value, Attributes attributes) {
Recording recording = acquireRecordingForWrite();
try {
aggregator(recording.aggregators, attributes).recordLong(value);
} finally {
releaseRecordingAfterWrite(recording);
if (resetOnCollect) {
Recording recording = acquireRecordingForWrite();
try {
aggregator(recording.aggregators, attributes).recordLong(value);
} finally {
releaseRecordingAfterWrite(recording);
}
} else {
// no need to hold writers back if we are not resetting metrics on collect
aggregator(currentRecording.aggregators, attributes).recordLong(value);
}
}

public void recordDouble(double value, Attributes attributes) {
Recording recording = acquireRecordingForWrite();
try {
aggregator(recording.aggregators, attributes).recordDouble(value);
} finally {
releaseRecordingAfterWrite(recording);
if (resetOnCollect) {
Recording recording = acquireRecordingForWrite();
try {
aggregator(recording.aggregators, attributes).recordDouble(value);
} finally {
releaseRecordingAfterWrite(recording);
}
} else {
// no need to hold writers back if we are not resetting metrics on collect
aggregator(currentRecording.aggregators, attributes).recordDouble(value);
}
}

Expand All @@ -113,7 +143,7 @@ private OtelAggregator aggregator(
}

public void collect(OtelInstrumentVisitor visitor) {
if (RESET_ON_COLLECT) {
if (resetOnCollect) {
doCollectAndReset(visitor);
} else {
doCollect(visitor);
Expand Down Expand Up @@ -168,27 +198,21 @@ private void doCollectAndReset(OtelInstrumentVisitor visitor) {
}

private Recording acquireRecordingForWrite() {
if (RESET_ON_COLLECT) {
// busy loop to limit impact on caller
while (true) {
final Recording recording = currentRecording;
// atomically notify collector of write activity and check state
if ((ACTIVITY.addAndGet(recording, WRITER) & RESET_PENDING) == 0) {
return recording;
} else {
// reset pending: rollback and check again for a fresh recording
ACTIVITY.addAndGet(recording, -WRITER);
}
// busy loop to limit impact on caller
while (true) {
final Recording recording = currentRecording;
// atomically notify collector of write activity and check state
if ((ACTIVITY.addAndGet(recording, WRITER) & RESET_PENDING) == 0) {
return recording;
} else {
// reset pending: rollback and check again for a fresh recording
ACTIVITY.addAndGet(recording, -WRITER);
}
} else {
return currentRecording;
}
}

private void releaseRecordingAfterWrite(Recording recording) {
if (RESET_ON_COLLECT) {
ACTIVITY.addAndGet(recording, -WRITER);
}
ACTIVITY.addAndGet(recording, -WRITER);
}

static final AtomicIntegerFieldUpdater<Recording> ACTIVITY =
Expand Down