@@ -963,6 +963,7 @@ def execute(self):
963963 self .reporter .warning_messages ("Empty scores." )
964964 return self ._response
965965
966+
966967class DownloadAnnotations (BaseReportableUseCase ):
967968 def __init__ (
968969 self ,
@@ -1056,96 +1057,100 @@ def coroutine_wrapper(coroutine):
10561057 loop .close ()
10571058 return count
10581059
1059- async def _download_big_annotation (self , item , export_path ):
1060+ async def _download_big_annotation (self , item , export_path , folder_id ):
10601061 postfix = self .get_postfix ()
1061- response = await self ._backend_client .download_big_annotation (
1062- item = item ,
1062+ await self ._backend_client .download_big_annotation (
1063+ item = item ,
10631064 team_id = self ._project .team_id ,
10641065 project_id = self ._project .id ,
1065- folder_id = self . _folder . uuid ,
1066+ folder_id = folder_id ,
10661067 reporter = self .reporter ,
10671068 download_path = f"{ export_path } { '/' + self ._folder .name if not self ._folder .is_root else '' } " ,
10681069 postfix = postfix ,
10691070 callback = self ._callback ,
10701071 )
10711072
1072- return
1073-
1074- async def download_big_annotations (self , queue_idx , export_path ):
1073+ async def download_big_annotations (self , queue_idx , export_path , folder_id ):
10751074 while True :
10761075 cur_queue = self ._big_file_queues [queue_idx ]
10771076 item = await cur_queue .get ()
10781077 cur_queue .task_done ()
10791078 if item :
1080- await self ._download_big_annotation (item , export_path )
1079+ await self ._download_big_annotation (item , export_path , folder_id )
10811080 else :
10821081 cur_queue .put_nowait (None )
10831082 break
10841083
1085- async def download_small_annotations (self , queue_idx , export_path ):
1086- max_chunk_size = 50000
1087-
1084+ async def download_small_annotations (self , queue_idx , export_path , folder_id ):
10881085 cur_queue = self ._small_file_queues [queue_idx ]
10891086
10901087 items = []
1091- i = 0
10921088 item = ""
1093-
10941089 postfix = self .get_postfix ()
10951090 while item is not None :
10961091 item = await cur_queue .get ()
10971092 if item :
10981093 items .append (item )
1099-
1100-
11011094 await self ._backend_client .download_small_annotations (
1102- team_id = self ._project .team_id ,
1103- project_id = self ._project .id ,
1104- folder_id = self ._folder .uuid ,
1105- items = items ,
1106- reporter = self .reporter ,
1107- download_path = f"{ export_path } { '/' + self ._folder .name if not self ._folder .is_root else '' } " ,
1108- postfix = postfix ,
1109- callback = self ._callback ,
1110- )
1111-
1112- async def distribute_to_queues (self , item_names , sm_queue_id , l_queue_id , folder_id ):
1113-
1114- team_id = self ._project .team_id
1115- project_id = self ._project .id
1116-
1117- resp = self ._backend_client .sort_items_by_size (item_names , team_id , project_id , folder_id )
1095+ team_id = self ._project .team_id ,
1096+ project_id = self ._project .id ,
1097+ folder_id = folder_id ,
1098+ items = items ,
1099+ reporter = self .reporter ,
1100+ download_path = f"{ export_path } { '/' + self ._folder .name if not self ._folder .is_root else '' } " ,
1101+ postfix = postfix ,
1102+ callback = self ._callback ,
1103+ )
11181104
1119- for item in resp ['large' ]:
1120- await self ._big_file_queues [l_queue_id ].put (item )
1105+ async def distribute_to_queues (
1106+ self , item_names , sm_queue_id , l_queue_id , folder_id
1107+ ):
1108+ try :
1109+ team_id = self ._project .team_id
1110+ project_id = self ._project .id
11211111
1122- for item in resp ['small' ]:
1123- await self ._small_file_queues [sm_queue_id ].put (item ['name' ])
1112+ resp = self ._backend_client .sort_items_by_size (
1113+ item_names , team_id , project_id , folder_id
1114+ )
11241115
1116+ for item in resp ["large" ]:
1117+ await self ._big_file_queues [l_queue_id ].put (item )
11251118
1126- await self ._big_file_queues [l_queue_id ].put (None )
1127- await self ._small_file_queues [sm_queue_id ].put (None )
1119+ for item in resp ["small" ]:
1120+ await self ._small_file_queues [sm_queue_id ].put (item ["name" ])
1121+ finally :
1122+ await self ._big_file_queues [l_queue_id ].put (None )
1123+ await self ._small_file_queues [sm_queue_id ].put (None )
11281124
11291125 async def run_workers (self , item_names , folder_id , export_path ):
11301126 try :
11311127 self ._big_file_queues .append (asyncio .Queue ())
11321128 self ._small_file_queues .append (asyncio .Queue ())
11331129 small_file_queue_idx = len (self ._small_file_queues ) - 1
11341130 big_file_queue_idx = len (self ._big_file_queues ) - 1
1135-
11361131 res = await asyncio .gather (
1137- self .distribute_to_queues (item_names , small_file_queue_idx , big_file_queue_idx , folder_id ),
1138- self .download_big_annotations (big_file_queue_idx , export_path ),
1139- self .download_big_annotations (big_file_queue_idx , export_path ),
1140- self .download_big_annotations (big_file_queue_idx , export_path ),
1141- self .download_small_annotations (small_file_queue_idx , export_path ),
1142- return_exceptions = True
1132+ self .distribute_to_queues (
1133+ item_names , small_file_queue_idx , big_file_queue_idx , folder_id
1134+ ),
1135+ self .download_big_annotations (
1136+ big_file_queue_idx , export_path , folder_id
1137+ ),
1138+ self .download_big_annotations (
1139+ big_file_queue_idx , export_path , folder_id
1140+ ),
1141+ self .download_big_annotations (
1142+ big_file_queue_idx , export_path , folder_id
1143+ ),
1144+ self .download_small_annotations (
1145+ small_file_queue_idx , export_path , folder_id
1146+ ),
1147+ return_exceptions = True ,
11431148 )
1144-
1149+ if any (res ):
1150+ self .reporter .log_error (f"Error { str ([i for i in res if i ])} " )
11451151 except Exception as e :
11461152 self .reporter .log_error (f"Error { str (e )} " )
11471153
1148-
11491154 def per_folder_execute (self , item_names , folder_id , export_path ):
11501155 asyncio .run (self .run_workers (item_names , folder_id , export_path ))
11511156
@@ -1170,43 +1175,41 @@ def execute(self):
11701175 & Condition ("project_id" , self ._project .id , EQ ),
11711176 )
11721177 folders .append (self ._folder )
1173- postfix = self .get_postfix ()
1174-
1175- import nest_asyncio
1176- import platform
1177-
1178- if platform .system ().lower () == "windows" :
1179- asyncio .set_event_loop_policy (asyncio .WindowsSelectorEventLoopPolicy ())
1180-
1181- nest_asyncio .apply ()
11821178
11831179 if not folders :
11841180 folders .append (self ._folder )
1185-
1186-
11871181 with concurrent .futures .ThreadPoolExecutor (max_workers = 5 ) as executor :
11881182 futures = []
1189-
11901183 for folder in folders :
11911184 if not self ._item_names :
11921185 condition = (
11931186 Condition ("team_id" , self ._project .team_id , EQ )
11941187 & Condition ("project_id" , self ._project .id , EQ )
11951188 & Condition ("folder_id" , folder .uuid , EQ )
11961189 )
1197- item_names = [item .name for item in self ._images .get_all (condition )]
1190+ item_names = [
1191+ item .name for item in self ._images .get_all (condition )
1192+ ]
11981193 else :
11991194 item_names = self ._item_names
12001195
12011196 new_export_path = export_path
1202- if folder .name != 'root' :
1203- new_export_path += f'/{ folder .name } '
1204- future = executor .submit (self .per_folder_execute , item_names , folder .uuid , new_export_path )
1197+ if folder .name != "root" :
1198+ new_export_path += f"/{ folder .name } "
1199+
1200+ # TODO check
1201+ if not item_names :
1202+ continue
1203+ future = executor .submit (
1204+ self .per_folder_execute ,
1205+ item_names ,
1206+ folder .uuid ,
1207+ new_export_path ,
1208+ )
12051209 futures .append (future )
12061210
12071211 for future in concurrent .futures .as_completed (futures ):
1208- pass
1209-
1212+ print (future .result ())
12101213
12111214 self .reporter .stop_spinner ()
12121215 count = self .get_items_count (export_path )
0 commit comments