Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 208 additions & 0 deletions dcpy/connectors/chain.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
"""
Connector chaining and resource transfer abstractions.

Enables elegant syntax for transferring resources between connectors:
builds_conn.resource(key="pluto", build_note="my-build") >> drafts_conn.resource(key="pluto", version="25v1")

The system automatically chooses the most efficient transfer method:
- If both connectors are PathedStorageConnectors on the same cloud: direct copy
- Otherwise: fallback to pull-temp-push pattern
"""

from abc import ABC, abstractmethod
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import TYPE_CHECKING, Any, Dict, Optional, Type

from dcpy.utils.logging import logger

if TYPE_CHECKING:
from dcpy.connectors.base import Connector
from dcpy.connectors.pathed_storage import PathedStorageConnector


class Resource:
"""
Represents a resource within a connector that can be transferred to another connector.

Examples:
builds_conn.resource(key="pluto", build_note="my-build")
drafts_conn.resource(key="pluto", version="25v1", acl="public-read")
local_conn.resource(path="/tmp/data")
"""

def __init__(self, connector: "Connector", **resource_spec):
"""
Initialize a Resource.

Args:
connector: The connector containing this resource
**resource_spec: Connector-specific resource specification (key, version, etc.)
"""
self.connector = connector
self.resource_spec = resource_spec

def __rshift__(self, destination: "Resource") -> "Resource":
"""
Transfer this resource to another resource using >> operator.

Args:
destination: Target resource to transfer to

Returns:
The destination resource for chaining
"""
return transfer_resource(self, destination)

def __or__(self, destination: "Resource") -> "Resource":
"""
Transfer this resource to another resource using | operator.

Args:
destination: Target resource to transfer to

Returns:
The destination resource for chaining
"""
return transfer_resource(self, destination)

def __repr__(self) -> str:
spec_str = ", ".join(f"{k}={v}" for k, v in self.resource_spec.items())
return f"{self.connector.__class__.__name__}.resource({spec_str})"


class TransferStrategy(ABC):
"""Abstract base class for resource transfer strategies."""

@abstractmethod
def can_handle(self, source: Resource, destination: Resource) -> bool:
"""Check if this strategy can handle the transfer."""
pass

@abstractmethod
def transfer(self, source: Resource, destination: Resource) -> Resource:
"""Execute the transfer."""
pass


class OptimizedCloudTransfer(TransferStrategy):
"""
Optimized transfer strategy for connectors that support direct transfers.

Delegates to connector-specific optimization logic rather than making assumptions
about connector internals.
"""

def can_handle(self, source: Resource, destination: Resource) -> bool:
"""Check if source connector can optimize transfer to destination connector."""
source_conn = source.connector
dest_conn = destination.connector

# Delegate to connector to determine if it can optimize to the destination
return hasattr(
source_conn, "can_optimize_transfer_to"
) and source_conn.can_optimize_transfer_to(dest_conn)

def transfer(self, source: Resource, destination: Resource) -> Resource:
"""Execute optimized transfer via source connector."""
logger.info(f"Using optimized transfer: {source} -> {destination}")

# Delegate the actual transfer to the source connector
source.connector.optimized_transfer_to(
dest_connector=destination.connector,
source_spec=source.resource_spec,
dest_spec=destination.resource_spec,
)

logger.info(f"Optimized transfer complete: {source} -> {destination}")
return destination


class FallbackTempTransfer(TransferStrategy):
"""
Fallback transfer strategy using temporary directory.

Downloads from source to temp dir, then uploads to destination.
Works with any connector types.
"""

def can_handle(self, source: Resource, destination: Resource) -> bool:
"""This strategy can handle any transfer."""
return True

def transfer(self, source: Resource, destination: Resource) -> Resource:
"""Execute fallback temp directory transfer."""
logger.info(f"Using fallback temp transfer: {source} -> {destination}")

with TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)

# Pull from source to temp
source.connector.pull(destination_path=temp_path, **source.resource_spec)

# Push from temp to destination
destination.connector.push(
source_path=str(temp_path), **destination.resource_spec
)

logger.info(f"Fallback transfer complete: {source} -> {destination}")
return destination


# Registry of transfer strategies, ordered by preference
TRANSFER_STRATEGIES = [
OptimizedCloudTransfer(),
FallbackTempTransfer(), # Always handles as fallback
]


def transfer_resource(source: Resource, destination: Resource) -> Resource:
"""
Transfer a resource from source to destination using the best available strategy.

Args:
source: Source resource to transfer from
destination: Destination resource to transfer to

Returns:
The destination resource for chaining
"""
# Find the first strategy that can handle this transfer
for strategy in TRANSFER_STRATEGIES:
if strategy.can_handle(source, destination):
return strategy.transfer(source, destination)

# This should never happen since FallbackTempTransfer always handles
raise RuntimeError(f"No transfer strategy available for {source} -> {destination}")


def register_transfer_strategy(strategy: TransferStrategy, priority: int = 0):
"""
Register a new transfer strategy.

Args:
strategy: The transfer strategy to register
priority: Priority (lower = higher priority, 0 = highest)
"""
TRANSFER_STRATEGIES.insert(priority, strategy)


# Mixin for connectors to add .resource() method
class ResourceMixin:
"""Mixin to add resource() method to connectors."""

