Skip to content

Commit 3e2bf89

Browse files
authored
Merge pull request #496 from superannotateai/1260_error_handling
Added error handeling
2 parents 45c5c47 + e184b86 commit 3e2bf89

File tree

3 files changed

+69
-90
lines changed

3 files changed

+69
-90
lines changed

src/superannotate/lib/core/usecases/annotations.py

Lines changed: 55 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,7 @@ async def _upload_small_annotations(self, chunk) -> Report:
329329
except Exception: # noqa
330330
failed_annotations.extend([i.name for i in chunk])
331331
finally:
332+
print(1111, len(chunk))
332333
self.reporter.update_progress(len(chunk))
333334
return Report(
334335
failed_annotations, missing_classes, missing_attr_groups, missing_attrs
@@ -345,7 +346,7 @@ async def upload(_chunk):
345346
self._report.missing_attr_groups.extend(report.missing_attr_groups)
346347
self._report.missing_attrs.extend(report.missing_attrs)
347348
except Exception as e:
348-
self.reporter.log_debug(str(e))
349+
self.reporter.log_error(str(e))
349350
self._report.failed_annotations.extend([i.name for i in _chunk])
350351

351352
_size = 0
@@ -386,7 +387,6 @@ async def _upload_big_annotation(self, item) -> Tuple[str, bool]:
386387
except Exception as e:
387388
self.reporter.log_debug(str(e))
388389
self._report.failed_annotations.append(item.name)
389-
raise
390390
finally:
391391
self.reporter.update_progress()
392392

@@ -397,7 +397,7 @@ async def upload_big_annotations(self):
397397
if item:
398398
await self._upload_big_annotation(item)
399399
else:
400-
await self._big_files_queue.put_nowait(None)
400+
self._big_files_queue.put_nowait(None)
401401
break
402402

403403
async def distribute_queues(self, items_to_upload: list):
@@ -431,26 +431,21 @@ async def distribute_queues(self, items_to_upload: list):
431431
self.reporter.update_progress()
432432
data[idx][1] = True
433433
processed_count += 1
434-
self.reporter.update_progress()
435434
data[idx][1] = True
436435
processed_count += 1
437436
self._big_files_queue.put_nowait(None)
438437
self._small_files_queue.put_nowait(None)
439438

440439
async def run_workers(self, items_to_upload):
441-
try:
442-
self._big_files_queue = asyncio.Queue()
443-
self._small_files_queue = asyncio.Queue()
444-
await asyncio.gather(
445-
self.distribute_queues(items_to_upload),
446-
self.upload_big_annotations(),
447-
self.upload_big_annotations(),
448-
self.upload_big_annotations(),
449-
return_exceptions=True,
450-
)
451-
await asyncio.gather(self.upload_small_annotations())
452-
except Exception as e:
453-
self.reporter.log_error(f"Error {str(e)}")
440+
self._big_files_queue = asyncio.Queue()
441+
self._small_files_queue = asyncio.Queue()
442+
await asyncio.gather(
443+
self.distribute_queues(items_to_upload),
444+
self.upload_big_annotations(),
445+
self.upload_big_annotations(),
446+
self.upload_big_annotations(),
447+
)
448+
await asyncio.gather(self.upload_small_annotations())
454449

455450
def execute(self):
456451
missing_annotations = []
@@ -472,7 +467,11 @@ def execute(self):
472467
items_to_upload.append(self.AnnotationToUpload(item.uuid, name, path))
473468
except KeyError:
474469
missing_annotations.append(name)
475-
asyncio.run(self.run_workers(items_to_upload))
470+
try:
471+
asyncio.run(self.run_workers(items_to_upload))
472+
except Exception as e:
473+
self.reporter.log_error(str(e))
474+
self._response.errors = AppException("Can't upload annotations.")
476475
self.reporter.finish_progress()
477476
self._log_report()
478477
uploaded_annotations = list(
@@ -675,7 +674,6 @@ def execute(self):
675674
)
676675
if not uploaded:
677676
self._response.errors = constants.INVALID_JSON_MESSAGE
678-
679677
else:
680678
response = asyncio.run(
681679
self._backend_service.upload_annotations(
@@ -750,6 +748,7 @@ def __init__(
750748
self._client = backend_service_provider
751749
self._show_process = show_process
752750
self._item_names_provided = True
751+
self._big_annotations_queue = None
753752

754753
def validate_project_type(self):
755754
if self._project.type == constants.ProjectType.PIXEL.value:
@@ -775,76 +774,64 @@ def validate_item_names(self):
775774
self._item_names = [item.name for item in self._images.get_all(condition)]
776775

777776
def _prettify_annotations(self, annotations: List[dict]):
778-
restruct = {}
777+
re_struct = {}
779778

780779
if self._item_names_provided:
781780
for annotation in annotations:
782-
restruct[annotation["metadata"]["name"]] = annotation
781+
re_struct[annotation["metadata"]["name"]] = annotation
783782
try:
784-
return [restruct[x] for x in self._item_names if x in restruct]
783+
return [re_struct[x] for x in self._item_names if x in re_struct]
785784
except KeyError:
786785
raise AppException("Broken data.")
787786

788787
return annotations
789788

790-
async def get_big_annotation(
791-
self,
792-
):
789+
async def get_big_annotation(self):
793790

794791
large_annotations = []
795792
while True:
796793
item = await self._big_annotations_queue.get()
797794
if not item:
798795
await self._big_annotations_queue.put(None)
799796
break
800-
801797
large_annotation = await self._client.get_big_annotation(
802798
team_id=self._project.team_id,
803799
project_id=self._project.id,
804-
folder_id=self._folder.uuid,
805800
item=item,
806801
reporter=self.reporter,
807802
)
808-
809803
large_annotations.append(large_annotation)
810-
811804
return large_annotations
812805

813806
async def get_small_annotations(self, item_names):
814-
small_annotations = await self._client.get_small_annotations(
807+
return await self._client.get_small_annotations(
815808
team_id=self._project.team_id,
816809
project_id=self._project.id,
817810
folder_id=self._folder.uuid,
818811
items=item_names,
819812
reporter=self.reporter,
820813
)
821814

822-
return small_annotations
823-
824815
async def distribute_to_queue(self, big_annotations):
825-
826816
for item in big_annotations:
827817
await self._big_annotations_queue.put(item)
828-
829818
await self._big_annotations_queue.put(None)
830819

831820
async def run_workers(self, big_annotations, small_annotations):
832-
821+
self._big_annotations_queue = asyncio.Queue()
833822
annotations = await asyncio.gather(
834823
self.distribute_to_queue(big_annotations),
835824
self.get_small_annotations(small_annotations),
836825
self.get_big_annotation(),
837826
self.get_big_annotation(),
838827
self.get_big_annotation(),
839-
return_exceptions=True,
840828
)
841829

842830
annotations = [i for x in annotations[1:] for i in x if x]
843831
return annotations
844832

845833
def execute(self):
846834
if self.is_valid():
847-
self._big_annotations_queue = asyncio.Queue()
848835
items_count = len(self._item_names)
849836
self.reporter.log_info(
850837
f"Getting {items_count} annotations from "
@@ -858,12 +845,15 @@ def execute(self):
858845
folder_id=self._folder.uuid,
859846
project_id=self._project.id,
860847
)
861-
862848
small_annotations = [x["name"] for x in items["small"]]
863-
annotations = asyncio.run(
864-
self.run_workers(items["large"], small_annotations)
865-
)
866-
849+
try:
850+
annotations = asyncio.run(
851+
self.run_workers(items["large"], small_annotations)
852+
)
853+
except Exception as e:
854+
self.reporter.log_error(str(e))
855+
self._response.errors = AppException("Can't upload annotations.")
856+
return self._response
867857
received_items_count = len(annotations)
868858
self.reporter.finish_progress()
869859
if items_count > received_items_count:
@@ -1122,31 +1112,27 @@ def coroutine_wrapper(coroutine):
11221112
loop.close()
11231113
return count
11241114

1125-
async def _download_big_annotation(self, item, export_path, folder_id):
1126-
postfix = self.get_postfix()
1127-
await self._backend_client.download_big_annotation(
1128-
item=item,
1129-
team_id=self._project.team_id,
1130-
project_id=self._project.id,
1131-
download_path=f"{export_path}{'/' + self._folder.name if not self._folder.is_root else ''}",
1132-
postfix=postfix,
1133-
callback=self._callback,
1134-
)
1135-
1136-
async def download_big_annotations(self, queue_idx, export_path, folder_id):
1115+
async def download_big_annotations(self, queue_idx, export_path):
11371116
while True:
11381117
cur_queue = self._big_file_queues[queue_idx]
11391118
item = await cur_queue.get()
11401119
cur_queue.task_done()
11411120
if item:
1142-
await self._download_big_annotation(item, export_path, folder_id)
1121+
postfix = self.get_postfix()
1122+
await self._backend_client.download_big_annotation(
1123+
item=item,
1124+
team_id=self._project.team_id,
1125+
project_id=self._project.id,
1126+
download_path=f"{export_path}{'/' + self._folder.name if not self._folder.is_root else ''}",
1127+
postfix=postfix,
1128+
callback=self._callback,
1129+
)
11431130
else:
11441131
cur_queue.put_nowait(None)
11451132
break
11461133

11471134
async def download_small_annotations(self, queue_idx, export_path, folder_id):
11481135
cur_queue = self._small_file_queues[queue_idx]
1149-
11501136
items = []
11511137
item = ""
11521138
postfix = self.get_postfix()
@@ -1195,15 +1181,9 @@ async def run_workers(self, item_names, folder_id, export_path):
11951181
self.distribute_to_queues(
11961182
item_names, small_file_queue_idx, big_file_queue_idx, folder_id
11971183
),
1198-
self.download_big_annotations(
1199-
big_file_queue_idx, export_path, folder_id
1200-
),
1201-
self.download_big_annotations(
1202-
big_file_queue_idx, export_path, folder_id
1203-
),
1204-
self.download_big_annotations(
1205-
big_file_queue_idx, export_path, folder_id
1206-
),
1184+
self.download_big_annotations(big_file_queue_idx, export_path),
1185+
self.download_big_annotations(big_file_queue_idx, export_path),
1186+
self.download_big_annotations(big_file_queue_idx, export_path),
12071187
self.download_small_annotations(
12081188
small_file_queue_idx, export_path, folder_id
12091189
),
@@ -1214,11 +1194,7 @@ async def run_workers(self, item_names, folder_id, export_path):
12141194
except Exception as e:
12151195
self.reporter.log_error(f"Error {str(e)}")
12161196

1217-
def per_folder_execute(self, item_names, folder_id, export_path):
1218-
asyncio.run(self.run_workers(item_names, folder_id, export_path))
1219-
12201197
def execute(self):
1221-
12221198
if self.is_valid():
12231199
export_path = str(
12241200
self.destination
@@ -1255,22 +1231,22 @@ def execute(self):
12551231
]
12561232
else:
12571233
item_names = self._item_names
1258-
12591234
new_export_path = export_path
1260-
12611235
if not folder.is_root and self._folder.is_root:
12621236
new_export_path += f"/{folder.name}"
1263-
1264-
# TODO check
12651237
if not item_names:
12661238
continue
1267-
future = executor.submit(
1268-
self.per_folder_execute,
1269-
item_names,
1270-
folder.uuid,
1271-
new_export_path,
1239+
futures.append(
1240+
executor.submit(
1241+
asyncio.run,
1242+
self.run_workers(item_names, folder.uuid, new_export_path),
1243+
)
12721244
)
1273-
futures.append(future)
1245+
1246+
for future in concurrent.futures.as_completed(futures):
1247+
exception = future.exception()
1248+
if exception:
1249+
self._response.errors = exception
12741250

12751251
self.reporter.stop_spinner()
12761252
count = self.get_items_count(export_path)

src/superannotate/lib/infrastructure/services.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ async def _sync_large_annotation(self, team_id, project_id, item_id):
339339
headers=self.default_headers,
340340
raise_for_status=True,
341341
) as session:
342-
res = await session.post(sync_url, params=sync_params)
342+
await session.post(sync_url, params=sync_params)
343343

344344
sync_params.pop("current_source")
345345
sync_params.pop("desired_source")
@@ -1165,10 +1165,8 @@ async def get_big_annotation(
11651165
self,
11661166
project_id: int,
11671167
team_id: int,
1168-
folder_id: int,
11691168
item: dict,
11701169
reporter: Reporter,
1171-
callback: Callable = None,
11721170
):
11731171

11741172
url = urljoin(
@@ -1183,13 +1181,14 @@ async def get_big_annotation(
11831181
"version": "V1.00",
11841182
}
11851183

1186-
synced = await self._sync_large_annotation(
1184+
await self._sync_large_annotation(
11871185
team_id=team_id, project_id=project_id, item_id=item["id"]
11881186
)
11891187

11901188
async with aiohttp.ClientSession(
11911189
connector=aiohttp.TCPConnector(ssl=False),
11921190
headers=self.default_headers,
1191+
raise_for_status=True,
11931192
) as session:
11941193
start_response = await session.post(url, params=query_params)
11951194
large_annotation = await start_response.json()
@@ -1253,13 +1252,14 @@ async def download_big_annotation(
12531252
self.URL_DOWNLOAD_LARGE_ANNOTATION.format(item_id=item_id),
12541253
)
12551254

1256-
synced = await self._sync_large_annotation(
1255+
await self._sync_large_annotation(
12571256
team_id=team_id, project_id=project_id, item_id=item_id
12581257
)
12591258

12601259
async with aiohttp.ClientSession(
12611260
connector=aiohttp.TCPConnector(ssl=False),
12621261
headers=self.default_headers,
1262+
raise_for_status=True,
12631263
) as session:
12641264
start_response = await session.post(url, params=query_params)
12651265
res = await start_response.json()
@@ -1485,7 +1485,9 @@ async def upload_annotations(
14851485
headers = copy.copy(self.default_headers)
14861486
del headers["Content-Type"]
14871487
async with aiohttp.ClientSession(
1488-
headers=headers, connector=aiohttp.TCPConnector(ssl=self._verify_ssl)
1488+
headers=headers,
1489+
connector=aiohttp.TCPConnector(ssl=self._verify_ssl),
1490+
raise_for_status=True,
14891491
) as session:
14901492
data = aiohttp.FormData(quote_fields=False)
14911493
for key, file in items_name_file_map.items():
@@ -1526,7 +1528,9 @@ async def upload_big_annotation(
15261528
chunk_size: int,
15271529
) -> bool:
15281530
async with aiohttp.ClientSession(
1529-
connector=aiohttp.TCPConnector(ssl=False), headers=self.default_headers
1531+
connector=aiohttp.TCPConnector(ssl=False),
1532+
headers=self.default_headers,
1533+
raise_for_status=True,
15301534
) as session:
15311535
params = {
15321536
"team_id": team_id,

0 commit comments

Comments
 (0)