33
44Authors: Rasmus Welander, Diogo Castro, Giuseppe Lo Presti.
55Emails: rasmus.oscar.welander@cern.ch, diogo.castro@cern.ch, giuseppe.lopresti@cern.ch
6- Last updated: 30/08/2024
6+ Last updated: 16/12/2025
77"""
88
99import time
1010import logging
1111import http
1212import requests
1313from typing import Union , Optional , Generator
14+ from functools import wraps
1415import cs3 .storage .provider .v1beta1 .resources_pb2 as cs3spr
1516import cs3 .storage .provider .v1beta1 .provider_api_pb2 as cs3sp
1617from cs3 .gateway .v1beta1 .gateway_api_pb2_grpc import GatewayAPIStub
1718import cs3 .types .v1beta1 .types_pb2 as types
1819import cs3 .rpc .v1beta1 .code_pb2 as cs3code
20+ import cs3 .rpc .v1beta1 .status_pb2 as cs3status
21+ import grpc
1922
2023
2124from .config import Config
2629LOCK_ATTR_KEY = 'cs3client.advlock'
2730
2831
32+ _GRPC_TO_CS3 = {
33+ grpc .StatusCode .NOT_FOUND : cs3code .CODE_NOT_FOUND ,
34+ grpc .StatusCode .UNAUTHENTICATED : cs3code .CODE_UNAUTHENTICATED ,
35+ grpc .StatusCode .PERMISSION_DENIED : cs3code .CODE_PERMISSION_DENIED ,
36+ grpc .StatusCode .ALREADY_EXISTS : cs3code .CODE_ALREADY_EXISTS ,
37+ grpc .StatusCode .UNIMPLEMENTED : cs3code .CODE_UNIMPLEMENTED ,
38+ grpc .StatusCode .FAILED_PRECONDITION : cs3code .CODE_FAILED_PRECONDITION ,
39+ grpc .StatusCode .ABORTED : cs3code .CODE_ABORTED ,
40+ grpc .StatusCode .INVALID_ARGUMENT : cs3code .CODE_INVALID_ARGUMENT ,
41+ grpc .StatusCode .INTERNAL : cs3code .CODE_INTERNAL ,
42+ }
43+
44+ def _grpc_exc_to_cs3_status (rpc_error : grpc .RpcError ) -> cs3status .Status :
45+ code = getattr (rpc_error , "code" , lambda : None )()
46+ details = getattr (rpc_error , "details" , lambda : None )()
47+ return cs3status .Status (
48+ code = _GRPC_TO_CS3 .get (code , cs3code .CODE_INTERNAL ),
49+ message = details or str (rpc_error ),
50+ trace = "Converted from gRPC exception"
51+ )
52+
2953class File :
3054 """
3155 File class to interact with the CS3 API.
@@ -48,6 +72,30 @@ def __init__(
4872 self ._gateway : GatewayAPIStub = gateway
4973 self ._status_code_handler : StatusCodeHandler = status_code_handler
5074
75+ def handle_grpc_error (func ):
76+ @wraps (func )
77+ def wrapper (self , * args , ** kwargs ):
78+ try :
79+ return func (self , * args , ** kwargs )
80+
81+ # Transport / gRPC-layer failures (no CS3 rpc.Status came back)
82+ except grpc .RpcError as e :
83+ # Log gRPC-layer info
84+ self ._log .error (
85+ f"gRPC-layer error in { func .__name__ } : code={ getattr (e ,'code' ,lambda :None )()} "
86+ f"details={ getattr (e ,'details' ,lambda :None )()} "
87+ )
88+ status = _grpc_exc_to_cs3_status (e )
89+ # Reuse existing mapping
90+ self ._status_code_handler .handle_errors (status , operation = func .__name__ )
91+ raise # fallback: should not be reached if handle_errors always raises
92+ except Exception as e :
93+ self ._log .error (f"Client error in { func .__name__ } : { e } " )
94+ raise
95+
96+ return wrapper
97+
98+ @handle_grpc_error
5199 def stat (self , auth_token : tuple , resource : Resource ) -> cs3spr .ResourceInfo :
52100 """
53101 Stat a file and return the ResourceInfo object.
@@ -70,6 +118,7 @@ def stat(self, auth_token: tuple, resource: Resource) -> cs3spr.ResourceInfo:
70118 )
71119 return res .info
72120
121+ @handle_grpc_error
73122 def set_xattr (self , auth_token : tuple , resource : Resource , key : str , value : str , lock_id : Optional [str ] = None ) -> None :
74123 """
75124 Set the extended attribute <key> to <value> for a resource.
@@ -93,6 +142,7 @@ def set_xattr(self, auth_token: tuple, resource: Resource, key: str, value: str,
93142 self ._status_code_handler .handle_errors (res .status , "set extended attribute" , resource .get_file_ref_str ())
94143 self ._log .debug (f'msg="Invoked setxattr" trace="{ res .status .trace } "' )
95144
145+ @handle_grpc_error
96146 def remove_xattr (self , auth_token : tuple , resource : Resource , key : str , lock_id : Optional [str ] = None ) -> None :
97147 """
98148 Remove the extended attribute <key>.
@@ -111,6 +161,7 @@ def remove_xattr(self, auth_token: tuple, resource: Resource, key: str, lock_id:
111161 self ._status_code_handler .handle_errors (res .status , "remove extended attribute" , resource .get_file_ref_str ())
112162 self ._log .debug (f'msg="Invoked UnsetArbitraryMetaData" trace="{ res .status .trace } "' )
113163
164+ @handle_grpc_error
114165 def rename_file (
115166 self , auth_token : tuple , resource : Resource , newresource : Resource , lock_id : Optional [str ] = None
116167 ) -> None :
@@ -132,6 +183,7 @@ def rename_file(
132183 self ._status_code_handler .handle_errors (res .status , "rename file" , resource .get_file_ref_str ())
133184 self ._log .debug (f'msg="Invoked Move" trace="{ res .status .trace } "' )
134185
186+ @handle_grpc_error
135187 def remove_file (self , auth_token : tuple , resource : Resource , lock_id : Optional [str ] = None ) -> None :
136188 """
137189 Remove a resource.
@@ -149,6 +201,7 @@ def remove_file(self, auth_token: tuple, resource: Resource, lock_id: Optional[s
149201 self ._status_code_handler .handle_errors (res .status , "remove file" , resource .get_file_ref_str ())
150202 self ._log .debug (f'msg="Invoked Delete" trace="{ res .status .trace } "' )
151203
204+ @handle_grpc_error
152205 def touch_file (self , auth_token : tuple , resource : Resource ) -> None :
153206 """
154207 Create a resource.
@@ -168,6 +221,7 @@ def touch_file(self, auth_token: tuple, resource: Resource) -> None:
168221 self ._status_code_handler .handle_errors (res .status , "touch file" , resource .get_file_ref_str ())
169222 self ._log .debug (f'msg="Invoked TouchFile" trace="{ res .status .trace } "' )
170223
224+ @handle_grpc_error
171225 def write_file (
172226 self , auth_token : tuple , resource : Resource , content : Union [str , bytes ], size : int ,
173227 app_name : Optional [str ] = None , lock_id : Optional [str ] = None ,
@@ -276,6 +330,7 @@ def write_file(
276330 f'headers="{ headers } "'
277331 )
278332
333+ @handle_grpc_error
279334 def read_file (self , auth_token : tuple , resource : Resource , lock_id : Optional [str ] = None ) -> Generator [bytes , None , None ]:
280335 """
281336 Read a file. Note that the function is a generator, managed by the app server.
@@ -332,6 +387,7 @@ def read_file(self, auth_token: tuple, resource: Resource, lock_id: Optional[str
332387 for chunk in data :
333388 yield chunk
334389
390+ @handle_grpc_error
335391 def make_dir (self , auth_token : tuple , resource : Resource ) -> None :
336392 """
337393 Create a directory.
@@ -348,6 +404,7 @@ def make_dir(self, auth_token: tuple, resource: Resource) -> None:
348404 self ._status_code_handler .handle_errors (res .status , "make directory" , resource .get_file_ref_str ())
349405 self ._log .debug (f'msg="Invoked CreateContainer" trace="{ res .status .trace } "' )
350406
407+ @handle_grpc_error
351408 def list_dir (
352409 self , auth_token : tuple , resource : Resource
353410 ) -> Generator [cs3spr .ResourceInfo , None , None ]:
@@ -368,6 +425,7 @@ def list_dir(
368425 for info in res .infos :
369426 yield info
370427
428+ @handle_grpc_error
371429 def _set_lock_using_xattr (self , auth_token , resource : Resource , app_name : str , lock_id : Union [int , str ]) -> None :
372430 """"
373431 Set a lock to a resource with the given value metadata and appname as holder
@@ -390,6 +448,7 @@ def _set_lock_using_xattr(self, auth_token, resource: Resource, app_name: str, l
390448 self .set_xattr (auth_token , resource , LOCK_ATTR_KEY , f"{ app_name } !{ lock_id } !{ expiration } " , None )
391449 return
392450
451+ @handle_grpc_error
393452 def set_lock (self , auth_token : tuple , resource : Resource , app_name : str , lock_id : Union [int , str ]) -> None :
394453 """
395454 Set a lock to a resource with the given value and appname as holder
@@ -427,6 +486,7 @@ def set_lock(self, auth_token: tuple, resource: Resource, app_name: str, lock_id
427486 self ._log .debug (f'msg="Invoked SetLock" { resource .get_file_ref_str ()} '
428487 f'value="{ lock_id } " result="{ res .status .trace } "' )
429488
489+ @handle_grpc_error
430490 def _get_lock_using_xattr (self , auth_token : tuple , resource : Resource ) -> dict :
431491 """
432492 Get the lock metadata for the given filepath
@@ -453,6 +513,7 @@ def _get_lock_using_xattr(self, auth_token: tuple, resource: Resource) -> dict:
453513 except KeyError :
454514 return None
455515
516+ @handle_grpc_error
456517 def get_lock (self , auth_token : tuple , resource : Resource ) -> Union [cs3spr .Lock , dict , None ]:
457518 """
458519 Get the lock for the given filepath
@@ -493,6 +554,7 @@ def get_lock(self, auth_token: tuple, resource: Resource) -> Union[cs3spr.Lock,
493554 "expiration" : {"seconds" : res .lock .expiration .seconds },
494555 }
495556
557+ @handle_grpc_error
496558 def _refresh_lock_using_xattr (
497559 self , auth_token : tuple , resource : Resource , app_name : str , lock_id : Union [str , int ],
498560 existing_lock_id : Union [str , int ] = None
@@ -529,6 +591,7 @@ def _refresh_lock_using_xattr(
529591 self .set_xattr (auth_token , resource , LOCK_ATTR_KEY , f"{ app_name } !{ lock_id } !{ expiration } " , None )
530592 return
531593
594+ @handle_grpc_error
532595 def refresh_lock (
533596 self , auth_token : tuple , resource : Resource , app_name : str , lock_id : Union [str , int ],
534597 existing_lock_id : Union [str , int ] = None
@@ -570,6 +633,7 @@ def refresh_lock(
570633 self ._log .debug (f'msg="Invoked RefreshLock" { resource .get_file_ref_str ()} result="{ res .status .trace } " '
571634 f'value="{ lock_id } " old_value="{ existing_lock_id } "' )
572635
636+ @handle_grpc_error
573637 def _unlock_using_xattr (
574638 self , auth_token : tuple , resource : Resource , app_name : str , lock_id : Union [str , int ]
575639 ) -> None :
@@ -603,6 +667,7 @@ def _unlock_using_xattr(
603667 self .remove_xattr (auth_token , resource , LOCK_ATTR_KEY , None )
604668 return
605669
670+ @handle_grpc_error
606671 def unlock (self , auth_token : tuple , resource : Resource , app_name , lock_id : Union [str , int ]):
607672 """
608673 Remove the lock for the given filepath
0 commit comments