Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion src/strands/models/bedrock.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,13 @@ def _format_request(
)
system_blocks.append({"cachePoint": {"type": cache_prompt}})

# Format system blocks (returns empty list if no system blocks provided)
formatted_system_blocks = self._format_bedrock_system_blocks(system_blocks)

return {
"modelId": self.config["model_id"],
"messages": self._format_bedrock_messages(messages),
"system": system_blocks,
"system": formatted_system_blocks,
**(
{
"toolConfig": {
Expand Down Expand Up @@ -295,6 +298,31 @@ def _format_request(
),
}

def _format_bedrock_system_blocks(self, system_blocks: list[SystemContentBlock] | None) -> list[dict[str, Any]]:
"""Format system content blocks for Bedrock API compatibility.

Similar to _format_bedrock_messages, this ensures system blocks conform to
Bedrock's strict validation requirements by eagerly filtering content blocks
to only include Bedrock-supported fields.

Args:
system_blocks: List of system content blocks to format

Returns:
Formatted system blocks for Bedrock API compatibility (empty list if no blocks)
"""
if not system_blocks:
return []

cleaned_blocks: list[dict[str, Any]] = []

for block in system_blocks:
# Format the system block using the same logic as message content blocks
formatted_block = self._format_request_message_content(cast(ContentBlock, block))
cleaned_blocks.append(formatted_block)

return cleaned_blocks

def _format_bedrock_messages(self, messages: Messages) -> list[dict[str, Any]]:
"""Format messages for Bedrock API compatibility.

Expand Down
139 changes: 139 additions & 0 deletions tests_integ/models/test_model_bedrock.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,3 +280,142 @@ def test_multi_prompt_system_content():
agent = Agent(system_prompt=system_prompt_content, load_tools_from_directory=False)
# just verifying there is no failure
agent("Hello!")


def test_message_cache_point_formatting(streaming_model):
"""Test that cachePoint blocks in messages are formatted as separate content blocks.

This test verifies the fix for issue #1219 where cachePoint blocks were not being
handled correctly, causing Bedrock API validation errors.
"""
# Create a message with cachePoint as a separate block
messages = [
{
"role": "user",
"content": [
{"text": "Some content for caching " * 100}, # Long text to meet cache requirements
{"cachePoint": {"type": "default"}},
],
},
]

# Format the messages using the model's internal method
formatted = streaming_model._format_bedrock_messages(messages)

# Verify cachePoint remains as a separate block
assert len(formatted[0]["content"]) == 2
assert "text" in formatted[0]["content"][0]
assert "cachePoint" in formatted[0]["content"][1]
# Verify they are not merged into a single block
assert len(formatted[0]["content"][0]) == 1 # Only text key
assert len(formatted[0]["content"][1]) == 1 # Only cachePoint key


def test_system_prompt_cache_point_formatting(streaming_model):
"""Test that cachePoint blocks in system prompts are formatted as separate content blocks.

This test verifies the fix for issue #1219 where system content blocks with cachePoint
were not being formatted correctly.
"""
# Create system content with cachePoint as a separate block
system_content = [
{"text": "You are a helpful assistant. " * 100}, # Long text to meet cache requirements
{"cachePoint": {"type": "default"}},
{"text": "Always be respectful."},
]

# Format the system blocks using the model's internal method
formatted = streaming_model._format_bedrock_system_blocks(system_content)

# Verify we have 3 separate blocks
assert len(formatted) == 3
assert "text" in formatted[0]
assert "cachePoint" in formatted[1]
assert "text" in formatted[2]
# Verify they are not merged
assert len(formatted[0]) == 1 # Only text key
assert len(formatted[1]) == 1 # Only cachePoint key
assert len(formatted[2]) == 1 # Only text key


def test_system_prompt_cache_point_agent(non_streaming_model):
"""Test agent with system prompt containing cachePoint blocks.

This is an integration test that verifies the entire flow works correctly
when using cachePoint in system prompts.
"""
system_prompt_content = [
{"text": "You are a helpful assistant. " * 100},
{"cachePoint": {"type": "default"}},
{"text": "Always respond concisely."},
]

agent = Agent(
model=non_streaming_model,
system_prompt=system_prompt_content,
load_tools_from_directory=False,
)

# Just verify there is no failure
result = agent("Hello!")
assert len(str(result)) > 0


def test_message_with_cache_point_agent(streaming_model):
"""Test agent with messages containing cachePoint blocks.

This is an integration test that verifies the entire flow works correctly
when using cachePoint in messages.
"""
messages = [
{
"role": "user",
"content": [
{"text": "Previous conversation context. " * 100},
{"cachePoint": {"type": "default"}},
],
},
]

agent = Agent(
model=streaming_model,
messages=messages,
load_tools_from_directory=False,
)

# Just verify there is no failure
result = agent("Hello!")
assert len(str(result)) > 0


def test_cache_point_with_system_and_messages(non_streaming_model):
"""Test combining cachePoint in both system prompts and messages.

This test verifies that cachePoint works correctly when used in both
system content blocks and message content blocks simultaneously.
"""
system_prompt_content = [
{"text": "System context. " * 100},
{"cachePoint": {"type": "default"}},
]

messages = [
{
"role": "user",
"content": [
{"text": "Message context. " * 100},
{"cachePoint": {"type": "default"}},
],
},
]

agent = Agent(
model=non_streaming_model,
system_prompt=system_prompt_content,
messages=messages,
load_tools_from_directory=False,
)

# Just verify there is no failure
result = agent("Hello!")
assert len(str(result)) > 0