Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions src/spdl/io/_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -875,18 +875,17 @@ def decode_image_nvjpeg(


_THREAD_LOCAL = threading.local()
_THREAD_LOCAL._decoder = None # pyre-ignore[16]


def _get_decoder() -> "NvDecDecoder":
if _THREAD_LOCAL._decoder is None: # pyre-ignore[16]
if getattr(_THREAD_LOCAL, "_decoder", None) is None:
_THREAD_LOCAL._decoder = _libspdl_cuda._nvdec_decoder() # pyre-ignore[16]
return _THREAD_LOCAL._decoder


def _del_cached_decoder() -> None:
if _THREAD_LOCAL._decoder is not None: # pyre-ignore[16]
_THREAD_LOCAL._decoder = None
if hasattr(_THREAD_LOCAL, "_decoder"):
delattr(_THREAD_LOCAL, "_decoder")


def nvdec_decoder(
Expand Down
223 changes: 223 additions & 0 deletions tests/cuda/nvdec_video_decoding_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
# LICENSE file in the root directory of this source tree.

import gc
import threading
import unittest
from unittest.mock import MagicMock, patch

import spdl.io
import spdl.io.utils
Expand Down Expand Up @@ -290,3 +292,224 @@ def test_color_conversion_rgba(self) -> None:
self.assertTrue(torch.all(array[:, 0, :, 2 * width :] == 0))
self.assertTrue(torch.all(array[:, 1, :, 2 * width :] == 14)) # TODO: investivate if this is correct.
self.assertTrue(torch.all(array[:, 2, :, 2 * width :] == 255))


def _create_mock_decoder() -> MagicMock:
"""Create a mock decoder with init and _reset methods."""
mock_decoder = MagicMock()
mock_decoder.init = MagicMock()
mock_decoder._reset = MagicMock()
return mock_decoder


def _get_test_data() -> tuple:
"""Get common test data: packets and cuda_config."""
h264 = _get_h264_sample()
packets = spdl.io.demux_video(h264.path)
cuda_config = spdl.io.cuda_config(device_index=DEFAULT_CUDA)
return packets, cuda_config


class TestNvdecThreadLocalCaching(unittest.TestCase):
"""Test the thread-local caching mechanism for NVDEC decoders."""

def setUp(self) -> None:
"""Set up mocks for decoder creation."""
# Clean up any cached decoders before each test
spdl.io._core._del_cached_decoder()

def tearDown(self) -> None:
"""Clean up after each test."""
# Clean up any cached decoders after each test
spdl.io._core._del_cached_decoder()

@patch("spdl.io._core._libspdl_cuda._nvdec_decoder")
def test_decoder_caching_same_thread(self, mock_nvdec_decoder: MagicMock) -> None:
"""Verify that nvdec_decoder returns the same cached instance within the same thread."""
# Setup: Mock decoder creation to return a mock object
mock_nvdec_decoder.return_value = _create_mock_decoder()
packets, cuda_config = _get_test_data()

# Execute: Create two decoders with use_cache=True
decoder1 = spdl.io.nvdec_decoder(cuda_config, packets.codec, use_cache=True)
decoder2 = spdl.io.nvdec_decoder(cuda_config, packets.codec, use_cache=True)

# Assert: Both references should point to the same cached instance
self.assertIs(
decoder1, decoder2, "Decoders should be the same instance when cached"
)
# Verify decoder was created only once
self.assertEqual(
mock_nvdec_decoder.call_count, 1, "Decoder should be created only once"
)

@patch("spdl.io._core._libspdl_cuda._nvdec_decoder")
def test_decoder_no_caching(self, mock_nvdec_decoder: MagicMock) -> None:
"""Verify that nvdec_decoder creates a new instance when use_cache=False."""
# Setup: Mock decoder creation to return different mock objects each time
mock_nvdec_decoder.side_effect = [
_create_mock_decoder(),
_create_mock_decoder(),
]
packets, cuda_config = _get_test_data()

# Execute: Create two decoders with use_cache=False
decoder1 = spdl.io.nvdec_decoder(cuda_config, packets.codec, use_cache=False)
decoder2 = spdl.io.nvdec_decoder(cuda_config, packets.codec, use_cache=False)

# Assert: References should be different instances
self.assertIsNot(
decoder1, decoder2, "Decoders should be different instances when not cached"
)
# Verify decoder was created twice
self.assertEqual(
mock_nvdec_decoder.call_count, 2, "Decoder should be created twice"
)

@patch("spdl.io._core._libspdl_cuda._nvdec_decoder")
def test_decoder_caching_different_threads(
self, mock_nvdec_decoder: MagicMock
) -> None:
"""Verify that different threads get different decoder instances even when `use_cache=True`."""
# Setup: Mock decoder creation to track creation count
creation_count = {"count": 0}
creation_lock = threading.Lock()

def create_mock_decoder_with_count():
with creation_lock:
creation_count["count"] += 1
return _create_mock_decoder()

mock_nvdec_decoder.side_effect = create_mock_decoder_with_count
packets, cuda_config = _get_test_data()

decoder_refs = []

def get_decoder_in_thread():
decoder = spdl.io.nvdec_decoder(cuda_config, packets.codec, use_cache=True)
decoder_refs.append(id(decoder))

# Execute: Create decoders in two different threads
thread1 = threading.Thread(target=get_decoder_in_thread)
thread2 = threading.Thread(target=get_decoder_in_thread)

thread1.start()
thread2.start()
thread1.join()
thread2.join()

# Assert: Different threads should get different decoder instances
self.assertEqual(
len(decoder_refs), 2, "Should have captured two decoder references"
)
self.assertNotEqual(
decoder_refs[0],
decoder_refs[1],
"Different threads should have different decoder instances",
)
# Verify decoder was created twice (once per thread)
self.assertEqual(
creation_count["count"], 2, "Decoder should be created once per thread"
)

@patch("spdl.io._core._libspdl_cuda._nvdec_decoder")
def test_decoder_cache_cleared_on_crop(self, mock_nvdec_decoder: MagicMock) -> None:
"""Verify that providing crop parameters forces recreation of the decoder.

# Note this is not a spec. It's just the way the implementation is.
# We prefer to keep cache even when cropping is provided, but we have not figured
# out the way yet.
# Feel free to delete this test if you figured.
"""

# Setup: Mock decoder creation to return different mock objects each time
mock_nvdec_decoder.side_effect = [
_create_mock_decoder(),
_create_mock_decoder(),
]
packets, cuda_config = _get_test_data()

# Execute: Create a decoder without crop, then with crop (use_cache is True but should be ignored)
decoder1 = spdl.io.nvdec_decoder(cuda_config, packets.codec, use_cache=True)
decoder2 = spdl.io.nvdec_decoder( # noqa: F841
cuda_config,
packets.codec,
use_cache=True,
crop_left=10,
)

# Assert: Should be different instances because crop forces recreation
self.assertIsNot(
decoder1, decoder2, "Crop parameters should force decoder recreation"
)
# Verify decoder was created twice (crop forces recreation)
self.assertEqual(
mock_nvdec_decoder.call_count,
2,
"Decoder should be recreated when crop parameters change",
)

@patch("spdl.io._core._libspdl_cuda._nvdec_decoder")
def test_cache_cleanup_with_hasattr_delattr(
self, mock_nvdec_decoder: MagicMock
) -> None:
"""Verify that cache cleanup works correctly with hasattr/delattr pattern."""
# Setup: Mock decoder creation
mock_nvdec_decoder.return_value = _create_mock_decoder()
packets, cuda_config = _get_test_data()

# Execute: Create decoder, delete cache, create again
decoder1 = spdl.io.nvdec_decoder(cuda_config, packets.codec, use_cache=True) # noqa: F841
spdl.io._core._del_cached_decoder()
decoder2 = spdl.io.nvdec_decoder(cuda_config, packets.codec, use_cache=True) # noqa: F841

# Assert: Second decoder should be a different instance after cache clear
# Note: Due to mocking, they'll have the same mock object, but verify creation count
self.assertEqual(
mock_nvdec_decoder.call_count,
2,
"Decoder should be created twice after cache clear",
)

@patch("spdl.io._core._libspdl_cuda._nvdec_decoder")
def test_thread_local_isolation_with_getattr(
self, mock_nvdec_decoder: MagicMock
) -> None:
"""Verify getattr properly handles thread-local storage without AttributeError."""
# Setup: Mock decoder creation to return unique mocks
mock_nvdec_decoder.side_effect = lambda: _create_mock_decoder()
packets, cuda_config = _get_test_data()

results = []
errors = []

def get_decoder_in_thread():
try:
# This tests that getattr() works correctly even when _decoder doesn't exist yet
decoder = spdl.io.nvdec_decoder(
cuda_config, packets.codec, use_cache=True
)
results.append(id(decoder))
except AttributeError as e:
errors.append(f"AttributeError: {e}")
except Exception as e:
errors.append(f"Unexpected error: {e}")

# Execute: Create decoders in multiple threads to test getattr behavior
threads = [threading.Thread(target=get_decoder_in_thread) for _ in range(3)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()

# Assert: All threads should complete without AttributeError
self.assertEqual(
len(errors),
0,
f"getattr() should handle missing _decoder attribute gracefully: {errors}",
)
self.assertEqual(len(results), 3, "All threads should create decoders")
# Verify each thread got its own decoder instance
self.assertEqual(
len(set(results)), 3, "Each thread should have a unique decoder instance"
)
Loading