66from typing import List
77from typing import Optional
88
9+ from lib .core .enums import AnnotationTypes
10+ from lib .core .exceptions import AppException
911from pydantic import BaseModel
1012
1113
1214class Annotation (BaseModel ):
1315 instanceId : int
1416 type : str
15- className : str
17+ className : Optional [str ]
18+ x : Optional [Any ]
19+ y : Optional [Any ]
1620 points : Optional [Dict ]
1721 attributes : Optional [List [Any ]] = []
1822 keyframe : bool = False
@@ -25,6 +29,7 @@ class FrameAnnotation(BaseModel):
2529
2630class VideoFrameGenerator :
2731 def __init__ (self , annotation_data : dict , fps : int ):
32+ self .validate_annotations (annotation_data )
2833 self .id_generator = iter (itertools .count (0 ))
2934 self ._annotation_data = annotation_data
3035 self .duration = annotation_data ["metadata" ]["duration" ] / (1000 * 1000 )
@@ -36,40 +41,54 @@ def __init__(self, annotation_data: dict, fps: int):
3641 self ._mapping = {}
3742 self ._process ()
3843
44+ @staticmethod
45+ def validate_annotations (annotation_data : dict ):
46+ try :
47+ annotation_data ["metadata" ]["duration" ]
48+ except KeyError :
49+ raise AppException ("Video not annotated yet" )
50+
3951 def get_frame (self , frame_no : int ):
4052 try :
4153 return self .annotations [frame_no ]
4254 except KeyError :
4355 self .annotations [frame_no ] = FrameAnnotation (frame = frame_no )
4456 return self .annotations [frame_no ]
4557
46- def interpolate_annotations (
47- self ,
48- class_name : str ,
49- from_frame : int ,
50- to_frame : int ,
51- data : dict ,
52- instance_id : int ,
53- steps : dict = None ,
54- annotation_type : str = "bbox" ,
58+ def _interpolate (
59+ self ,
60+ class_name : str ,
61+ from_frame : int ,
62+ to_frame : int ,
63+ data : dict ,
64+ instance_id : int ,
65+ steps : dict = None ,
66+ annotation_type : str = "bbox" ,
5567 ) -> dict :
5668 annotations = {}
5769 for idx , frame_idx in enumerate (range (from_frame + 1 , to_frame ), 1 ):
58- points = None
59- if annotation_type == "bbox" and data .get ("points" ) and steps :
60- points = {
70+ tmp_data = {}
71+ if annotation_type == AnnotationTypes . BBOX and data .get ("points" ) and steps :
72+ tmp_data [ " points" ] = {
6173 "x1" : round (data ["points" ]["x1" ] + steps ["x1" ] * idx , 2 ),
6274 "y1" : round (data ["points" ]["y1" ] + steps ["y1" ] * idx , 2 ),
6375 "x2" : round (data ["points" ]["x2" ] + steps ["x2" ] * idx , 2 ),
6476 "y2" : round (data ["points" ]["y2" ] + steps ["y2" ] * idx , 2 ),
6577 }
78+ elif annotation_type == AnnotationTypes .POINT :
79+ tmp_data = {
80+ "x" : round (data ["x" ] + steps ["x" ] * idx , 2 ),
81+ "y" : round (data ["y" ] + steps ["y" ] * idx , 2 )
82+ }
83+ elif annotation_type in (AnnotationTypes .POLYGON , AnnotationTypes .POLYLINE ):
84+ tmp_data ["points" ] = [point + steps [idx ] * 2 for idx , point in enumerate (data ["points" ])]
6685 annotations [frame_idx ] = Annotation (
6786 instanceId = instance_id ,
6887 type = annotation_type ,
6988 className = class_name ,
70- points = points ,
7189 attributes = data ["attributes" ],
7290 keyframe = False ,
91+ ** tmp_data
7392 )
7493 return annotations
7594
@@ -98,6 +117,9 @@ def get_median(self, annotations: List[dict]) -> dict:
98117 median_annotation = annotation
99118 return median_annotation
100119
120+ def calculate_sped (self , from_frame , to_frame ):
121+ pass
122+
101123 @staticmethod
102124 def merge_first_frame (frames_mapping ):
103125 try :
@@ -108,11 +130,37 @@ def merge_first_frame(frames_mapping):
108130 finally :
109131 return frames_mapping
110132
133+ def _interpolate_frames (
134+ self , from_frame , from_frame_no , to_frame , to_frame_no , annotation_type , class_name , instance_id
135+ ):
136+ steps = None
137+ frames_diff = to_frame_no - from_frame_no
138+ if annotation_type == AnnotationTypes .BBOX and from_frame .get ("points" ) and to_frame .get ("points" ):
139+ steps = {}
140+ for point in "x1" , "x2" , "y1" , "y2" :
141+ steps [point ] = round (
142+ (to_frame ["points" ][point ] - from_frame ["points" ][point ]) / frames_diff , 2
143+ )
144+ elif annotation_type == AnnotationTypes .POINT :
145+ steps = {
146+ "x" : (to_frame ["x" ] - from_frame ["x" ]) / frames_diff ,
147+ "y" : (to_frame ["y" ] - from_frame ["y" ]) / frames_diff
148+ }
149+ elif annotation_type in (AnnotationTypes .POLYGON , AnnotationTypes .POLYLINE ):
150+ steps = [
151+ (to_point - from_point ) / frames_diff
152+ for from_point , to_point in zip (from_frame ["points" ], to_frame ["points" ])
153+ ]
154+ return self ._interpolate (
155+ class_name = class_name , from_frame = from_frame_no , to_frame = to_frame_no , data = from_frame ,
156+ instance_id = instance_id , steps = steps , annotation_type = annotation_type
157+ )
158+
111159 def _process (self ):
112160 for instance in self ._annotation_data ["instances" ]:
113161 instance_id = next (self .id_generator )
114162 annotation_type = instance ["meta" ]["type" ]
115- class_name = instance ["meta" ][ "className" ]
163+ class_name = instance ["meta" ]. get ( "className" )
116164 for parameter in instance ["parameters" ]:
117165 frames_mapping = defaultdict (list )
118166 last_frame_no = None
@@ -131,55 +179,15 @@ def _process(self):
131179 )
132180 frames_diff = to_frame_no - from_frame_no
133181 if frames_diff > 1 :
134- steps = None
135- if (
136- annotation_type == "bbox"
137- and from_frame .get ("points" )
138- and to_frame .get ("points" )
139- ):
140- steps = {
141- "y1" : round (
142- (
143- to_frame ["points" ]["y1" ]
144- - from_frame ["points" ]["y1" ]
145- )
146- / frames_diff ,
147- 2 ,
148- ),
149- "x2" : round (
150- (
151- to_frame ["points" ]["x2" ]
152- - from_frame ["points" ]["x2" ]
153- )
154- / frames_diff ,
155- 2 ,
156- ),
157- "x1" : round (
158- (
159- to_frame ["points" ]["x1" ]
160- - from_frame ["points" ]["x1" ]
161- )
162- / frames_diff ,
163- 2 ,
164- ),
165- "y2" : round (
166- (
167- to_frame ["points" ]["y2" ]
168- - from_frame ["points" ]["y2" ]
169- )
170- / frames_diff ,
171- 2 ,
172- ),
173- }
174182 interpolated_frames .update (
175- self .interpolate_annotations (
183+ self ._interpolate_frames (
184+ from_frame = from_frame ,
185+ from_frame_no = from_frame_no ,
186+ to_frame = to_frame ,
187+ to_frame_no = to_frame_no ,
176188 class_name = class_name ,
177- from_frame = from_frame_no ,
178- to_frame = to_frame_no ,
179- data = from_frame ,
180- instance_id = instance_id ,
181- steps = steps ,
182189 annotation_type = annotation_type ,
190+ instance_id = instance_id
183191 )
184192 )
185193 start_median_frame = self .get_median (frames_mapping [from_frame_no ])
@@ -188,6 +196,8 @@ def _process(self):
188196 instanceId = instance_id ,
189197 type = annotation_type ,
190198 className = class_name ,
199+ x = start_median_frame .get ("x" ),
200+ y = start_median_frame .get ("y" ),
191201 points = start_median_frame .get ("points" ),
192202 attributes = start_median_frame ["attributes" ],
193203 keyframe = True ,
@@ -196,6 +206,8 @@ def _process(self):
196206 instanceId = instance_id ,
197207 type = annotation_type ,
198208 className = class_name ,
209+ x = start_median_frame .get ("x" ),
210+ y = start_median_frame .get ("y" ),
199211 points = end_median_frame .get ("points" ),
200212 attributes = end_median_frame ["attributes" ],
201213 keyframe = True ,
@@ -208,4 +220,5 @@ def _process(self):
208220
209221 def __iter__ (self ):
210222 for frame_no in range (1 , int (self .frames_count ) + 1 ):
211- yield self .get_frame (frame_no ).dict ()
223+ frame = self .get_frame (frame_no )
224+ yield {** frame .dict (exclude_unset = True ), ** frame .dict (exclude_none = True )}
0 commit comments