Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 66 additions & 1 deletion cs3client/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,22 @@

Authors: Rasmus Welander, Diogo Castro, Giuseppe Lo Presti.
Emails: rasmus.oscar.welander@cern.ch, diogo.castro@cern.ch, giuseppe.lopresti@cern.ch
Last updated: 30/08/2024
Last updated: 16/12/2025
"""

import time
import logging
import http
import requests
from typing import Union, Optional, Generator
from functools import wraps
import cs3.storage.provider.v1beta1.resources_pb2 as cs3spr
import cs3.storage.provider.v1beta1.provider_api_pb2 as cs3sp
from cs3.gateway.v1beta1.gateway_api_pb2_grpc import GatewayAPIStub
import cs3.types.v1beta1.types_pb2 as types
import cs3.rpc.v1beta1.code_pb2 as cs3code
import cs3.rpc.v1beta1.status_pb2 as cs3status
import grpc


from .config import Config
Expand All @@ -26,6 +29,27 @@
LOCK_ATTR_KEY = 'cs3client.advlock'


_GRPC_TO_CS3 = {
grpc.StatusCode.NOT_FOUND: cs3code.CODE_NOT_FOUND,
grpc.StatusCode.UNAUTHENTICATED: cs3code.CODE_UNAUTHENTICATED,
grpc.StatusCode.PERMISSION_DENIED: cs3code.CODE_PERMISSION_DENIED,
grpc.StatusCode.ALREADY_EXISTS: cs3code.CODE_ALREADY_EXISTS,
grpc.StatusCode.UNIMPLEMENTED: cs3code.CODE_UNIMPLEMENTED,
grpc.StatusCode.FAILED_PRECONDITION: cs3code.CODE_FAILED_PRECONDITION,
grpc.StatusCode.ABORTED: cs3code.CODE_ABORTED,
grpc.StatusCode.INVALID_ARGUMENT: cs3code.CODE_INVALID_ARGUMENT,
grpc.StatusCode.INTERNAL: cs3code.CODE_INTERNAL,
}

def _grpc_exc_to_cs3_status(rpc_error: grpc.RpcError) -> cs3status.Status:
code = getattr(rpc_error, "code", lambda: None)()
details = getattr(rpc_error, "details", lambda: None)()
return cs3status.Status(
code=_GRPC_TO_CS3.get(code, cs3code.CODE_INTERNAL),
message=details or str(rpc_error),
trace="Converted from gRPC exception"
)

class File:
"""
File class to interact with the CS3 API.
Expand All @@ -48,6 +72,30 @@ def __init__(
self._gateway: GatewayAPIStub = gateway
self._status_code_handler: StatusCodeHandler = status_code_handler

def handle_grpc_error(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
try:
return func(self, *args, **kwargs)

# Transport / gRPC-layer failures (no CS3 rpc.Status came back)
except grpc.RpcError as e:
# Log gRPC-layer info
self._log.error(
f"gRPC-layer error in {func.__name__}: code={getattr(e,'code',lambda:None)()} "
f"details={getattr(e,'details',lambda:None)()}"
)
status = _grpc_exc_to_cs3_status(e)
# Reuse existing mapping
self._status_code_handler.handle_errors(status, operation=func.__name__)
raise # fallback: should not be reached if handle_errors always raises
except Exception as e:
self._log.error(f"Client error in {func.__name__}: {e}")
raise

return wrapper

@handle_grpc_error
def stat(self, auth_token: tuple, resource: Resource) -> cs3spr.ResourceInfo:
"""
Stat a file and return the ResourceInfo object.
Expand All @@ -70,6 +118,7 @@ def stat(self, auth_token: tuple, resource: Resource) -> cs3spr.ResourceInfo:
)
return res.info

@handle_grpc_error
def set_xattr(self, auth_token: tuple, resource: Resource, key: str, value: str, lock_id: Optional[str] = None) -> None:
"""
Set the extended attribute <key> to <value> for a resource.
Expand All @@ -93,6 +142,7 @@ def set_xattr(self, auth_token: tuple, resource: Resource, key: str, value: str,
self._status_code_handler.handle_errors(res.status, "set extended attribute", resource.get_file_ref_str())
self._log.debug(f'msg="Invoked setxattr" trace="{res.status.trace}"')

@handle_grpc_error
def remove_xattr(self, auth_token: tuple, resource: Resource, key: str, lock_id: Optional[str] = None) -> None:
"""
Remove the extended attribute <key>.
Expand All @@ -111,6 +161,7 @@ def remove_xattr(self, auth_token: tuple, resource: Resource, key: str, lock_id:
self._status_code_handler.handle_errors(res.status, "remove extended attribute", resource.get_file_ref_str())
self._log.debug(f'msg="Invoked UnsetArbitraryMetaData" trace="{res.status.trace}"')

@handle_grpc_error
def rename_file(
self, auth_token: tuple, resource: Resource, newresource: Resource, lock_id: Optional[str] = None
) -> None:
Expand All @@ -132,6 +183,7 @@ def rename_file(
self._status_code_handler.handle_errors(res.status, "rename file", resource.get_file_ref_str())
self._log.debug(f'msg="Invoked Move" trace="{res.status.trace}"')

@handle_grpc_error
def remove_file(self, auth_token: tuple, resource: Resource, lock_id: Optional[str] = None) -> None:
"""
Remove a resource.
Expand All @@ -149,6 +201,7 @@ def remove_file(self, auth_token: tuple, resource: Resource, lock_id: Optional[s
self._status_code_handler.handle_errors(res.status, "remove file", resource.get_file_ref_str())
self._log.debug(f'msg="Invoked Delete" trace="{res.status.trace}"')

@handle_grpc_error
def touch_file(self, auth_token: tuple, resource: Resource) -> None:
"""
Create a resource.
Expand All @@ -168,6 +221,7 @@ def touch_file(self, auth_token: tuple, resource: Resource) -> None:
self._status_code_handler.handle_errors(res.status, "touch file", resource.get_file_ref_str())
self._log.debug(f'msg="Invoked TouchFile" trace="{res.status.trace}"')

@handle_grpc_error
def write_file(
self, auth_token: tuple, resource: Resource, content: Union[str, bytes], size: int,
app_name: Optional[str] = None, lock_id: Optional[str] = None,
Expand Down Expand Up @@ -276,6 +330,7 @@ def write_file(
f'headers="{headers}"'
)

@handle_grpc_error
def read_file(self, auth_token: tuple, resource: Resource, lock_id: Optional[str] = None) -> Generator[bytes, None, None]:
"""
Read a file. Note that the function is a generator, managed by the app server.
Expand Down Expand Up @@ -332,6 +387,7 @@ def read_file(self, auth_token: tuple, resource: Resource, lock_id: Optional[str
for chunk in data:
yield chunk

@handle_grpc_error
def make_dir(self, auth_token: tuple, resource: Resource) -> None:
"""
Create a directory.
Expand All @@ -348,6 +404,7 @@ def make_dir(self, auth_token: tuple, resource: Resource) -> None:
self._status_code_handler.handle_errors(res.status, "make directory", resource.get_file_ref_str())
self._log.debug(f'msg="Invoked CreateContainer" trace="{res.status.trace}"')

@handle_grpc_error
def list_dir(
self, auth_token: tuple, resource: Resource
) -> Generator[cs3spr.ResourceInfo, None, None]:
Expand All @@ -368,6 +425,7 @@ def list_dir(
for info in res.infos:
yield info

@handle_grpc_error
def _set_lock_using_xattr(self, auth_token, resource: Resource, app_name: str, lock_id: Union[int, str]) -> None:
""""
Set a lock to a resource with the given value metadata and appname as holder
Expand All @@ -390,6 +448,7 @@ def _set_lock_using_xattr(self, auth_token, resource: Resource, app_name: str, l
self.set_xattr(auth_token, resource, LOCK_ATTR_KEY, f"{app_name}!{lock_id}!{expiration}", None)
return

@handle_grpc_error
def set_lock(self, auth_token: tuple, resource: Resource, app_name: str, lock_id: Union[int, str]) -> None:
"""
Set a lock to a resource with the given value and appname as holder
Expand Down Expand Up @@ -427,6 +486,7 @@ def set_lock(self, auth_token: tuple, resource: Resource, app_name: str, lock_id
self._log.debug(f'msg="Invoked SetLock" {resource.get_file_ref_str()} '
f'value="{lock_id}" result="{res.status.trace}"')

@handle_grpc_error
def _get_lock_using_xattr(self, auth_token: tuple, resource: Resource) -> dict:
"""
Get the lock metadata for the given filepath
Expand All @@ -453,6 +513,7 @@ def _get_lock_using_xattr(self, auth_token: tuple, resource: Resource) -> dict:
except KeyError:
return None

@handle_grpc_error
def get_lock(self, auth_token: tuple, resource: Resource) -> Union[cs3spr.Lock, dict, None]:
"""
Get the lock for the given filepath
Expand Down Expand Up @@ -493,6 +554,7 @@ def get_lock(self, auth_token: tuple, resource: Resource) -> Union[cs3spr.Lock,
"expiration": {"seconds": res.lock.expiration.seconds},
}

@handle_grpc_error
def _refresh_lock_using_xattr(
self, auth_token: tuple, resource: Resource, app_name: str, lock_id: Union[str, int],
existing_lock_id: Union[str, int] = None
Expand Down Expand Up @@ -529,6 +591,7 @@ def _refresh_lock_using_xattr(
self.set_xattr(auth_token, resource, LOCK_ATTR_KEY, f"{app_name}!{lock_id}!{expiration}", None)
return

@handle_grpc_error
def refresh_lock(
self, auth_token: tuple, resource: Resource, app_name: str, lock_id: Union[str, int],
existing_lock_id: Union[str, int] = None
Expand Down Expand Up @@ -570,6 +633,7 @@ def refresh_lock(
self._log.debug(f'msg="Invoked RefreshLock" {resource.get_file_ref_str()} result="{res.status.trace}" '
f'value="{lock_id}" old_value="{existing_lock_id}"')

@handle_grpc_error
def _unlock_using_xattr(
self, auth_token: tuple, resource: Resource, app_name: str, lock_id: Union[str, int]
) -> None:
Expand Down Expand Up @@ -603,6 +667,7 @@ def _unlock_using_xattr(
self.remove_xattr(auth_token, resource, LOCK_ATTR_KEY, None)
return

@handle_grpc_error
def unlock(self, auth_token: tuple, resource: Resource, app_name, lock_id: Union[str, int]):
"""
Remove the lock for the given filepath
Expand Down
Loading