Skip to content

Commit e979b38

Browse files
Narek MkhitaryanNarek Mkhitaryan
authored andcommitted
change upload annotations assets data
1 parent 8e4ecb4 commit e979b38

File tree

4 files changed

+49
-36
lines changed

4 files changed

+49
-36
lines changed

src/superannotate/lib/core/serviceproviders.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,7 @@ async def upload_small_annotations(
367367
self,
368368
project: entities.ProjectEntity,
369369
folder: entities.FolderEntity,
370-
items_name_file_map: Dict[str, io.StringIO],
370+
items_name_data_map: Dict[str, dict],
371371
) -> UploadAnnotationsResponse:
372372
raise NotImplementedError
373373

src/superannotate/lib/core/usecases/annotations.py

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
from lib.core.types import PriorityScoreEntity
4848
from lib.core.usecases.base import BaseReportableUseCase
4949
from lib.core.video_convertor import VideoFrameGenerator
50+
from lib.infrastructure.utils import divide_to_chunks
5051
from pydantic import BaseModel
5152

5253
logger = logging.getLogger("sa")
@@ -119,11 +120,6 @@ def get_or_raise(response: ServiceResponse):
119120
raise AppException(response.error)
120121

121122

122-
def divide_to_chunks(it, size):
123-
it = iter(it)
124-
return iter(lambda: tuple(islice(it, size)), ())
125-
126-
127123
def log_report(
128124
report: Report,
129125
):
@@ -148,7 +144,6 @@ class ItemToUpload(BaseModel):
148144
item: BaseItemEntity
149145
annotation_json: Optional[dict]
150146
path: Optional[str]
151-
file: Optional[io.StringIO]
152147
file_size: Optional[int]
153148
mask: Optional[io.BytesIO]
154149

@@ -186,7 +181,7 @@ async def upload_small_annotations(
186181
report: Report,
187182
callback: Callable = None,
188183
):
189-
async def upload(_chunk):
184+
async def upload(_chunk: List[ItemToUpload]):
190185
failed_annotations, missing_classes, missing_attr_groups, missing_attrs = (
191186
[],
192187
[],
@@ -197,7 +192,7 @@ async def upload(_chunk):
197192
response = await service_provider.annotations.upload_small_annotations(
198193
project=project,
199194
folder=folder,
200-
items_name_file_map={i.item.name: i.file for i in chunk},
195+
items_name_data_map={i.item.name: i.annotation_json for i in chunk},
201196
)
202197
if response.ok:
203198
if response.data.failed_items: # noqa
@@ -221,9 +216,9 @@ async def upload(_chunk):
221216
reporter.update_progress(len(chunk))
222217

223218
_size = 0
224-
chunk = []
219+
chunk: List[ItemToUpload] = []
225220
while True:
226-
item_data = await queue.get()
221+
item_data: ItemToUpload = await queue.get()
227222
queue.task_done()
228223
if not item_data:
229224
queue.put_nowait(None)
@@ -253,11 +248,14 @@ async def upload_big_annotations(
253248
):
254249
async def _upload_big_annotation(item_data: ItemToUpload) -> Tuple[str, bool]:
255250
try:
251+
buff = io.StringIO()
252+
json.dump(item_data.annotation_json, buff, allow_nan=False)
253+
buff.seek(0)
256254
is_uploaded = await service_provider.annotations.upload_big_annotation(
257255
project=project,
258256
folder=folder,
259257
item_id=item_data.item.id,
260-
data=item_data.file,
258+
data=buff,
261259
chunk_size=5 * 1024 * 1024,
262260
)
263261
if is_uploaded and callback:
@@ -335,15 +333,14 @@ async def distribute_queues(self, items_to_upload: List[ItemToUpload]):
335333
for idx, (item_to_upload, processed) in enumerate(data):
336334
if not processed:
337335
try:
338-
item_to_upload.file = io.StringIO()
336+
file = io.StringIO()
339337
json.dump(
340338
item_to_upload.annotation_json,
341-
item_to_upload.file,
339+
file,
342340
allow_nan=False,
343341
)
344-
item_to_upload.file.seek(0, os.SEEK_END)
345-
item_to_upload.file_size = item_to_upload.file.tell()
346-
item_to_upload.file.seek(0)
342+
file.seek(0, os.SEEK_END)
343+
item_to_upload.file_size = file.tell()
347344
while True:
348345
if item_to_upload.file_size > BIG_FILE_THRESHOLD:
349346
if self._big_files_queue.qsize() > 32:
@@ -723,13 +720,10 @@ async def distribute_queues(self, items_to_upload: List[ItemToUpload]):
723720
if not processed:
724721
try:
725722
(
726-
annotation,
723+
item_to_upload.annotation_json,
727724
item_to_upload.mask,
728725
item_to_upload.file_size,
729726
) = await self.get_annotation(item_to_upload.path)
730-
item_to_upload.file = io.StringIO()
731-
json.dump(annotation, item_to_upload.file, allow_nan=False)
732-
item_to_upload.file.seek(0)
733727
while True:
734728
if item_to_upload.file_size > BIG_FILE_THRESHOLD:
735729
if self._big_files_queue.qsize() > 32:
@@ -1018,7 +1012,7 @@ def execute(self):
10181012
self._service_provider.annotations.upload_small_annotations(
10191013
project=self._project,
10201014
folder=self._folder,
1021-
items_name_file_map={self._image.name: annotation_file},
1015+
items_name_data_map={self._image.name: annotation_json},
10221016
)
10231017
)
10241018
if response.ok:

