@@ -345,7 +345,7 @@ async def upload(_chunk):
345345 self ._report .missing_attr_groups .extend (report .missing_attr_groups )
346346 self ._report .missing_attrs .extend (report .missing_attrs )
347347 except Exception as e :
348- self .reporter .log_debug (str (e ))
348+ self .reporter .log_error (str (e ))
349349 self ._report .failed_annotations .extend ([i .name for i in _chunk ])
350350
351351 _size = 0
@@ -386,7 +386,6 @@ async def _upload_big_annotation(self, item) -> Tuple[str, bool]:
386386 except Exception as e :
387387 self .reporter .log_debug (str (e ))
388388 self ._report .failed_annotations .append (item .name )
389- raise
390389 finally :
391390 self .reporter .update_progress ()
392391
@@ -397,7 +396,7 @@ async def upload_big_annotations(self):
397396 if item :
398397 await self ._upload_big_annotation (item )
399398 else :
400- await self ._big_files_queue .put_nowait (None )
399+ self ._big_files_queue .put_nowait (None )
401400 break
402401
403402 async def distribute_queues (self , items_to_upload : list ):
@@ -438,19 +437,15 @@ async def distribute_queues(self, items_to_upload: list):
438437 self ._small_files_queue .put_nowait (None )
439438
440439 async def run_workers (self , items_to_upload ):
441- try :
442- self ._big_files_queue = asyncio .Queue ()
443- self ._small_files_queue = asyncio .Queue ()
444- await asyncio .gather (
445- self .distribute_queues (items_to_upload ),
446- self .upload_big_annotations (),
447- self .upload_big_annotations (),
448- self .upload_big_annotations (),
449- return_exceptions = True ,
450- )
451- await asyncio .gather (self .upload_small_annotations ())
452- except Exception as e :
453- self .reporter .log_error (f"Error { str (e )} " )
440+ self ._big_files_queue = asyncio .Queue ()
441+ self ._small_files_queue = asyncio .Queue ()
442+ await asyncio .gather (
443+ self .distribute_queues (items_to_upload ),
444+ self .upload_big_annotations (),
445+ self .upload_big_annotations (),
446+ self .upload_big_annotations (),
447+ )
448+ await asyncio .gather (self .upload_small_annotations ())
454449
455450 def execute (self ):
456451 missing_annotations = []
@@ -472,7 +467,11 @@ def execute(self):
472467 items_to_upload .append (self .AnnotationToUpload (item .uuid , name , path ))
473468 except KeyError :
474469 missing_annotations .append (name )
475- asyncio .run (self .run_workers (items_to_upload ))
470+ try :
471+ asyncio .run (self .run_workers (items_to_upload ))
472+ except Exception as e :
473+ self .reporter .log_error (str (e ))
474+ self ._response .errors = AppException ("Can't upload annotations." )
476475 self .reporter .finish_progress ()
477476 self ._log_report ()
478477 uploaded_annotations = list (
@@ -675,7 +674,6 @@ def execute(self):
675674 )
676675 if not uploaded :
677676 self ._response .errors = constants .INVALID_JSON_MESSAGE
678-
679677 else :
680678 response = asyncio .run (
681679 self ._backend_service .upload_annotations (
@@ -750,6 +748,7 @@ def __init__(
750748 self ._client = backend_service_provider
751749 self ._show_process = show_process
752750 self ._item_names_provided = True
751+ self ._big_annotations_queue = None
753752
754753 def validate_project_type (self ):
755754 if self ._project .type == constants .ProjectType .PIXEL .value :
@@ -775,76 +774,64 @@ def validate_item_names(self):
775774 self ._item_names = [item .name for item in self ._images .get_all (condition )]
776775
777776 def _prettify_annotations (self , annotations : List [dict ]):
778- restruct = {}
777+ re_struct = {}
779778
780779 if self ._item_names_provided :
781780 for annotation in annotations :
782- restruct [annotation ["metadata" ]["name" ]] = annotation
781+ re_struct [annotation ["metadata" ]["name" ]] = annotation
783782 try :
784- return [restruct [x ] for x in self ._item_names if x in restruct ]
783+ return [re_struct [x ] for x in self ._item_names if x in re_struct ]
785784 except KeyError :
786785 raise AppException ("Broken data." )
787786
788787 return annotations
789788
790- async def get_big_annotation (
791- self ,
792- ):
789+ async def get_big_annotation (self ):
793790
794791 large_annotations = []
795792 while True :
796793 item = await self ._big_annotations_queue .get ()
797794 if not item :
798795 await self ._big_annotations_queue .put (None )
799796 break
800-
801797 large_annotation = await self ._client .get_big_annotation (
802798 team_id = self ._project .team_id ,
803799 project_id = self ._project .id ,
804- folder_id = self ._folder .uuid ,
805800 item = item ,
806801 reporter = self .reporter ,
807802 )
808-
809803 large_annotations .append (large_annotation )
810-
811804 return large_annotations
812805
813806 async def get_small_annotations (self , item_names ):
814- small_annotations = await self ._client .get_small_annotations (
807+ return await self ._client .get_small_annotations (
815808 team_id = self ._project .team_id ,
816809 project_id = self ._project .id ,
817810 folder_id = self ._folder .uuid ,
818811 items = item_names ,
819812 reporter = self .reporter ,
820813 )
821814
822- return small_annotations
823-
824815 async def distribute_to_queue (self , big_annotations ):
825-
826816 for item in big_annotations :
827817 await self ._big_annotations_queue .put (item )
828-
829818 await self ._big_annotations_queue .put (None )
830819
831820 async def run_workers (self , big_annotations , small_annotations ):
832-
821+ self . _big_annotations_queue = asyncio . Queue ()
833822 annotations = await asyncio .gather (
834823 self .distribute_to_queue (big_annotations ),
835824 self .get_small_annotations (small_annotations ),
836825 self .get_big_annotation (),
837826 self .get_big_annotation (),
838827 self .get_big_annotation (),
839- return_exceptions = True ,
840828 )
841829
842830 annotations = [i for x in annotations [1 :] for i in x if x ]
843831 return annotations
844832
845833 def execute (self ):
846834 if self .is_valid ():
847- self ._big_annotations_queue = asyncio .Queue ()
848835 items_count = len (self ._item_names )
849836 self .reporter .log_info (
850837 f"Getting { items_count } annotations from "
@@ -858,12 +845,15 @@ def execute(self):
858845 folder_id = self ._folder .uuid ,
859846 project_id = self ._project .id ,
860847 )
861-
862848 small_annotations = [x ["name" ] for x in items ["small" ]]
863- annotations = asyncio .run (
864- self .run_workers (items ["large" ], small_annotations )
865- )
866-
849+ try :
850+ annotations = asyncio .run (
851+ self .run_workers (items ["large" ], small_annotations )
852+ )
853+ except Exception as e :
854+ self .reporter .log_error (str (e ))
855+ self ._response .errors = AppException ("Can't upload annotations." )
856+ return self ._response
867857 received_items_count = len (annotations )
868858 self .reporter .finish_progress ()
869859 if items_count > received_items_count :
@@ -1122,31 +1112,27 @@ def coroutine_wrapper(coroutine):
11221112 loop .close ()
11231113 return count
11241114
1125- async def _download_big_annotation (self , item , export_path , folder_id ):
1126- postfix = self .get_postfix ()
1127- await self ._backend_client .download_big_annotation (
1128- item = item ,
1129- team_id = self ._project .team_id ,
1130- project_id = self ._project .id ,
1131- download_path = f"{ export_path } { '/' + self ._folder .name if not self ._folder .is_root else '' } " ,
1132- postfix = postfix ,
1133- callback = self ._callback ,
1134- )
1135-
1136- async def download_big_annotations (self , queue_idx , export_path , folder_id ):
1115+ async def download_big_annotations (self , queue_idx , export_path ):
11371116 while True :
11381117 cur_queue = self ._big_file_queues [queue_idx ]
11391118 item = await cur_queue .get ()
11401119 cur_queue .task_done ()
11411120 if item :
1142- await self ._download_big_annotation (item , export_path , folder_id )
1121+ postfix = self .get_postfix ()
1122+ await self ._backend_client .download_big_annotation (
1123+ item = item ,
1124+ team_id = self ._project .team_id ,
1125+ project_id = self ._project .id ,
1126+ download_path = f"{ export_path } { '/' + self ._folder .name if not self ._folder .is_root else '' } " ,
1127+ postfix = postfix ,
1128+ callback = self ._callback ,
1129+ )
11431130 else :
11441131 cur_queue .put_nowait (None )
11451132 break
11461133
11471134 async def download_small_annotations (self , queue_idx , export_path , folder_id ):
11481135 cur_queue = self ._small_file_queues [queue_idx ]
1149-
11501136 items = []
11511137 item = ""
11521138 postfix = self .get_postfix ()
@@ -1195,15 +1181,9 @@ async def run_workers(self, item_names, folder_id, export_path):
11951181 self .distribute_to_queues (
11961182 item_names , small_file_queue_idx , big_file_queue_idx , folder_id
11971183 ),
1198- self .download_big_annotations (
1199- big_file_queue_idx , export_path , folder_id
1200- ),
1201- self .download_big_annotations (
1202- big_file_queue_idx , export_path , folder_id
1203- ),
1204- self .download_big_annotations (
1205- big_file_queue_idx , export_path , folder_id
1206- ),
1184+ self .download_big_annotations (big_file_queue_idx , export_path ),
1185+ self .download_big_annotations (big_file_queue_idx , export_path ),
1186+ self .download_big_annotations (big_file_queue_idx , export_path ),
12071187 self .download_small_annotations (
12081188 small_file_queue_idx , export_path , folder_id
12091189 ),
@@ -1214,11 +1194,7 @@ async def run_workers(self, item_names, folder_id, export_path):
12141194 except Exception as e :
12151195 self .reporter .log_error (f"Error { str (e )} " )
12161196
1217- def per_folder_execute (self , item_names , folder_id , export_path ):
1218- asyncio .run (self .run_workers (item_names , folder_id , export_path ))
1219-
12201197 def execute (self ):
1221-
12221198 if self .is_valid ():
12231199 export_path = str (
12241200 self .destination
@@ -1255,22 +1231,22 @@ def execute(self):
12551231 ]
12561232 else :
12571233 item_names = self ._item_names
1258-
12591234 new_export_path = export_path
1260-
12611235 if not folder .is_root and self ._folder .is_root :
12621236 new_export_path += f"/{ folder .name } "
1263-
1264- # TODO check
12651237 if not item_names :
12661238 continue
1267- future = executor . submit (
1268- self . per_folder_execute ,
1269- item_names ,
1270- folder .uuid ,
1271- new_export_path ,
1239+ futures . append (
1240+ executor . submit (
1241+ asyncio . run ,
1242+ self . run_workers ( item_names , folder .uuid , new_export_path ) ,
1243+ )
12721244 )
1273- futures .append (future )
1245+
1246+ for future in concurrent .futures .as_completed (futures ):
1247+ exception = future .exception ()
1248+ if exception :
1249+ self ._response .errors = exception
12741250
12751251 self .reporter .stop_spinner ()
12761252 count = self .get_items_count (export_path )
0 commit comments