diff --git a/pkg/model/provider/bedrock/adapter.go b/pkg/model/provider/bedrock/adapter.go index a9c34610a..c4c3e11aa 100644 --- a/pkg/model/provider/bedrock/adapter.go +++ b/pkg/model/provider/bedrock/adapter.go @@ -1,6 +1,7 @@ package bedrock import ( + "fmt" "io" "log/slog" @@ -20,6 +21,12 @@ type streamAdapter struct { // State for accumulating tool call data currentToolID string currentToolName string + + // Buffered state for proper event ordering + // Bedrock sends MessageStop before Metadata, but runtime expects usage before FinishReason + pendingFinishReason chat.FinishReason + pendingUsage *chat.Usage + metadataReceived bool } func newStreamAdapter(stream *bedrockruntime.ConverseStreamEventStream, model string, trackUsage bool) *streamAdapter { @@ -32,12 +39,64 @@ func newStreamAdapter(stream *bedrockruntime.ConverseStreamEventStream, model st // Recv gets the next completion chunk func (a *streamAdapter) Recv() (chat.MessageStreamResponse, error) { + // If we have both finish reason and usage buffered, emit the final response + // This handles both event orderings: MessageStop→Metadata and Metadata→MessageStop + if a.pendingFinishReason != "" && a.metadataReceived { + slog.Debug("Bedrock stream: emitting buffered final response", + "finish_reason", a.pendingFinishReason, + "has_usage", a.pendingUsage != nil) + response := chat.MessageStreamResponse{ + Object: "chat.completion.chunk", + Model: a.model, + Choices: []chat.MessageStreamChoice{ + { + Index: 0, + FinishReason: a.pendingFinishReason, + Delta: chat.MessageDelta{ + Role: string(chat.MessageRoleAssistant), + }, + }, + }, + Usage: a.pendingUsage, + } + // Clear pending state + a.pendingFinishReason = "" + a.pendingUsage = nil + a.metadataReceived = false + return response, nil + } + event, ok := <-a.stream.Events() if !ok { // Check for errors if err := a.stream.Err(); err != nil { + slog.Debug("Bedrock stream: error on channel close", "error", err) return chat.MessageStreamResponse{}, err } + // If we have a pending finish reason but never got metadata, emit it now + if a.pendingFinishReason != "" { + slog.Debug("Bedrock stream: channel closed, emitting pending finish reason without metadata", + "finish_reason", a.pendingFinishReason, + "has_usage", a.pendingUsage != nil) + response := chat.MessageStreamResponse{ + Object: "chat.completion.chunk", + Model: a.model, + Choices: []chat.MessageStreamChoice{ + { + Index: 0, + FinishReason: a.pendingFinishReason, + Delta: chat.MessageDelta{ + Role: string(chat.MessageRoleAssistant), + }, + }, + }, + Usage: a.pendingUsage, + } + a.pendingFinishReason = "" + a.pendingUsage = nil + return response, nil + } + slog.Debug("Bedrock stream: channel closed, returning EOF") return chat.MessageStreamResponse{}, io.EOF } @@ -103,24 +162,34 @@ func (a *streamAdapter) Recv() (chat.MessageStreamResponse, error) { slog.Debug("Bedrock stream: content block stop", "index", ev.Value.ContentBlockIndex) case *types.ConverseStreamOutputMemberMessageStop: - // Message complete - determine finish reason + // Buffer the finish reason - don't emit it yet, wait for metadata with usage + // Bedrock sends MessageStop before Metadata, but runtime returns early on FinishReason stopReason := ev.Value.StopReason switch stopReason { case types.StopReasonToolUse: - response.Choices[0].FinishReason = chat.FinishReasonToolCalls + a.pendingFinishReason = chat.FinishReasonToolCalls case types.StopReasonEndTurn, types.StopReasonStopSequence: - response.Choices[0].FinishReason = chat.FinishReasonStop + a.pendingFinishReason = chat.FinishReasonStop case types.StopReasonMaxTokens: - response.Choices[0].FinishReason = chat.FinishReasonLength + a.pendingFinishReason = chat.FinishReasonLength default: - response.Choices[0].FinishReason = chat.FinishReasonStop + a.pendingFinishReason = chat.FinishReasonStop } + slog.Debug("Bedrock stream: message stop (buffered)", + "stop_reason", stopReason, + "pending_finish_reason", a.pendingFinishReason, + "metadata_already_received", a.metadataReceived) case *types.ConverseStreamOutputMemberMetadata: - // Metadata event with usage info - always capture if available + // Metadata event with usage info - capture and mark received + a.metadataReceived = true + slog.Debug("Bedrock stream: received metadata event", + "has_usage", ev.Value.Usage != nil, + "finish_reason_already_received", a.pendingFinishReason != "") + if ev.Value.Usage != nil { usage := ev.Value.Usage - slog.Debug("Bedrock stream: received usage metadata", + slog.Debug("Bedrock stream: usage metadata details", "input_tokens", derefInt32(usage.InputTokens), "output_tokens", derefInt32(usage.OutputTokens), "cache_read_tokens", derefInt32(usage.CacheReadInputTokens), @@ -128,16 +197,24 @@ func (a *streamAdapter) Recv() (chat.MessageStreamResponse, error) { "track_usage", a.trackUsage) if a.trackUsage { - response.Usage = &chat.Usage{ + a.pendingUsage = &chat.Usage{ InputTokens: int64(derefInt32(usage.InputTokens)), OutputTokens: int64(derefInt32(usage.OutputTokens)), CachedInputTokens: int64(derefInt32(usage.CacheReadInputTokens)), CacheWriteTokens: int64(derefInt32(usage.CacheWriteInputTokens)), } + slog.Debug("Bedrock stream: usage captured in pendingUsage", + "input", a.pendingUsage.InputTokens, + "output", a.pendingUsage.OutputTokens) + } else { + slog.Debug("Bedrock stream: usage NOT captured (trackUsage is false)") } } else { - slog.Debug("Bedrock stream: metadata event has no usage data") + slog.Debug("Bedrock stream: metadata event has nil Usage field") } + + default: + slog.Debug("Bedrock stream: unknown event type", "type", fmt.Sprintf("%T", event)) } return response, nil diff --git a/pkg/modelsdev/store.go b/pkg/modelsdev/store.go index cfe1ce1e3..b022632d8 100644 --- a/pkg/modelsdev/store.go +++ b/pkg/modelsdev/store.go @@ -177,6 +177,25 @@ func (s *Store) GetModel(ctx context.Context, id string) (*Model, error) { model, exists := provider.Models[modelID] if !exists { + // For amazon-bedrock, try stripping region/inference profile prefixes + // Bedrock uses prefixes like "global.", "us.", "eu.", "apac." etc. for + // cross-region inference profiles, but models.dev stores models without + // these prefixes. Try stripping the first segment if it doesn't match + // a known model provider prefix (anthropic, meta, amazon, etc.) + if providerID == "amazon-bedrock" { + if idx := strings.Index(modelID, "."); idx != -1 { + possibleRegionPrefix := modelID[:idx] + // Only strip if the prefix is NOT a known model provider + // (i.e., it's likely a region prefix like "global", "us", "eu") + if !isBedrockModelProvider(possibleRegionPrefix) { + normalizedModelID := modelID[idx+1:] + model, exists = provider.Models[normalizedModelID] + if exists { + return &model, nil + } + } + } + } return nil, fmt.Errorf("model %q not found in provider %q", modelID, providerID) } @@ -316,3 +335,23 @@ func (s *Store) ResolveModelAlias(ctx context.Context, providerID, modelName str return modelName } + +// bedrockModelProviders contains known model provider prefixes used in Bedrock model IDs. +// These are NOT region prefixes and should not be stripped when normalizing model IDs. +var bedrockModelProviders = map[string]bool{ + "anthropic": true, + "amazon": true, + "meta": true, + "cohere": true, + "ai21": true, + "mistral": true, + "stability": true, + "deepseek": true, + "google": true, + "minimax": true, +} + +// isBedrockModelProvider returns true if the prefix is a known Bedrock model provider. +func isBedrockModelProvider(prefix string) bool { + return bedrockModelProviders[prefix] +}