def resource(self, **resource_spec) -> Resource:
"""
Create a Resource object for this connector.

Args:
**resource_spec: Connector-specific resource specification

Returns:
Resource object that can be used in transfer chains

Examples:
builds_conn.resource(key="pluto", build_note="my-build")
drafts_conn.resource(key="pluto", version="25v1", acl="public-read")
"""
return Resource(self, **resource_spec)
111 changes: 104 additions & 7 deletions dcpy/connectors/hybrid_pathed_storage.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
from cloudpathlib.azure import AzureBlobClient
from cloudpathlib import S3Client, CloudPath
from dataclasses import dataclass
from enum import Enum
import logging as default_logging
import os
from pathlib import Path
import shutil
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import Any, NotRequired, TypedDict, Unpack

from cloudpathlib import CloudPath, S3Client
from cloudpathlib.azure import AzureBlobClient

from dcpy.configuration import DEFAULT_S3_URL
from dcpy.utils.logging import logger
from dcpy.connectors.chain import ResourceMixin
from dcpy.connectors.registry import Connector
from dcpy.utils.logging import logger

default_logging.getLogger("azure").setLevel(
"ERROR"
Expand Down Expand Up @@ -187,7 +189,7 @@ def __str__(self):
return f"{self.storage_type.value}://{self.root_path}"


class PathedStorageConnector(Connector, arbitrary_types_allowed=True):
class PathedStorageConnector(Connector, ResourceMixin, arbitrary_types_allowed=True):
"""Connector where all the keys are expected to be stringified relative paths.

In theory they all work the same across all providers (s3, azure, localstorage, etc.) provided
Expand Down Expand Up @@ -312,3 +314,98 @@ def get_subfolders(self, prefix: str) -> list[str]:
if not folder.exists() or not folder.is_dir():
return []
return [f.name for f in folder.iterdir() if f.is_dir()]

def can_optimize_transfer_to(self, other_connector: "Connector") -> bool:
"""Check if we can optimize transfer to another PathedStorageConnector on same cloud."""
if not isinstance(other_connector, PathedStorageConnector):
return False

# Check if both use the same cloud storage backend
self_storage = self.storage
other_storage = other_connector.storage

# Same local storage (both local paths)
if (
self_storage.storage_type == StorageType.LOCAL
and other_storage.storage_type == StorageType.LOCAL
):
return True

# Same S3 bucket
if (
self_storage.storage_type == StorageType.S3
and other_storage.storage_type == StorageType.S3
):
# Extract bucket names from root paths for comparison
self_bucket = (
str(self_storage.root_path).split("/")[0]
if self_storage.root_path
else None
)
other_bucket = (
str(other_storage.root_path).split("/")[0]
if other_storage.root_path
else None
)
return self_bucket == other_bucket

# Same Azure container
if (
self_storage.storage_type == StorageType.AZURE
and other_storage.storage_type == StorageType.AZURE
):
# Extract container names from root paths for comparison
self_container = (
str(self_storage.root_path).split("/")[0]
if self_storage.root_path
else None
)
other_container = (
str(other_storage.root_path).split("/")[0]
if other_storage.root_path
else None
)
return self_container == other_container

return False

def optimized_transfer_to(
self, dest_connector: "Connector", source_spec: dict, dest_spec: dict
) -> None:
"""Execute optimized transfer using cloud provider's native copy operations."""
if not isinstance(dest_connector, PathedStorageConnector):
raise ValueError(
"Can only optimize transfers to other PathedStorageConnectors"
)

# Build source and destination paths
source_path = self._build_path(**source_spec)
dest_path = dest_connector._build_path(**dest_spec)

logger.info(
f"Optimized transfer: {self.storage.root_path / source_path} -> {dest_connector.storage.root_path / dest_path}"
)

# Use cloud storage's native copy method
full_source_path = self.storage.root_path / source_path
full_dest_path = dest_connector.storage.root_path / dest_path

# For directories, use copytree; for files, use copy
if full_source_path.is_dir():
full_source_path.copytree(full_dest_path)
else:
# Ensure destination parent directory exists
full_dest_path.parent.mkdir(parents=True, exist_ok=True)
full_source_path.copy(full_dest_path)

# Apply any additional metadata/ACL settings
if "acl" in dest_spec and hasattr(dest_connector, "_set_acl"):
dest_connector._set_acl(dest_path, dest_spec["acl"])

def _build_path(self, **resource_spec) -> str:
"""Build the storage path from resource specification."""
# Default implementation - subclasses should override for specific logic
if "key" in resource_spec:
return resource_spec["key"]
else:
raise ValueError("Resource specification must include 'key'")
20 changes: 19 additions & 1 deletion dcpy/connectors/registry.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from pathlib import Path
from typing import Any, Callable, Generic, TypeVar, overload

from pydantic import BaseModel
from typing import Any, TypeVar, Generic, overload, Callable

from dcpy.utils.logging import logger

Expand Down Expand Up @@ -55,6 +57,22 @@ def get_pull_local_sub_path(self, key: str, **kwargs) -> Path:
class Connector(Push, Pull, ABC):
"""A connector that does not version datasets but only stores the "current" or "latest" versions"""

def can_optimize_transfer_to(self, other_connector: "Connector") -> bool:
"""
Check if this connector can optimize transfers to another connector.
Default implementation returns False - subclasses override for optimization.
"""
return False

def optimized_transfer_to(
self, dest_connector: "Connector", source_spec: dict, dest_spec: dict
) -> None:
"""
Execute optimized transfer to another connector.
Only called if can_optimize_transfer_to returned True.
"""
raise NotImplementedError("Connector does not support optimized transfers")


class VersionedConnector(Connector, VersionSearch, ABC):
"""A connector that implements the most standard connector behavior (pull, push, version)"""
Expand Down
Loading
Loading