Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@
from opentelemetry.trace import Status, StatusCode

from .attributes import LangGraphSpanAttributes
from codon_sdk.instrumentation.schemas.nodespec import NodeSpec, NodeSpecSpanAttributes
from codon_sdk.instrumentation.schemas.nodespec import (
NodeSpec,
NodeSpecSpanAttributes,
_RESOLVED_ORG_ID,
_RESOLVED_ORG_NAMESPACE,
)
from codon_sdk.agents import Workload
from codon_sdk.instrumentation.schemas.telemetry.spans import CodonBaseSpanAttributes
from codon_sdk.instrumentation.telemetry import NodeTelemetryPayload
Expand Down Expand Up @@ -145,25 +150,35 @@ def _apply_workload_attributes(
) -> None:
workload = getattr(runtime, "_workload", None)

organization = (
telemetry.organization_id
or telemetry.org_namespace
span.set_attribute(
CodonBaseSpanAttributes.AgentFramework.value,
__framework__,
)

resource_attrs = getattr(getattr(span, "resource", None), "attributes", {}) or {}
org_id = (
_RESOLVED_ORG_ID
or resource_attrs.get(CodonBaseSpanAttributes.OrganizationId.value)
or telemetry.organization_id
or context.get("organization_id")
or (workload.organization_id if workload else None)
)
org_namespace = (
telemetry.org_namespace
or context.get("org_namespace")
or (workload.organization_id if workload else None)
or resource_attrs.get(CodonBaseSpanAttributes.OrgNamespace.value)
or nodespec.org_namespace
or _RESOLVED_ORG_NAMESPACE
or ORG_NAMESPACE
)

span.set_attribute(
CodonBaseSpanAttributes.AgentFramework.value,
__framework__,
)
if organization:
span.set_attribute(CodonBaseSpanAttributes.OrganizationId.value, organization)
span.set_attribute(CodonBaseSpanAttributes.OrgNamespace.value, organization)
telemetry.organization_id = telemetry.organization_id or organization
telemetry.org_namespace = telemetry.org_namespace or organization
if org_id:
span.set_attribute(CodonBaseSpanAttributes.OrganizationId.value, org_id)
telemetry.organization_id = telemetry.organization_id or org_id
if org_namespace:
span.set_attribute(CodonBaseSpanAttributes.OrgNamespace.value, org_namespace)
telemetry.org_namespace = telemetry.org_namespace or org_namespace

workload_id = telemetry.workload_id or context.get("workload_id") or (
workload.agent_class_id if workload else None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "codon-instrumentation-langgraph"
version = "0.1.0a3"
version = "0.1.0a4"
license = {text = "Apache-2.0"}
authors = [
{ name="Codon, Inc.", email="martin@codonops.ai" },
Expand Down
1 change: 1 addition & 0 deletions sdk/AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ For a full walkthrough, see `docs/guides/codon-workload-quickstart.md`.
- CodonWorkload can emit spans natively when `enable_tracing=True` (default: False). It uses the global tracer provider configured via `initialize_telemetry` to create one span per node execution with workload/org/deployment IDs, logic/run IDs, and NodeSpec attributes. Leave it disabled if another instrumentation layer (e.g., LangGraph adapter) is already wrapping nodes to avoid duplicate spans.
- Organization metadata: when an API key is present and an org lookup URL is configured, `initialize_telemetry` will resolve the organization and namespace and apply them to telemetry resources and as the default `org_namespace` for NodeSpecs (overriding `ORG_NAMESPACE`). If no org is resolved, NodeSpecs fall back to `ORG_NAMESPACE` or a placeholder with a warning to avoid crashes.
- Auto-instrumentation: a configurator hook exists (`OTEL_PYTHON_CONFIGURATOR=codon_sdk.instrumentation.config:otel_configure`) to run Codon telemetry init during `opentelemetry-instrument`, but this path is not yet stable end-to-end. For reliable results, call `initialize_telemetry()` explicitly; revisit the configurator once validated.
- Span org attributes: `codon.organization.id` now prefers the ID resolved via API-key lookup or the execution context, while `org.namespace` uses the namespace; they are no longer forced to the same value when both are available.

## Extending the SDK
- Capture requirements for new schema fields inside each class docstring and mirror them here.
Expand Down
2 changes: 1 addition & 1 deletion sdk/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ build-backend = "setuptools.build_meta"
[project]
# This is the main SDK package name
name = "codon_sdk"
version = "0.1.0a3"
version = "0.1.0a4"
license = {text = "Apache-2.0"}
authors = [
{ name="Codon, Inc.", email="martin@codonops.ai" },
Expand Down
31 changes: 20 additions & 11 deletions sdk/src/codon_sdk/agents/codon_workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
from codon_sdk.instrumentation.schemas.nodespec import NodeSpec, NodeSpecSpanAttributes
from codon_sdk.instrumentation.telemetry import NodeTelemetryPayload
from codon_sdk.instrumentation.schemas.telemetry.spans import CodonBaseSpanAttributes
from codon_sdk.instrumentation.schemas.nodespec import nodespec_env, _RESOLVED_ORG_NAMESPACE, _RESOLVED_ORG_ID

from .workload import Workload

Expand Down Expand Up @@ -103,17 +104,23 @@ def _apply_workload_attributes(
span, *, telemetry: NodeTelemetryPayload, nodespec: NodeSpec, context: Dict[str, Any]
) -> None:
span.set_attribute(CodonBaseSpanAttributes.AgentFramework.value, "codon")
organization = (
resource_attrs = getattr(getattr(span, "resource", None), "attributes", {}) or {}
org_id = (
telemetry.organization_id
or telemetry.org_namespace
or context.get("organization_id")
or resource_attrs.get(CodonBaseSpanAttributes.OrganizationId.value)
)
org_namespace = (
telemetry.org_namespace
or context.get("org_namespace")
or resource_attrs.get(CodonBaseSpanAttributes.OrgNamespace.value)
or nodespec.org_namespace
or os.getenv("ORG_NAMESPACE")
)
if organization:
span.set_attribute(CodonBaseSpanAttributes.OrganizationId.value, organization)
span.set_attribute(CodonBaseSpanAttributes.OrgNamespace.value, organization)
if org_id:
span.set_attribute(CodonBaseSpanAttributes.OrganizationId.value, org_id)
if org_namespace:
span.set_attribute(CodonBaseSpanAttributes.OrgNamespace.value, org_namespace)

workload_id = telemetry.workload_id or context.get("workload_id")
logic_id = telemetry.workload_logic_id or context.get("logic_id")
Expand Down Expand Up @@ -399,7 +406,7 @@ def __init__(
self._agent_class_id: Optional[str] = None
self._logic_id: Optional[str] = None
self._entry_nodes: Optional[List[str]] = None
self._organization_id: Optional[str] = os.getenv("ORG_NAMESPACE")
self._organization_id: Optional[str] = _RESOLVED_ORG_ID or os.getenv("ORG_NAMESPACE")
self._enable_tracing = enable_tracing
super().__init__(
name=name,
Expand Down Expand Up @@ -481,7 +488,8 @@ def add_node(
self._node_functions[name] = function
self._predecessors.setdefault(name, set())
self._successors.setdefault(name, set())
self._organization_id = nodespec.org_namespace
if self._organization_id is None:
self._organization_id = nodespec.org_namespace
self._update_logic_identity()
return nodespec

Expand Down Expand Up @@ -551,8 +559,9 @@ async def execute_async(
"workload_run_id": run_id,
"run_id": run_id,
"workload_name": self.metadata.name,
"organization_id": self.organization_id,
"org_namespace": self.organization_id,
# Authoritative org defaults seeded from resolved lookup; namespace kept separate.
"organization_id": _RESOLVED_ORG_ID or self.organization_id,
"org_namespace": _RESOLVED_ORG_NAMESPACE or self.organization_id or os.getenv(nodespec_env.OrgNamespace),
"workload_version": self.metadata.version,
**kwargs,
}
Expand Down Expand Up @@ -653,8 +662,8 @@ def enqueue(
workload_logic_id=self.logic_id,
workload_run_id=run_id,
deployment_id=deployment_id,
organization_id=self.organization_id,
org_namespace=self.organization_id,
organization_id=context.get("organization_id"),
org_namespace=context.get("org_namespace"),
nodespec_id=nodespec.id,
node_name=nodespec.name,
node_role=nodespec.role,
Expand Down
5 changes: 4 additions & 1 deletion sdk/src/codon_sdk/instrumentation/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
from opentelemetry.sdk.trace.export import BatchSpanProcessor

import logging
from codon_sdk.instrumentation.schemas.nodespec import set_default_org_namespace
from codon_sdk.instrumentation.schemas.nodespec import set_default_org_namespace, set_default_org_identity
from codon_sdk.instrumentation.telemetry import NodeTelemetryPayload

# Avoid configuring root logger; module-level logger only.
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -107,6 +108,8 @@ def initialize_telemetry(
)
if org_namespace:
set_default_org_namespace(org_namespace)
if org_id or org_namespace:
set_default_org_identity(org_id, org_namespace)
if not org_namespace or not org_id:
logger.warning(
"Unable to resolve organization metadata from API key via %s; spans and NodeSpecs will omit org unless provided elsewhere",
Expand Down
10 changes: 10 additions & 0 deletions sdk/src/codon_sdk/instrumentation/schemas/nodespec/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class NodeSpecEnv(BaseModel):

nodespec_env = NodeSpecEnv()
_RESOLVED_ORG_NAMESPACE: Optional[str] = None
_RESOLVED_ORG_ID: Optional[str] = None


def set_default_org_namespace(namespace: Optional[str]) -> None:
Expand All @@ -62,6 +63,15 @@ def set_default_org_namespace(namespace: Optional[str]) -> None:
_RESOLVED_ORG_NAMESPACE = namespace


def set_default_org_identity(org_id: Optional[str], namespace: Optional[str]) -> None:
"""Set process-wide default org id/namespace, typically from API-key lookup."""

global _RESOLVED_ORG_ID, _RESOLVED_ORG_NAMESPACE
_RESOLVED_ORG_ID = org_id
if namespace is not None:
_RESOLVED_ORG_NAMESPACE = namespace


class NodeSpec(BaseModel):
"""Immutable specification that introspects Python callables and generates stable SHA-256 identifiers.

Expand Down
3 changes: 3 additions & 0 deletions sdk/test/agents/test_codon_workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,9 @@ def second(message, *, runtime, context):
assert span.attributes.get("codon.workload.logic_id") == workload.logic_id
assert span.attributes.get("codon.workload.run_id")
assert span.attributes.get("codon.workload.deployment_id") == "dev-trace"
assert span.attributes.get("org.namespace") == "test-org"
# organization id defaults to namespace in this test setup
assert span.attributes.get("codon.organization.id") == "test-org"
exporter.clear()


Expand Down
53 changes: 53 additions & 0 deletions sdk/test/instrumentation/test_initialize_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,56 @@ def test_attach_flag_argument_overrides_env(monkeypatch):
# attach disabled -> new provider set
assert "provider" in captured
assert captured["provider"] is not existing


def test_org_lookup_success(monkeypatch):
monkeypatch.setenv("CODON_API_KEY", "env-key")
monkeypatch.setenv("CODON_ORG_LOOKUP_URL", "http://lookup")

captured = {}

class DummyExporter:
def __init__(self, *, endpoint=None, headers=None, **kwargs):
captured["endpoint"] = endpoint
captured["headers"] = headers

class DummyResource:
def __init__(self, *, attributes):
self.attributes = attributes

def merge(self, other):
merged = dict(self.attributes)
merged.update(other.attributes)
return DummyResource(attributes=merged)

def fake_lookup(*args, **kwargs):
return ("ORG-1", "ns-1")

_patch_base(monkeypatch, existing_provider=object())
monkeypatch.setattr(
instrumentation_config,
"OTLPSpanExporter",
DummyExporter,
)
monkeypatch.setattr(
instrumentation_config,
"Resource",
DummyResource,
)
monkeypatch.setattr(
instrumentation_config,
"_resolve_org_metadata",
lambda **kwargs: fake_lookup(),
)
provider_holder = {}
monkeypatch.setattr(
instrumentation_config.trace,
"set_tracer_provider",
lambda provider: provider_holder.setdefault("provider", provider),
)

instrumentation_config.initialize_telemetry()
provider = provider_holder["provider"]
attrs = provider.resource.attributes
assert attrs["codon.organization.id"] == "ORG-1"
assert attrs["org.namespace"] == "ns-1"
Loading