Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
389 changes: 389 additions & 0 deletions extract_flac.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,389 @@
import io
import logging
import re
import base64
import struct
import mutagen
from collections import namedtuple
from utils import convert_key_to_camelot

FMT_VERSION = "BB"
NonTerminalBeatgridMarker = namedtuple(
"NonTerminalBeatgridMarker", ["position", "beats_till_next_marker"]
)
TerminalBeatgridMarker = namedtuple("TerminalBeatgridMarker", ["position", "bpm"])


class Entry(object):
def __init__(self, *args):
assert len(args) == len(self.FIELDS)
for field, value in zip(self.FIELDS, args):
setattr(self, field, value)

def __repr__(self):
return "{name}({data})".format(
name=self.__class__.__name__,
data=", ".join(
"{}={!r}".format(name, getattr(self, name)) for name in self.FIELDS
),
)


class UnknownEntry(Entry):
NAME = None
FIELDS = ("data",)

@classmethod
def load(cls, data):
return cls(data)

def dump(self):
return self.data


class CueEntry(Entry):
NAME = "CUE"
FMT = ">cBIc3s2s"
FIELDS = (
"field1",
"index",
"position",
"field4",
"color",
"field6",
"name",
)

@classmethod
def load(cls, data):
info_size = struct.calcsize(cls.FMT)
info = struct.unpack(cls.FMT, data[:info_size])
name, nullbyte, other = data[info_size:].partition(b"\x00")
assert nullbyte == b"\x00"
assert other == b""
return cls(*info, name.decode("utf-8"))

def dump(self):
struct_fields = self.FIELDS[:-1]
return b"".join(
(
struct.pack(self.FMT, *(getattr(self, f) for f in struct_fields)),
self.name.encode("utf-8"),
b"\x00",
)
)

def pad_base64_string(b64_string: str):

while b64_string.endswith(b'='):
b64_string = b64_string[:-1]

padding_needed = 4 - (len(b64_string) % 4)

if padding_needed != 4:
b64_string += b'=' * padding_needed
return b64_string

def parse_beatgrid_markers(fp):
version = struct.unpack("BB", fp.read(2))

if version != (0x01, 0x00):
raise ValueError("Unsupported version: " + str(version))

num_markers = struct.unpack(">I", fp.read(4))[0]
markers = []

for i in range(num_markers):
pos = struct.unpack(">f", fp.read(4))[0]
data = fp.read(4)

if len(data) < 4:
logging.error("Insufficient data for beatgrid marker.")
break

if i == num_markers - 1:
bpm = struct.unpack(">f", data)[0]
markers.append(TerminalBeatgridMarker(pos, bpm))

else:
beats_till_next_marker = struct.unpack(">I", data)[0]
markers.append(NonTerminalBeatgridMarker(pos, beats_till_next_marker))

fp.read(1)
return markers


def get_beatgrid(base64_data):
if not base64_data:
logging.debug("No hot cue data provided.")
return []

if isinstance(base64_data, str):
base64_data = base64_data.encode("utf-8")

clean_data = re.sub(rb"[^a-zA-Z0-9+/=]", b"", base64_data)

