From 7823fcd8ea5cc70b84ccfb23f91c53d1ec7fd02a Mon Sep 17 00:00:00 2001 From: Alex Richey Date: Wed, 14 Jan 2026 14:07:24 -0500 Subject: [PATCH] Proto Connector Piping --- dcpy/connectors/chain.py | 208 +++++++++++++++ dcpy/connectors/hybrid_pathed_storage.py | 111 +++++++- dcpy/connectors/registry.py | 20 +- dcpy/test/connectors/test_chaining.py | 310 +++++++++++++++++++++++ 4 files changed, 641 insertions(+), 8 deletions(-) create mode 100644 dcpy/connectors/chain.py create mode 100644 dcpy/test/connectors/test_chaining.py diff --git a/dcpy/connectors/chain.py b/dcpy/connectors/chain.py new file mode 100644 index 0000000000..8b198eab43 --- /dev/null +++ b/dcpy/connectors/chain.py @@ -0,0 +1,208 @@ +""" +Connector chaining and resource transfer abstractions. + +Enables elegant syntax for transferring resources between connectors: + builds_conn.resource(key="pluto", build_note="my-build") >> drafts_conn.resource(key="pluto", version="25v1") + +The system automatically chooses the most efficient transfer method: +- If both connectors are PathedStorageConnectors on the same cloud: direct copy +- Otherwise: fallback to pull-temp-push pattern +""" + +from abc import ABC, abstractmethod +from pathlib import Path +from tempfile import TemporaryDirectory +from typing import TYPE_CHECKING, Any, Dict, Optional, Type + +from dcpy.utils.logging import logger + +if TYPE_CHECKING: + from dcpy.connectors.base import Connector + from dcpy.connectors.pathed_storage import PathedStorageConnector + + +class Resource: + """ + Represents a resource within a connector that can be transferred to another connector. + + Examples: + builds_conn.resource(key="pluto", build_note="my-build") + drafts_conn.resource(key="pluto", version="25v1", acl="public-read") + local_conn.resource(path="/tmp/data") + """ + + def __init__(self, connector: "Connector", **resource_spec): + """ + Initialize a Resource. + + Args: + connector: The connector containing this resource + **resource_spec: Connector-specific resource specification (key, version, etc.) + """ + self.connector = connector + self.resource_spec = resource_spec + + def __rshift__(self, destination: "Resource") -> "Resource": + """ + Transfer this resource to another resource using >> operator. + + Args: + destination: Target resource to transfer to + + Returns: + The destination resource for chaining + """ + return transfer_resource(self, destination) + + def __or__(self, destination: "Resource") -> "Resource": + """ + Transfer this resource to another resource using | operator. + + Args: + destination: Target resource to transfer to + + Returns: + The destination resource for chaining + """ + return transfer_resource(self, destination) + + def __repr__(self) -> str: + spec_str = ", ".join(f"{k}={v}" for k, v in self.resource_spec.items()) + return f"{self.connector.__class__.__name__}.resource({spec_str})" + + +class TransferStrategy(ABC): + """Abstract base class for resource transfer strategies.""" + + @abstractmethod + def can_handle(self, source: Resource, destination: Resource) -> bool: + """Check if this strategy can handle the transfer.""" + pass + + @abstractmethod + def transfer(self, source: Resource, destination: Resource) -> Resource: + """Execute the transfer.""" + pass + + +class OptimizedCloudTransfer(TransferStrategy): + """ + Optimized transfer strategy for connectors that support direct transfers. + + Delegates to connector-specific optimization logic rather than making assumptions + about connector internals. + """ + + def can_handle(self, source: Resource, destination: Resource) -> bool: + """Check if source connector can optimize transfer to destination connector.""" + source_conn = source.connector + dest_conn = destination.connector + + # Delegate to connector to determine if it can optimize to the destination + return hasattr( + source_conn, "can_optimize_transfer_to" + ) and source_conn.can_optimize_transfer_to(dest_conn) + + def transfer(self, source: Resource, destination: Resource) -> Resource: + """Execute optimized transfer via source connector.""" + logger.info(f"Using optimized transfer: {source} -> {destination}") + + # Delegate the actual transfer to the source connector + source.connector.optimized_transfer_to( + dest_connector=destination.connector, + source_spec=source.resource_spec, + dest_spec=destination.resource_spec, + ) + + logger.info(f"Optimized transfer complete: {source} -> {destination}") + return destination + + +class FallbackTempTransfer(TransferStrategy): + """ + Fallback transfer strategy using temporary directory. + + Downloads from source to temp dir, then uploads to destination. + Works with any connector types. + """ + + def can_handle(self, source: Resource, destination: Resource) -> bool: + """This strategy can handle any transfer.""" + return True + + def transfer(self, source: Resource, destination: Resource) -> Resource: + """Execute fallback temp directory transfer.""" + logger.info(f"Using fallback temp transfer: {source} -> {destination}") + + with TemporaryDirectory() as temp_dir: + temp_path = Path(temp_dir) + + # Pull from source to temp + source.connector.pull(destination_path=temp_path, **source.resource_spec) + + # Push from temp to destination + destination.connector.push( + source_path=str(temp_path), **destination.resource_spec + ) + + logger.info(f"Fallback transfer complete: {source} -> {destination}") + return destination + + +# Registry of transfer strategies, ordered by preference +TRANSFER_STRATEGIES = [ + OptimizedCloudTransfer(), + FallbackTempTransfer(), # Always handles as fallback +] + + +def transfer_resource(source: Resource, destination: Resource) -> Resource: + """ + Transfer a resource from source to destination using the best available strategy. + + Args: + source: Source resource to transfer from + destination: Destination resource to transfer to + + Returns: + The destination resource for chaining + """ + # Find the first strategy that can handle this transfer + for strategy in TRANSFER_STRATEGIES: + if strategy.can_handle(source, destination): + return strategy.transfer(source, destination) + + # This should never happen since FallbackTempTransfer always handles + raise RuntimeError(f"No transfer strategy available for {source} -> {destination}") + + +def register_transfer_strategy(strategy: TransferStrategy, priority: int = 0): + """ + Register a new transfer strategy. + + Args: + strategy: The transfer strategy to register + priority: Priority (lower = higher priority, 0 = highest) + """ + TRANSFER_STRATEGIES.insert(priority, strategy) + + +# Mixin for connectors to add .resource() method +class ResourceMixin: + """Mixin to add resource() method to connectors.""" + + def resource(self, **resource_spec) -> Resource: + """ + Create a Resource object for this connector. + + Args: + **resource_spec: Connector-specific resource specification + + Returns: + Resource object that can be used in transfer chains + + Examples: + builds_conn.resource(key="pluto", build_note="my-build") + drafts_conn.resource(key="pluto", version="25v1", acl="public-read") + """ + return Resource(self, **resource_spec) diff --git a/dcpy/connectors/hybrid_pathed_storage.py b/dcpy/connectors/hybrid_pathed_storage.py index 682723b101..12449a55d7 100644 --- a/dcpy/connectors/hybrid_pathed_storage.py +++ b/dcpy/connectors/hybrid_pathed_storage.py @@ -1,16 +1,18 @@ -from cloudpathlib.azure import AzureBlobClient -from cloudpathlib import S3Client, CloudPath -from dataclasses import dataclass -from enum import Enum import logging as default_logging import os -from pathlib import Path import shutil +from dataclasses import dataclass +from enum import Enum +from pathlib import Path from typing import Any, NotRequired, TypedDict, Unpack +from cloudpathlib import CloudPath, S3Client +from cloudpathlib.azure import AzureBlobClient + from dcpy.configuration import DEFAULT_S3_URL -from dcpy.utils.logging import logger +from dcpy.connectors.chain import ResourceMixin from dcpy.connectors.registry import Connector +from dcpy.utils.logging import logger default_logging.getLogger("azure").setLevel( "ERROR" @@ -187,7 +189,7 @@ def __str__(self): return f"{self.storage_type.value}://{self.root_path}" -class PathedStorageConnector(Connector, arbitrary_types_allowed=True): +class PathedStorageConnector(Connector, ResourceMixin, arbitrary_types_allowed=True): """Connector where all the keys are expected to be stringified relative paths. In theory they all work the same across all providers (s3, azure, localstorage, etc.) provided @@ -312,3 +314,98 @@ def get_subfolders(self, prefix: str) -> list[str]: if not folder.exists() or not folder.is_dir(): return [] return [f.name for f in folder.iterdir() if f.is_dir()] + + def can_optimize_transfer_to(self, other_connector: "Connector") -> bool: + """Check if we can optimize transfer to another PathedStorageConnector on same cloud.""" + if not isinstance(other_connector, PathedStorageConnector): + return False + + # Check if both use the same cloud storage backend + self_storage = self.storage + other_storage = other_connector.storage + + # Same local storage (both local paths) + if ( + self_storage.storage_type == StorageType.LOCAL + and other_storage.storage_type == StorageType.LOCAL + ): + return True + + # Same S3 bucket + if ( + self_storage.storage_type == StorageType.S3 + and other_storage.storage_type == StorageType.S3 + ): + # Extract bucket names from root paths for comparison + self_bucket = ( + str(self_storage.root_path).split("/")[0] + if self_storage.root_path + else None + ) + other_bucket = ( + str(other_storage.root_path).split("/")[0] + if other_storage.root_path + else None + ) + return self_bucket == other_bucket + + # Same Azure container + if ( + self_storage.storage_type == StorageType.AZURE + and other_storage.storage_type == StorageType.AZURE + ): + # Extract container names from root paths for comparison + self_container = ( + str(self_storage.root_path).split("/")[0] + if self_storage.root_path + else None + ) + other_container = ( + str(other_storage.root_path).split("/")[0] + if other_storage.root_path + else None + ) + return self_container == other_container + + return False + + def optimized_transfer_to( + self, dest_connector: "Connector", source_spec: dict, dest_spec: dict + ) -> None: + """Execute optimized transfer using cloud provider's native copy operations.""" + if not isinstance(dest_connector, PathedStorageConnector): + raise ValueError( + "Can only optimize transfers to other PathedStorageConnectors" + ) + + # Build source and destination paths + source_path = self._build_path(**source_spec) + dest_path = dest_connector._build_path(**dest_spec) + + logger.info( + f"Optimized transfer: {self.storage.root_path / source_path} -> {dest_connector.storage.root_path / dest_path}" + ) + + # Use cloud storage's native copy method + full_source_path = self.storage.root_path / source_path + full_dest_path = dest_connector.storage.root_path / dest_path + + # For directories, use copytree; for files, use copy + if full_source_path.is_dir(): + full_source_path.copytree(full_dest_path) + else: + # Ensure destination parent directory exists + full_dest_path.parent.mkdir(parents=True, exist_ok=True) + full_source_path.copy(full_dest_path) + + # Apply any additional metadata/ACL settings + if "acl" in dest_spec and hasattr(dest_connector, "_set_acl"): + dest_connector._set_acl(dest_path, dest_spec["acl"]) + + def _build_path(self, **resource_spec) -> str: + """Build the storage path from resource specification.""" + # Default implementation - subclasses should override for specific logic + if "key" in resource_spec: + return resource_spec["key"] + else: + raise ValueError("Resource specification must include 'key'") diff --git a/dcpy/connectors/registry.py b/dcpy/connectors/registry.py index 24375d2f9f..3396308df6 100644 --- a/dcpy/connectors/registry.py +++ b/dcpy/connectors/registry.py @@ -1,8 +1,10 @@ from __future__ import annotations + from abc import ABC, abstractmethod from pathlib import Path +from typing import Any, Callable, Generic, TypeVar, overload + from pydantic import BaseModel -from typing import Any, TypeVar, Generic, overload, Callable from dcpy.utils.logging import logger @@ -55,6 +57,22 @@ def get_pull_local_sub_path(self, key: str, **kwargs) -> Path: class Connector(Push, Pull, ABC): """A connector that does not version datasets but only stores the "current" or "latest" versions""" + def can_optimize_transfer_to(self, other_connector: "Connector") -> bool: + """ + Check if this connector can optimize transfers to another connector. + Default implementation returns False - subclasses override for optimization. + """ + return False + + def optimized_transfer_to( + self, dest_connector: "Connector", source_spec: dict, dest_spec: dict + ) -> None: + """ + Execute optimized transfer to another connector. + Only called if can_optimize_transfer_to returned True. + """ + raise NotImplementedError("Connector does not support optimized transfers") + class VersionedConnector(Connector, VersionSearch, ABC): """A connector that implements the most standard connector behavior (pull, push, version)""" diff --git a/dcpy/test/connectors/test_chaining.py b/dcpy/test/connectors/test_chaining.py new file mode 100644 index 0000000000..053b5e5911 --- /dev/null +++ b/dcpy/test/connectors/test_chaining.py @@ -0,0 +1,310 @@ +""" +Unit tests for connector chaining and resource transfer optimization. + +Tests the automatic optimization of transfers between connectors when they +share the same cloud provider or storage backend. +""" + +from pathlib import Path +from tempfile import TemporaryDirectory +from unittest.mock import Mock, call, patch + +import pytest + +from dcpy.connectors.chain import ( + FallbackTempTransfer, + OptimizedCloudTransfer, + Resource, + ResourceMixin, + transfer_resource, +) +from dcpy.connectors.registry import Connector + + +class MockOptimizedConnector(Connector, ResourceMixin): + """Mock connector that supports optimized transfers.""" + + def __init__(self, name: str, can_optimize: bool = True): + super().__init__(conn_type="mock") + self.name = name + self._can_optimize = can_optimize + self.optimized_transfer_called = False + self.pull_called = False + self.push_called = False + + def can_optimize_transfer_to(self, other_connector: "Connector") -> bool: + """Mock optimization capability.""" + return ( + self._can_optimize + and isinstance(other_connector, MockOptimizedConnector) + and other_connector._can_optimize + ) + + def optimized_transfer_to( + self, dest_connector: "Connector", source_spec: dict, dest_spec: dict + ) -> None: + """Mock optimized transfer.""" + self.optimized_transfer_called = True + self.last_optimized_call = { + "dest_connector": dest_connector, + "source_spec": source_spec, + "dest_spec": dest_spec, + } + + def pull(self, destination_path: Path, **kwargs) -> dict: + """Mock pull method.""" + self.pull_called = True + self.last_pull_call = {"destination_path": destination_path, "kwargs": kwargs} + return {"path": destination_path / "mock_file"} + + def push(self, source_path: str, **kwargs) -> dict: + """Mock push method.""" + self.push_called = True + self.last_push_call = {"source_path": source_path, "kwargs": kwargs} + return {"success": True} + + def __repr__(self): + return f"MockOptimizedConnector({self.name})" + + +class MockBasicConnector(Connector, ResourceMixin): + """Mock connector that does not support optimization.""" + + def __init__(self, name: str): + super().__init__(conn_type="mock_basic") + self.name = name + self.pull_called = False + self.push_called = False + + def pull(self, destination_path: Path, **kwargs) -> dict: + """Mock pull method.""" + self.pull_called = True + self.last_pull_call = {"destination_path": destination_path, "kwargs": kwargs} + return {"path": destination_path / "mock_file"} + + def push(self, source_path: str, **kwargs) -> dict: + """Mock push method.""" + self.push_called = True + self.last_push_call = {"source_path": source_path, "kwargs": kwargs} + return {"success": True} + + def __repr__(self): + return f"MockBasicConnector({self.name})" + + +class TestResourceTransfer: + """Test resource transfer between connectors.""" + + def test_optimized_transfer_is_used(self): + """Test that optimized transfer is used when both connectors support it.""" + # Setup + source_conn = MockOptimizedConnector("source", can_optimize=True) + dest_conn = MockOptimizedConnector("dest", can_optimize=True) + + source_resource = source_conn.resource( + key="test_product", build_note="my_build" + ) + dest_resource = dest_conn.resource( + key="test_product", version="1.0", acl="public-read" + ) + + # Execute transfer + result = source_resource >> dest_resource + + # Verify optimized transfer was used + assert source_conn.optimized_transfer_called, ( + "Optimized transfer should have been called" + ) + assert not source_conn.pull_called, ( + "Pull should not have been called for optimized transfer" + ) + assert not dest_conn.push_called, ( + "Push should not have been called for optimized transfer" + ) + + # Verify correct parameters were passed + assert source_conn.last_optimized_call["dest_connector"] is dest_conn + assert source_conn.last_optimized_call["source_spec"] == { + "key": "test_product", + "build_note": "my_build", + } + assert source_conn.last_optimized_call["dest_spec"] == { + "key": "test_product", + "version": "1.0", + "acl": "public-read", + } + + # Verify return value for chaining + assert result is dest_resource + + def test_fallback_transfer_is_used(self): + """Test that fallback transfer is used when optimization is not available.""" + # Setup + source_conn = MockBasicConnector("source") + dest_conn = MockBasicConnector("dest") + + source_resource = source_conn.resource( + key="test_product", build_note="my_build" + ) + dest_resource = dest_conn.resource(key="test_product", version="1.0") + + # Execute transfer with mocked temp directory + with patch("dcpy.connectors.chain.TemporaryDirectory") as mock_temp_dir: + mock_temp_dir.return_value.__enter__.return_value = "/tmp/mock_temp" + mock_temp_dir.return_value.__exit__.return_value = None + + result = source_resource >> dest_resource + + # Verify fallback transfer was used + assert source_conn.pull_called, ( + "Pull should have been called for fallback transfer" + ) + assert dest_conn.push_called, ( + "Push should have been called for fallback transfer" + ) + + # Verify correct parameters were passed + assert source_conn.last_pull_call["kwargs"] == { + "key": "test_product", + "build_note": "my_build", + } + assert dest_conn.last_push_call["kwargs"] == { + "key": "test_product", + "version": "1.0", + } + + # Verify return value for chaining + assert result is dest_resource + + def test_mixed_connectors_use_fallback(self): + """Test that fallback is used when only one connector supports optimization.""" + # Setup + source_conn = MockOptimizedConnector("source", can_optimize=True) + dest_conn = MockBasicConnector("dest") # No optimization + + source_resource = source_conn.resource(key="test_product") + dest_resource = dest_conn.resource(key="test_product", version="1.0") + + # Execute transfer + with patch("dcpy.connectors.chain.TemporaryDirectory") as mock_temp_dir: + mock_temp_dir.return_value.__enter__.return_value = "/tmp/mock_temp" + mock_temp_dir.return_value.__exit__.return_value = None + + result = source_resource >> dest_resource + + # Verify fallback was used (not optimization) + assert not source_conn.optimized_transfer_called, ( + "Optimized transfer should not be called" + ) + assert source_conn.pull_called, "Pull should have been called for fallback" + assert dest_conn.push_called, "Push should have been called for fallback" + + def test_chaining_multiple_transfers(self): + """Test chaining multiple transfers together.""" + # Setup + conn_a = MockOptimizedConnector("a", can_optimize=True) + conn_b = MockOptimizedConnector("b", can_optimize=True) + conn_c = MockOptimizedConnector("c", can_optimize=True) + + resource_a = conn_a.resource(key="test", version="1.0") + resource_b = conn_b.resource(key="test", version="2.0") + resource_c = conn_c.resource(key="test", version="3.0") + + # Execute chained transfers + result = resource_a >> resource_b >> resource_c + + # Verify both transfers used optimization + assert conn_a.optimized_transfer_called, ( + "First transfer should use optimization" + ) + assert conn_b.optimized_transfer_called, ( + "Second transfer should use optimization" + ) + + # Verify final result + assert result is resource_c + + def test_pipe_operator_works(self): + """Test that | operator works the same as >>.""" + # Setup + source_conn = MockOptimizedConnector("source", can_optimize=True) + dest_conn = MockOptimizedConnector("dest", can_optimize=True) + + source_resource = source_conn.resource(key="test") + dest_resource = dest_conn.resource(key="test", version="1.0") + + # Execute transfer with | operator + result = source_resource | dest_resource + + # Verify optimized transfer was used + assert source_conn.optimized_transfer_called, ( + "Optimized transfer should work with | operator" + ) + assert result is dest_resource + + def test_resource_representation(self): + """Test that Resource objects have useful string representations.""" + conn = MockOptimizedConnector("test") + resource = conn.resource( + key="my_product", version="1.0", build_note="test_build" + ) + + repr_str = repr(resource) + + # Should include connector class name and resource spec + assert "MockOptimizedConnector" in repr_str + assert "key=my_product" in repr_str + assert "version=1.0" in repr_str + assert "build_note=test_build" in repr_str + + +class TestTransferStrategies: + """Test individual transfer strategies.""" + + def test_optimized_strategy_can_handle(self): + """Test OptimizedCloudTransfer strategy detection.""" + strategy = OptimizedCloudTransfer() + + # Setup connectors + optimized_a = MockOptimizedConnector("a", can_optimize=True) + optimized_b = MockOptimizedConnector("b", can_optimize=True) + basic_c = MockBasicConnector("c") + + resource_a = optimized_a.resource(key="test") + resource_b = optimized_b.resource(key="test") + resource_c = basic_c.resource(key="test") + + # Test strategy detection + assert strategy.can_handle(resource_a, resource_b), ( + "Should handle optimized-to-optimized" + ) + assert not strategy.can_handle(resource_a, resource_c), ( + "Should not handle optimized-to-basic" + ) + assert not strategy.can_handle(resource_c, resource_a), ( + "Should not handle basic-to-optimized" + ) + assert not strategy.can_handle(resource_c, resource_c), ( + "Should not handle basic-to-basic" + ) + + def test_fallback_strategy_handles_everything(self): + """Test that FallbackTempTransfer handles any connector combination.""" + strategy = FallbackTempTransfer() + + # Setup various connector combinations + optimized_a = MockOptimizedConnector("a") + basic_b = MockBasicConnector("b") + + resource_a = optimized_a.resource(key="test") + resource_b = basic_b.resource(key="test") + + # Test that fallback handles everything + assert strategy.can_handle(resource_a, resource_b) + assert strategy.can_handle(resource_b, resource_a) + assert strategy.can_handle(resource_a, resource_a) + assert strategy.can_handle(resource_b, resource_b) + + +if __name__ == "__main__": + pytest.main([__file__])