@@ -169,9 +169,9 @@ def get_annotation_from_s3(bucket, path: str):
169169 file .seek (0 )
170170 return file
171171
172- def prepare_annotation (self , annotation : dict ) -> dict :
172+ def prepare_annotation (self , annotation : dict , size ) -> dict :
173173 errors = None
174- if sys . getsizeof ( annotation ) < BIG_FILE_THRESHOLD :
174+ if size < BIG_FILE_THRESHOLD :
175175 use_case = ValidateAnnotationUseCase (
176176 reporter = self .reporter ,
177177 team_id = self ._project .team_id ,
@@ -192,12 +192,12 @@ def get_annotation(
192192 self , path : str
193193 ) -> (Optional [Tuple [io .StringIO ]], Optional [io .BytesIO ]):
194194 mask = None
195+
195196 if self ._client_s3_bucket :
196- annotation = json .load (
197- self .get_annotation_from_s3 (self ._client_s3_bucket , path )
198- )
197+ file = self .get_annotation_from_s3 (self ._client_s3_bucket , path )
198+
199199 else :
200- annotation = json . load ( open (path ) )
200+ file = open (path )
201201 if self ._project .type == constants .ProjectType .PIXEL .value :
202202 mask = open (
203203 path .replace (
@@ -206,11 +206,18 @@ def get_annotation(
206206 ),
207207 "rb" ,
208208 )
209- annotation = self .prepare_annotation (annotation )
209+ _tmp = file .read ()
210+ if not isinstance (_tmp , bytes ):
211+ _tmp = _tmp .encode ("utf8" )
212+ file = io .BytesIO (_tmp )
213+ file .seek (0 )
214+ size = file .getbuffer ().nbytes
215+ annotation = json .load (file )
216+ annotation = self .prepare_annotation (annotation , size )
210217 if not annotation :
211218 self .reporter .store_message ("invalid_jsons" , path )
212- return None , None
213- return annotation , mask
219+ raise AppException ( "Invalid json" )
220+ return annotation , mask , size
214221
215222 @staticmethod
216223 def chunks (data , size : int = 10000 ):
@@ -299,25 +306,29 @@ async def _upload_small_annotations(self, chunk) -> Report:
299306 [],
300307 [],
301308 )
302- response = await self ._backend_service .upload_annotations (
303- team_id = self ._project .team_id ,
304- project_id = self ._project .id ,
305- folder_id = self ._folder .id ,
306- items_name_file_map = {i .name : i .data for i in chunk },
307- )
308- self .reporter .update_progress (len (chunk ))
309- if response .ok :
310- if response .data .failed_items : # noqa
311- failed_annotations = response .data .failed_items
312- missing_classes = response .data .missing_resources .classes
313- missing_attr_groups = response .data .missing_resources .attribute_groups
314- missing_attrs = response .data .missing_resources .attributes
315- if self ._project .type == constants .ProjectType .PIXEL .value :
316- for i in chunk :
317- if i .mask :
318- self ._upload_mask (i )
319- else :
309+ try :
310+ response = await self ._backend_service .upload_annotations (
311+ team_id = self ._project .team_id ,
312+ project_id = self ._project .id ,
313+ folder_id = self ._folder .id ,
314+ items_name_file_map = {i .name : i .data for i in chunk },
315+ )
316+ if response .ok :
317+ if response .data .failed_items : # noqa
318+ failed_annotations = response .data .failed_items
319+ missing_classes = response .data .missing_resources .classes
320+ missing_attr_groups = response .data .missing_resources .attribute_groups
321+ missing_attrs = response .data .missing_resources .attributes
322+ if self ._project .type == constants .ProjectType .PIXEL .value :
323+ for i in chunk :
324+ if i .mask :
325+ self ._upload_mask (i )
326+ else :
327+ failed_annotations .extend ([i .name for i in chunk ])
328+ except Exception : # noqa
320329 failed_annotations .extend ([i .name for i in chunk ])
330+ finally :
331+ self .reporter .update_progress (len (chunk ))
321332 return Report (
322333 failed_annotations , missing_classes , missing_attr_groups , missing_attrs
323334 )
@@ -350,7 +361,6 @@ async def upload(_chunk):
350361 ):
351362 await upload (chunk )
352363 chunk = []
353-
354364 if chunk :
355365 await upload (chunk )
356366
@@ -364,24 +374,27 @@ async def _upload_big_annotation(self, item) -> Tuple[str, bool]:
364374 data = item .data ,
365375 chunk_size = 5 * 1024 * 1024 ,
366376 )
367- self .reporter .update_progress ()
368377 if is_uploaded and (
369378 self ._project .type == constants .ProjectType .PIXEL .value and item .mask
370379 ):
371380 self ._upload_mask (item .mask )
372381 return item .name , is_uploaded
373- except AppException as e :
374- self .reporter .log_error (str (e ))
382+ except Exception as e :
383+ self .reporter .log_debug (str (e ))
375384 self ._report .failed_annotations .append (item .name )
385+ raise
386+ finally :
387+ self .reporter .update_progress ()
376388
377389 async def upload_big_annotations (self ):
378390 while True :
379391 item = await self ._big_files_queue .get ()
380392 self ._big_files_queue .task_done ()
381- if not item :
382- self ._big_files_queue .put_nowait (None )
383- return
384- await self ._upload_big_annotation (item )
393+ if item :
394+ await self ._upload_big_annotation (item )
395+ else :
396+ await self ._big_files_queue .put (None )
397+ break
385398
386399 async def distribute_queues (self , items_to_upload : list ):
387400 data : List [List [Any , bool ]] = [[i , False ] for i in items_to_upload ]
@@ -390,12 +403,12 @@ async def distribute_queues(self, items_to_upload: list):
390403 for idx , (item , processed ) in enumerate (data ):
391404 if not processed :
392405 try :
393- annotation , mask = self .get_annotation (item .path )
406+ annotation , mask , size = self .get_annotation (item .path )
394407 t_item = copy .copy (item )
395- size = sys .getsizeof (annotation )
396408 annotation_file = io .StringIO ()
397409 json .dump (annotation , annotation_file )
398410 annotation_file .seek (0 )
411+ del annotation
399412 t_item .data = annotation_file
400413 t_item .mask = mask
401414 if size > BIG_FILE_THRESHOLD :
@@ -407,10 +420,9 @@ async def distribute_queues(self, items_to_upload: list):
407420 self ._small_files_queue .put_nowait (t_item )
408421 except Exception as e :
409422 self .reporter .log_debug (str (e ))
410- data [idx ][1 ] = True
411- self .reporter .update_progress ()
412423 self ._report .failed_annotations .append (item .name )
413424 finally :
425+ self .reporter .update_progress ()
414426 data [idx ][1 ] = True
415427 processed_count += 1
416428 self ._big_files_queue .put_nowait (None )
@@ -426,7 +438,6 @@ async def run_workers(self, items_to_upload):
426438 self .upload_big_annotations (),
427439 self .upload_big_annotations (),
428440 self .upload_big_annotations (),
429- self .upload_big_annotations (),
430441 return_exceptions = True ,
431442 )
432443 except Exception as e :
@@ -466,10 +477,16 @@ def execute(self):
466477 if not statuses_changed :
467478 self ._response .errors = AppException ("Failed to change status." )
468479
480+ if missing_annotations :
481+ logger .warning (
482+ f"Couldn't find { len (missing_annotations )} /{ len (name_path_mappings .keys ())} "
483+ "items on the platform that match the annotations you want to upload."
484+ )
469485 if self ._report .failed_annotations :
470486 self .reporter .log_warning (
471487 f"Couldn't validate annotations. { constants .USE_VALIDATE_MESSAGE } "
472488 )
489+
473490 self ._response .data = (
474491 uploaded_annotations ,
475492 self ._report .failed_annotations ,
@@ -1007,7 +1024,12 @@ def download_annotation_classes(self, path: str):
10071024 classes_path .mkdir (parents = True , exist_ok = True )
10081025 with open (classes_path / "classes.json" , "w+" ) as file :
10091026 json .dump (
1010- [i .dict (exclude_unset = True ) for i in response .data ], file , indent = 4
1027+ [
1028+ i .dict (exclude_unset = True , by_alias = True , fill_enum_values = True )
1029+ for i in response .data
1030+ ],
1031+ file ,
1032+ indent = 4 ,
10111033 )
10121034 else :
10131035 self ._response .errors = AppException ("Cant download classes." )
@@ -1056,9 +1078,9 @@ def execute(self):
10561078 loop = asyncio .new_event_loop ()
10571079 if not self ._item_names :
10581080 condition = (
1059- Condition ("team_id" , self ._project .team_id , EQ )
1060- & Condition ("project_id" , self ._project .id , EQ )
1061- & Condition ("folder_id" , self ._folder .uuid , EQ )
1081+ Condition ("team_id" , self ._project .team_id , EQ )
1082+ & Condition ("project_id" , self ._project .id , EQ )
1083+ & Condition ("folder_id" , self ._folder .uuid , EQ )
10621084 )
10631085 item_names = [item .name for item in self ._images .get_all (condition )]
10641086 else :
@@ -1081,11 +1103,13 @@ def execute(self):
10811103 for folder in folders :
10821104 if not self ._item_names :
10831105 condition = (
1084- Condition ("team_id" , self ._project .team_id , EQ )
1085- & Condition ("project_id" , self ._project .id , EQ )
1086- & Condition ("folder_id" , folder .uuid , EQ )
1106+ Condition ("team_id" , self ._project .team_id , EQ )
1107+ & Condition ("project_id" , self ._project .id , EQ )
1108+ & Condition ("folder_id" , folder .uuid , EQ )
10871109 )
1088- item_names = [item .name for item in self ._images .get_all (condition )]
1110+ item_names = [
1111+ item .name for item in self ._images .get_all (condition )
1112+ ]
10891113 else :
10901114 item_names = self ._item_names
10911115 coroutines .append (
0 commit comments