try:
data = base64.b64decode(clean_data)
logging.debug(f"Decoded hot cue data length: {len(data)} bytes.")
except Exception:
cut_off_data = clean_data[:(len(clean_data) // 4) * 4]
try:
data = base64.b64decode(cut_off_data)
except Exception as e:
logging.error(f"Base64 decode error for beatgrid data: {e}")
return []

if not data.startswith(b"application/octet-stream\0"):
logging.error("Failed to parse tag")
return []
fieldname_endpos = data.index(b"\0", 26)
fielddata = data[fieldname_endpos + 1 :]

data = fielddata.replace(b"\n", b"")

fp = io.BytesIO(data)
with fp:
markers = parse_beatgrid_markers(fp)

result = {"markers": {"non_terminal": [], "terminal": None}}

if not markers:
return result

if len(markers) == 1:
m = markers[0]
result["markers"]["terminal"] = {"position": m.position, "bpm": m.bpm}

else:
non_terminal = []

for m in markers[:-1]:
non_terminal.append(
{
"position": m.position,
"beats_till_next_marker": m.beats_till_next_marker,
}
)

result["markers"]["non_terminal"] = non_terminal
terminal = markers[-1]
result["markers"]["terminal"] = {
"position": terminal.position,
"bpm": terminal.bpm,
}

return result


def readbytes(fp):
for x in iter(lambda: fp.read(1), b""):
if x == b"\00":
break
yield x


def get_entry_type(entry_name):
entry_type = UnknownEntry
if CueEntry.NAME == entry_name:
entry_type = CueEntry
return entry_type


def parse_serato_hot_cues(base64_data):
if not base64_data:
logging.debug("No hot cue data provided.")
return []

if isinstance(base64_data, str):
base64_data = base64_data.encode("utf-8")

clean_data = re.sub(rb"[^a-zA-Z0-9+/=]", b"", base64_data)
clean_data = pad_base64_string(clean_data)

try:
data = base64.b64decode(clean_data)
logging.debug(f"Decoded hot cue data length: {len(data)} bytes.")
except Exception as e:
logging.error(f"Base64 decode error for serato markers data: {e}")
return []

if not data.startswith(b"application/octet-stream\0"):
logging.error("Failed to parse tag")
return []
fieldname_endpos = data.index(b"\0", 26)
fielddata = data[fieldname_endpos + 1 :]

fielddata = fielddata.replace(b"\n", b"")
data = fielddata[2:fielddata.find(b'\x00')]
try:
b64_decoded_data = base64.b64decode(data)
except Exception as e1:
logging.debug(f"Base64 decode error for serato markers content: {e1}. Trying again..")
try:
cut_off_data = data[:(len(data) // 4) * 4]
b64_decoded_data = base64.b64decode(cut_off_data)
except Exception as e2:
logging.error(f"Base64 decode error for serato markers content: {e2}")
return []

hot_cues = []
fp = io.BytesIO(b64_decoded_data)
assert struct.unpack(FMT_VERSION, fp.read(2)) == (0x01, 0x01)
while True:
entry_bytes = b"".join(readbytes(fp))
entry_name = entry_bytes.decode("utf-8")
if not entry_name:
break
entry_len = struct.unpack(">I", fp.read(4))[0]
assert entry_len > 0

entry_type = get_entry_type(entry_name)
if entry_type is UnknownEntry:
logging.debug(
f"Unknown entry type '{entry_name}' encountered. Skipping {entry_len} bytes."
)
fp.read(entry_len)
continue
entry = CueEntry.load(fp.read(entry_len))
logging.debug(f"Parsed hot cue entry: {entry}")
color_data = entry.color
if len(color_data) == 3:
color_hex = "#{:02X}{:02X}{:02X}".format(
color_data[0], color_data[1], color_data[2]
)
else:
color_hex = "#000000"

hot_cues.append(
{
"index": entry.index,
"position_ms": entry.position,
"color": color_hex,
"name": entry.name,
}
)
return hot_cues

def extract_metadata(input_file: str) -> dict:
audio_metadata = {
"title": "Unknown",
"artist": "Unknown",
"bpm": 0.0,
"key": "Unknown",
"duration_sec": 0.0,
}
hot_cues = []
beatgrid_data = {"markers": {"non_terminal": [], "terminal": None}}

try:
tagfile = mutagen.File(input_file)
if tagfile is None:
logging.warning(
f"Unable to open or read tags from {input_file} using mutagen."
)

return {
"metadata": audio_metadata,
"hot_cues": [],
"beatgrid": beatgrid_data,
}

tags = tagfile.tags if tagfile.tags is not None else {}
audio_metadata["title"] = str(
tags.get("TITLE")[0]
if len(tags.get("TITLE", [])) > 0
else tags.get("TITLE", "Unknown")
)
audio_metadata["artist"] = str(
tags.get("ARTIST")[0]
if len(tags.get("ARTIST", [])) > 0
else tags.get("ARTIST", "Unknown")
)
bpm_tag = (
tags.get("BPM")[0]
if len(tags.get("BPM", [])) > 0
else tags.get("BPM", None)
)

if bpm_tag:
try:
bpm_value = None
if hasattr(bpm_tag, "text") and bpm_tag.text:
bpm_str = str(bpm_tag.text[0]).strip()
bpm_str_cleaned = re.sub(r"[^0-9.]", "", bpm_str)

if bpm_str_cleaned:
bpm_value = float(bpm_str_cleaned)
elif isinstance(bpm_tag, (int, float, str)):
bpm_value = float(bpm_tag)

if bpm_value is not None:
audio_metadata["bpm"] = bpm_value
else:
logging.warning(
f"Could not extract valid BPM value from tag(s) '{bpm_tag}' for {input_file}."
)

except (ValueError, TypeError) as e:
logging.warning(
f"Could not convert BPM tag '{bpm_tag}' to float for {input_file}: {e}."
)
audio_metadata["bpm"] = 0.0

key_tag = tags.get("TKEY")
key = "Unknown"

if key_tag and hasattr(key_tag, "text") and key_tag.text:
key = str(key_tag.text[0]).strip()
else:
key_tag_initialkey = (
tags.get("INITIALKEY")[0]
if len(tags.get("INITIALKEY", [])) > 0
else tags.get("INITIALKEY", None)
)
if key_tag_initialkey and isinstance(key_tag_initialkey, str):
key = str(key_tag_initialkey).strip()

audio_metadata["key"] = convert_key_to_camelot(key)

if tagfile.info and hasattr(tagfile.info, "length"):
audio_metadata["duration_sec"] = round(tagfile.info.length, 3)
else:
logging.warning(f"Could not get duration from file info for {input_file}.")

hotcues_tag = (
tags.get("SERATO_MARKERS_V2")[0]
if len(tags.get("SERATO_MARKERS_V2", [])) > 0
else tags.get("SERATO_MARKERS_V2", None)
)
if hotcues_tag:
try:
hot_cues = parse_serato_hot_cues(hotcues_tag)

except Exception as e:
logging.error(
f"Error reading Serato Markers2 (hot cues) from {input_file}: {e}",
exc_info=True,
)

beatgrid_tag = (
tags.get("SERATO_BEATGRID")[0]
if len(tags.get("SERATO_BEATGRID", [])) > 0
else tags.get("SERATO_BEATGRID", None)
)
beatgrid_data = get_beatgrid(beatgrid_tag)

except FileNotFoundError:
logging.error(f"File not found: {input_file}")

return {"metadata": audio_metadata, "hot_cues": [], "beatgrid": beatgrid_data}
except Exception as e:
logging.error(
f"An unexpected error occurred while processing {input_file}: {e}",
exc_info=True,
)

return {"metadata": audio_metadata, "hot_cues": [], "beatgrid": beatgrid_data}

return {"metadata": audio_metadata, "hot_cues": hot_cues, "beatgrid": beatgrid_data}
Loading