diff --git a/extract_flac.py b/extract_flac.py new file mode 100644 index 0000000..161531a --- /dev/null +++ b/extract_flac.py @@ -0,0 +1,389 @@ +import io +import logging +import re +import base64 +import struct +import mutagen +from collections import namedtuple +from utils import convert_key_to_camelot + +FMT_VERSION = "BB" +NonTerminalBeatgridMarker = namedtuple( + "NonTerminalBeatgridMarker", ["position", "beats_till_next_marker"] +) +TerminalBeatgridMarker = namedtuple("TerminalBeatgridMarker", ["position", "bpm"]) + + +class Entry(object): + def __init__(self, *args): + assert len(args) == len(self.FIELDS) + for field, value in zip(self.FIELDS, args): + setattr(self, field, value) + + def __repr__(self): + return "{name}({data})".format( + name=self.__class__.__name__, + data=", ".join( + "{}={!r}".format(name, getattr(self, name)) for name in self.FIELDS + ), + ) + + +class UnknownEntry(Entry): + NAME = None + FIELDS = ("data",) + + @classmethod + def load(cls, data): + return cls(data) + + def dump(self): + return self.data + + +class CueEntry(Entry): + NAME = "CUE" + FMT = ">cBIc3s2s" + FIELDS = ( + "field1", + "index", + "position", + "field4", + "color", + "field6", + "name", + ) + + @classmethod + def load(cls, data): + info_size = struct.calcsize(cls.FMT) + info = struct.unpack(cls.FMT, data[:info_size]) + name, nullbyte, other = data[info_size:].partition(b"\x00") + assert nullbyte == b"\x00" + assert other == b"" + return cls(*info, name.decode("utf-8")) + + def dump(self): + struct_fields = self.FIELDS[:-1] + return b"".join( + ( + struct.pack(self.FMT, *(getattr(self, f) for f in struct_fields)), + self.name.encode("utf-8"), + b"\x00", + ) + ) + +def pad_base64_string(b64_string: str): + + while b64_string.endswith(b'='): + b64_string = b64_string[:-1] + + padding_needed = 4 - (len(b64_string) % 4) + + if padding_needed != 4: + b64_string += b'=' * padding_needed + return b64_string + +def parse_beatgrid_markers(fp): + version = struct.unpack("BB", fp.read(2)) + + if version != (0x01, 0x00): + raise ValueError("Unsupported version: " + str(version)) + + num_markers = struct.unpack(">I", fp.read(4))[0] + markers = [] + + for i in range(num_markers): + pos = struct.unpack(">f", fp.read(4))[0] + data = fp.read(4) + + if len(data) < 4: + logging.error("Insufficient data for beatgrid marker.") + break + + if i == num_markers - 1: + bpm = struct.unpack(">f", data)[0] + markers.append(TerminalBeatgridMarker(pos, bpm)) + + else: + beats_till_next_marker = struct.unpack(">I", data)[0] + markers.append(NonTerminalBeatgridMarker(pos, beats_till_next_marker)) + + fp.read(1) + return markers + + +def get_beatgrid(base64_data): + if not base64_data: + logging.debug("No hot cue data provided.") + return [] + + if isinstance(base64_data, str): + base64_data = base64_data.encode("utf-8") + + clean_data = re.sub(rb"[^a-zA-Z0-9+/=]", b"", base64_data) + + try: + data = base64.b64decode(clean_data) + logging.debug(f"Decoded hot cue data length: {len(data)} bytes.") + except Exception: + cut_off_data = clean_data[:(len(clean_data) // 4) * 4] + try: + data = base64.b64decode(cut_off_data) + except Exception as e: + logging.error(f"Base64 decode error for beatgrid data: {e}") + return [] + + if not data.startswith(b"application/octet-stream\0"): + logging.error("Failed to parse tag") + return [] + fieldname_endpos = data.index(b"\0", 26) + fielddata = data[fieldname_endpos + 1 :] + + data = fielddata.replace(b"\n", b"") + + fp = io.BytesIO(data) + with fp: + markers = parse_beatgrid_markers(fp) + + result = {"markers": {"non_terminal": [], "terminal": None}} + + if not markers: + return result + + if len(markers) == 1: + m = markers[0] + result["markers"]["terminal"] = {"position": m.position, "bpm": m.bpm} + + else: + non_terminal = [] + + for m in markers[:-1]: + non_terminal.append( + { + "position": m.position, + "beats_till_next_marker": m.beats_till_next_marker, + } + ) + + result["markers"]["non_terminal"] = non_terminal + terminal = markers[-1] + result["markers"]["terminal"] = { + "position": terminal.position, + "bpm": terminal.bpm, + } + + return result + + +def readbytes(fp): + for x in iter(lambda: fp.read(1), b""): + if x == b"\00": + break + yield x + + +def get_entry_type(entry_name): + entry_type = UnknownEntry + if CueEntry.NAME == entry_name: + entry_type = CueEntry + return entry_type + + +def parse_serato_hot_cues(base64_data): + if not base64_data: + logging.debug("No hot cue data provided.") + return [] + + if isinstance(base64_data, str): + base64_data = base64_data.encode("utf-8") + + clean_data = re.sub(rb"[^a-zA-Z0-9+/=]", b"", base64_data) + clean_data = pad_base64_string(clean_data) + + try: + data = base64.b64decode(clean_data) + logging.debug(f"Decoded hot cue data length: {len(data)} bytes.") + except Exception as e: + logging.error(f"Base64 decode error for serato markers data: {e}") + return [] + + if not data.startswith(b"application/octet-stream\0"): + logging.error("Failed to parse tag") + return [] + fieldname_endpos = data.index(b"\0", 26) + fielddata = data[fieldname_endpos + 1 :] + + fielddata = fielddata.replace(b"\n", b"") + data = fielddata[2:fielddata.find(b'\x00')] + try: + b64_decoded_data = base64.b64decode(data) + except Exception as e1: + logging.debug(f"Base64 decode error for serato markers content: {e1}. Trying again..") + try: + cut_off_data = data[:(len(data) // 4) * 4] + b64_decoded_data = base64.b64decode(cut_off_data) + except Exception as e2: + logging.error(f"Base64 decode error for serato markers content: {e2}") + return [] + + hot_cues = [] + fp = io.BytesIO(b64_decoded_data) + assert struct.unpack(FMT_VERSION, fp.read(2)) == (0x01, 0x01) + while True: + entry_bytes = b"".join(readbytes(fp)) + entry_name = entry_bytes.decode("utf-8") + if not entry_name: + break + entry_len = struct.unpack(">I", fp.read(4))[0] + assert entry_len > 0 + + entry_type = get_entry_type(entry_name) + if entry_type is UnknownEntry: + logging.debug( + f"Unknown entry type '{entry_name}' encountered. Skipping {entry_len} bytes." + ) + fp.read(entry_len) + continue + entry = CueEntry.load(fp.read(entry_len)) + logging.debug(f"Parsed hot cue entry: {entry}") + color_data = entry.color + if len(color_data) == 3: + color_hex = "#{:02X}{:02X}{:02X}".format( + color_data[0], color_data[1], color_data[2] + ) + else: + color_hex = "#000000" + + hot_cues.append( + { + "index": entry.index, + "position_ms": entry.position, + "color": color_hex, + "name": entry.name, + } + ) + return hot_cues + +def extract_metadata(input_file: str) -> dict: + audio_metadata = { + "title": "Unknown", + "artist": "Unknown", + "bpm": 0.0, + "key": "Unknown", + "duration_sec": 0.0, + } + hot_cues = [] + beatgrid_data = {"markers": {"non_terminal": [], "terminal": None}} + + try: + tagfile = mutagen.File(input_file) + if tagfile is None: + logging.warning( + f"Unable to open or read tags from {input_file} using mutagen." + ) + + return { + "metadata": audio_metadata, + "hot_cues": [], + "beatgrid": beatgrid_data, + } + + tags = tagfile.tags if tagfile.tags is not None else {} + audio_metadata["title"] = str( + tags.get("TITLE")[0] + if len(tags.get("TITLE", [])) > 0 + else tags.get("TITLE", "Unknown") + ) + audio_metadata["artist"] = str( + tags.get("ARTIST")[0] + if len(tags.get("ARTIST", [])) > 0 + else tags.get("ARTIST", "Unknown") + ) + bpm_tag = ( + tags.get("BPM")[0] + if len(tags.get("BPM", [])) > 0 + else tags.get("BPM", None) + ) + + if bpm_tag: + try: + bpm_value = None + if hasattr(bpm_tag, "text") and bpm_tag.text: + bpm_str = str(bpm_tag.text[0]).strip() + bpm_str_cleaned = re.sub(r"[^0-9.]", "", bpm_str) + + if bpm_str_cleaned: + bpm_value = float(bpm_str_cleaned) + elif isinstance(bpm_tag, (int, float, str)): + bpm_value = float(bpm_tag) + + if bpm_value is not None: + audio_metadata["bpm"] = bpm_value + else: + logging.warning( + f"Could not extract valid BPM value from tag(s) '{bpm_tag}' for {input_file}." + ) + + except (ValueError, TypeError) as e: + logging.warning( + f"Could not convert BPM tag '{bpm_tag}' to float for {input_file}: {e}." + ) + audio_metadata["bpm"] = 0.0 + + key_tag = tags.get("TKEY") + key = "Unknown" + + if key_tag and hasattr(key_tag, "text") and key_tag.text: + key = str(key_tag.text[0]).strip() + else: + key_tag_initialkey = ( + tags.get("INITIALKEY")[0] + if len(tags.get("INITIALKEY", [])) > 0 + else tags.get("INITIALKEY", None) + ) + if key_tag_initialkey and isinstance(key_tag_initialkey, str): + key = str(key_tag_initialkey).strip() + + audio_metadata["key"] = convert_key_to_camelot(key) + + if tagfile.info and hasattr(tagfile.info, "length"): + audio_metadata["duration_sec"] = round(tagfile.info.length, 3) + else: + logging.warning(f"Could not get duration from file info for {input_file}.") + + hotcues_tag = ( + tags.get("SERATO_MARKERS_V2")[0] + if len(tags.get("SERATO_MARKERS_V2", [])) > 0 + else tags.get("SERATO_MARKERS_V2", None) + ) + if hotcues_tag: + try: + hot_cues = parse_serato_hot_cues(hotcues_tag) + + except Exception as e: + logging.error( + f"Error reading Serato Markers2 (hot cues) from {input_file}: {e}", + exc_info=True, + ) + + beatgrid_tag = ( + tags.get("SERATO_BEATGRID")[0] + if len(tags.get("SERATO_BEATGRID", [])) > 0 + else tags.get("SERATO_BEATGRID", None) + ) + beatgrid_data = get_beatgrid(beatgrid_tag) + + except FileNotFoundError: + logging.error(f"File not found: {input_file}") + + return {"metadata": audio_metadata, "hot_cues": [], "beatgrid": beatgrid_data} + except Exception as e: + logging.error( + f"An unexpected error occurred while processing {input_file}: {e}", + exc_info=True, + ) + + return {"metadata": audio_metadata, "hot_cues": [], "beatgrid": beatgrid_data} + + return {"metadata": audio_metadata, "hot_cues": hot_cues, "beatgrid": beatgrid_data} diff --git a/extract_wav.py b/extract_wav.py index a46c6f3..ef63707 100644 --- a/extract_wav.py +++ b/extract_wav.py @@ -36,9 +36,15 @@ def parse_serato_hot_cues(base64_data): try: data = base64.b64decode(clean_data) logging.debug(f"Decoded hot cue data length: {len(data)} bytes.") - except Exception as e: - logging.error(f"Base64 decode error for hot cue data: {e}") - return [] + except Exception: + try: + # Attempt to decode after removing padding characters and cutting until multiple of 4 + clean_data_unpadded = clean_data.replace(b'=', b'') + cut_off_data = clean_data_unpadded[:(len(clean_data_unpadded) // 4) * 4] + data = base64.b64decode(cut_off_data) + except Exception as e: + logging.error(f"Base64 decode error for hot cue data: {e}") + return [] index = 0 hot_cues = [] diff --git a/serato2rekordbox.py b/serato2rekordbox.py index d6bd7fc..d167335 100644 --- a/serato2rekordbox.py +++ b/serato2rekordbox.py @@ -24,6 +24,7 @@ import extract_mp3 import extract_m4a import extract_wav +import extract_flac import urllib.request import ssl @@ -323,6 +324,9 @@ def extract_file_paths_from_crate(crate_file_path, encoding: str = "utf-16-be"): elif file_extension == '.wav': extracted_data = extract_wav.extract_metadata(full_system_path) + + elif file_extension == '.flac': + extracted_data = extract_flac.extract_metadata(full_system_path) else: unsuccessfulConversions.append({'type': 'unsupported_format', 'path': full_system_path, 'error': f"Unsupported format: {file_extension}"})