@@ -1391,7 +1391,6 @@ def __init__(
13911391 self ._item_names = item_names
13921392 self ._item_names_provided = True
13931393 self ._big_annotations_queue = None
1394- self ._small_annotations_queue = None
13951394
13961395 def validate_project_type (self ):
13971396 if self ._project .type == constants .ProjectType .PIXEL .value :
@@ -1440,29 +1439,18 @@ async def get_big_annotation(self):
14401439 break
14411440 return large_annotations
14421441
1443- async def get_small_annotations (self ):
1444- small_annotations = []
1445- while True :
1446- items = await self ._small_annotations_queue .get ()
1447- if items :
1448- annotations = (
1449- await self ._service_provider .annotations .list_small_annotations (
1450- project = self ._project ,
1451- folder = self ._folder ,
1452- item_ids = [i .id for i in items ],
1453- reporter = self .reporter ,
1454- )
1455- )
1456- small_annotations .extend (annotations )
1457- else :
1458- await self ._small_annotations_queue .put (None )
1459- break
1460- return small_annotations
1442+ async def get_small_annotations (self , item_ids : List [int ]):
1443+ return await self ._service_provider .annotations .list_small_annotations (
1444+ project = self ._project ,
1445+ folder = self ._folder ,
1446+ item_ids = item_ids ,
1447+ reporter = self .reporter ,
1448+ )
14611449
14621450 async def run_workers (
14631451 self ,
14641452 big_annotations : List [BaseItemEntity ],
1465- small_annotations : List [BaseItemEntity ],
1453+ small_annotations : List [List [ Dict ] ],
14661454 ):
14671455 annotations = []
14681456 if big_annotations :
@@ -1485,26 +1473,16 @@ async def run_workers(
14851473 )
14861474 )
14871475 if small_annotations :
1488- self ._small_annotations_queue = asyncio .Queue ()
1489- small_chunks = divide_to_chunks (
1490- small_annotations , size = self ._config .ANNOTATION_CHUNK_SIZE
1491- )
1492- for chunk in small_chunks :
1493- self ._small_annotations_queue .put_nowait (chunk )
1494- self ._small_annotations_queue .put_nowait (None )
1495-
1496- annotations .extend (
1497- list (
1498- itertools .chain .from_iterable (
1499- await asyncio .gather (
1500- * [
1501- self .get_small_annotations ()
1502- for _ in range (self ._config .MAX_COROUTINE_COUNT )
1503- ]
1504- )
1505- )
1476+ for chunks in divide_to_chunks (
1477+ small_annotations , self ._config .MAX_COROUTINE_COUNT
1478+ ):
1479+ tasks = []
1480+ for chunk in chunks :
1481+ tasks .append (self .get_small_annotations ([i ["id" ] for i in chunk ]))
1482+ annotations .extend (
1483+ list (itertools .chain .from_iterable (await asyncio .gather (* tasks )))
15061484 )
1507- )
1485+
15081486 return list (filter (None , annotations ))
15091487
15101488 def execute (self ):
@@ -1527,7 +1505,6 @@ def execute(self):
15271505 items = get_or_raise (self ._service_provider .items .list (condition ))
15281506 else :
15291507 items = []
1530- id_item_map = {i .id : i for i in items }
15311508 if not items :
15321509 logger .info ("No annotations to download." )
15331510 self ._response .data = []
@@ -1537,18 +1514,20 @@ def execute(self):
15371514 f"Getting { items_count } annotations from "
15381515 f"{ self ._project .name } { f'/{ self ._folder .name } ' if self ._folder .name != 'root' else '' } ."
15391516 )
1517+ id_item_map = {i .id : i for i in items }
15401518 self .reporter .start_progress (
15411519 items_count ,
15421520 disable = logger .level > logging .INFO or self .reporter .log_enabled ,
15431521 )
1544-
1545- sort_response = self ._service_provider . annotations . sort_items_by_size (
1546- project = self . _project , folder = self . _folder , item_ids = list (id_item_map )
1522+ sort_response = self . _service_provider . annotations . get_upload_chunks (
1523+ project = self ._project ,
1524+ item_ids = list (id_item_map ),
15471525 )
15481526 large_item_ids = set (map (itemgetter ("id" ), sort_response ["large" ]))
1549- small_items_ids = set (map (itemgetter ("id" ), sort_response ["small" ]))
1550- large_items = list (filter (lambda item : item .id in large_item_ids , items ))
1551- small_items = list (filter (lambda item : item .id in small_items_ids , items ))
1527+ large_items : List [BaseItemEntity ] = list (
1528+ filter (lambda item : item .id in large_item_ids , items )
1529+ )
1530+ small_items : List [List [dict ]] = sort_response ["small" ]
15521531 try :
15531532 nest_asyncio .apply ()
15541533 annotations = asyncio .run (self .run_workers (large_items , small_items ))
@@ -1585,7 +1564,6 @@ def __init__(
15851564 self ._service_provider = service_provider
15861565 self ._callback = callback
15871566 self ._big_file_queue = None
1588- self ._small_file_queue = None
15891567
15901568 def validate_item_names (self ):
15911569 if self ._item_names :
@@ -1664,28 +1642,24 @@ async def download_big_annotations(self, export_path):
16641642 self ._big_file_queue .put_nowait (None )
16651643 break
16661644
1667- async def download_small_annotations (self , export_path , folder : FolderEntity ):
1645+ async def download_small_annotations (
1646+ self , item_ids : List [int ], export_path , folder : FolderEntity
1647+ ):
16681648 postfix = self .get_postfix ()
1669- while True :
1670- items = await self ._small_file_queue .get ()
1671- if items :
1672- await self ._service_provider .annotations .download_small_annotations (
1673- project = self ._project ,
1674- folder = folder ,
1675- item_ids = [i .id for i in items ],
1676- reporter = self .reporter ,
1677- download_path = f"{ export_path } { '/' + self ._folder .name if not self ._folder .is_root else '' } " ,
1678- postfix = postfix ,
1679- callback = self ._callback ,
1680- )
1681- else :
1682- self ._small_file_queue .put_nowait (None )
1683- break
1649+ await self ._service_provider .annotations .download_small_annotations (
1650+ project = self ._project ,
1651+ folder = folder ,
1652+ item_ids = item_ids ,
1653+ reporter = self .reporter ,
1654+ download_path = f"{ export_path } { '/' + self ._folder .name if not self ._folder .is_root else '' } " ,
1655+ postfix = postfix ,
1656+ callback = self ._callback ,
1657+ )
16841658
16851659 async def run_workers (
16861660 self ,
16871661 big_annotations : List [BaseItemEntity ],
1688- small_annotations : List [BaseItemEntity ],
1662+ small_annotations : List [List [ dict ] ],
16891663 folder : FolderEntity ,
16901664 export_path ,
16911665 ):
@@ -1702,19 +1676,17 @@ async def run_workers(
17021676 )
17031677
17041678 if small_annotations :
1705- self ._small_file_queue = asyncio .Queue ()
1706- small_chunks = divide_to_chunks (
1707- small_annotations , size = self ._config .ANNOTATION_CHUNK_SIZE
1708- )
1709- for chunk in small_chunks :
1710- self ._small_file_queue .put_nowait (chunk )
1711- self ._small_file_queue .put_nowait (None )
1712- await asyncio .gather (
1713- * [
1714- self .download_small_annotations (export_path , folder )
1715- for _ in range (self ._config .MAX_COROUTINE_COUNT )
1716- ]
1717- )
1679+ for chunks in divide_to_chunks (
1680+ small_annotations , self ._config .MAX_COROUTINE_COUNT
1681+ ):
1682+ tasks = []
1683+ for chunk in chunks :
1684+ tasks .append (
1685+ self .download_small_annotations (
1686+ [i ["id" ] for i in chunk ], export_path , folder
1687+ )
1688+ )
1689+ await asyncio .gather (* tasks )
17181690
17191691 def execute (self ):
17201692 if self .is_valid ():
@@ -1755,26 +1727,23 @@ def execute(self):
17551727 new_export_path += f"/{ folder .name } "
17561728
17571729 id_item_map = {i .id : i for i in items }
1758- sort_response = self ._service_provider .annotations .sort_items_by_size (
1730+ sort_response = self ._service_provider .annotations .get_upload_chunks (
17591731 project = self ._project ,
1760- folder = self ._folder ,
17611732 item_ids = list (id_item_map ),
17621733 )
17631734 large_item_ids = set (map (itemgetter ("id" ), sort_response ["large" ]))
1764- small_items_ids = set (map (itemgetter ("id" ), sort_response ["small" ]))
1765- large_items = list (
1735+ large_items : List [BaseItemEntity ] = list (
17661736 filter (lambda item : item .id in large_item_ids , items )
17671737 )
1768- small_items = list (
1769- filter (lambda item : item .id in small_items_ids , items )
1770- )
1738+ small_items : List [List [dict ]] = sort_response ["small" ]
17711739 try :
17721740 asyncio .run (
17731741 self .run_workers (
17741742 large_items , small_items , folder , new_export_path
17751743 )
17761744 )
17771745 except Exception as e :
1746+ raise e
17781747 logger .error (e )
17791748 self ._response .errors = AppException ("Can't get annotations." )
17801749 return self ._response
0 commit comments