src/superannotate/lib/infrastructure/services/annotation.py

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -162,8 +162,8 @@ def get_upload_chunks(
162162
response = self.client.request(
163163
url=urljoin(self.assets_provider_url, self.URL_CLASSIFY_ITEM_SIZE),
164164
method="POST",
165-
params={"project_id": project.id, "limit": len(item_ids)},
166-
data={"item_ids": item_ids},
165+
params={"limit": len(item_ids)},
166+
data={"project_id": project.id, "item_ids": item_ids},
167167
)
168168
if not response.ok:
169169
raise AppException(response.error)
@@ -246,12 +246,12 @@ async def upload_small_annotations(
246246
self,
247247
project: entities.ProjectEntity,
248248
folder: entities.FolderEntity,
249-
items_name_file_map: Dict[str, io.StringIO],
249+
items_name_data_map: Dict[str, dict],
250250
) -> UploadAnnotationsResponse:
251251
url = urljoin(
252252
self.assets_provider_url,
253253
(
254-
f"{self.URL_UPLOAD_ANNOTATIONS}?{'&'.join(f'image_names[]={item_name}' for item_name in items_name_file_map.keys())}"
254+
f"{self.URL_UPLOAD_ANNOTATIONS}?{'&'.join(f'image_names[]={item_name}' for item_name in items_name_data_map.keys())}"
255255
),
256256
)
257257

@@ -261,12 +261,19 @@ async def upload_small_annotations(
261261
headers=headers,
262262
connector=aiohttp.TCPConnector(ssl=False),
263263
) as session:
264-
data = aiohttp.FormData(quote_fields=False)
265-
for key, file in items_name_file_map.items():
266-
file.seek(0)
267-
data.add_field(
264+
form_data = aiohttp.FormData(
265+
quote_fields=False,
266+
)
267+
tmp = {}
268+
for name, data in items_name_data_map.items():
269+
tmp[name] = io.StringIO()
270+
json.dump({"data": data}, tmp[name], allow_nan=False)
271+
tmp[name].seek(0)
272+
273+
for key, data in tmp.items():
274+
form_data.add_field(
268275
key,
269-
bytes(file.read(), "ascii"),
276+
data,
270277
filename=key,
271278
content_type="application/json",
272279
)
@@ -276,7 +283,9 @@ async def upload_small_annotations(
276283
"project_id": project.id,
277284
"folder_id": folder.id,
278285
}
279-
_response = await session.request("post", url, params=params, data=data)
286+
_response = await session.request(
287+
"post", url, params=params, data=form_data
288+
)
280289
if not _response.ok:
281290
logger.debug(f"Status code {str(_response.status)}")
282291
logger.debug(await _response.text())
@@ -324,7 +333,8 @@ async def upload_big_annotation(
324333
params["chunk_id"] = chunk_id
325334
if chunk:
326335
data_sent = True
327-
response = await session.post(
336+
response = await session.request(
337+
"post",
328338
urljoin(
329339
self.assets_provider_url,
330340
self.URL_START_FILE_SEND_PART.format(item_id=item_id),
@@ -341,7 +351,8 @@ async def upload_big_annotation(
341351
if len(chunk) < chunk_size:
342352
break
343353
del params["chunk_id"]
344-
response = await session.post(
354+
response = await session.request(
355+
"post",
345356
urljoin(
346357
self.assets_provider_url,
347358
self.URL_START_FILE_SEND_FINISH.format(item_id=item_id),
@@ -352,7 +363,8 @@ async def upload_big_annotation(
352363
if not response.ok:
353364
raise AppException(str(await response.text()))
354365
del params["path"]
355-
response = await session.post(
366+
response = await session.request(
367+
"post",
356368
urljoin(
357369
self.assets_provider_url,
358370
self.URL_START_FILE_SYNC.format(item_id=item_id),
@@ -363,7 +375,8 @@ async def upload_big_annotation(
363375
if not response.ok:
364376
raise AppException(str(await response.text()))
365377
while True:
366-
response = await session.get(
378+
response = await session.request(
379+
"get",
367380
urljoin(
368381
self.assets_provider_url,
369382
self.URL_START_FILE_SYNC_STATUS.format(item_id=item_id),

src/superannotate/lib/infrastructure/utils.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from itertools import islice
12
from pathlib import Path
23
from typing import Optional
34
from typing import Tuple
@@ -6,6 +7,11 @@
67
from superannotate.lib.app.exceptions import PathError
78

89

10+
def divide_to_chunks(it, size):
11+
it = iter(it)
12+
return iter(lambda: tuple(islice(it, size)), ())
13+
14+
915
def split_project_path(project_path: str) -> Tuple[str, Optional[str]]:
1016
path = Path(project_path)
1117
if len(path.parts) > 3:

0 commit comments

Comments
 (0)