4747from lib .core .types import PriorityScoreEntity
4848from lib .core .usecases .base import BaseReportableUseCase
4949from lib .core .video_convertor import VideoFrameGenerator
50+ from lib .infrastructure .utils import divide_to_chunks
5051from pydantic import BaseModel
5152
5253logger = logging .getLogger ("sa" )
@@ -119,11 +120,6 @@ def get_or_raise(response: ServiceResponse):
119120 raise AppException (response .error )
120121
121122
122- def divide_to_chunks (it , size ):
123- it = iter (it )
124- return iter (lambda : tuple (islice (it , size )), ())
125-
126-
127123def log_report (
128124 report : Report ,
129125):
@@ -148,7 +144,6 @@ class ItemToUpload(BaseModel):
148144 item : BaseItemEntity
149145 annotation_json : Optional [dict ]
150146 path : Optional [str ]
151- file : Optional [io .StringIO ]
152147 file_size : Optional [int ]
153148 mask : Optional [io .BytesIO ]
154149
@@ -186,7 +181,7 @@ async def upload_small_annotations(
186181 report : Report ,
187182 callback : Callable = None ,
188183):
189- async def upload (_chunk ):
184+ async def upload (_chunk : List [ ItemToUpload ] ):
190185 failed_annotations , missing_classes , missing_attr_groups , missing_attrs = (
191186 [],
192187 [],
@@ -197,7 +192,7 @@ async def upload(_chunk):
197192 response = await service_provider .annotations .upload_small_annotations (
198193 project = project ,
199194 folder = folder ,
200- items_name_file_map = {i .item .name : i .file for i in chunk },
195+ items_name_data_map = {i .item .name : i .annotation_json for i in chunk },
201196 )
202197 if response .ok :
203198 if response .data .failed_items : # noqa
@@ -221,9 +216,9 @@ async def upload(_chunk):
221216 reporter .update_progress (len (chunk ))
222217
223218 _size = 0
224- chunk = []
219+ chunk : List [ ItemToUpload ] = []
225220 while True :
226- item_data = await queue .get ()
221+ item_data : ItemToUpload = await queue .get ()
227222 queue .task_done ()
228223 if not item_data :
229224 queue .put_nowait (None )
@@ -253,11 +248,14 @@ async def upload_big_annotations(
253248):
254249 async def _upload_big_annotation (item_data : ItemToUpload ) -> Tuple [str , bool ]:
255250 try :
251+ buff = io .StringIO ()
252+ json .dump (item_data .annotation_json , buff , allow_nan = False )
253+ buff .seek (0 )
256254 is_uploaded = await service_provider .annotations .upload_big_annotation (
257255 project = project ,
258256 folder = folder ,
259257 item_id = item_data .item .id ,
260- data = item_data . file ,
258+ data = buff ,
261259 chunk_size = 5 * 1024 * 1024 ,
262260 )
263261 if is_uploaded and callback :
@@ -335,15 +333,14 @@ async def distribute_queues(self, items_to_upload: List[ItemToUpload]):
335333 for idx , (item_to_upload , processed ) in enumerate (data ):
336334 if not processed :
337335 try :
338- item_to_upload . file = io .StringIO ()
336+ file = io .StringIO ()
339337 json .dump (
340338 item_to_upload .annotation_json ,
341- item_to_upload . file ,
339+ file ,
342340 allow_nan = False ,
343341 )
344- item_to_upload .file .seek (0 , os .SEEK_END )
345- item_to_upload .file_size = item_to_upload .file .tell ()
346- item_to_upload .file .seek (0 )
342+ file .seek (0 , os .SEEK_END )
343+ item_to_upload .file_size = file .tell ()
347344 while True :
348345 if item_to_upload .file_size > BIG_FILE_THRESHOLD :
349346 if self ._big_files_queue .qsize () > 32 :
@@ -723,13 +720,10 @@ async def distribute_queues(self, items_to_upload: List[ItemToUpload]):
723720 if not processed :
724721 try :
725722 (
726- annotation ,
723+ item_to_upload . annotation_json ,
727724 item_to_upload .mask ,
728725 item_to_upload .file_size ,
729726 ) = await self .get_annotation (item_to_upload .path )
730- item_to_upload .file = io .StringIO ()
731- json .dump (annotation , item_to_upload .file , allow_nan = False )
732- item_to_upload .file .seek (0 )
733727 while True :
734728 if item_to_upload .file_size > BIG_FILE_THRESHOLD :
735729 if self ._big_files_queue .qsize () > 32 :
@@ -1018,7 +1012,7 @@ def execute(self):
10181012 self ._service_provider .annotations .upload_small_annotations (
10191013 project = self ._project ,
10201014 folder = self ._folder ,
1021- items_name_file_map = {self ._image .name : annotation_file },
1015+ items_name_data_map = {self ._image .name : annotation_json },
10221016 )
10231017 )
10241018 if response .ok :
0 commit comments