Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 86 additions & 9 deletions pkg/model/provider/bedrock/adapter.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package bedrock

import (
"fmt"
"io"
"log/slog"

Expand All @@ -20,6 +21,12 @@ type streamAdapter struct {
// State for accumulating tool call data
currentToolID string
currentToolName string

// Buffered state for proper event ordering
// Bedrock sends MessageStop before Metadata, but runtime expects usage before FinishReason
pendingFinishReason chat.FinishReason
pendingUsage *chat.Usage
metadataReceived bool
}

func newStreamAdapter(stream *bedrockruntime.ConverseStreamEventStream, model string, trackUsage bool) *streamAdapter {
Expand All @@ -32,12 +39,64 @@ func newStreamAdapter(stream *bedrockruntime.ConverseStreamEventStream, model st

// Recv gets the next completion chunk
func (a *streamAdapter) Recv() (chat.MessageStreamResponse, error) {
// If we have both finish reason and usage buffered, emit the final response
// This handles both event orderings: MessageStop→Metadata and Metadata→MessageStop
if a.pendingFinishReason != "" && a.metadataReceived {
slog.Debug("Bedrock stream: emitting buffered final response",
"finish_reason", a.pendingFinishReason,
"has_usage", a.pendingUsage != nil)
response := chat.MessageStreamResponse{
Object: "chat.completion.chunk",
Model: a.model,
Choices: []chat.MessageStreamChoice{
{
Index: 0,
FinishReason: a.pendingFinishReason,
Delta: chat.MessageDelta{
Role: string(chat.MessageRoleAssistant),
},
},
},
Usage: a.pendingUsage,
}
// Clear pending state
a.pendingFinishReason = ""
a.pendingUsage = nil
a.metadataReceived = false
return response, nil
}

event, ok := <-a.stream.Events()
if !ok {
// Check for errors
if err := a.stream.Err(); err != nil {
slog.Debug("Bedrock stream: error on channel close", "error", err)
return chat.MessageStreamResponse{}, err
}
// If we have a pending finish reason but never got metadata, emit it now
if a.pendingFinishReason != "" {
slog.Debug("Bedrock stream: channel closed, emitting pending finish reason without metadata",
"finish_reason", a.pendingFinishReason,
"has_usage", a.pendingUsage != nil)
response := chat.MessageStreamResponse{
Object: "chat.completion.chunk",
Model: a.model,
Choices: []chat.MessageStreamChoice{
{
Index: 0,
FinishReason: a.pendingFinishReason,
Delta: chat.MessageDelta{
Role: string(chat.MessageRoleAssistant),
},
},
},
Usage: a.pendingUsage,
}
a.pendingFinishReason = ""
a.pendingUsage = nil
return response, nil
}
slog.Debug("Bedrock stream: channel closed, returning EOF")
return chat.MessageStreamResponse{}, io.EOF
}

Expand Down Expand Up @@ -103,41 +162,59 @@ func (a *streamAdapter) Recv() (chat.MessageStreamResponse, error) {
slog.Debug("Bedrock stream: content block stop", "index", ev.Value.ContentBlockIndex)

case *types.ConverseStreamOutputMemberMessageStop:
// Message complete - determine finish reason
// Buffer the finish reason - don't emit it yet, wait for metadata with usage
// Bedrock sends MessageStop before Metadata, but runtime returns early on FinishReason
stopReason := ev.Value.StopReason
switch stopReason {
case types.StopReasonToolUse:
response.Choices[0].FinishReason = chat.FinishReasonToolCalls
a.pendingFinishReason = chat.FinishReasonToolCalls
case types.StopReasonEndTurn, types.StopReasonStopSequence:
response.Choices[0].FinishReason = chat.FinishReasonStop
a.pendingFinishReason = chat.FinishReasonStop
case types.StopReasonMaxTokens:
response.Choices[0].FinishReason = chat.FinishReasonLength
a.pendingFinishReason = chat.FinishReasonLength
default:
response.Choices[0].FinishReason = chat.FinishReasonStop
a.pendingFinishReason = chat.FinishReasonStop
}
slog.Debug("Bedrock stream: message stop (buffered)",
"stop_reason", stopReason,
"pending_finish_reason", a.pendingFinishReason,
"metadata_already_received", a.metadataReceived)

case *types.ConverseStreamOutputMemberMetadata:
// Metadata event with usage info - always capture if available
// Metadata event with usage info - capture and mark received
a.metadataReceived = true
slog.Debug("Bedrock stream: received metadata event",
"has_usage", ev.Value.Usage != nil,
"finish_reason_already_received", a.pendingFinishReason != "")

if ev.Value.Usage != nil {
usage := ev.Value.Usage
slog.Debug("Bedrock stream: received usage metadata",
slog.Debug("Bedrock stream: usage metadata details",
"input_tokens", derefInt32(usage.InputTokens),
"output_tokens", derefInt32(usage.OutputTokens),
"cache_read_tokens", derefInt32(usage.CacheReadInputTokens),
"cache_write_tokens", derefInt32(usage.CacheWriteInputTokens),
"track_usage", a.trackUsage)

if a.trackUsage {
response.Usage = &chat.Usage{
a.pendingUsage = &chat.Usage{
InputTokens: int64(derefInt32(usage.InputTokens)),
OutputTokens: int64(derefInt32(usage.OutputTokens)),
CachedInputTokens: int64(derefInt32(usage.CacheReadInputTokens)),
CacheWriteTokens: int64(derefInt32(usage.CacheWriteInputTokens)),
}
slog.Debug("Bedrock stream: usage captured in pendingUsage",
"input", a.pendingUsage.InputTokens,
"output", a.pendingUsage.OutputTokens)
} else {
slog.Debug("Bedrock stream: usage NOT captured (trackUsage is false)")
}
} else {
slog.Debug("Bedrock stream: metadata event has no usage data")
slog.Debug("Bedrock stream: metadata event has nil Usage field")
}

default:
slog.Debug("Bedrock stream: unknown event type", "type", fmt.Sprintf("%T", event))
}

return response, nil
Expand Down
39 changes: 39 additions & 0 deletions pkg/modelsdev/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,25 @@ func (s *Store) GetModel(ctx context.Context, id string) (*Model, error) {

model, exists := provider.Models[modelID]
if !exists {
// For amazon-bedrock, try stripping region/inference profile prefixes
// Bedrock uses prefixes like "global.", "us.", "eu.", "apac." etc. for
// cross-region inference profiles, but models.dev stores models without
// these prefixes. Try stripping the first segment if it doesn't match
// a known model provider prefix (anthropic, meta, amazon, etc.)
if providerID == "amazon-bedrock" {
if idx := strings.Index(modelID, "."); idx != -1 {
possibleRegionPrefix := modelID[:idx]
// Only strip if the prefix is NOT a known model provider
// (i.e., it's likely a region prefix like "global", "us", "eu")
if !isBedrockModelProvider(possibleRegionPrefix) {
normalizedModelID := modelID[idx+1:]
model, exists = provider.Models[normalizedModelID]
if exists {
return &model, nil
}
}
}
}
return nil, fmt.Errorf("model %q not found in provider %q", modelID, providerID)
}

Expand Down Expand Up @@ -316,3 +335,23 @@ func (s *Store) ResolveModelAlias(ctx context.Context, providerID, modelName str

return modelName
}

// bedrockModelProviders contains known model provider prefixes used in Bedrock model IDs.
// These are NOT region prefixes and should not be stripped when normalizing model IDs.
var bedrockModelProviders = map[string]bool{
"anthropic": true,
"amazon": true,
"meta": true,
"cohere": true,
"ai21": true,
"mistral": true,
"stability": true,
"deepseek": true,
"google": true,
"minimax": true,
}

// isBedrockModelProvider returns true if the prefix is a known Bedrock model provider.
func isBedrockModelProvider(prefix string) bool {
return bedrockModelProviders[prefix]
}