From 6e88948b4aa9df6def4b5ffb21f28c4ec0631dde Mon Sep 17 00:00:00 2001 From: Rasmus Welander Date: Tue, 16 Dec 2025 11:42:00 +0100 Subject: [PATCH] Add handling of GRPC errors --- cs3client/file.py | 67 ++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 66 insertions(+), 1 deletion(-) diff --git a/cs3client/file.py b/cs3client/file.py index a88e221..d27c59e 100644 --- a/cs3client/file.py +++ b/cs3client/file.py @@ -3,7 +3,7 @@ Authors: Rasmus Welander, Diogo Castro, Giuseppe Lo Presti. Emails: rasmus.oscar.welander@cern.ch, diogo.castro@cern.ch, giuseppe.lopresti@cern.ch -Last updated: 30/08/2024 +Last updated: 16/12/2025 """ import time @@ -11,11 +11,14 @@ import http import requests from typing import Union, Optional, Generator +from functools import wraps import cs3.storage.provider.v1beta1.resources_pb2 as cs3spr import cs3.storage.provider.v1beta1.provider_api_pb2 as cs3sp from cs3.gateway.v1beta1.gateway_api_pb2_grpc import GatewayAPIStub import cs3.types.v1beta1.types_pb2 as types import cs3.rpc.v1beta1.code_pb2 as cs3code +import cs3.rpc.v1beta1.status_pb2 as cs3status +import grpc from .config import Config @@ -26,6 +29,27 @@ LOCK_ATTR_KEY = 'cs3client.advlock' +_GRPC_TO_CS3 = { + grpc.StatusCode.NOT_FOUND: cs3code.CODE_NOT_FOUND, + grpc.StatusCode.UNAUTHENTICATED: cs3code.CODE_UNAUTHENTICATED, + grpc.StatusCode.PERMISSION_DENIED: cs3code.CODE_PERMISSION_DENIED, + grpc.StatusCode.ALREADY_EXISTS: cs3code.CODE_ALREADY_EXISTS, + grpc.StatusCode.UNIMPLEMENTED: cs3code.CODE_UNIMPLEMENTED, + grpc.StatusCode.FAILED_PRECONDITION: cs3code.CODE_FAILED_PRECONDITION, + grpc.StatusCode.ABORTED: cs3code.CODE_ABORTED, + grpc.StatusCode.INVALID_ARGUMENT: cs3code.CODE_INVALID_ARGUMENT, + grpc.StatusCode.INTERNAL: cs3code.CODE_INTERNAL, +} + +def _grpc_exc_to_cs3_status(rpc_error: grpc.RpcError) -> cs3status.Status: + code = getattr(rpc_error, "code", lambda: None)() + details = getattr(rpc_error, "details", lambda: None)() + return cs3status.Status( + code=_GRPC_TO_CS3.get(code, cs3code.CODE_INTERNAL), + message=details or str(rpc_error), + trace="Converted from gRPC exception" + ) + class File: """ File class to interact with the CS3 API. @@ -48,6 +72,30 @@ def __init__( self._gateway: GatewayAPIStub = gateway self._status_code_handler: StatusCodeHandler = status_code_handler + def handle_grpc_error(func): + @wraps(func) + def wrapper(self, *args, **kwargs): + try: + return func(self, *args, **kwargs) + + # Transport / gRPC-layer failures (no CS3 rpc.Status came back) + except grpc.RpcError as e: + # Log gRPC-layer info + self._log.error( + f"gRPC-layer error in {func.__name__}: code={getattr(e,'code',lambda:None)()} " + f"details={getattr(e,'details',lambda:None)()}" + ) + status = _grpc_exc_to_cs3_status(e) + # Reuse existing mapping + self._status_code_handler.handle_errors(status, operation=func.__name__) + raise # fallback: should not be reached if handle_errors always raises + except Exception as e: + self._log.error(f"Client error in {func.__name__}: {e}") + raise + + return wrapper + + @handle_grpc_error def stat(self, auth_token: tuple, resource: Resource) -> cs3spr.ResourceInfo: """ Stat a file and return the ResourceInfo object. @@ -70,6 +118,7 @@ def stat(self, auth_token: tuple, resource: Resource) -> cs3spr.ResourceInfo: ) return res.info + @handle_grpc_error def set_xattr(self, auth_token: tuple, resource: Resource, key: str, value: str, lock_id: Optional[str] = None) -> None: """ Set the extended attribute to for a resource. @@ -93,6 +142,7 @@ def set_xattr(self, auth_token: tuple, resource: Resource, key: str, value: str, self._status_code_handler.handle_errors(res.status, "set extended attribute", resource.get_file_ref_str()) self._log.debug(f'msg="Invoked setxattr" trace="{res.status.trace}"') + @handle_grpc_error def remove_xattr(self, auth_token: tuple, resource: Resource, key: str, lock_id: Optional[str] = None) -> None: """ Remove the extended attribute . @@ -111,6 +161,7 @@ def remove_xattr(self, auth_token: tuple, resource: Resource, key: str, lock_id: self._status_code_handler.handle_errors(res.status, "remove extended attribute", resource.get_file_ref_str()) self._log.debug(f'msg="Invoked UnsetArbitraryMetaData" trace="{res.status.trace}"') + @handle_grpc_error def rename_file( self, auth_token: tuple, resource: Resource, newresource: Resource, lock_id: Optional[str] = None ) -> None: @@ -132,6 +183,7 @@ def rename_file( self._status_code_handler.handle_errors(res.status, "rename file", resource.get_file_ref_str()) self._log.debug(f'msg="Invoked Move" trace="{res.status.trace}"') + @handle_grpc_error def remove_file(self, auth_token: tuple, resource: Resource, lock_id: Optional[str] = None) -> None: """ Remove a resource. @@ -149,6 +201,7 @@ def remove_file(self, auth_token: tuple, resource: Resource, lock_id: Optional[s self._status_code_handler.handle_errors(res.status, "remove file", resource.get_file_ref_str()) self._log.debug(f'msg="Invoked Delete" trace="{res.status.trace}"') + @handle_grpc_error def touch_file(self, auth_token: tuple, resource: Resource) -> None: """ Create a resource. @@ -168,6 +221,7 @@ def touch_file(self, auth_token: tuple, resource: Resource) -> None: self._status_code_handler.handle_errors(res.status, "touch file", resource.get_file_ref_str()) self._log.debug(f'msg="Invoked TouchFile" trace="{res.status.trace}"') + @handle_grpc_error def write_file( self, auth_token: tuple, resource: Resource, content: Union[str, bytes], size: int, app_name: Optional[str] = None, lock_id: Optional[str] = None, @@ -276,6 +330,7 @@ def write_file( f'headers="{headers}"' ) + @handle_grpc_error def read_file(self, auth_token: tuple, resource: Resource, lock_id: Optional[str] = None) -> Generator[bytes, None, None]: """ Read a file. Note that the function is a generator, managed by the app server. @@ -332,6 +387,7 @@ def read_file(self, auth_token: tuple, resource: Resource, lock_id: Optional[str for chunk in data: yield chunk + @handle_grpc_error def make_dir(self, auth_token: tuple, resource: Resource) -> None: """ Create a directory. @@ -348,6 +404,7 @@ def make_dir(self, auth_token: tuple, resource: Resource) -> None: self._status_code_handler.handle_errors(res.status, "make directory", resource.get_file_ref_str()) self._log.debug(f'msg="Invoked CreateContainer" trace="{res.status.trace}"') + @handle_grpc_error def list_dir( self, auth_token: tuple, resource: Resource ) -> Generator[cs3spr.ResourceInfo, None, None]: @@ -368,6 +425,7 @@ def list_dir( for info in res.infos: yield info + @handle_grpc_error def _set_lock_using_xattr(self, auth_token, resource: Resource, app_name: str, lock_id: Union[int, str]) -> None: """" Set a lock to a resource with the given value metadata and appname as holder @@ -390,6 +448,7 @@ def _set_lock_using_xattr(self, auth_token, resource: Resource, app_name: str, l self.set_xattr(auth_token, resource, LOCK_ATTR_KEY, f"{app_name}!{lock_id}!{expiration}", None) return + @handle_grpc_error def set_lock(self, auth_token: tuple, resource: Resource, app_name: str, lock_id: Union[int, str]) -> None: """ Set a lock to a resource with the given value and appname as holder @@ -427,6 +486,7 @@ def set_lock(self, auth_token: tuple, resource: Resource, app_name: str, lock_id self._log.debug(f'msg="Invoked SetLock" {resource.get_file_ref_str()} ' f'value="{lock_id}" result="{res.status.trace}"') + @handle_grpc_error def _get_lock_using_xattr(self, auth_token: tuple, resource: Resource) -> dict: """ Get the lock metadata for the given filepath @@ -453,6 +513,7 @@ def _get_lock_using_xattr(self, auth_token: tuple, resource: Resource) -> dict: except KeyError: return None + @handle_grpc_error def get_lock(self, auth_token: tuple, resource: Resource) -> Union[cs3spr.Lock, dict, None]: """ Get the lock for the given filepath @@ -493,6 +554,7 @@ def get_lock(self, auth_token: tuple, resource: Resource) -> Union[cs3spr.Lock, "expiration": {"seconds": res.lock.expiration.seconds}, } + @handle_grpc_error def _refresh_lock_using_xattr( self, auth_token: tuple, resource: Resource, app_name: str, lock_id: Union[str, int], existing_lock_id: Union[str, int] = None @@ -529,6 +591,7 @@ def _refresh_lock_using_xattr( self.set_xattr(auth_token, resource, LOCK_ATTR_KEY, f"{app_name}!{lock_id}!{expiration}", None) return + @handle_grpc_error def refresh_lock( self, auth_token: tuple, resource: Resource, app_name: str, lock_id: Union[str, int], existing_lock_id: Union[str, int] = None @@ -570,6 +633,7 @@ def refresh_lock( self._log.debug(f'msg="Invoked RefreshLock" {resource.get_file_ref_str()} result="{res.status.trace}" ' f'value="{lock_id}" old_value="{existing_lock_id}"') + @handle_grpc_error def _unlock_using_xattr( self, auth_token: tuple, resource: Resource, app_name: str, lock_id: Union[str, int] ) -> None: @@ -603,6 +667,7 @@ def _unlock_using_xattr( self.remove_xattr(auth_token, resource, LOCK_ATTR_KEY, None) return + @handle_grpc_error def unlock(self, auth_token: tuple, resource: Resource, app_name, lock_id: Union[str, int]): """ Remove the lock for the given filepath