diff --git a/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/data/OtelMetricStorage.java b/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/data/OtelMetricStorage.java index daa81e0bfc2..4875a6a3b9b 100644 --- a/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/data/OtelMetricStorage.java +++ b/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/data/OtelMetricStorage.java @@ -1,6 +1,7 @@ package datadog.opentelemetry.shim.metrics.data; import datadog.opentelemetry.shim.metrics.OtelInstrumentDescriptor; +import datadog.opentelemetry.shim.metrics.OtelInstrumentType; import datadog.opentelemetry.shim.metrics.export.OtelInstrumentVisitor; import datadog.trace.api.Config; import datadog.trace.api.config.OtlpConfig; @@ -23,15 +24,16 @@ public final class OtelMetricStorage { private static final RatelimitedLogger RATELIMITED_LOGGER = new RatelimitedLogger(LOGGER, 5, TimeUnit.MINUTES); + private static final OtlpConfig.Temporality TEMPORALITY_PREFERENCE = + Config.get().getOtlpMetricsTemporalityPreference(); + private static final int CARDINALITY_LIMIT = Config.get().getMetricsOtelCardinalityLimit(); private static final Attributes CARDINALITY_OVERFLOW = Attributes.builder().put("otel.metric.overflow", true).build(); - private static final boolean RESET_ON_COLLECT = - Config.get().getOtlpMetricsTemporalityPreference() == OtlpConfig.Temporality.DELTA; - private final OtelInstrumentDescriptor descriptor; + private final boolean resetOnCollect; private final Function aggregatorSupplier; private volatile Recording currentRecording; @@ -41,13 +43,31 @@ public final class OtelMetricStorage { private OtelMetricStorage( OtelInstrumentDescriptor descriptor, Supplier aggregatorSupplier) { this.descriptor = descriptor; + this.resetOnCollect = shouldResetOnCollect(descriptor.getType()); this.aggregatorSupplier = unused -> aggregatorSupplier.get(); this.currentRecording = new Recording(); - if (RESET_ON_COLLECT) { + if (resetOnCollect) { this.previousRecording = new Recording(); } } + /** Should storage reset on collect? (Depends on instrument type and temporality preference.) */ + private static boolean shouldResetOnCollect(OtelInstrumentType type) { + switch (TEMPORALITY_PREFERENCE) { + case DELTA: + // gauges and up/down counters stay as cumulative + return type == OtelInstrumentType.HISTOGRAM + || type == OtelInstrumentType.COUNTER + || type == OtelInstrumentType.OBSERVABLE_COUNTER; + case LOWMEMORY: + // observable counters, gauges, and up/down counters stay as cumulative + return type == OtelInstrumentType.HISTOGRAM || type == OtelInstrumentType.COUNTER; + case CUMULATIVE: + default: + return false; + } + } + public static OtelMetricStorage newDoubleSumStorage(OtelInstrumentDescriptor descriptor) { return new OtelMetricStorage(descriptor, OtelDoubleSum::new); } @@ -78,20 +98,30 @@ public OtelInstrumentDescriptor getDescriptor() { } public void recordLong(long value, Attributes attributes) { - Recording recording = acquireRecordingForWrite(); - try { - aggregator(recording.aggregators, attributes).recordLong(value); - } finally { - releaseRecordingAfterWrite(recording); + if (resetOnCollect) { + Recording recording = acquireRecordingForWrite(); + try { + aggregator(recording.aggregators, attributes).recordLong(value); + } finally { + releaseRecordingAfterWrite(recording); + } + } else { + // no need to hold writers back if we are not resetting metrics on collect + aggregator(currentRecording.aggregators, attributes).recordLong(value); } } public void recordDouble(double value, Attributes attributes) { - Recording recording = acquireRecordingForWrite(); - try { - aggregator(recording.aggregators, attributes).recordDouble(value); - } finally { - releaseRecordingAfterWrite(recording); + if (resetOnCollect) { + Recording recording = acquireRecordingForWrite(); + try { + aggregator(recording.aggregators, attributes).recordDouble(value); + } finally { + releaseRecordingAfterWrite(recording); + } + } else { + // no need to hold writers back if we are not resetting metrics on collect + aggregator(currentRecording.aggregators, attributes).recordDouble(value); } } @@ -113,7 +143,7 @@ private OtelAggregator aggregator( } public void collect(OtelInstrumentVisitor visitor) { - if (RESET_ON_COLLECT) { + if (resetOnCollect) { doCollectAndReset(visitor); } else { doCollect(visitor); @@ -168,27 +198,21 @@ private void doCollectAndReset(OtelInstrumentVisitor visitor) { } private Recording acquireRecordingForWrite() { - if (RESET_ON_COLLECT) { - // busy loop to limit impact on caller - while (true) { - final Recording recording = currentRecording; - // atomically notify collector of write activity and check state - if ((ACTIVITY.addAndGet(recording, WRITER) & RESET_PENDING) == 0) { - return recording; - } else { - // reset pending: rollback and check again for a fresh recording - ACTIVITY.addAndGet(recording, -WRITER); - } + // busy loop to limit impact on caller + while (true) { + final Recording recording = currentRecording; + // atomically notify collector of write activity and check state + if ((ACTIVITY.addAndGet(recording, WRITER) & RESET_PENDING) == 0) { + return recording; + } else { + // reset pending: rollback and check again for a fresh recording + ACTIVITY.addAndGet(recording, -WRITER); } - } else { - return currentRecording; } } private void releaseRecordingAfterWrite(Recording recording) { - if (RESET_ON_COLLECT) { - ACTIVITY.addAndGet(recording, -WRITER); - } + ACTIVITY.addAndGet(recording, -WRITER); } static final AtomicIntegerFieldUpdater ACTIVITY =