Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
src/resolver_athena_client/generated/* linguist-generated=true
tests/functional/e2e/testcases/**/*.jpg filter=lfs diff=lfs merge=lfs -text
tests/functional/e2e/testcases/**/*.png filter=lfs diff=lfs merge=lfs -text
7 changes: 7 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"python.testing.pytestArgs": [
"tests"
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true
}
2 changes: 1 addition & 1 deletion examples/classify_single_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ async def main() -> int:
resize_images=True,
compress_images=True,
timeout=30.0, # Shorter timeout for single requests
affiliate="Crisp",
affiliate=os.getenv("ATHENA_AFFILIATE", "athena-test"),
deployment_id="single-example-deployment", # Not used
)

Expand Down
2 changes: 1 addition & 1 deletion src/resolver_athena_client/client/transformers/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def process_image() -> tuple[bytes, bool]:
# Resize if needed
if rgb_image.size != _target_size:
resized_image = rgb_image.resize(
_target_size, Image.Resampling.LANCZOS
_target_size, Image.Resampling.BILINEAR
)
else:
resized_image = rgb_image
Expand Down
8 changes: 5 additions & 3 deletions tests/functional/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,12 @@ def valid_formatted_image(
tmp_path_factory: pytest.TempPathFactory,
) -> bytes:
image_format = request.param
if (magick_path := shutil.which("magick")) is None:
# Try 'magick' first (ImageMagick 7.x), fall back to 'convert' (6.x)
if (magick_path := shutil.which("magick")) is None and (
magick_path := shutil.which("convert")
) is None:
pytest.fail(
"ImageMagick 'magick' command not found - cannot run "
"multi-format test"
"ImageMagick command not found - cannot run multi-format test"
)

image_dir = tmp_path_factory.mktemp("images")
Expand Down
Empty file.
68 changes: 68 additions & 0 deletions tests/functional/e2e/test_classify_single.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from pathlib import Path

import pytest

from resolver_athena_client.client.athena_client import AthenaClient
from resolver_athena_client.client.athena_options import AthenaOptions
from resolver_athena_client.client.channel import (
CredentialHelper,
create_channel_with_credentials,
)
from resolver_athena_client.client.models import ImageData
from tests.functional.e2e.testcases.parser import (
AthenaTestCase,
load_test_cases,
)

TEST_CASES = load_test_cases("benign_model")

FP_ERROR_TOLERANCE = 1e-4


@pytest.mark.asyncio
@pytest.mark.functional
@pytest.mark.parametrize("test_case", TEST_CASES, ids=lambda tc: tc.id)
async def test_classify_single(
athena_options: AthenaOptions,
credential_helper: CredentialHelper,
test_case: AthenaTestCase,
) -> None:
"""Functional test for ClassifySingle endpoint and API methods.

This test creates a unique test image for each iteration and classifies it.

"""

# Create gRPC channel with credentials
channel = await create_channel_with_credentials(
athena_options.host, credential_helper
)
with Path.open(Path(test_case.filepath), "rb") as f:
image_bytes = f.read()

async with AthenaClient(channel, athena_options) as client:
image_data = ImageData(image_bytes)

# Classify with auto-generated correlation ID
result = await client.classify_single(image_data)

if result.error.code:
msg = f"Image Result Error: {result.error.message}"
pytest.fail(msg)

actual_output = {c.label: c.weight for c in result.classifications}
assert set(test_case.expected_output.keys()).issubset(
set(actual_output.keys())
), (
"Expected output to contain labels: ",
f"{test_case.expected_output.keys() - actual_output.keys()}",
)
actual_output = {k: actual_output[k] for k in test_case.expected_output}

for label in test_case.expected_output:
expected = test_case.expected_output[label]
actual = actual_output[label]
assert abs(expected - actual) < FP_ERROR_TOLERANCE, (
f"Weight for label '{label}' differs by more than {FP_ERROR_TOLERANCE}: "
f"expected={expected}, actual={actual}, diff={abs(expected - actual)}"
)
Empty file.
Loading
Loading