@@ -963,6 +963,7 @@ def execute(self):
963963 self .reporter .warning_messages ("Empty scores." )
964964 return self ._response
965965
966+
966967class DownloadAnnotations (BaseReportableUseCase ):
967968 def __init__ (
968969 self ,
@@ -1053,96 +1054,100 @@ def coroutine_wrapper(coroutine):
10531054 loop .close ()
10541055 return count
10551056
1056- async def _download_big_annotation (self , item , export_path ):
1057+ async def _download_big_annotation (self , item , export_path , folder_id ):
10571058 postfix = self .get_postfix ()
1058- response = await self ._backend_client .download_big_annotation (
1059- item = item ,
1059+ await self ._backend_client .download_big_annotation (
1060+ item = item ,
10601061 team_id = self ._project .team_id ,
10611062 project_id = self ._project .id ,
1062- folder_id = self . _folder . uuid ,
1063+ folder_id = folder_id ,
10631064 reporter = self .reporter ,
10641065 download_path = f"{ export_path } { '/' + self ._folder .name if not self ._folder .is_root else '' } " ,
10651066 postfix = postfix ,
10661067 callback = self ._callback ,
10671068 )
10681069
1069- return
1070-
1071- async def download_big_annotations (self , queue_idx , export_path ):
1070+ async def download_big_annotations (self , queue_idx , export_path , folder_id ):
10721071 while True :
10731072 cur_queue = self ._big_file_queues [queue_idx ]
10741073 item = await cur_queue .get ()
10751074 cur_queue .task_done ()
10761075 if item :
1077- await self ._download_big_annotation (item , export_path )
1076+ await self ._download_big_annotation (item , export_path , folder_id )
10781077 else :
10791078 cur_queue .put_nowait (None )
10801079 break
10811080
1082- async def download_small_annotations (self , queue_idx , export_path ):
1083- max_chunk_size = 50000
1084-
1081+ async def download_small_annotations (self , queue_idx , export_path , folder_id ):
10851082 cur_queue = self ._small_file_queues [queue_idx ]
10861083
10871084 items = []
1088- i = 0
10891085 item = ""
1090-
10911086 postfix = self .get_postfix ()
10921087 while item is not None :
10931088 item = await cur_queue .get ()
10941089 if item :
10951090 items .append (item )
1096-
1097-
10981091 await self ._backend_client .download_small_annotations (
1099- team_id = self ._project .team_id ,
1100- project_id = self ._project .id ,
1101- folder_id = self ._folder .uuid ,
1102- items = items ,
1103- reporter = self .reporter ,
1104- download_path = f"{ export_path } { '/' + self ._folder .name if not self ._folder .is_root else '' } " ,
1105- postfix = postfix ,
1106- callback = self ._callback ,
1107- )
1108-
1109- async def distribute_to_queues (self , item_names , sm_queue_id , l_queue_id , folder_id ):
1110-
1111- team_id = self ._project .team_id
1112- project_id = self ._project .id
1113-
1114- resp = self ._backend_client .sort_items_by_size (item_names , team_id , project_id , folder_id )
1092+ team_id = self ._project .team_id ,
1093+ project_id = self ._project .id ,
1094+ folder_id = folder_id ,
1095+ items = items ,
1096+ reporter = self .reporter ,
1097+ download_path = f"{ export_path } { '/' + self ._folder .name if not self ._folder .is_root else '' } " ,
1098+ postfix = postfix ,
1099+ callback = self ._callback ,
1100+ )
11151101
1116- for item in resp ['large' ]:
1117- await self ._big_file_queues [l_queue_id ].put (item )
1102+ async def distribute_to_queues (
1103+ self , item_names , sm_queue_id , l_queue_id , folder_id
1104+ ):
1105+ try :
1106+ team_id = self ._project .team_id
1107+ project_id = self ._project .id
11181108
1119- for item in resp ['small' ]:
1120- await self ._small_file_queues [sm_queue_id ].put (item ['name' ])
1109+ resp = self ._backend_client .sort_items_by_size (
1110+ item_names , team_id , project_id , folder_id
1111+ )
11211112
1113+ for item in resp ["large" ]:
1114+ await self ._big_file_queues [l_queue_id ].put (item )
11221115
1123- await self ._big_file_queues [l_queue_id ].put (None )
1124- await self ._small_file_queues [sm_queue_id ].put (None )
1116+ for item in resp ["small" ]:
1117+ await self ._small_file_queues [sm_queue_id ].put (item ["name" ])
1118+ finally :
1119+ await self ._big_file_queues [l_queue_id ].put (None )
1120+ await self ._small_file_queues [sm_queue_id ].put (None )
11251121
11261122 async def run_workers (self , item_names , folder_id , export_path ):
11271123 try :
11281124 self ._big_file_queues .append (asyncio .Queue ())
11291125 self ._small_file_queues .append (asyncio .Queue ())
11301126 small_file_queue_idx = len (self ._small_file_queues ) - 1
11311127 big_file_queue_idx = len (self ._big_file_queues ) - 1
1132-
11331128 res = await asyncio .gather (
1134- self .distribute_to_queues (item_names , small_file_queue_idx , big_file_queue_idx , folder_id ),
1135- self .download_big_annotations (big_file_queue_idx , export_path ),
1136- self .download_big_annotations (big_file_queue_idx , export_path ),
1137- self .download_big_annotations (big_file_queue_idx , export_path ),
1138- self .download_small_annotations (small_file_queue_idx , export_path ),
1139- return_exceptions = True
1129+ self .distribute_to_queues (
1130+ item_names , small_file_queue_idx , big_file_queue_idx , folder_id
1131+ ),
1132+ self .download_big_annotations (
1133+ big_file_queue_idx , export_path , folder_id
1134+ ),
1135+ self .download_big_annotations (
1136+ big_file_queue_idx , export_path , folder_id
1137+ ),
1138+ self .download_big_annotations (
1139+ big_file_queue_idx , export_path , folder_id
1140+ ),
1141+ self .download_small_annotations (
1142+ small_file_queue_idx , export_path , folder_id
1143+ ),
1144+ return_exceptions = True ,
11401145 )
1141-
1146+ if any (res ):
1147+ self .reporter .log_error (f"Error { str ([i for i in res if i ])} " )
11421148 except Exception as e :
11431149 self .reporter .log_error (f"Error { str (e )} " )
11441150
1145-
11461151 def per_folder_execute (self , item_names , folder_id , export_path ):
11471152 asyncio .run (self .run_workers (item_names , folder_id , export_path ))
11481153
@@ -1167,43 +1172,41 @@ def execute(self):
11671172 & Condition ("project_id" , self ._project .id , EQ ),
11681173 )
11691174 folders .append (self ._folder )
1170- postfix = self .get_postfix ()
1171-
1172- import nest_asyncio
1173- import platform
1174-
1175- if platform .system ().lower () == "windows" :
1176- asyncio .set_event_loop_policy (asyncio .WindowsSelectorEventLoopPolicy ())
1177-
1178- nest_asyncio .apply ()
11791175
11801176 if not folders :
11811177 folders .append (self ._folder )
1182-
1183-
11841178 with concurrent .futures .ThreadPoolExecutor (max_workers = 5 ) as executor :
11851179 futures = []
1186-
11871180 for folder in folders :
11881181 if not self ._item_names :
11891182 condition = (
11901183 Condition ("team_id" , self ._project .team_id , EQ )
11911184 & Condition ("project_id" , self ._project .id , EQ )
11921185 & Condition ("folder_id" , folder .uuid , EQ )
11931186 )
1194- item_names = [item .name for item in self ._images .get_all (condition )]
1187+ item_names = [
1188+ item .name for item in self ._images .get_all (condition )
1189+ ]
11951190 else :
11961191 item_names = self ._item_names
11971192
11981193 new_export_path = export_path
1199- if folder .name != 'root' :
1200- new_export_path += f'/{ folder .name } '
1201- future = executor .submit (self .per_folder_execute , item_names , folder .uuid , new_export_path )
1194+ if folder .name != "root" :
1195+ new_export_path += f"/{ folder .name } "
1196+
1197+ # TODO check
1198+ if not item_names :
1199+ continue
1200+ future = executor .submit (
1201+ self .per_folder_execute ,
1202+ item_names ,
1203+ folder .uuid ,
1204+ new_export_path ,
1205+ )
12021206 futures .append (future )
12031207
12041208 for future in concurrent .futures .as_completed (futures ):
1205- pass
1206-
1209+ print (future .result ())
12071210
12081211 self .reporter .stop_spinner ()
12091212 count = self .get_items_count (export_path )
0 commit comments