66import os
77import platform
88import queue
9- import time
9+ import sys
1010import threading
1111from datetime import datetime
1212from itertools import islice
1313from pathlib import Path
14+ from typing import Any
1415from typing import Callable
1516from typing import Dict
1617from typing import List
17- from typing import Any
1818from typing import Optional
1919from typing import Tuple
2020
@@ -117,7 +117,7 @@ def __init__(
117117 self ._s3_bucket = None
118118 self ._big_files_queue = queue .Queue ()
119119 self ._small_files_queue = queue .Queue ()
120- self .await = threading . Condition ( )
120+ self ._report = Report ([], [], [], [] )
121121
122122 @staticmethod
123123 def get_name_path_mappings (annotation_paths ):
@@ -130,22 +130,22 @@ def get_name_path_mappings(annotation_paths):
130130 return name_path_mappings
131131
132132 def _log_report (
133- self , missing_classes : list , missing_attr_groups : list , missing_attrs : list
133+ self ,
134134 ):
135- if missing_classes :
135+ if self . _report . missing_classes :
136136 logger .warning (
137137 "Could not find annotation classes matching existing classes on the platform: "
138- f"[{ ', ' .join (missing_classes )} ]"
138+ f"[{ ', ' .join (self . _report . missing_classes )} ]"
139139 )
140- if missing_attr_groups :
140+ if self . _report . missing_attr_groups :
141141 logger .warning (
142142 "Could not find attribute groups matching existing attribute groups on the platform: "
143- f"[{ ', ' .join (missing_attr_groups )} ]"
143+ f"[{ ', ' .join (self . _report . missing_attr_groups )} ]"
144144 )
145- if missing_attrs :
145+ if self . _report . missing_attrs :
146146 logger .warning (
147147 "Could not find attributes matching existing attributes on the platform: "
148- f"[{ ', ' .join (missing_attrs )} ]"
148+ f"[{ ', ' .join (self . _report . missing_attrs )} ]"
149149 )
150150
151151 if self .reporter .custom_messages .get ("invalid_jsons" ):
@@ -164,23 +164,22 @@ def get_annotation_from_s3(bucket, path: str):
164164 file .seek (0 )
165165 return file
166166
167- def prepare_annotation (self , annotation : dict ):
168- use_case = ValidateAnnotationUseCase (
169- reporter = self .reporter ,
170- team_id = self ._project .team_id ,
171- project_type = self ._project .type ,
172- annotation = annotation ,
173- backend_service_provider = self ._backend_service ,
174- )
175- errors = use_case .execute ().data
167+ def prepare_annotation (self , annotation : dict ) -> dict :
168+ errors = None
169+ if sys .getsizeof (annotation ) < BIG_FILE_THRESHOLD :
170+ use_case = ValidateAnnotationUseCase (
171+ reporter = self .reporter ,
172+ team_id = self ._project .team_id ,
173+ project_type = self ._project .type ,
174+ annotation = annotation ,
175+ backend_service_provider = self ._backend_service ,
176+ )
177+ errors = use_case .execute ().data
176178 if not errors :
177179 annotation = UploadAnnotationUseCase .set_defaults (
178180 annotation , self ._project .type
179181 )
180- annotation_file = io .StringIO ()
181- json .dump (annotation , annotation_file )
182- annotation_file .seek (0 )
183- return annotation_file
182+ return annotation
184183
185184 def get_annotation (
186185 self , path : str
@@ -286,7 +285,7 @@ def _upload_mask(self, item: AnnotationToUpload):
286285 Body = item .mask ,
287286 )
288287
289- def upload_small_annotations (self , chunk ) -> Report :
288+ def _upload_small_annotations (self , chunk ) -> Report :
290289 failed_annotations , missing_classes , missing_attr_groups , missing_attrs = [], [], [], []
291290 response = self ._backend_service .upload_annotations (
292291 team_id = self ._project .team_id ,
@@ -311,11 +310,31 @@ def upload_small_annotations(self, chunk) -> Report:
311310 failed_annotations , missing_classes , missing_attr_groups , missing_attrs
312311 )
313312
314- def upload_big_annotation (
315- self , item , queue
316- ) -> Tuple [str , bool ]:
313+ def upload_small_annotations (self ):
314+ chunk = []
315+ while True :
316+ item = self ._small_files_queue .get ()
317+ if not item :
318+ self ._small_files_queue .put (None )
319+ break
320+ chunk .append (item )
321+ if len (chunk ) >= self .CHUNK_SIZE :
322+ report = self ._upload_small_annotations (chunk )
323+ self ._report .failed_annotations .extend (report .failed_annotations )
324+ self ._report .missing_classes .extend (report .missing_classes )
325+ self ._report .missing_attr_groups .extend (report .missing_attr_groups )
326+ self ._report .missing_attrs .extend (report .missing_attrs )
327+
328+ chunk = []
329+ if chunk :
330+ report = self ._upload_small_annotations (chunk )
331+ self ._report .failed_annotations .extend (report .failed_annotations )
332+ self ._report .missing_classes .extend (report .missing_classes )
333+ self ._report .missing_attr_groups .extend (report .missing_attr_groups )
334+ self ._report .missing_attrs .extend (report .missing_attrs )
335+
336+ def _upload_big_annotation (self , item ) -> Tuple [str , bool ]:
317337 try :
318- queue .get_nowait ()
319338 is_uploaded = self ._backend_service .upload_big_annotation (
320339 team_id = self ._project .team_id ,
321340 project_id = self ._project .id ,
@@ -324,40 +343,51 @@ def upload_big_annotation(
324343 data = item .data ,
325344 chunk_size = 5 * 1024 * 1024 ,
326345 )
346+ self .reporter .update_progress ()
327347 if is_uploaded and (
328348 self ._project .type == constants .ProjectType .PIXEL .value
329349 and item .mask
330350 ):
331351 self ._upload_mask (item .mask )
332352 return item .name , is_uploaded
333- finally :
334- return item .name , False
353+ except AppException as e :
354+ self .reporter .log_error (str (e ))
355+ self ._report .failed_annotations .append (item .name )
335356
336- def qsize (self , n ):
337- if self ._big_files_queue .qsize () != n :
338- return False
339- else :
340- return True
341- def _put_to_queue (self ):
342-
343- def get_annotation (self , items_to_upload : list ):
344- data : List [Any , bool ] = [(i , False ) for i in items_to_upload ]
345- unprocessed = False
357+ def upload_big_annotations (self ):
346358 while True :
347- for ixd , (item , processed ) in enumerate (data ):
359+ item = self ._big_files_queue .get ()
360+ if not item :
361+ self ._big_files_queue .put (None )
362+ break
363+ self ._upload_big_annotation (item )
364+
365+ def distribute_queues (self , items_to_upload : list ):
366+ data : List [List [Any , bool ]] = [[i , False ] for i in items_to_upload ]
367+ processed_count = 0
368+ while processed_count < len (data ):
369+ for idx , (item , processed ) in enumerate (data ):
348370 if processed :
349371 continue
350- data , mask = self .get_annotation (item .path )
351- size = item .data .seek (0 , os .SEEK_END )
352- item .data .seek (0 )
372+ annotation , mask = self .get_annotation (item .path )
373+ t_item = copy .copy (item )
374+ size = sys .getsizeof (annotation )
375+ annotation_file = io .StringIO ()
376+ json .dump (annotation , annotation_file )
377+ annotation_file .seek (0 )
378+ t_item .data = annotation_file
379+ t_item .mask = mask
353380 if size > BIG_FILE_THRESHOLD :
354381 if self ._big_files_queue .qsize () > 4 :
355382 continue
356383 else :
357- self ._big_files_queue .put (item )
384+ self ._big_files_queue .put (t_item )
358385 else :
359- self ._small_files_queue .put (item )
360-
386+ self ._small_files_queue .put (t_item )
387+ data [idx ][1 ] = True
388+ processed_count += 1
389+ self ._big_files_queue .put (None )
390+ self ._small_files_queue .put (None )
361391
362392 def execute (self ):
363393 failed_annotations = []
@@ -378,53 +408,28 @@ def execute(self):
378408 self ._item_ids .append (item .uuid )
379409 items_to_upload .append (self .AnnotationToUpload (item .uuid , name , path ))
380410 except KeyError :
381- missing_annotations .append (path )
382-
383- with concurrent .futures .ThreadPoolExecutor (
384- max_workers = self .MAX_WORKERS
385- ) as executor :
386- small_futures = []
387- big_futures = []
388- big_files_queue = queue .Queue ()
389- for item in items_to_upload :
390- item .data , item .mask = self .get_annotation (item .path )
391- size = item .data .seek (0 , os .SEEK_END )
392- item .data .seek (0 )
393- small_chunk = []
394- self .reporter .update_progress ()
395- if size > BIG_FILE_THRESHOLD :
396- if big_files_queue .qsize () > 4 :
397- time .sleep (2 )
398- big_files_queue .put (1 )
399- big_futures .append (executor .submit (self .upload_big_annotation , item , big_files_queue ))
400- else :
401- if len (small_chunk ) >= self .CHUNK_SIZE :
402- small_futures .append (executor .submit (self .upload_small_annotations , small_chunk ))
403- small_chunk = []
404- if small_chunk :
405- small_futures .append (executor .submit (self .upload_small_annotations , small_chunk ))
406-
407- missing_classes , missing_attr_groups , missing_attrs = [], [], []
408- uploaded_annotations_names = []
409- for future in concurrent .futures .as_completed (small_futures ):
410- report : Report = future .result () # noqa
411- failed_annotations .extend (report .failed_annotations )
412- missing_classes .extend (report .missing_classes )
413- missing_attr_groups .extend (report .missing_attr_groups )
414- missing_attrs .extend (report .missing_attrs )
415- for future in concurrent .futures .as_completed (big_futures ):
416- item_name , uploaded = future .result ()
417- if not uploaded :
418- failed_annotations .extend (item_name )
419- self .reporter .finish_progress ()
420- self ._log_report (missing_classes , missing_attr_groups , missing_attrs )
411+ missing_annotations .append (name )
412+ threads = []
413+ for _ in range (self .THREADS_COUNT ):
414+ t_s = threading .Thread (target = self .upload_small_annotations )
415+ t_b = threading .Thread (target = self .upload_big_annotations )
416+ t_s .start ()
417+ t_b .start ()
418+ threads .append (t_s )
419+ threads .append (t_b )
420+ queue_distributor = threading .Thread (target = self .distribute_queues , args = (items_to_upload ,))
421+ queue_distributor .start ()
422+ for i in threads :
423+ i .join ()
424+ queue_distributor .join ()
425+ self ._log_report ()
421426 uploaded_annotations = list (
422427 name_path_mappings .keys ()
423- - set (failed_annotations ).union (set (missing_annotations ))
428+ - set (self . _report . failed_annotations ).union (set (missing_annotations ))
424429 )
425430 if uploaded_annotations :
426431 statuses_changed = self ._set_annotation_statuses_in_progress (
427- uploaded_annotations_names
432+ uploaded_annotations
428433 )
429434 if not statuses_changed :
430435 self ._response .errors = AppException ("Failed to change status." )
0 commit comments