|
| 1 | +''' |
| 2 | +''' |
| 3 | +import json |
| 4 | +import logging |
| 5 | +from collections import namedtuple |
| 6 | +from datetime import datetime |
| 7 | +from pathlib import Path |
| 8 | +from PIL import Image |
| 9 | +import numpy as np |
| 10 | +import cv2 |
| 11 | + |
| 12 | +from ..baseStrategy import baseStrategy |
| 13 | + |
| 14 | +from ....common import id2rgb, write_to_json |
| 15 | + |
| 16 | +logger = logging.getLogger("superannotate-python-sdk") |
| 17 | + |
| 18 | + |
| 19 | +class CocoBaseStrategy(baseStrategy): |
| 20 | + project_type_to_json_ending = { |
| 21 | + 'pixel': '___pixel.json', |
| 22 | + 'vector': '___objects.json' |
| 23 | + } |
| 24 | + |
| 25 | + def __init__(self, args): |
| 26 | + self.total_images_num = 0 |
| 27 | + super().__init__(args) |
| 28 | + |
| 29 | + def set_num_total_images(self, num): |
| 30 | + self.total_images_num = num |
| 31 | + |
| 32 | + def get_num_total_images(self): |
| 33 | + return self.total_images_num |
| 34 | + |
| 35 | + def _create_categories(self, path_to_classes): |
| 36 | + |
| 37 | + classes = None |
| 38 | + s_class = namedtuple('Class', ['class_name', 'id']) |
| 39 | + |
| 40 | + with open(path_to_classes, 'r') as fp: |
| 41 | + classes = json.load(fp) |
| 42 | + categories = [ |
| 43 | + self._create_single_category(s_class(item, classes[item])) |
| 44 | + for item in classes |
| 45 | + ] |
| 46 | + return categories |
| 47 | + |
| 48 | + def _create_single_category(self, item): |
| 49 | + category = { |
| 50 | + 'id': item.id, |
| 51 | + 'name': item.class_name, |
| 52 | + 'supercategory': item.class_name, |
| 53 | + 'isthing': 1, |
| 54 | + 'color': id2rgb(item.id) |
| 55 | + } |
| 56 | + return category |
| 57 | + |
| 58 | + def _make_id_generator(self): |
| 59 | + cur_id = 0 |
| 60 | + while True: |
| 61 | + cur_id += 1 |
| 62 | + yield cur_id |
| 63 | + |
| 64 | + def _create_skeleton(self): |
| 65 | + out_json = { |
| 66 | + 'info': |
| 67 | + { |
| 68 | + 'description': |
| 69 | + 'This is {} dataset.'.format(self.dataset_name), |
| 70 | + 'url': |
| 71 | + 'https://superannotate.ai', |
| 72 | + 'version': |
| 73 | + '1.0', |
| 74 | + 'year': |
| 75 | + datetime.now().year, |
| 76 | + 'contributor': |
| 77 | + 'Superannotate AI', |
| 78 | + 'date_created': |
| 79 | + datetime.now().strftime("%d/%m/%Y") |
| 80 | + }, |
| 81 | + 'licenses': |
| 82 | + [ |
| 83 | + { |
| 84 | + 'url': 'https://superannotate.ai', |
| 85 | + 'id': 1, |
| 86 | + 'name': 'Superannotate AI' |
| 87 | + } |
| 88 | + ], |
| 89 | + 'images': [], |
| 90 | + 'annotations': [], |
| 91 | + 'categories': [] |
| 92 | + } |
| 93 | + return out_json |
| 94 | + |
| 95 | + def convert_from_old_sa_to_new(self, old_json_data, project_type): |
| 96 | + |
| 97 | + new_json_data = { |
| 98 | + "metadata": {}, |
| 99 | + "instances": [], |
| 100 | + "tags": [], |
| 101 | + "comments": [] |
| 102 | + } |
| 103 | + |
| 104 | + meta_keys = [ |
| 105 | + "name", "width", "height", "status", "pinned", "isPredicted", |
| 106 | + "projectId", "annotatorEmail", "qaEmail" |
| 107 | + ] |
| 108 | + if project_type == "pixel": |
| 109 | + meta_keys.append("isSegmented") |
| 110 | + |
| 111 | + new_json_data["metadata"] = dict.fromkeys(meta_keys) |
| 112 | + |
| 113 | + metadata = new_json_data["metadata"] |
| 114 | + |
| 115 | + for item in old_json_data: |
| 116 | + object_type = item.get("type") |
| 117 | + |
| 118 | + #add metadata |
| 119 | + if object_type == "meta": |
| 120 | + meta_name = item["name"] |
| 121 | + if meta_name == "imageAttributes": |
| 122 | + metadata["height"] = item.get("height") |
| 123 | + metadata["width"] = item.get("width") |
| 124 | + metadata["status"] = item.get("status") |
| 125 | + metadata["pinned"] = item.get("pinned") |
| 126 | + if meta_name == "lastAction": |
| 127 | + metadata["lastAction"] = dict.fromkeys( |
| 128 | + ["email", "timestamp"] |
| 129 | + ) |
| 130 | + metadata["lastAction"]["email"] = item.get("userId") |
| 131 | + metadata["lastAction"]["timestamp"] = item.get("timestamp") |
| 132 | + #add tags |
| 133 | + elif object_type == "tag": |
| 134 | + new_json_data["tags"].append(item.get("name")) |
| 135 | + #add comments |
| 136 | + elif object_type == "comment": |
| 137 | + item.pop("type") |
| 138 | + item["correspondence"] = item["comments"] |
| 139 | + for comment in item["correspondence"]: |
| 140 | + comment["email"] = comment["id"] |
| 141 | + comment.pop("id") |
| 142 | + item.pop("comments") |
| 143 | + new_json_data["comments"].append(item) |
| 144 | + #add instances |
| 145 | + else: |
| 146 | + new_json_data["instances"].append(item) |
| 147 | + return new_json_data |
| 148 | + |
| 149 | + def _parse_json_into_common_format(self, sa_annotation_json, fpath): |
| 150 | + """ |
| 151 | + If the annotation format ever changes this function will handle it and |
| 152 | + return something optimal for the converters. Additionally, if anything |
| 153 | + important is absent from the current json, this function fills it. |
| 154 | + """ |
| 155 | + if isinstance(sa_annotation_json, list): |
| 156 | + sa_annotation_json = self.convert_from_old_sa_to_new( |
| 157 | + sa_annotation_json, self.project_type |
| 158 | + ) |
| 159 | + if 'metadata' not in sa_annotation_json: |
| 160 | + sa_annotation_json['metadata'] = {} |
| 161 | + |
| 162 | + if 'tags' not in sa_annotation_json: |
| 163 | + sa_annotation_json['tags'] = [] |
| 164 | + |
| 165 | + if 'instances' not in sa_annotation_json: |
| 166 | + sa_annotation_json['instances'] = [] |
| 167 | + if 'comments' not in sa_annotation_json: |
| 168 | + sa_annotation_json['comments'] = [] |
| 169 | + |
| 170 | + if 'name' not in sa_annotation_json[ |
| 171 | + 'metadata'] or sa_annotation_json['metadata']['name'] is None: |
| 172 | + fname = fpath.split('/')[-1] |
| 173 | + fname = fname[:-len( |
| 174 | + self.project_type_to_json_ending[self.project_type] |
| 175 | + )] |
| 176 | + |
| 177 | + sa_annotation_json['metadata']['name'] = fname |
| 178 | + sa_annotation_json['metadata']['image_path'] = str( |
| 179 | + Path(fpath).parent / sa_annotation_json['metadata']['name'] |
| 180 | + ) |
| 181 | + |
| 182 | + sa_annotation_json['metadata']['annotation_json'] = fpath |
| 183 | + if self.task == 'panoptic_segmentation': |
| 184 | + panoptic_mask = str( |
| 185 | + Path(self.export_root) / |
| 186 | + (sa_annotation_json['metadata']['name'] + '.png') |
| 187 | + ) |
| 188 | + |
| 189 | + sa_annotation_json['metadata']['panoptic_mask'] = panoptic_mask |
| 190 | + |
| 191 | + if self.project_type == 'Pixel': |
| 192 | + sa_annotation_json['metadata']['sa_bluemask_path'] = str( |
| 193 | + Path(self.export_root) / |
| 194 | + (sa_annotation_json['metadata']['name'] + '___save.png') |
| 195 | + ) |
| 196 | + |
| 197 | + if not isinstance( |
| 198 | + sa_annotation_json['metadata'].get('height', None), int |
| 199 | + ) or not isinstance( |
| 200 | + sa_annotation_json['metadata'].get('width', None), int |
| 201 | + ): |
| 202 | + image_height, image_width = self.get_image_dimensions( |
| 203 | + sa_annotation_json['metadata']['image_path'] |
| 204 | + ) |
| 205 | + sa_annotation_json['metadata']['height'] = image_height |
| 206 | + sa_annotation_json['metadata']['width'] = image_width |
| 207 | + |
| 208 | + return sa_annotation_json |
| 209 | + |
| 210 | + def get_image_dimensions(self, image_path): |
| 211 | + |
| 212 | + img_height = None |
| 213 | + img_width = None |
| 214 | + |
| 215 | + img = cv2.imread(image_path, cv2.IMREAD_UNCHANGED) |
| 216 | + if img is not None: |
| 217 | + dimensions = img.shape |
| 218 | + img_height, img_width = (dimensions[0], dimensions[1]) |
| 219 | + else: |
| 220 | + try: |
| 221 | + img = Image.open(image_path) |
| 222 | + img_width, img_height = img.size() |
| 223 | + except Exception as e: |
| 224 | + raise |
| 225 | + |
| 226 | + return img_height, img_width |
| 227 | + |
| 228 | + def _prepare_single_image_commons_pixel(self, id_, metadata): |
| 229 | + |
| 230 | + ImgCommons = namedtuple( |
| 231 | + 'ImgCommons', |
| 232 | + ['image_info', 'ann_mask', 'sa_bluemask_rgb', 'flat_mask'] |
| 233 | + ) |
| 234 | + sa_bluemask_path = metadata['sa_bluemask_path'] |
| 235 | + |
| 236 | + image_info = self._make_image_info( |
| 237 | + metadata['name'], metadata['height'], metadata['width'], id_ |
| 238 | + ) |
| 239 | + |
| 240 | + sa_bluemask_rgb = np.asarray( |
| 241 | + Image.open(sa_bluemask_path).convert('RGB'), dtype=np.uint32 |
| 242 | + ) |
| 243 | + |
| 244 | + ann_mask = np.zeros( |
| 245 | + (image_info['height'], image_info['width']), dtype=np.uint32 |
| 246 | + ) |
| 247 | + flat_mask = (sa_bluemask_rgb[:, :, 0] << |
| 248 | + 16) | (sa_bluemask_rgb[:, :, 1] << |
| 249 | + 8) | (sa_bluemask_rgb[:, :, 2]) |
| 250 | + |
| 251 | + res = ImgCommons(image_info, ann_mask, sa_bluemask_rgb, flat_mask) |
| 252 | + |
| 253 | + return res |
| 254 | + |
| 255 | + def _prepare_single_image_commons_vector(self, id_, metadata): |
| 256 | + |
| 257 | + ImgCommons = namedtuple('ImgCommons', ['image_info']) |
| 258 | + |
| 259 | + image_info = self._make_image_info( |
| 260 | + metadata['name'], metadata['height'], metadata['width'], id_ |
| 261 | + ) |
| 262 | + |
| 263 | + res = ImgCommons(image_info) |
| 264 | + |
| 265 | + return res |
| 266 | + |
| 267 | + def _prepare_single_image_commons(self, id_, metadata): |
| 268 | + res = None |
| 269 | + if self.project_type == 'Pixel': |
| 270 | + res = self._prepare_single_image_commons_pixel(id_, metadata) |
| 271 | + elif self.project_type == 'Vector': |
| 272 | + res = self._prepare_single_image_commons_vector(id_, metadata) |
| 273 | + return res |
| 274 | + |
| 275 | + def _make_image_info(self, pname, pheight, pwidth, id_): |
| 276 | + image_info = { |
| 277 | + 'id': id_, |
| 278 | + 'file_name': pname, |
| 279 | + 'height': pheight, |
| 280 | + 'width': pwidth, |
| 281 | + 'license': 1 |
| 282 | + } |
| 283 | + |
| 284 | + return image_info |
| 285 | + |
| 286 | + def _create_sa_classes(self, json_path): |
| 287 | + json_data = json.load(open(json_path)) |
| 288 | + classes_list = json_data["categories"] |
| 289 | + |
| 290 | + classes = [] |
| 291 | + for data in classes_list: |
| 292 | + color = np.random.choice(range(256), size=3) |
| 293 | + hexcolor = "#%02x%02x%02x" % tuple(color) |
| 294 | + classes_dict = { |
| 295 | + 'name': data["name"], |
| 296 | + 'color': hexcolor, |
| 297 | + 'attribute_groups': [] |
| 298 | + } |
| 299 | + classes.append(classes_dict) |
| 300 | + |
| 301 | + return classes |
| 302 | + |
| 303 | + def to_sa_format(self): |
| 304 | + json_data = self.export_root / (self.dataset_name + ".json") |
| 305 | + sa_classes = self._create_sa_classes(json_data) |
| 306 | + (self.output_dir / 'classes').mkdir(parents=True, exist_ok=True) |
| 307 | + write_to_json(self.output_dir / 'classes' / 'classes.json', sa_classes) |
| 308 | + self.conversion_algorithm(json_data, self.output_dir) |
| 309 | + |
| 310 | + def make_anno_json_generator(self): |
| 311 | + json_data = None |
| 312 | + |
| 313 | + if self.project_type == 'Pixel': |
| 314 | + jsons = list(Path(self.export_root).glob('*pixel.json')) |
| 315 | + elif self.project_type == 'Vector': |
| 316 | + jsons = list(Path(self.export_root).glob('*objects.json')) |
| 317 | + |
| 318 | + self.set_num_total_images(len(jsons)) |
| 319 | + print() |
| 320 | + for fpath in jsons: |
| 321 | + with open(fpath, 'r') as fp: |
| 322 | + json_data = json.load(fp) |
| 323 | + json_data = self._parse_json_into_common_format( |
| 324 | + json_data, fpath |
| 325 | + ) |
| 326 | + |
| 327 | + yield json_data |
0 commit comments