From 0530fa649213ef843ba82b52bdefb7e3dee272e8 Mon Sep 17 00:00:00 2001 From: Martin Date: Sun, 21 Dec 2025 19:44:16 -0500 Subject: [PATCH] feat: Migrated to support LangGraph + LangChain v 1.0 --- .gitignore | 2 +- codex-instructions/sdk-production-refactor.md | 174 ----- .../codon-instrumentation-langgraph/AGENTS.md | 54 +- .../codon-instrumentation-langgraph/README.md | 51 +- .../instrumentation/langgraph/__init__.py | 16 +- .../instrumentation/langgraph/adapter.py | 506 +++++++++++- .../instrumentation/langgraph/callbacks.py | 739 +++++++++++++++++- .../instrumentation/langgraph/context.py | 58 ++ .../pyproject.toml | 5 +- sdk/AGENTS.md | 2 +- sdk/pyproject.toml | 2 +- .../schemas/telemetry/spans/__init__.py | 8 + sdk/src/codon_sdk/llm/__init__.py | 31 +- .../langgraph/test_adapter_result.py | 35 +- .../langgraph/test_graph_span.py | 64 ++ 15 files changed, 1464 insertions(+), 283 deletions(-) delete mode 100644 codex-instructions/sdk-production-refactor.md create mode 100644 instrumentation-packages/codon-instrumentation-langgraph/codon/instrumentation/langgraph/context.py create mode 100644 sdk/test/instrumentation/langgraph/test_graph_span.py diff --git a/.gitignore b/.gitignore index 4e659c5..c1de85c 100644 --- a/.gitignore +++ b/.gitignore @@ -219,5 +219,5 @@ __marimo__/ /.pre-commit-cache/ # Agents -# codex-instructions +codex-instructions local diff --git a/codex-instructions/sdk-production-refactor.md b/codex-instructions/sdk-production-refactor.md deleted file mode 100644 index b5256ba..0000000 --- a/codex-instructions/sdk-production-refactor.md +++ /dev/null @@ -1,174 +0,0 @@ -### Phase 1: The "Front Door" (API Gateway) - -**Objective:** Secure the ingestion pipeline immediately with minimal complexity. -**Timeline:** Immediate / MVP. - -#### 1\. SDK Changes (Core Refactoring) - - * **Move Logic:** Move `initialize_telemetry` from `codon.instrumentation.langgraph` to `codon_sdk.instrumentation`. - * **Update Signature:** Accept `api_key` and `endpoint` arguments. - * **Hardcode Default:** Set the default endpoint to your production load balancer (e.g., `https://ingest.codonops.ai:4317`). - * **Header Injection:** Inject `x-codon-api-key` into the exporter. - -**Action:** Create `sdk/src/codon_sdk/instrumentation/config.py` (or similar) and migrate the logic there. Update the LangGraph package to import it. - -#### 2\. Infrastructure Changes (Server-Side) - - * **DNS:** Point `ingest.codonops.ai` to your Cloud Load Balancer. - * **Gateway:** Deploy Nginx (or Kong) behind the LB. - * **Config:** specific rule to check `x-codon-api-key` against your DB/Cache. - * **Success:** If valid, inject header `X-Codon-Org-ID: ` and forward to Collector. - * **Failure:** Return `401 Unauthorized`. - * **OTel Collector:** - * Enable `include_metadata: true` on OTLP receivers. - * Add `attributes/insert_tenancy` processor to extract `metadata.x-codon-org-id`. - ------ - -### Phase 2: The "Infinite Scale" (JWT) - -**Objective:** Decouple ingestion authorization from the database to handle high-volume telemetry. -**Timeline:** Post-Launch / Scale-up. - -#### 1\. SDK Changes - - * **Token Manager:** Implement a `TokenManager` class in the Core SDK. - * **Exchange:** On init, call `https://api.codon.ai/auth/token` with the API Key to get a JWT. - * **Refresh:** Automatically refresh the JWT 5 minutes before expiry. - * **Exporter Update:** Update `initialize_telemetry` to use the dynamic JWT in the `Authorization: Bearer ` header instead of the static API key. - -#### 2\. Infrastructure Changes - - * **Auth Service:** Ensure your backend has an endpoint to mint signed JWTs (RS256) containing the `org_id` claim. - * **Collector/Gateway:** Reconfigure to use the **OIDC Authenticator**. - * It will now validate the **signature** of the JWT locally (CPU operation) instead of calling the DB (I/O operation). - * The `attributes` processor will now extract tenancy from `auth.claims.org_id`. - ------ - -### Architecture Decision Records (ADRs) - -Here are the formal records you should commit to your documentation (e.g., `docs/adr/`). - -#### ADR 001: Telemetry Authentication Strategy - -**Status:** Accepted -**Date:** 2025-11-24 - -**Context:** -We need to secure the OpenTelemetry ingestion endpoint (`ingest.codonops.ai`) to prevent unauthorized data submission and ensure accurate billing/tenancy. We have two options: checking API keys against a database (Gateway pattern) or using signed tokens (JWT pattern). - -**Decision:** -We will implement a **Two-Phase Strategy**: - -1. **Phase 1 (API Gateway):** Validate API Keys at the ingress gateway (Nginx) via a database/cache lookup. -2. **Phase 2 (JWT):** Migrate to client-side JWT exchange for stateless authentication at scale. - -**Consequences:** - - * **Positive:** Phase 1 allows for rapid implementation using existing API keys without complex SDK logic. Phase 2 provides a clear path to handling massive concurrency without database bottlenecks. - * **Negative:** Phase 1 introduces a database dependency on the hot path of telemetry ingestion. If the DB slows down, telemetry ingestion slows down. This is acceptable for MVP volumes but must be replaced (Phase 2) before high-scale adoption. - -#### ADR 002: Centralization of Telemetry Configuration - -**Status:** Accepted -**Date:** 2025-11-24 - -**Context:** -Currently, `initialize_telemetry` is defined inside the `codon-instrumentation-langgraph` package. As we add support for OpenAI, CrewAI, and other frameworks, duplicating this initialization logic will lead to drift in configuration defaults (endpoints, auth headers, resource attributes). - -**Decision:** -We will move the telemetry initialization logic to the **SDK Core** (`codon_sdk`). - - * **New Location:** `codon_sdk.instrumentation.initialize` (or similar). - * **Instrumentation Packages:** Will import and wrap this core function or instruct users to call it directly. - -**Consequences:** - - * **Positive:** Single source of truth for endpoints and authentication. Updates to the ingestion architecture (e.g., changing the endpoint URL or auth header format) only need to happen in one place (`codon_sdk`). - * **Negative:** Instrumentation packages take a hard dependency on the specific version of `codon_sdk` that contains this utility. - ------ - -### Recommended Code Structure for Core SDK - -Here is how you should implement the **Core SDK** change right now to support Phase 1. - -**File:** `sdk/src/codon_sdk/instrumentation/__init__.py` (or a new `config.py` inside it) - -```python -import os -from typing import Optional, Dict -from opentelemetry import trace -from opentelemetry.sdk.resources import Resource -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import BatchSpanProcessor -from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter - -# Hardcoded Production Endpoint - The "Front Door" -DEFAULT_INGEST_ENDPOINT = "https://ingest.codonops.ai:4317" - -def initialize_telemetry( - api_key: Optional[str] = None, - service_name: Optional[str] = None, - endpoint: Optional[str] = None, -) -> None: - """ - Global initialization for Codon telemetry. - - This should be called once at the start of the application. It configures - OpenTelemetry to send traces to the Codon Cloud (or a custom collector). - """ - - # 1. Resolve Identity - final_api_key = api_key or os.getenv("CODON_API_KEY") - if not final_api_key: - # Fallback for local dev without auth (optional, or raise Error) - # For Phase 1 production, we might want to enforce this. - pass - - # 2. Resolve Context - # We prefer the user-provided service name, then env var, then default. - final_service_name = ( - service_name - or os.getenv("OTEL_SERVICE_NAME") - or "unknown_codon_service" - ) - - # 3. Resolve Destination - # Default to Prod, allow env var override, allow explicit arg override. - final_endpoint = ( - endpoint - or os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT") - or DEFAULT_INGEST_ENDPOINT - ) - - # 4. Configure Headers - headers: Dict[str, str] = {} - if final_api_key: - headers["x-codon-api-key"] = final_api_key - - # 5. Setup OpenTelemetry - resource = Resource(attributes={"service.name": final_service_name}) - - exporter = OTLPSpanExporter( - endpoint=final_endpoint, - headers=headers - ) - - provider = TracerProvider(resource=resource) - provider.add_span_processor(BatchSpanProcessor(exporter)) - - # Set the global tracer provider so all instrumentation packages pick it up - trace.set_tracer_provider(provider) -``` - -**Usage in `codon-instrumentation-langgraph`:** -You would delete the local `initialize_telemetry` definition and re-export or document the usage of the core one. - -```python -# instrumentation-packages/codon-instrumentation-langgraph/codon/instrumentation/langgraph/__init__.py - -# Re-export for convenience, or deprecate and point users to core -from codon_sdk.instrumentation import initialize_telemetry -``` diff --git a/instrumentation-packages/codon-instrumentation-langgraph/AGENTS.md b/instrumentation-packages/codon-instrumentation-langgraph/AGENTS.md index 76581b0..9292080 100644 --- a/instrumentation-packages/codon-instrumentation-langgraph/AGENTS.md +++ b/instrumentation-packages/codon-instrumentation-langgraph/AGENTS.md @@ -14,7 +14,7 @@ This document explains how to convert an existing LangGraph `StateGraph` into a ## Why Wrap a LangGraph Graph? - **Zero instrumentation boilerplate:** every LangGraph node is auto-wrapped with `track_node`, producing OpenTelemetry spans without manual decorators. - **Stable identifiers:** nodes become `NodeSpec`s with deterministic SHA-256 IDs, and the overall graph gets a logic ID for caching, retries, and provenance. -- **Audit-first runtime:** executions use Codon’s token scheduler, producing a detailed ledger (token enqueue/dequeue, node completions, custom events) for compliance. +- **Telemetry-first runtime:** executions use native LangGraph semantics while Codon emits spans for each node invocation and workload run metadata for downstream analysis. - **Drop-in ergonomics:** call `LangGraphWorkloadAdapter.from_langgraph(graph, ...)` and keep your existing LangGraph code unchanged. --- @@ -33,7 +33,7 @@ from myproject.langgraph import build_graph langgraph = build_graph() # returns StateGraph or compiled graph -workload = LangGraphWorkloadAdapter.from_langgraph( +graph = LangGraphWorkloadAdapter.from_langgraph( langgraph, name="ResearchAgent", version="1.0.0", @@ -42,16 +42,14 @@ workload = LangGraphWorkloadAdapter.from_langgraph( ) initial_state = {"topic": "Sustainable cities"} -report = workload.execute({"state": initial_state}, deployment_id="dev") -print(report.node_results("writer")[-1]) -print(f"Ledger entries: {len(report.ledger)}") +result = graph.invoke({"topic": "Sustainable cities"}) +print(result) ``` ### What Happened? -1. Every LangGraph node was registered as a Codon node via `add_node`, producing a `NodeSpec`. -2. Edges in the LangGraph became workload edges, so `runtime.emit` drives execution. -3. `execute` seeded tokens with the provided state, ran the graph in token order, and captured telemetry & audit logs. -4. You can inspect `report.ledger` for compliance, or `report.node_results(...)` for business outputs. +1. Every LangGraph node was registered as a Codon `NodeSpec` for deterministic IDs. +2. The adapter returned an instrumented graph that preserves native LangGraph execution semantics. +3. Telemetry spans are emitted via callbacks during normal LangGraph invocation (no `execute` call required). --- @@ -63,7 +61,7 @@ The adapter inspects your graph to extract: Need finer control? Provide a `node_overrides` mapping where each entry is either a plain dict or `NodeOverride` object. You can specify the role, callable used for `NodeSpec` introspection, model metadata, and explicit schemas: ```python -workload = LangGraphWorkloadAdapter.from_langgraph( +graph = LangGraphWorkloadAdapter.from_langgraph( langgraph, name="SupportBot", version="2.3.0", @@ -83,9 +81,7 @@ Any fields you omit fall back to the adapter defaults. Overrides propagate to te --- ## Handling State -- The adapter expects your token payload to contain a dictionary under the `"state"` key. -- Each LangGraph node receives that state, invokes the original runnable, and emits updated state to successors. -- Shared run-level data lives in `runtime.state`; you can read it from within nodes for cross-node coordination. +- The adapter does not alter LangGraph state semantics. Nodes receive the same state and return the same updates they would without Codon instrumentation. Example node signature inside your LangGraph graph: ```python @@ -95,21 +91,21 @@ async def researcher(state): insights = await fetch_insights(plan) return {"insights": insights} ``` -When wrapped by the adapter, the Codon node sees `message["state"]` and merges the returned dict with the existing state. +When wrapped by the adapter, the original LangGraph node callable is preserved and invoked as usual. --- ## Entry Nodes By default the adapter infers entry nodes as those with no incoming edges. You can override this by supplying `entry_nodes`: ```python -workload = LangGraphWorkloadAdapter.from_langgraph( +graph = LangGraphWorkloadAdapter.from_langgraph( langgraph, name="OpsAgent", version="0.4.1", entry_nodes=["bootstrap"], ) ``` -At execution time you can still override entry nodes via `workload.execute(..., entry_nodes=[...])` if needed. +Entry nodes are still inferred from the LangGraph structure; override them by changing the graph itself before compilation. --- @@ -146,33 +142,34 @@ langgraph.add_edge("writer", "critic") langgraph.add_edge("critic", "writer") # feedback loop langgraph.add_edge("critic", "finalize") -workload = LangGraphWorkloadAdapter.from_langgraph( +graph = LangGraphWorkloadAdapter.from_langgraph( langgraph, name="ReflectiveAgent", version="0.1.0", ) -result = workload.execute({"state": {"topic": "urban gardens"}}, deployment_id="demo") -print(result.node_results("finalize")[-1]) +result = graph.invoke({"topic": "urban gardens"}) +print(result) ``` -The ledger records each iteration through the loop, and `runtime.state` tracks iteration counts for auditing. +Each iteration is reflected in node spans and the graph snapshot span ties the run to the full graph definition. --- ## Adapter Options & Artifacts - Use `compile_kwargs={...}` when calling `LangGraphWorkloadAdapter.from_langgraph(...)` to compile your graph with checkpointers, memory stores, or any other LangGraph runtime extras. The adapter still inspects the pre-compiled graph for node metadata while compiling with the provided extras so the runtime is ready to go. -- Set `return_artifacts=True` to receive a `LangGraphAdapterResult` containing the `CodonWorkload`, the original state graph, and the compiled graph. This makes it easy to hand both artifacts to downstream systems (e.g., background runners) without re-compiling. -- Provide `runtime_config={...}` during adaptation to establish default invocation options (e.g., base callbacks, tracing settings). At execution time, pass `langgraph_config={...}` to `workload.execute(...)` to layer per-run overrides; both configs are merged and supplied alongside Codon’s telemetry callback. -- Regardless of the return value, the resulting workload exposes `langgraph_state_graph`, `langgraph_compiled_graph`, `langgraph_compile_kwargs`, and `langgraph_runtime_config` for quick access to the underlying LangGraph objects. +- Set `return_artifacts=True` to receive a `LangGraphAdapterResult` containing the `CodonWorkload`, the original state graph, the compiled graph, and the instrumented graph wrapper. This makes it easy to hand both artifacts to downstream systems (e.g., background runners) without re-compiling. +- Provide `runtime_config={...}` during adaptation to establish default invocation options (e.g., base callbacks, tracing settings). At invocation time, pass `config={...}` to `graph.invoke(...)` (or `graph.ainvoke(...)`) to layer per-run overrides; both configs are merged and supplied alongside Codon’s telemetry callbacks. +- The returned graph exposes `workload`, `langgraph_state_graph`, `langgraph_compiled_graph`, `langgraph_compile_kwargs`, and `langgraph_runtime_config` for quick access to the underlying LangGraph objects and metadata. --- ## Telemetry & Audit Integration - Call `initialize_telemetry(service_name=...)` once during process startup to export spans via OTLP. The initializer now lives in the core SDK (`codon_sdk.instrumentation.initialize_telemetry`) and is re-exported here. It defaults the endpoint to `https://ingest.codonops.ai:4317`, injects `x-codon-api-key` from the argument or `CODON_API_KEY` env, and respects `OTEL_EXPORTER_OTLP_ENDPOINT`/`OTEL_SERVICE_NAME` overrides. If you already have an OTEL tracer provider (e.g., via auto-instrumentation), set `CODON_ATTACH_TO_EXISTING_OTEL_PROVIDER=true` or pass `attach_to_existing=True` to add Codon’s exporter to the existing provider instead of replacing it. - Each node span now carries workload metadata (`codon.workload.id`, `codon.workload.run_id`, `codon.workload.logic_id`, `codon.workload.deployment_id`, `codon.organization.id`) so traces can be rolled up by workload, deployment, or organization without joins. +- Each graph invocation emits a single graph snapshot span (`agent.graph`) with node/edge structure serialized in `codon.graph.definition_json` for full topology visibility. - `LangGraphTelemetryCallback` is attached automatically when invoking LangChain runnables; it captures model vendor/identifier, token usage (prompt, completion, total), and response metadata, all of which is emitted as span attributes (`codon.tokens.*`, `codon.model.*`, `codon.node.raw_attributes_json`). - Instrumentation writes into the shared `NodeTelemetryPayload` (`runtime.telemetry`) defined by the SDK so future mixins collect the same schema-aligned fields without reimplementing bookkeeping. - Node inputs/outputs and latency are recorded alongside status codes, enabling the `trace_events` schema to be populated directly from exported span data. -- The audit ledger still covers token enqueue/dequeue, node completions, custom events (`runtime.record_event`), and stop requests for replay and compliance workflows. +- Telemetry spans cover node inputs/outputs, latency, model usage, and workload/run identifiers without altering LangGraph execution. ### Analytics Alignment - The span attribute set is designed to satisfy the MVP telemetry tables in `docs/design/Codon Telemetry Data Schema - MVP Version.txt`. You can aggregate by `nodespec_id` or `logic_id` to compute token totals, error rates, or latency buckets per node. @@ -181,9 +178,12 @@ The ledger records each iteration through the loop, and `runtime.state` tracks i --- ## Limitations & Roadmap -- Conditional edges: currently you emit along every registered edge; to mimic conditionals, have your node wrapper decide which edges receive tokens. Future versions aim to map LangGraph’s conditional constructs directly. -- Streaming tokens / concurrency: not yet supported; the adapter processes tokens sequentially (though you can extend it for concurrency). -- Persistence: the workload runtime is in-memory today. Roadmap includes pluggable stores for tokens/state/audit (see `docs/vision/codon-workload-design-philosophy.md`). +- Conditional edges: telemetry spans are emitted only for nodes actually executed; the graph snapshot span provides the full topology for downstream analysis. +- Streaming tokens: relies on LangGraph/LangChain streaming support; Codon captures model usage when providers expose usage metadata. +- Persistence: execution remains native to LangGraph; persistence is governed by your graph checkpointers and stores. +- Direct SDK calls: if a node bypasses LangChain runnables and calls provider SDKs directly, token usage callbacks are not emitted. +- Custom runnables: objects that do not implement `invoke/ainvoke` cannot be auto-wrapped for config injection. +- Async context boundaries: background tasks may lose ContextVar state, preventing automatic config propagation. --- diff --git a/instrumentation-packages/codon-instrumentation-langgraph/README.md b/instrumentation-packages/codon-instrumentation-langgraph/README.md index 0f259e4..25a4a0e 100644 --- a/instrumentation-packages/codon-instrumentation-langgraph/README.md +++ b/instrumentation-packages/codon-instrumentation-langgraph/README.md @@ -37,7 +37,7 @@ db_agent_graph.add_node("query_resolver_node", self.query_resolver_node) db_agent_graph.add_node("query_executor_node", self.query_executor_node) # ... add more nodes and edges -# Wrap with Codon adapter +# Wrap with Codon adapter (returns an instrumented graph) self._graph = LangGraphWorkloadAdapter.from_langgraph( db_agent_graph, name="LangGraphSQLAgentDemo", @@ -46,8 +46,46 @@ self._graph = LangGraphWorkloadAdapter.from_langgraph( tags=["langgraph", "demo", "sql"], compile_kwargs={"checkpointer": MemorySaver()} ) + +# Invoke the graph as usual +result = self._graph.invoke({"question": "Which locations pay data engineers the most?"}) + +# Access workload metadata if needed +workload = self._graph.workload +``` + +## Instrumenting Prebuilt Graphs (create_agent) + +LangChain v1's `create_agent` returns a compiled LangGraph graph, which means you can wrap it directly without rebuilding a `StateGraph`. (See the LangChain Studio docs: https://docs.langchain.com/oss/python/langchain/studio.) + +```python +from langchain.agents import create_agent +from codon.instrumentation.langgraph import LangGraphWorkloadAdapter + +agent_graph = create_agent( + model=model, + tools=tools, + system_prompt="You are a helpful assistant.", +) + +graph = LangGraphWorkloadAdapter.from_langgraph( + agent_graph, + name="PrebuiltAgent", + version="1.0.0", + node_overrides={ + # Optional: restore NodeSpec fidelity when wrapping compiled graphs + "planner": {"role": "planner", "callable": planner_fn}, + "agent": {"role": "agent", "model_name": "gpt-4o"}, + }, +) + +result = graph.invoke({"input": "Summarize the latest updates."}) ``` +Notes: +- Compiled graphs can obscure callable signatures and schemas, so `node_overrides` is the easiest way to restore full NodeSpec metadata. +- If you only have the compiled graph, you can still list available node names via `graph.nodes.keys()` and use those keys in `node_overrides`. + ### Automatic Node Inference The adapter automatically infers nodes from your StateGraph, eliminating the need to manually instrument each node with decorators. This provides comprehensive telemetry out of the box. @@ -61,6 +99,17 @@ You can pass any LangGraph compile arguments through `compile_kwargs`: - Memory configurations - Custom compilation options +## Graph Snapshot Span + +Each graph invocation emits a single graph snapshot span (one per run) that captures the full node/edge structure. This lets downstream analysis understand the full graph shape even when only a subset of nodes executed. + +## Edge Cases & Limitations + +- **Direct SDK calls:** If a node calls a provider SDK directly (not via a LangChain runnable), callbacks will not fire and token usage metadata will be missing. +- **Custom runnables:** Runnables that do not expose `invoke/ainvoke` cannot be auto-wrapped with config injection. +- **Async context boundaries:** Background tasks can drop ContextVar state, which may prevent config propagation into LLM calls. +- **Provider usage metadata:** Some providers only return token usage when explicitly enabled (especially in streaming). + ## Best Practices 1. **Use the adapter**: `LangGraphWorkloadAdapter.from_langgraph()` provides comprehensive instrumentation with just a few lines of code diff --git a/instrumentation-packages/codon-instrumentation-langgraph/codon/instrumentation/langgraph/__init__.py b/instrumentation-packages/codon-instrumentation-langgraph/codon/instrumentation/langgraph/__init__.py index b9eefee..1437aa2 100644 --- a/instrumentation-packages/codon-instrumentation-langgraph/codon/instrumentation/langgraph/__init__.py +++ b/instrumentation-packages/codon-instrumentation-langgraph/codon/instrumentation/langgraph/__init__.py @@ -34,7 +34,7 @@ _RESOLVED_ORG_ID, _RESOLVED_ORG_NAMESPACE, ) -from codon_sdk.agents import Workload +from .context import GraphInvocationContext, current_graph_context from codon_sdk.instrumentation.schemas.telemetry.spans import CodonBaseSpanAttributes from codon_sdk.instrumentation.telemetry import NodeTelemetryPayload from codon_sdk.instrumentation import initialize_telemetry @@ -47,7 +47,9 @@ "LangGraphAdapterResult", "NodeOverride", "current_invocation", + "current_graph_context", "LangGraphTelemetryCallback", + "LangGraphNodeSpanCallback", ] ORG_NAMESPACE: str = os.getenv("ORG_NAMESPACE") @@ -61,6 +63,8 @@ ) + + class LangGraphWorkloadMixin(ABC): """Mixin contract for workloads built from LangGraph graphs. @@ -78,8 +82,8 @@ def from_langgraph( version: str, description: Optional[str] = None, tags: Optional[Sequence[str]] = None, - ) -> Workload: - """Translate a LangGraph graph into a concrete Codon workload.""" + ) -> Any: + """Wrap a LangGraph graph and return an instrumented graph.""" def current_invocation() -> Optional[NodeTelemetryPayload]: @@ -88,6 +92,7 @@ def current_invocation() -> Optional[NodeTelemetryPayload]: return _ACTIVE_INVOCATION.get() + def _is_truthy(value: Optional[str]) -> bool: return str(value or "").strip().lower() in {"1", "true", "yes", "y", "on"} @@ -520,6 +525,9 @@ def wrapper(*args, **kwargs): LangGraphWorkloadAdapter, NodeOverride, ) -from .callbacks import LangGraphTelemetryCallback # noqa: E402 # isort: skip +from .callbacks import ( # noqa: E402 # isort: skip + LangGraphNodeSpanCallback, + LangGraphTelemetryCallback, +) _maybe_warn_deprecated_langgraph() diff --git a/instrumentation-packages/codon-instrumentation-langgraph/codon/instrumentation/langgraph/adapter.py b/instrumentation-packages/codon-instrumentation-langgraph/codon/instrumentation/langgraph/adapter.py index 031baca..b5fc1dd 100644 --- a/instrumentation-packages/codon-instrumentation-langgraph/codon/instrumentation/langgraph/adapter.py +++ b/instrumentation-packages/codon-instrumentation-langgraph/codon/instrumentation/langgraph/adapter.py @@ -15,16 +15,43 @@ """LangGraph integration helpers for Codon Workloads.""" from __future__ import annotations +import hashlib +import logging import inspect import json +import os +import uuid from collections import defaultdict from dataclasses import dataclass from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Sequence, Tuple, Union from codon_sdk.agents import CodonWorkload +from codon_sdk.instrumentation.schemas.telemetry.spans import ( + CodonBaseSpanAttributes, + CodonGraphSpanAttributes, + CodonSpanNames, +) +from codon_sdk.instrumentation.schemas.nodespec import ( + _RESOLVED_ORG_ID, + _RESOLVED_ORG_NAMESPACE, +) +from opentelemetry import trace + +from .context import ( + GraphInvocationContext, + _ACTIVE_CONFIG, + _ACTIVE_GRAPH_CONTEXT, + current_langgraph_config, +) from codon_sdk.agents.codon_workload import WorkloadRuntimeError -from .callbacks import LangGraphTelemetryCallback +from .callbacks import ( + BoundInvocationTelemetryCallback, + LangGraphNodeSpanCallback, + LangGraphTelemetryCallback, + _lookup_invocation_metadata, +) +from . import current_invocation try: # pragma: no cover - we do not require langgraph at install time from langgraph.graph import StateGraph # type: ignore @@ -36,6 +63,110 @@ RawEdgeIterable = Iterable[Tuple[str, str]] +def _ensure_callback_list(value: Any) -> List[Any]: + if value is None: + return [] + handlers = getattr(value, "handlers", None) + if handlers is not None: + return list(handlers) + if isinstance(value, (list, tuple)): + return list(value) + return [value] + + +def _wrap_runnable(value: Any) -> Any: + if isinstance(value, _RunnableConfigWrapper): + return value + if not (hasattr(value, "invoke") or hasattr(value, "ainvoke")): + return value + return _RunnableConfigWrapper(value) + + +def _extract_invocation_from_config(config: Mapping[str, Any]) -> Optional[Any]: + metadata = config.get("metadata") + if isinstance(metadata, Mapping): + invocation = metadata.get("codon_invocation") + if invocation is not None: + return invocation + invocation = _lookup_invocation_metadata(config) + if invocation is not None: + return invocation + return None + + +def _inject_bound_callback(config: Mapping[str, Any]) -> Mapping[str, Any]: + if not isinstance(config, Mapping): + return config + invocation = _extract_invocation_from_config(config) or current_invocation() + if os.getenv("CODON_LANGGRAPH_DEBUG_USAGE") == "1": + logger = logging.getLogger(__name__) + logger.info( + "codon.langgraph usage debug: inject_bound_callback invocation_present=%s metadata_keys=%s", + bool(invocation), + sorted(config.get("metadata", {}).keys()) + if isinstance(config.get("metadata"), Mapping) + else None, + ) + if not invocation: + return config + callbacks = _ensure_callback_list(config.get("callbacks")) + callbacks = [ + cb for cb in callbacks if not isinstance(cb, BoundInvocationTelemetryCallback) + ] + invocation.extra_attributes.setdefault("codon_span_defer", True) + updated = dict(config) + updated["callbacks"] = callbacks + [BoundInvocationTelemetryCallback(invocation)] + return updated + + +def _ensure_codon_run_metadata(config: Mapping[str, Any], run_id: str) -> None: + if not isinstance(config, dict): + return + metadata = config.get("metadata") + if not isinstance(metadata, dict): + metadata = dict(metadata) if isinstance(metadata, Mapping) else {} + config["metadata"] = metadata + metadata.setdefault("codon_run_id", run_id) + + +def _normalize_org_id(value: Optional[str]) -> Optional[str]: + if not value: + return None + return value if value.startswith("ORG_") else None + + +def _resource_org_id() -> Optional[str]: + provider = trace.get_tracer_provider() + resource = getattr(provider, "resource", None) + if not resource: + return None + attributes = getattr(resource, "attributes", None) + if not isinstance(attributes, Mapping): + return None + candidate = attributes.get(CodonBaseSpanAttributes.OrganizationId.value) or attributes.get( + "codon.organization.id" + ) + if isinstance(candidate, str) and candidate.startswith("ORG_"): + return candidate + return None + + +def _wrap_config_values(value: Any) -> Any: + if isinstance(value, Mapping): + wrapped: Dict[str, Any] = {} + for key, item in value.items(): + if key == "callbacks": + wrapped[key] = item + else: + wrapped[key] = _wrap_config_values(item) + return wrapped + if isinstance(value, list): + return [_wrap_config_values(item) for item in value] + if isinstance(value, tuple): + return tuple(_wrap_config_values(item) for item in value) + return _wrap_runnable(value) + + def _merge_runtime_configs( base: Optional[Mapping[str, Any]], override: Optional[Mapping[str, Any]], @@ -48,16 +179,347 @@ def _merge_runtime_configs( continue for key, value in cfg.items(): if key == "callbacks": - if isinstance(value, (list, tuple)): - callbacks.extend(value) - else: - callbacks.append(value) + callbacks.extend(_ensure_callback_list(value)) else: merged[key] = value - callbacks.append(LangGraphTelemetryCallback()) + if not any(isinstance(cb, LangGraphNodeSpanCallback) for cb in callbacks): + callbacks.append(LangGraphNodeSpanCallback()) + if not any(isinstance(cb, LangGraphTelemetryCallback) for cb in callbacks): + callbacks.append(LangGraphTelemetryCallback()) merged["callbacks"] = callbacks - return merged + return _wrap_config_values(merged) + + +class _RunnableConfigWrapper: + def __init__(self, runnable: Any) -> None: + self._runnable = runnable + + def __getattr__(self, name: str) -> Any: + return getattr(self._runnable, name) + + def invoke(self, *args: Any, **kwargs: Any) -> Any: + if kwargs.get("config") is None: + config = current_langgraph_config() + if config is not None: + kwargs = dict(kwargs) + kwargs["config"] = _inject_bound_callback(config) + try: + if kwargs.get("config") is not None: + kwargs = dict(kwargs) + kwargs["config"] = _inject_bound_callback(kwargs["config"]) + return self._runnable.invoke(*args, **kwargs) + except TypeError: + kwargs.pop("config", None) + return self._runnable.invoke(*args, **kwargs) + + async def ainvoke(self, *args: Any, **kwargs: Any) -> Any: + if kwargs.get("config") is None: + config = current_langgraph_config() + if config is not None: + kwargs = dict(kwargs) + kwargs["config"] = _inject_bound_callback(config) + try: + if kwargs.get("config") is not None: + kwargs = dict(kwargs) + kwargs["config"] = _inject_bound_callback(kwargs["config"]) + return await self._runnable.ainvoke(*args, **kwargs) + except TypeError: + kwargs.pop("config", None) + return await self._runnable.ainvoke(*args, **kwargs) + + +def _resolve_deployment_id(config: Optional[Mapping[str, Any]]) -> Optional[str]: + if not config: + return None + for key in ("deployment_id", "codon_deployment_id"): + value = config.get(key) + if value: + return str(value) + configurable = config.get("configurable") + if isinstance(configurable, Mapping): + value = configurable.get("deployment_id") or configurable.get("codon_deployment_id") + if value: + return str(value) + metadata = config.get("metadata") + if isinstance(metadata, Mapping): + value = metadata.get("deployment_id") or metadata.get("codon_deployment_id") + if value: + return str(value) + return None + + +def _build_graph_definition( + node_specs: Mapping[str, Any], edges: Sequence[Tuple[str, str]] +) -> Dict[str, Any]: + nodes = [] + for name, spec in node_specs.items(): + nodes.append( + { + "name": name, + "role": getattr(spec, "role", None), + "nodespec_id": getattr(spec, "id", None), + } + ) + return { + "nodes": nodes, + "edges": [{"source": src, "target": dst} for src, dst in edges], + } + + +def _hash_graph_definition(definition: Mapping[str, Any]) -> str: + payload = json.dumps(definition, sort_keys=True, default=str).encode("utf-8") + return hashlib.sha256(payload).hexdigest() + + +class _WrappedLangGraph: + def __init__( + self, + graph: Any, + *, + workload: CodonWorkload, + node_specs: Mapping[str, Any], + graph_definition: Optional[Dict[str, Any]], + state_graph: Optional[Any] = None, + compiled_graph: Optional[Any] = None, + compile_kwargs: Optional[Mapping[str, Any]] = None, + runtime_config: Optional[Mapping[str, Any]] = None, + ) -> None: + self._graph = graph + self.workload = workload + self.node_specs = dict(node_specs) + self.graph_definition = graph_definition + self.runtime_config = dict(runtime_config or {}) + self.langgraph_state_graph = state_graph + self.langgraph_compiled_graph = compiled_graph or graph + self.langgraph_compile_kwargs = dict(compile_kwargs or {}) + self.langgraph_runtime_config = dict(runtime_config or {}) + + def __getattr__(self, item: str) -> Any: + return getattr(self._graph, item) + + def _emit_graph_span(self, run_context: GraphInvocationContext) -> None: + tracer = trace.get_tracer(__name__) + with tracer.start_as_current_span(CodonSpanNames.AgentGraph.value) as span: + span.set_attribute( + CodonBaseSpanAttributes.AgentFramework.value, + "langgraph", + ) + span.set_attribute( + CodonBaseSpanAttributes.WorkloadId.value, + run_context.workload.agent_class_id, + ) + span.set_attribute( + CodonBaseSpanAttributes.WorkloadLogicId.value, + run_context.workload.logic_id, + ) + span.set_attribute( + CodonBaseSpanAttributes.WorkloadRunId.value, + run_context.run_id, + ) + span.set_attribute( + CodonBaseSpanAttributes.WorkloadName.value, + run_context.workload.metadata.name, + ) + span.set_attribute( + CodonBaseSpanAttributes.WorkloadVersion.value, + run_context.workload.metadata.version, + ) + if run_context.deployment_id: + span.set_attribute( + CodonBaseSpanAttributes.DeploymentId.value, + run_context.deployment_id, + ) + org_id = run_context.organization_id or _resource_org_id() + if org_id: + span.set_attribute( + CodonBaseSpanAttributes.OrganizationId.value, + org_id, + ) + if run_context.org_namespace: + span.set_attribute( + CodonBaseSpanAttributes.OrgNamespace.value, + run_context.org_namespace, + ) + + if self.graph_definition: + definition_hash = _hash_graph_definition(self.graph_definition) + span.set_attribute( + CodonGraphSpanAttributes.DefinitionHash.value, + definition_hash, + ) + span.set_attribute( + CodonGraphSpanAttributes.NodeCount.value, + len(self.graph_definition.get("nodes", [])), + ) + span.set_attribute( + CodonGraphSpanAttributes.EdgeCount.value, + len(self.graph_definition.get("edges", [])), + ) + span.set_attribute( + CodonGraphSpanAttributes.DefinitionJson.value, + json.dumps(self.graph_definition, default=str), + ) + + def _invoke(self, *args: Any, config: Optional[Mapping[str, Any]] = None, **kwargs: Any): + merged_config = _merge_runtime_configs(self.runtime_config, config) + org_id = _normalize_org_id(_RESOLVED_ORG_ID) or _normalize_org_id( + self.workload.organization_id + ) + if org_id is None: + org_id = _resource_org_id() + org_namespace = ( + _RESOLVED_ORG_NAMESPACE + or os.getenv("ORG_NAMESPACE") + or self.workload.organization_id + ) + run_context = GraphInvocationContext( + workload=self.workload, + node_specs=self.node_specs, + run_id=str(uuid.uuid4()), + deployment_id=_resolve_deployment_id(merged_config), + organization_id=org_id, + org_namespace=org_namespace, + graph_definition=self.graph_definition, + ) + _ensure_codon_run_metadata(merged_config, run_context.run_id) + graph_token = _ACTIVE_GRAPH_CONTEXT.set(run_context) + config_token = _ACTIVE_CONFIG.set(merged_config) + try: + self._emit_graph_span(run_context) + try: + return self._graph.invoke(*args, config=merged_config, **kwargs) + except TypeError: + return self._graph.invoke(*args, **kwargs) + finally: + _ACTIVE_GRAPH_CONTEXT.reset(graph_token) + _ACTIVE_CONFIG.reset(config_token) + + async def _ainvoke( + self, *args: Any, config: Optional[Mapping[str, Any]] = None, **kwargs: Any + ): + merged_config = _merge_runtime_configs(self.runtime_config, config) + org_id = _normalize_org_id(_RESOLVED_ORG_ID) or _normalize_org_id( + self.workload.organization_id + ) + if org_id is None: + org_id = _resource_org_id() + org_namespace = ( + _RESOLVED_ORG_NAMESPACE + or os.getenv("ORG_NAMESPACE") + or self.workload.organization_id + ) + run_context = GraphInvocationContext( + workload=self.workload, + node_specs=self.node_specs, + run_id=str(uuid.uuid4()), + deployment_id=_resolve_deployment_id(merged_config), + organization_id=org_id, + org_namespace=org_namespace, + graph_definition=self.graph_definition, + ) + _ensure_codon_run_metadata(merged_config, run_context.run_id) + graph_token = _ACTIVE_GRAPH_CONTEXT.set(run_context) + config_token = _ACTIVE_CONFIG.set(merged_config) + try: + self._emit_graph_span(run_context) + try: + return await self._graph.ainvoke(*args, config=merged_config, **kwargs) + except TypeError: + return await self._graph.ainvoke(*args, **kwargs) + finally: + _ACTIVE_GRAPH_CONTEXT.reset(graph_token) + _ACTIVE_CONFIG.reset(config_token) + + def invoke(self, *args: Any, **kwargs: Any): + return self._invoke(*args, **kwargs) + + async def ainvoke(self, *args: Any, **kwargs: Any): + return await self._ainvoke(*args, **kwargs) + + def stream(self, *args: Any, **kwargs: Any): + config = kwargs.pop("config", None) + merged_config = _merge_runtime_configs(self.runtime_config, config) + org_id = _normalize_org_id(_RESOLVED_ORG_ID) or _normalize_org_id( + self.workload.organization_id + ) + if org_id is None: + org_id = _resource_org_id() + org_namespace = ( + _RESOLVED_ORG_NAMESPACE + or os.getenv("ORG_NAMESPACE") + or self.workload.organization_id + ) + run_context = GraphInvocationContext( + workload=self.workload, + node_specs=self.node_specs, + run_id=str(uuid.uuid4()), + deployment_id=_resolve_deployment_id(merged_config), + organization_id=org_id, + org_namespace=org_namespace, + graph_definition=self.graph_definition, + ) + _ensure_codon_run_metadata(merged_config, run_context.run_id) + graph_token = _ACTIVE_GRAPH_CONTEXT.set(run_context) + config_token = _ACTIVE_CONFIG.set(merged_config) + self._emit_graph_span(run_context) + + def _iterator(): + try: + try: + iterator = self._graph.stream(*args, config=merged_config, **kwargs) + except TypeError: + iterator = self._graph.stream(*args, **kwargs) + for item in iterator: + yield item + finally: + _ACTIVE_GRAPH_CONTEXT.reset(graph_token) + _ACTIVE_CONFIG.reset(config_token) + + return _iterator() + + async def astream(self, *args: Any, **kwargs: Any): + config = kwargs.pop("config", None) + merged_config = _merge_runtime_configs(self.runtime_config, config) + org_id = _normalize_org_id(_RESOLVED_ORG_ID) or _normalize_org_id( + self.workload.organization_id + ) + if org_id is None: + org_id = _resource_org_id() + org_namespace = ( + _RESOLVED_ORG_NAMESPACE + or os.getenv("ORG_NAMESPACE") + or self.workload.organization_id + ) + run_context = GraphInvocationContext( + workload=self.workload, + node_specs=self.node_specs, + run_id=str(uuid.uuid4()), + deployment_id=_resolve_deployment_id(merged_config), + organization_id=org_id, + org_namespace=org_namespace, + graph_definition=self.graph_definition, + ) + _ensure_codon_run_metadata(merged_config, run_context.run_id) + graph_token = _ACTIVE_GRAPH_CONTEXT.set(run_context) + config_token = _ACTIVE_CONFIG.set(merged_config) + self._emit_graph_span(run_context) + + async def _aiterator(): + try: + try: + iterator = self._graph.astream(*args, config=merged_config, **kwargs) + except TypeError: + iterator = self._graph.astream(*args, **kwargs) + if inspect.isawaitable(iterator): + iterator = await iterator + async for item in iterator: + yield item + finally: + _ACTIVE_GRAPH_CONTEXT.reset(graph_token) + _ACTIVE_CONFIG.reset(config_token) + + return _aiterator() @dataclass(frozen=True) @@ -67,6 +529,7 @@ class LangGraphAdapterResult: workload: CodonWorkload state_graph: Any compiled_graph: Any + wrapped_graph: Any @dataclass(frozen=True) @@ -100,8 +563,8 @@ def from_langgraph( compile_kwargs: Optional[Mapping[str, Any]] = None, runtime_config: Optional[Mapping[str, Any]] = None, return_artifacts: bool = False, - ) -> Union[CodonWorkload, LangGraphAdapterResult]: - """Create a :class:`CodonWorkload` from a LangGraph ``StateGraph``. + ) -> Union[Any, LangGraphAdapterResult]: + """Wrap a LangGraph graph and return an instrumented graph. Parameters ---------- @@ -113,7 +576,8 @@ def from_langgraph( you can attach checkpointers, memory stores, or other runtime extras. return_artifacts When ``True`` return a :class:`LangGraphAdapterResult` containing the - workload, the original state graph, and the compiled graph. + workload, the original state graph, the compiled graph, and the + instrumented graph wrapper. """ compiled, raw_nodes, raw_edges = cls._normalise_graph( @@ -189,14 +653,28 @@ def from_langgraph( setattr(workload, "langgraph_compile_kwargs", dict(compile_kwargs or {})) setattr(workload, "langgraph_runtime_config", dict(runtime_config or {})) + node_specs = {spec.name: spec for spec in workload.nodes} + graph_definition = _build_graph_definition(node_specs, valid_edges) + wrapped_graph = _WrappedLangGraph( + compiled, + workload=workload, + node_specs=node_specs, + graph_definition=graph_definition, + state_graph=graph, + compiled_graph=compiled, + compile_kwargs=compile_kwargs, + runtime_config=runtime_config, + ) + if return_artifacts: return LangGraphAdapterResult( workload=workload, state_graph=graph, compiled_graph=compiled, + wrapped_graph=wrapped_graph, ) - return workload + return wrapped_graph # ------------------------------------------------------------------ # Internal helpers @@ -217,10 +695,12 @@ def _normalise_graph( nodes = raw_nodes or comp_nodes edges = raw_edges or comp_edges - if nodes is None or edges is None: + if nodes is None: raise ValueError( - "Unable to extract nodes/edges from LangGraph graph. Pass the original StateGraph or ensure the compiled graph exposes config.nodes/config.edges." + "Unable to extract nodes from LangGraph graph. Pass the original StateGraph or ensure the compiled graph exposes config.nodes/config.edges." ) + if edges is None: + edges = [] return compiled, nodes, edges diff --git a/instrumentation-packages/codon-instrumentation-langgraph/codon/instrumentation/langgraph/callbacks.py b/instrumentation-packages/codon-instrumentation-langgraph/codon/instrumentation/langgraph/callbacks.py index c8c8367..cd1ff73 100644 --- a/instrumentation-packages/codon-instrumentation-langgraph/codon/instrumentation/langgraph/callbacks.py +++ b/instrumentation-packages/codon-instrumentation-langgraph/codon/instrumentation/langgraph/callbacks.py @@ -16,10 +16,17 @@ from __future__ import annotations import dataclasses +import json +import time from typing import Any, Mapping, Optional +import logging +import os +import threading +from dataclasses import dataclass + try: # pragma: no cover - optional dependency - from langchain.callbacks.base import BaseCallbackHandler + from langchain_core.callbacks.base import BaseCallbackHandler except Exception: # pragma: no cover - fallback when LangChain is absent class BaseCallbackHandler: # type: ignore @@ -27,7 +34,15 @@ class BaseCallbackHandler: # type: ignore pass -from . import current_invocation +from opentelemetry import trace +from opentelemetry.trace import Status, StatusCode + +from codon_sdk.instrumentation.schemas.nodespec import NodeSpecSpanAttributes +from codon_sdk.instrumentation.schemas.telemetry.spans import CodonBaseSpanAttributes +from codon_sdk.instrumentation.telemetry import NodeTelemetryPayload + +from .context import current_graph_context, current_langgraph_config +from . import current_invocation, _ACTIVE_INVOCATION def _coerce_mapping(value: Any) -> Optional[Mapping[str, Any]]: @@ -108,59 +123,514 @@ def _normalise_usage(payload: Mapping[str, Any]) -> tuple[dict[str, Any], Option return usage, prompt_tokens, completion_tokens, total_tokens -class LangGraphTelemetryCallback(BaseCallbackHandler): - """Captures model metadata and token usage from LangChain callbacks.""" +def _safe_repr(value: Any, *, max_length: int = 2048) -> str: + try: + rendered = repr(value) + except Exception as exc: # pragma: no cover - defensive path + rendered = f"" + if len(rendered) > max_length: + return rendered[: max_length - 3] + "..." + return rendered + + +def _ensure_callback_list(callbacks: Any) -> list[Any]: + if callbacks is None: + return [] + handlers = getattr(callbacks, "handlers", None) + if handlers is not None: + return list(handlers) + if isinstance(callbacks, (list, tuple)): + return list(callbacks) + return [callbacks] + + +def _attach_bound_callback(config: Mapping[str, Any], invocation: "NodeTelemetryPayload") -> None: + if not isinstance(config, dict): + return + callbacks = _ensure_callback_list(config.get("callbacks")) + callbacks = [ + cb for cb in callbacks if not isinstance(cb, BoundInvocationTelemetryCallback) + ] + invocation.extra_attributes.setdefault("codon_span_defer", True) + callbacks.append(BoundInvocationTelemetryCallback(invocation)) + config["callbacks"] = callbacks + + +def _extract_node_name(serialized: Mapping[str, Any], kwargs: Mapping[str, Any]) -> Optional[str]: + for key in ("name", "run_name"): + value = kwargs.get(key) + if isinstance(value, str): + return value + name = serialized.get("name") + if isinstance(name, str): + return name + identifier = serialized.get("id") + if isinstance(identifier, (list, tuple)) and identifier: + candidate = identifier[-1] + if isinstance(candidate, str): + return candidate + return None - def on_llm_start(self, serialized: Mapping[str, Any], prompts: list[str], **kwargs: Any) -> None: - invocation = current_invocation() - if not invocation: + +class _ActiveSpan: + def __init__( + self, + span, + telemetry, + started_at, + run_id: str, + ): + self.span = span + self.telemetry = telemetry + self.started_at = started_at + self.run_id = run_id + + +_ACTIVE_BY_TELEMETRY: dict[int, _ActiveSpan] = {} + + +def _resource_org_id() -> Optional[str]: + provider = trace.get_tracer_provider() + resource = getattr(provider, "resource", None) + if not resource: + return None + attributes = getattr(resource, "attributes", None) + if not isinstance(attributes, Mapping): + return None + candidate = attributes.get(CodonBaseSpanAttributes.OrganizationId.value) or attributes.get( + "codon.organization.id" + ) + if isinstance(candidate, str) and candidate.startswith("ORG_"): + return candidate + return None + + +def _ensure_org_id(telemetry: NodeTelemetryPayload) -> Optional[str]: + candidate = telemetry.organization_id + if isinstance(candidate, str) and candidate.startswith("ORG_"): + return candidate + fallback = _resource_org_id() + if fallback: + telemetry.organization_id = fallback + return fallback + return None + + +def _infer_tokens_from_usage(usage: Mapping[str, Any]) -> tuple[Optional[int], Optional[int], Optional[int]]: + prompt = usage.get("prompt_tokens") + if prompt is None: + prompt = usage.get("input_tokens") + completion = usage.get("completion_tokens") + if completion is None: + completion = usage.get("output_tokens") + total = usage.get("total_tokens") + if total is None and prompt is not None and completion is not None: + total = prompt + completion + return prompt, completion, total + + +def _finalize_node_span(active: _ActiveSpan) -> None: + telemetry = active.telemetry + span = active.span + telemetry.extra_attributes["codon_span_finalized"] = True + + _ensure_org_id(telemetry) + + if telemetry.node_output: + span.set_attribute( + CodonBaseSpanAttributes.NodeOutput.value, + telemetry.node_output, + ) + if telemetry.duration_ms is not None: + span.set_attribute( + CodonBaseSpanAttributes.NodeLatencyMs.value, + telemetry.duration_ms, + ) + span.set_attribute( + CodonBaseSpanAttributes.NodeStatusCode.value, + telemetry.status_code, + ) + if telemetry.error_message: + span.set_attribute( + CodonBaseSpanAttributes.NodeErrorMessage.value, + telemetry.error_message, + ) + + if telemetry.model_vendor: + span.set_attribute( + CodonBaseSpanAttributes.ModelVendor.value, + telemetry.model_vendor, + ) + if telemetry.model_identifier: + span.set_attribute( + CodonBaseSpanAttributes.ModelIdentifier.value, + telemetry.model_identifier, + ) + if telemetry.token_usage and ( + telemetry.input_tokens is None + or telemetry.output_tokens is None + or telemetry.total_tokens is None + ): + prompt, completion, total = _infer_tokens_from_usage(telemetry.token_usage) + if telemetry.input_tokens is None: + telemetry.input_tokens = prompt + if telemetry.output_tokens is None: + telemetry.output_tokens = completion + if telemetry.total_tokens is None: + telemetry.total_tokens = total + if telemetry.input_tokens is not None: + span.set_attribute( + CodonBaseSpanAttributes.TokenInput.value, + telemetry.input_tokens, + ) + if telemetry.output_tokens is not None: + span.set_attribute( + CodonBaseSpanAttributes.TokenOutput.value, + telemetry.output_tokens, + ) + if telemetry.total_tokens is not None: + span.set_attribute( + CodonBaseSpanAttributes.TokenTotal.value, + telemetry.total_tokens, + ) + if telemetry.token_usage: + span.set_attribute( + CodonBaseSpanAttributes.TokenUsageJson.value, + json.dumps(telemetry.token_usage, default=str), + ) + if telemetry.network_calls: + span.set_attribute( + CodonBaseSpanAttributes.NetworkCallsJson.value, + json.dumps(telemetry.network_calls, default=str), + ) + if telemetry.organization_id: + span.set_attribute( + CodonBaseSpanAttributes.OrganizationId.value, + telemetry.organization_id, + ) + raw_json = telemetry.to_raw_attributes_json() + if raw_json: + span.set_attribute( + CodonBaseSpanAttributes.NodeRawAttributes.value, + raw_json, + ) + + if telemetry.status_code != "OK": + span.set_status(Status(StatusCode.ERROR, telemetry.error_message or "error")) + + span.end() + _ACTIVE_INVOCATION.set(None) + + +def _resolve_config(kwargs: Mapping[str, Any]) -> Optional[Mapping[str, Any]]: + config = kwargs.get("config") + if isinstance(config, Mapping): + return config + config = current_langgraph_config() + if isinstance(config, Mapping): + return config + return None + + +def _resolve_metadata(kwargs: Mapping[str, Any]) -> Optional[Mapping[str, Any]]: + metadata = kwargs.get("metadata") + if isinstance(metadata, Mapping): + return metadata + return None + + +def _resolve_invocation(kwargs: Mapping[str, Any]) -> Optional["NodeTelemetryPayload"]: + invocation = current_invocation() + if invocation is not None: + return invocation + config = _resolve_config(kwargs) + if not isinstance(config, Mapping): + return None + metadata = config.get("metadata") + if isinstance(metadata, Mapping): + candidate = metadata.get("codon_invocation") + if isinstance(candidate, NodeTelemetryPayload): + return candidate + return _lookup_invocation_metadata(config) + + +def _attach_invocation_to_config(config: Mapping[str, Any], invocation: NodeTelemetryPayload) -> bool: + if not isinstance(config, dict): + return False + metadata = config.get("metadata") + if not isinstance(metadata, dict): + metadata = dict(metadata) if isinstance(metadata, Mapping) else {} + config["metadata"] = metadata + metadata["codon_invocation"] = invocation + return True + + +class LangGraphNodeSpanCallback(BaseCallbackHandler): + """Emit node spans from LangChain callback events.""" + + run_inline = False + + def __init__(self) -> None: + self._active: dict[str, _ActiveSpan] = {} + self._logger = logging.getLogger(__name__) + self._debug_usage_enabled = os.getenv("CODON_LANGGRAPH_DEBUG_USAGE") == "1" + + def on_chain_start(self, serialized: Mapping[str, Any], inputs: Mapping[str, Any], **kwargs: Any) -> None: + graph_context = current_graph_context() + if not graph_context: + return + node_name = _extract_node_name(serialized, kwargs) + if not node_name: + return + nodespec = graph_context.node_specs.get(node_name) + if nodespec is None: return - params = _coerce_mapping(kwargs.get("invocation_params")) or _coerce_mapping( - serialized.get("kwargs") if isinstance(serialized, Mapping) else None + run_id = kwargs.get("run_id") + if run_id is None: + run_id = f"node-{id(nodespec)}-{time.time_ns()}" + run_id = str(run_id) + + telemetry = current_invocation() or NodeTelemetryPayload() + telemetry.node_name = telemetry.node_name or nodespec.name + telemetry.node_role = telemetry.node_role or nodespec.role + telemetry.nodespec_id = telemetry.nodespec_id or nodespec.id + telemetry.node_input = telemetry.node_input or _safe_repr(inputs) + + telemetry.workload_id = telemetry.workload_id or graph_context.workload.agent_class_id + telemetry.workload_logic_id = telemetry.workload_logic_id or graph_context.workload.logic_id + telemetry.workload_run_id = telemetry.workload_run_id or graph_context.run_id + telemetry.workload_name = telemetry.workload_name or graph_context.workload.metadata.name + telemetry.workload_version = ( + telemetry.workload_version or graph_context.workload.metadata.version ) + telemetry.deployment_id = telemetry.deployment_id or graph_context.deployment_id + if graph_context.organization_id and graph_context.organization_id.startswith("ORG_"): + if not isinstance(telemetry.organization_id, str) or not telemetry.organization_id.startswith("ORG_"): + telemetry.organization_id = graph_context.organization_id + if not telemetry.organization_id or not telemetry.organization_id.startswith("ORG_"): + _ensure_org_id(telemetry) + telemetry.org_namespace = telemetry.org_namespace or graph_context.org_namespace + + tracer = trace.get_tracer(__name__) + span = tracer.start_span(nodespec.name) + _ACTIVE_INVOCATION.set(telemetry) + config = _resolve_config(kwargs) + if isinstance(config, dict): + metadata = config.get("metadata") + if not isinstance(metadata, dict): + metadata = dict(metadata) if isinstance(metadata, Mapping) else {} + config["metadata"] = metadata + metadata.setdefault("codon_run_id", graph_context.run_id) + registry_config = config + kwargs_metadata = _resolve_metadata(kwargs) + if ( + (registry_config is None or not isinstance(registry_config.get("metadata"), Mapping)) + and kwargs_metadata is not None + ): + registry_config = {"metadata": dict(kwargs_metadata)} + if self._debug_usage_enabled: + self._logger.info( + "codon.langgraph usage debug: using kwargs metadata for registry keys=%s", + sorted(kwargs_metadata.keys()), + ) + if config is not None: + attached = _attach_invocation_to_config(config, telemetry) + _attach_bound_callback(config, telemetry) + registered = _register_invocation_metadata(registry_config or config, nodespec.name, telemetry) + if self._debug_usage_enabled: + self._logger.info( + "codon.langgraph usage debug: attached codon_invocation=%s registered=%s", + attached, + registered, + ) + elif self._debug_usage_enabled: + self._logger.info( + "codon.langgraph usage debug: missing config for codon_invocation", + ) - identifier, vendor = _extract_model_info(params or {}) + span.set_attribute(NodeSpecSpanAttributes.ID.value, nodespec.id) + span.set_attribute(NodeSpecSpanAttributes.Version.value, nodespec.spec_version) + span.set_attribute(NodeSpecSpanAttributes.Name.value, nodespec.name) + span.set_attribute(NodeSpecSpanAttributes.Role.value, nodespec.role) + span.set_attribute( + NodeSpecSpanAttributes.CallableSignature.value, + nodespec.callable_signature, + ) + span.set_attribute(NodeSpecSpanAttributes.InputSchema.value, nodespec.input_schema) + if nodespec.output_schema is not None: + span.set_attribute( + NodeSpecSpanAttributes.OutputSchema.value, + nodespec.output_schema, + ) + if nodespec.model_name: + span.set_attribute(NodeSpecSpanAttributes.ModelName.value, nodespec.model_name) + if nodespec.model_version: + span.set_attribute( + NodeSpecSpanAttributes.ModelVersion.value, + nodespec.model_version, + ) - if isinstance(serialized, Mapping): - meta = _coerce_mapping(serialized.get("id")) - serial_identifier, serial_vendor = _extract_model_info(serialized) - identifier = identifier or serial_identifier - vendor = vendor or _first(serial_vendor, meta.get("provider") if meta else None, serialized.get("name")) + span.set_attribute(CodonBaseSpanAttributes.AgentFramework.value, "langgraph") + if telemetry.organization_id: + span.set_attribute( + CodonBaseSpanAttributes.OrganizationId.value, + telemetry.organization_id, + ) + if telemetry.org_namespace: + span.set_attribute( + CodonBaseSpanAttributes.OrgNamespace.value, + telemetry.org_namespace, + ) + span.set_attribute(CodonBaseSpanAttributes.WorkloadId.value, telemetry.workload_id) + span.set_attribute( + CodonBaseSpanAttributes.WorkloadLogicId.value, + telemetry.workload_logic_id, + ) + span.set_attribute(CodonBaseSpanAttributes.WorkloadRunId.value, telemetry.workload_run_id) + span.set_attribute(CodonBaseSpanAttributes.WorkloadName.value, telemetry.workload_name) + span.set_attribute( + CodonBaseSpanAttributes.WorkloadVersion.value, + telemetry.workload_version, + ) + if telemetry.deployment_id: + span.set_attribute( + CodonBaseSpanAttributes.DeploymentId.value, + telemetry.deployment_id, + ) + span.set_attribute( + CodonBaseSpanAttributes.NodeInput.value, + telemetry.node_input, + ) - invocation.set_model_info( - vendor=str(vendor) if vendor else None, - identifier=str(identifier) if identifier else None, + self._active[run_id] = _ActiveSpan( + span=span, + telemetry=telemetry, + started_at=time.perf_counter(), + run_id=run_id, ) + _ACTIVE_BY_TELEMETRY[id(telemetry)] = self._active[run_id] + + def on_chain_end(self, outputs: Mapping[str, Any], **kwargs: Any) -> None: + run_id = kwargs.get("run_id") + active = None + if run_id is not None: + active = self._active.pop(str(run_id), None) + if active is None and len(self._active) == 1: + active = self._active.pop(next(iter(self._active.keys())), None) + if not active: + return - def on_llm_end(self, response: Any, **kwargs: Any) -> None: - invocation = current_invocation() - if not invocation: + telemetry = active.telemetry + telemetry.node_output = telemetry.node_output or _safe_repr(outputs) + telemetry.duration_ms = int((time.perf_counter() - active.started_at) * 1000) + if self._debug_usage_enabled: + self._logger.info( + "codon.langgraph usage debug: on_chain_end tokens input=%s output=%s total=%s usage_present=%s", + telemetry.input_tokens, + telemetry.output_tokens, + telemetry.total_tokens, + telemetry.token_usage is not None, + ) + if telemetry.extra_attributes.get("codon_span_defer") and not telemetry.extra_attributes.get( + "codon_span_finalized" + ): + if ( + telemetry.input_tokens is None + and telemetry.output_tokens is None + and telemetry.total_tokens is None + ): + telemetry.extra_attributes["codon_span_deferred"] = True + if self._debug_usage_enabled: + self._logger.info("codon.langgraph usage debug: deferring span finalization") + return + + _ACTIVE_BY_TELEMETRY.pop(id(telemetry), None) + _finalize_node_span(active) + config = _resolve_config(kwargs) + if config is not None: + _unregister_invocation_metadata(config, telemetry.node_name or "") + + def on_chain_error(self, error: BaseException, **kwargs: Any) -> None: + run_id = kwargs.get("run_id") + active = None + if run_id is not None: + active = self._active.pop(str(run_id), None) + if active is None and len(self._active) == 1: + active = self._active.pop(next(iter(self._active.keys())), None) + if not active: return + telemetry = active.telemetry + telemetry.status_code = "ERROR" + telemetry.error_message = repr(error) + active.span.record_exception(error) + active.span.set_status(Status(StatusCode.ERROR, str(error))) + + telemetry.duration_ms = int((time.perf_counter() - active.started_at) * 1000) + active.span.set_attribute( + CodonBaseSpanAttributes.NodeLatencyMs.value, + telemetry.duration_ms, + ) + active.span.set_attribute( + CodonBaseSpanAttributes.NodeStatusCode.value, + telemetry.status_code, + ) + if telemetry.error_message: + active.span.set_attribute( + CodonBaseSpanAttributes.NodeErrorMessage.value, + telemetry.error_message, + ) - llm_output = _coerce_mapping(getattr(response, "llm_output", None)) - if llm_output: - self._capture_payload(invocation, llm_output) + raw_json = telemetry.to_raw_attributes_json() + if raw_json: + active.span.set_attribute( + CodonBaseSpanAttributes.NodeRawAttributes.value, + raw_json, + ) - response_metadata = _coerce_mapping(getattr(response, "response_metadata", None)) - if response_metadata: - self._capture_payload(invocation, response_metadata) + active.span.end() + _ACTIVE_INVOCATION.set(None) + _ACTIVE_BY_TELEMETRY.pop(id(telemetry), None) + config = _resolve_config(kwargs) + if config is not None: + _unregister_invocation_metadata(config, telemetry.node_name or "") - usage_metadata = _coerce_mapping(getattr(response, "usage_metadata", None)) - if usage_metadata: - self._capture_payload(invocation, usage_metadata) - generations = getattr(response, "generations", None) - if generations: - for generation_list in generations: - for generation in generation_list: - metadata = getattr(generation, "generation_info", None) - if isinstance(metadata, Mapping): - self._capture_payload(invocation, metadata) +class LangGraphTelemetryCallback(BaseCallbackHandler): + """Captures model metadata and token usage from LangChain callbacks.""" - message = getattr(generation, "message", None) - if message is not None: - self._capture_message(invocation, message) + run_inline = False + _logger = logging.getLogger(__name__) + + def __init__(self) -> None: + if self._debug_usage_enabled(): + self._logger.info("codon.langgraph usage debug: LangGraphTelemetryCallback initialized") + + @staticmethod + def _debug_usage_enabled() -> bool: + return os.getenv("CODON_LANGGRAPH_DEBUG_USAGE") == "1" + + def on_llm_start(self, serialized: Mapping[str, Any], prompts: list[str], **kwargs: Any) -> None: + invocation = _resolve_invocation(kwargs) + if self._debug_usage_enabled(): + self._logger.info( + "codon.langgraph usage debug: on_llm_start fired invocation_present=%s", + bool(invocation), + ) + if not invocation: + return + self._capture_llm_start(invocation, serialized, kwargs) + + def on_llm_end(self, response: Any, **kwargs: Any) -> None: + invocation = _resolve_invocation(kwargs) + if self._debug_usage_enabled(): + self._logger.info( + "codon.langgraph usage debug: on_llm_end fired invocation_present=%s", + bool(invocation), + ) + if not invocation: + return + self._capture_llm_end(invocation, response) def _capture_message(self, invocation, message: Any) -> None: for attr in ("usage_metadata", "response_metadata", "metadata"): @@ -174,6 +644,15 @@ def _capture_message(self, invocation, message: Any) -> None: data = additional.get(key) if isinstance(data, Mapping): self._capture_payload(invocation, data) + if self._debug_usage_enabled(): + usage = getattr(message, "usage_metadata", None) + response_meta = getattr(message, "response_metadata", None) + self._logger.info( + "codon.langgraph usage debug: message usage_metadata=%s response_metadata=%s additional_keys=%s", + isinstance(usage, Mapping), + isinstance(response_meta, Mapping), + sorted(additional.keys()) if isinstance(additional, Mapping) else None, + ) def _capture_payload( self, @@ -188,6 +667,15 @@ def _capture_payload( total_tokens=total_tokens, token_usage=usage, ) + if self._debug_usage_enabled(): + self._logger.info( + "codon.langgraph usage debug: payload_keys=%s usage_keys=%s prompt=%s completion=%s total=%s", + sorted(payload.keys()), + sorted(usage.keys()) if usage else None, + prompt_tokens, + completion_tokens, + total_tokens, + ) model_identifier, model_vendor = _extract_model_info(payload) invocation.set_model_info( @@ -201,6 +689,99 @@ def _capture_payload( if response_metadata: invocation.add_network_call(dict(response_metadata)) + def _capture_llm_start( + self, + invocation: NodeTelemetryPayload, + serialized: Mapping[str, Any], + kwargs: Mapping[str, Any], + ) -> None: + params = _coerce_mapping(kwargs.get("invocation_params")) or _coerce_mapping( + serialized.get("kwargs") if isinstance(serialized, Mapping) else None + ) + + identifier, vendor = _extract_model_info(params or {}) + + if isinstance(serialized, Mapping): + meta = _coerce_mapping(serialized.get("id")) + serial_identifier, serial_vendor = _extract_model_info(serialized) + identifier = identifier or serial_identifier + vendor = vendor or _first(serial_vendor, meta.get("provider") if meta else None, serialized.get("name")) + + invocation.set_model_info( + vendor=str(vendor) if vendor else None, + identifier=str(identifier) if identifier else None, + ) + + if self._debug_usage_enabled(): + self._logger.info( + "codon.langgraph usage debug: on_llm_start model=%s vendor=%s invocation_params_keys=%s", + identifier, + vendor, + sorted(params.keys()) if isinstance(params, Mapping) else None, + ) + + def _capture_llm_end(self, invocation: NodeTelemetryPayload, response: Any) -> None: + llm_output = _coerce_mapping(getattr(response, "llm_output", None)) + if llm_output: + self._capture_payload(invocation, llm_output) + if self._debug_usage_enabled(): + self._logger.info( + "codon.langgraph usage debug: on_llm_end has_llm_output=%s has_response_metadata=%s has_usage_metadata=%s", + bool(llm_output), + bool(getattr(response, "response_metadata", None)), + bool(getattr(response, "usage_metadata", None)), + ) + + response_metadata = _coerce_mapping(getattr(response, "response_metadata", None)) + if response_metadata: + self._capture_payload(invocation, response_metadata) + + usage_metadata = _coerce_mapping(getattr(response, "usage_metadata", None)) + if usage_metadata: + self._capture_payload(invocation, usage_metadata) + + generations = getattr(response, "generations", None) + if generations: + for generation_list in generations: + for generation in generation_list: + metadata = getattr(generation, "generation_info", None) + if isinstance(metadata, Mapping): + self._capture_payload(invocation, metadata) + + message = getattr(generation, "message", None) + if message is not None: + self._capture_message(invocation, message) + if self._debug_usage_enabled() and generations: + self._logger.info( + "codon.langgraph usage debug: on_llm_end generations=%s", + sum(len(g) for g in generations if g is not None), + ) + + +class BoundInvocationTelemetryCallback(LangGraphTelemetryCallback): + """Telemetry callback bound to a specific node invocation.""" + + def __init__(self, invocation: NodeTelemetryPayload) -> None: + self._invocation = invocation + super().__init__() + + def on_llm_start(self, serialized: Mapping[str, Any], prompts: list[str], **kwargs: Any) -> None: + if self._debug_usage_enabled(): + self._logger.info("codon.langgraph usage debug: bound on_llm_start fired") + self._capture_llm_start(self._invocation, serialized, kwargs) + + def on_llm_end(self, response: Any, **kwargs: Any) -> None: + if self._debug_usage_enabled(): + self._logger.info("codon.langgraph usage debug: bound on_llm_end fired") + self._capture_llm_end(self._invocation, response) + if self._invocation.extra_attributes.get("codon_span_finalized"): + return + if self._invocation.extra_attributes.get("codon_span_deferred"): + self._invocation.extra_attributes["codon_span_finalized"] = True + active = _ACTIVE_BY_TELEMETRY.pop(id(self._invocation), None) + if active is not None: + _finalize_node_span(active) + def _extract_model_info(payload: Mapping[str, Any]) -> tuple[Optional[Any], Optional[Any]]: identifiers = ( @@ -229,4 +810,80 @@ def _extract_model_info(payload: Mapping[str, Any]) -> tuple[Optional[Any], Opti return identifier, vendor -__all__ = ["LangGraphTelemetryCallback"] +__all__ = ["LangGraphTelemetryCallback", "BoundInvocationTelemetryCallback"] +@dataclass(frozen=True) +class _MetadataKey: + thread_id: str + node_name: str + + +_METADATA_REGISTRY: dict[_MetadataKey, NodeTelemetryPayload] = {} +_METADATA_REGISTRY_LOCK = threading.Lock() + + +def _metadata_key_from_config(config: Mapping[str, Any], node_name: str) -> Optional[_MetadataKey]: + metadata = config.get("metadata") + if not isinstance(metadata, Mapping): + return None + codon_run_id = metadata.get("codon_run_id") + if codon_run_id and node_name: + return _MetadataKey(f"run:{codon_run_id}", str(node_name)) + thread_id = metadata.get("thread_id") + langgraph_node = metadata.get("langgraph_node") or metadata.get("langgraph_node_name") + node = langgraph_node or node_name + scope_id = str(thread_id) if thread_id else None + if not scope_id: + checkpoint_ns = metadata.get("langgraph_checkpoint_ns") + path = metadata.get("langgraph_path") + step = metadata.get("langgraph_step") + if checkpoint_ns is not None or path is not None or step is not None: + scope_id = f"fallback:{checkpoint_ns}:{path}:{step}" + if not scope_id or not node: + if os.getenv("CODON_LANGGRAPH_DEBUG_USAGE") == "1": + logger = logging.getLogger(__name__) + logger.info( + "codon.langgraph usage debug: metadata key missing scope_id=%s node=%s thread_id=%s checkpoint_ns=%s path=%s step=%s", + scope_id, + node, + thread_id, + metadata.get("langgraph_checkpoint_ns"), + metadata.get("langgraph_path"), + metadata.get("langgraph_step"), + ) + return None + return _MetadataKey(str(scope_id), str(node)) + + +def _register_invocation_metadata(config: Mapping[str, Any], node_name: str, telemetry: NodeTelemetryPayload) -> bool: + key = _metadata_key_from_config(config, node_name) + if not key: + return False + with _METADATA_REGISTRY_LOCK: + _METADATA_REGISTRY[key] = telemetry + return True + + +def _unregister_invocation_metadata(config: Mapping[str, Any], node_name: str) -> None: + key = _metadata_key_from_config(config, node_name) + if not key: + return + with _METADATA_REGISTRY_LOCK: + _METADATA_REGISTRY.pop(key, None) + + +def _lookup_invocation_metadata(config: Mapping[str, Any]) -> Optional[NodeTelemetryPayload]: + metadata = config.get("metadata") + if not isinstance(metadata, Mapping): + return None + codon_run_id = metadata.get("codon_run_id") + node = metadata.get("langgraph_node") or metadata.get("langgraph_node_name") + if codon_run_id and node: + key = _MetadataKey(f"run:{codon_run_id}", str(node)) + with _METADATA_REGISTRY_LOCK: + return _METADATA_REGISTRY.get(key) + thread_id = metadata.get("thread_id") + if not thread_id or not node: + return None + key = _MetadataKey(str(thread_id), str(node)) + with _METADATA_REGISTRY_LOCK: + return _METADATA_REGISTRY.get(key) diff --git a/instrumentation-packages/codon-instrumentation-langgraph/codon/instrumentation/langgraph/context.py b/instrumentation-packages/codon-instrumentation-langgraph/codon/instrumentation/langgraph/context.py new file mode 100644 index 0000000..6d2712b --- /dev/null +++ b/instrumentation-packages/codon-instrumentation-langgraph/codon/instrumentation/langgraph/context.py @@ -0,0 +1,58 @@ +# Copyright 2025 Codon, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from contextvars import ContextVar +from dataclasses import dataclass +from typing import Any, Dict, Mapping, Optional + +from codon_sdk.agents import Workload +from codon_sdk.instrumentation.schemas.nodespec import NodeSpec + + +@dataclass(frozen=True) +class GraphInvocationContext: + workload: Workload + node_specs: Mapping[str, NodeSpec] + run_id: str + deployment_id: Optional[str] + organization_id: Optional[str] + org_namespace: Optional[str] + graph_definition: Optional[Dict[str, Any]] + + +_ACTIVE_GRAPH_CONTEXT: ContextVar[Optional[GraphInvocationContext]] = ContextVar( + "codon_langgraph_active_graph_context", default=None +) + +_ACTIVE_CONFIG: ContextVar[Optional[Mapping[str, Any]]] = ContextVar( + "codon_langgraph_active_config", default=None +) + + +def current_graph_context() -> Optional[GraphInvocationContext]: + return _ACTIVE_GRAPH_CONTEXT.get() + + +def current_langgraph_config() -> Optional[Mapping[str, Any]]: + return _ACTIVE_CONFIG.get() + +__all__ = [ + "GraphInvocationContext", + "_ACTIVE_GRAPH_CONTEXT", + "_ACTIVE_CONFIG", + "current_graph_context", + "current_langgraph_config", +] diff --git a/instrumentation-packages/codon-instrumentation-langgraph/pyproject.toml b/instrumentation-packages/codon-instrumentation-langgraph/pyproject.toml index 0b5fe05..2e4b5c1 100644 --- a/instrumentation-packages/codon-instrumentation-langgraph/pyproject.toml +++ b/instrumentation-packages/codon-instrumentation-langgraph/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "codon-instrumentation-langgraph" -version = "0.1.0a5" +version = "0.2.0a0" license = {text = "Apache-2.0"} authors = [ { name="Codon, Inc.", email="martin@codonops.ai" }, @@ -26,6 +26,9 @@ classifiers = [ ] dependencies = [ "codon-sdk", + "langchain>=1.0.0", + "langchain-core>=1.0.0", + "langgraph>=1.0.0", "opentelemetry-api", "opentelemetry-sdk", "opentelemetry-exporter-otlp-proto-grpc", diff --git a/sdk/AGENTS.md b/sdk/AGENTS.md index f136696..e05e4ad 100644 --- a/sdk/AGENTS.md +++ b/sdk/AGENTS.md @@ -9,7 +9,7 @@ This scaffold explains the agent-facing contracts provided by `codon_sdk`. Flesh - **Expectations:** Subclasses auto-register logic in `_register_logic_group`, wrap callables in `add_node`, manage topology via `add_edge`, and bind a run to `deployment_id` inside `execute` while emitting telemetry. - **Instrumentation mixins:** Framework packages ship their own mixins (see `docs/guides/workload-mixin-guidelines.md`) to expose `from_*` constructors while keeping the core SDK agnostic. - **Reference implementation:** Each instrumentation package should define mixins inside its own namespace (e.g., `codon.instrumentation.langgraph.LangGraphWorkloadMixin`). -- **Adapters:** `LangGraphWorkloadAdapter.from_langgraph(...)` demonstrates how to wrap existing LangGraph graphs with `CodonWorkload` automatically; use it as a model for future adapters. +- **Adapters:** `LangGraphWorkloadAdapter.from_langgraph(...)` wraps existing LangGraph graphs and returns an instrumented graph that preserves native invocation semantics while emitting Codon telemetry; use it as a model for future adapters. - **LangGraph compatibility:** `codon-instrumentation-langgraph` `0.1.0a5` is the final release that supports LangGraph 0.3.x. Starting in `0.2.0a0`, the adapter targets LangChain/LangGraph v1.x only and will emit a `DeprecationWarning` on older versions. ## CodonWorkload (Opinionated Implementation) diff --git a/sdk/pyproject.toml b/sdk/pyproject.toml index c42acd8..15803e4 100644 --- a/sdk/pyproject.toml +++ b/sdk/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "setuptools.build_meta" [project] # This is the main SDK package name name = "codon_sdk" -version = "0.1.0a4" +version = "0.1.0a5" license = {text = "Apache-2.0"} authors = [ { name="Codon, Inc.", email="martin@codonops.ai" }, diff --git a/sdk/src/codon_sdk/instrumentation/schemas/telemetry/spans/__init__.py b/sdk/src/codon_sdk/instrumentation/schemas/telemetry/spans/__init__.py index 19d561d..bcdcb4d 100644 --- a/sdk/src/codon_sdk/instrumentation/schemas/telemetry/spans/__init__.py +++ b/sdk/src/codon_sdk/instrumentation/schemas/telemetry/spans/__init__.py @@ -40,10 +40,18 @@ class CodonBaseSpanAttributes(Enum): NetworkCallsJson: str = "codon.network.calls_json" +class CodonGraphSpanAttributes(Enum): + DefinitionJson: str = "codon.graph.definition_json" + DefinitionHash: str = "codon.graph.definition_hash" + NodeCount: str = "codon.graph.node_count" + EdgeCount: str = "codon.graph.edge_count" + + class CodonSpanNames(Enum): AgentRun: str = "agent.run" AgentTool: str = "agent.tool" AgentWorkflow: str = "agent.workflow" + AgentGraph: str = "agent.graph" AgentTask: str = "agent.task" AgentLLM: str = "agent.llm" AgentLLMCompletion: str = "agent.llm.completion" diff --git a/sdk/src/codon_sdk/llm/__init__.py b/sdk/src/codon_sdk/llm/__init__.py index 9b642da..6f81bfe 100644 --- a/sdk/src/codon_sdk/llm/__init__.py +++ b/sdk/src/codon_sdk/llm/__init__.py @@ -27,11 +27,15 @@ try: # pragma: no cover - make dependency optional from codon.instrumentation.langgraph import current_invocation + from codon.instrumentation.langgraph.context import current_langgraph_config except Exception: # pragma: no cover - defensive fallback def current_invocation() -> Optional[NodeTelemetryPayload]: # type: ignore return None + def current_langgraph_config() -> Optional[Mapping[str, Any]]: # type: ignore + return None + __all__ = [ "track_llm_async", @@ -72,7 +76,7 @@ async def track_llm_async( if value is not None: span.set_attribute(key, value) - merged_config = _merge_config(config) + merged_config = _merge_config(config or current_langgraph_config()) try: if merged_config is not None: @@ -139,19 +143,34 @@ def _merge_config(override: Optional[Mapping[str, Any]]) -> Optional[Mapping[str for key, value in override.items(): if key == "callbacks": - if isinstance(value, Iterable) and not isinstance(value, (str, bytes)): - callbacks_list.extend(value) - else: - callbacks_list.append(value) + callbacks_list.extend(_normalize_callbacks(value)) else: merged[key] = value if callbacks_list: - merged["callbacks"] = callbacks_list + merged["callbacks"] = _ensure_callback_list(callbacks_list) return merged if merged else None +def _normalize_callbacks(value: Any) -> list[Any]: + handlers = getattr(value, "handlers", None) + if handlers is not None: + return list(handlers) + if isinstance(value, Iterable) and not isinstance(value, (str, bytes)): + return list(value) + return [value] + + +def _ensure_callback_list(value: Any) -> list[Any]: + handlers = getattr(value, "handlers", None) + if handlers is not None: + return list(handlers) + if isinstance(value, (list, tuple)): + return list(value) + return [value] + + def _populate_telemetry( payload: Optional[NodeTelemetryPayload], response: Any, diff --git a/sdk/test/instrumentation/langgraph/test_adapter_result.py b/sdk/test/instrumentation/langgraph/test_adapter_result.py index a4cc58c..5d9ed71 100644 --- a/sdk/test/instrumentation/langgraph/test_adapter_result.py +++ b/sdk/test/instrumentation/langgraph/test_adapter_result.py @@ -18,6 +18,7 @@ from codon.instrumentation.langgraph import ( # type: ignore LangGraphAdapterResult, + LangGraphNodeSpanCallback, LangGraphTelemetryCallback, LangGraphWorkloadAdapter, ) @@ -42,11 +43,17 @@ def __init__(self, nodes=None, edges=None): self.edges = edges or [("start", "end")] self.compile_called = False self.compile_kwargs = None + self.seen_configs: list[Mapping[str, Any]] = [] def compile(self, **kwargs): self.compile_called = True self.compile_kwargs = kwargs - return {"compiled": True, "kwargs": kwargs} + return self + + def invoke(self, state, *, config=None): + if config is not None: + self.seen_configs.append(config) + return state def test_adapter_returns_artifacts_with_compile_kwargs(): @@ -66,11 +73,12 @@ def test_adapter_returns_artifacts_with_compile_kwargs(): assert result.state_graph is graph assert graph.compile_called is True assert graph.compile_kwargs == {"checkpointer": "memory"} - assert result.compiled_graph["kwargs"]["checkpointer"] == "memory" + assert result.compiled_graph is graph assert getattr(workload, "langgraph_state_graph") is graph assert getattr(workload, "langgraph_compiled_graph") == result.compiled_graph assert getattr(workload, "langgraph_compile_kwargs") == {"checkpointer": "memory"} + assert result.wrapped_graph.workload is workload node_names = {spec.name for spec in workload.nodes} assert node_names == {"start", "end"} @@ -90,6 +98,10 @@ def invoke(self, state, *, config=None): self.seen_configs.append(config or {}) return {"value": state.get("value", 0) + 1} + def ainvoke(self, state, *, config=None): + self.seen_configs.append(config or {}) + return {"value": state.get("value", 0) + 1} + def passthrough(state): return state @@ -104,9 +116,7 @@ def invoke(self, state): def test_runtime_config_merges_callbacks(): - recording = RecordingRunnable() - nodes = {"start": recording, "end": passthrough} - graph = FakeGraph(nodes=nodes) + graph = FakeGraph() base_config = {"callbacks": ["base"], "metadata": "base"} result = LangGraphWorkloadAdapter.from_langgraph( @@ -121,20 +131,19 @@ def test_runtime_config_merges_callbacks(): assert workload.langgraph_runtime_config == base_config invocation_config = {"metadata": "override", "callbacks": ["call"]} - workload.execute( - {"state": {"value": 0}}, - deployment_id="dev", - langgraph_config=invocation_config, - ) + wrapped = result.wrapped_graph + wrapped.invoke({"value": 0}, config=invocation_config) - assert recording.seen_configs - config = recording.seen_configs[0] + assert graph.seen_configs + config = graph.seen_configs[0] assert config["metadata"] == "override" callbacks = config["callbacks"] + assert len(callbacks) >= 4 assert callbacks[0] == "base" assert callbacks[1] == "call" - assert isinstance(callbacks[2], LangGraphTelemetryCallback) + assert isinstance(callbacks[2], LangGraphNodeSpanCallback) + assert isinstance(callbacks[3], LangGraphTelemetryCallback) start_spec = next(spec for spec in workload.nodes if spec.name == "start") assert start_spec.callable_signature.startswith("node_callable(") diff --git a/sdk/test/instrumentation/langgraph/test_graph_span.py b/sdk/test/instrumentation/langgraph/test_graph_span.py new file mode 100644 index 0000000..f1a979d --- /dev/null +++ b/sdk/test/instrumentation/langgraph/test_graph_span.py @@ -0,0 +1,64 @@ +import json + +import pytest + +opentelemetry = pytest.importorskip("opentelemetry") +trace = opentelemetry.trace +sdk_trace = pytest.importorskip("opentelemetry.sdk.trace") +trace_export = pytest.importorskip("opentelemetry.sdk.trace.export") + +TracerProvider = sdk_trace.TracerProvider +InMemorySpanExporter = trace_export.InMemorySpanExporter +SimpleSpanProcessor = trace_export.SimpleSpanProcessor + +from codon.instrumentation.langgraph import LangGraphWorkloadAdapter +from codon_sdk.instrumentation.schemas.telemetry.spans import ( + CodonGraphSpanAttributes, + CodonSpanNames, +) + + +class FakeGraph: + def __init__(self): + self.nodes = {"start": lambda state: state} + self.edges = [("start", "start")] + + def compile(self, **kwargs): + return self + + def invoke(self, state, *, config=None): + return state + + +def test_graph_span_emitted(monkeypatch): + monkeypatch.setenv("ORG_NAMESPACE", "test-org") + exporter = InMemorySpanExporter() + provider = TracerProvider() + provider.add_span_processor(SimpleSpanProcessor(exporter)) + try: + trace.set_tracer_provider(provider) + except Exception: # pragma: no cover - reconfigure in existing test runs + trace._TRACER_PROVIDER = provider # type: ignore[attr-defined] + + graph = FakeGraph() + wrapped = LangGraphWorkloadAdapter.from_langgraph( + graph, + name="GraphAgent", + version="0.1.0", + ) + + wrapped.invoke({"value": 1}) + + spans = exporter.get_finished_spans() + graph_spans = [span for span in spans if span.name == CodonSpanNames.AgentGraph.value] + assert graph_spans + graph_span = graph_spans[0] + attrs = graph_span.attributes + + assert CodonGraphSpanAttributes.NodeCount.value in attrs + assert CodonGraphSpanAttributes.EdgeCount.value in attrs + assert CodonGraphSpanAttributes.DefinitionJson.value in attrs + + definition = json.loads(attrs[CodonGraphSpanAttributes.DefinitionJson.value]) + assert definition["nodes"] + assert definition["edges"]