diff --git a/.github/workflows/pr-test.yaml b/.github/workflows/pr-test.yaml index 27ea6f0..4ebae9c 100644 --- a/.github/workflows/pr-test.yaml +++ b/.github/workflows/pr-test.yaml @@ -22,6 +22,9 @@ jobs: steps: - name: Check out repository uses: actions/checkout@v4 + with: + submodules: recursive + fetch-depth: 0 - name: Verify required folders exist run: | diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..f2f9edb --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "slide2vec/hs2p"] + path = slide2vec/hs2p + url = https://github.com/clemsgrs/hs2p.git diff --git a/Dockerfile b/Dockerfile index 3a96a4f..8b4d039 100644 --- a/Dockerfile +++ b/Dockerfile @@ -35,27 +35,14 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ zip unzip \ git \ openssh-server \ - build-essential \ - ninja-build \ python3-pip python3-dev python-is-python3 \ && mkdir /var/run/sshd \ && apt-get clean \ && rm -rf /var/lib/apt/lists/* -# install ASAP -ARG ASAP_URL=https://github.com/computationalpathologygroup/ASAP/releases/download/ASAP-2.2-(Nightly)/ASAP-2.2-Ubuntu2204.deb -RUN apt-get update && curl -L ${ASAP_URL} -o /tmp/ASAP.deb && apt-get install --assume-yes /tmp/ASAP.deb && \ - SITE_PACKAGES=`python3 -c "import sysconfig; print(sysconfig.get_paths()['purelib'])"` && \ - printf "/opt/ASAP/bin/\n" > "${SITE_PACKAGES}/asap.pth" && \ - apt-get clean && \ - rm -rf /var/lib/apt/lists/* - # clone & install relevant repositories RUN git clone https://github.com/prov-gigapath/prov-gigapath.git /home/user/prov-gigapath -# add folders to python path -ENV PYTHONPATH="/home/user/prov-gigapath:/home/user/CONCH:/home/user/MUSK:$PYTHONPATH" - WORKDIR /opt/app/ # you can add any Python dependencies to requirements.in @@ -70,9 +57,16 @@ RUN python -m pip install \ --requirement /opt/app/requirements.in \ && rm -rf /home/user/.cache/pip -COPY --chown=user:user . /opt/app/ +COPY --chown=user:user slide2vec /opt/app/slide2vec +COPY --chown=user:user setup.py /opt/app/setup.py +COPY --chown=user:user setup.cfg /opt/app/setup.cfg +COPY --chown=user:user pyproject.toml /opt/app/pyproject.toml +COPY --chown=user:user MANIFEST.in /opt/app/MANIFEST.in +COPY --chown=user:user README.md /opt/app/README.md +COPY --chown=user:user LICENSE /opt/app/LICENSE + RUN python -m pip install /opt/app -RUN python -m pip install flash-attn>=2.5.8 --no-build-isolation +RUN python -m pip install 'flash-attn>=2.7.1,<=2.8.0' --no-build-isolation ########################## @@ -112,12 +106,19 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ && apt-get clean \ && rm -rf /var/lib/apt/lists/* +# install ASAP +ARG ASAP_URL=https://github.com/computationalpathologygroup/ASAP/releases/download/ASAP-2.2-(Nightly)/ASAP-2.2-Ubuntu2204.deb +RUN apt-get update && curl -L ${ASAP_URL} -o /tmp/ASAP.deb && apt-get install --assume-yes /tmp/ASAP.deb && \ + SITE_PACKAGES=`python3 -c "import sysconfig; print(sysconfig.get_paths()['purelib'])"` && \ + printf "/opt/ASAP/bin/\n" > "${SITE_PACKAGES}/asap.pth" && \ + apt-get clean && \ + rm -rf /var/lib/apt/lists/* + # copy Python libs & entrypoints from build stage (includes flash-attn, your deps, ASAP .pth) COPY --from=build /usr/local/lib/python3.10/dist-packages /usr/local/lib/python3.10/dist-packages COPY --from=build /usr/local/bin /usr/local/bin -# copy ASAP installation, app code, and prov-gigapath -COPY --from=build /opt/ASAP /opt/ASAP +# copy app code, and prov-gigapath COPY --from=build /opt/app /opt/app COPY --from=build /home/user/prov-gigapath /home/user/prov-gigapath diff --git a/Dockerfile.ci b/Dockerfile.ci index fb8e8a8..dd2803b 100755 --- a/Dockerfile.ci +++ b/Dockerfile.ci @@ -58,7 +58,14 @@ RUN python -m pip install \ --requirement /opt/app/requirements.in \ && rm -rf /root/.cache/pip -COPY --chown=user:user . /opt/app/ +COPY --chown=user:user slide2vec /opt/app/slide2vec +COPY --chown=user:user setup.py /opt/app/setup.py +COPY --chown=user:user setup.cfg /opt/app/setup.cfg +COPY --chown=user:user pyproject.toml /opt/app/pyproject.toml +COPY --chown=user:user MANIFEST.in /opt/app/MANIFEST.in +COPY --chown=user:user README.md /opt/app/README.md +COPY --chown=user:user LICENSE /opt/app/LICENSE + RUN python -m pip install /opt/app USER user diff --git a/slide2vec/aggregate.py b/slide2vec/aggregate.py index 1eeae65..9e5f5f7 100644 --- a/slide2vec/aggregate.py +++ b/slide2vec/aggregate.py @@ -28,14 +28,20 @@ def get_args_parser(add_help: bool = True): "--config-file", default="", metavar="FILE", help="path to config file" ) parser.add_argument( - "--run-id", + "--output-dir", type=str, - default="", - help="Name of output subdirectory", + default=None, + help="output directory to save logs and checkpoints", ) parser.add_argument( "--run-on-cpu", action="store_true", help="run inference on cpu" ) + parser.add_argument( + "opts", + help="Modify config options at the end of the command using \"path.key=value\".", + default=None, + nargs=argparse.REMAINDER, + ) return parser @@ -54,7 +60,7 @@ def main(args): # setup configuration run_on_cpu = args.run_on_cpu cfg = get_cfg_from_file(args.config_file) - output_dir = Path(cfg.output_dir, args.run_id) + output_dir = Path(cfg.output_dir, args.output_dir) cfg.output_dir = str(output_dir) coordinates_dir = Path(cfg.output_dir, "coordinates") @@ -71,6 +77,11 @@ def main(args): process_list.is_file() ), "Process list CSV not found. Ensure tiling has been run." process_df = pd.read_csv(process_list) + if "aggregation_status" not in process_df.columns: + process_df["aggregation_status"] = ["tbp"] * len(process_df) + cols = ["wsi_name", "wsi_path", "mask_path", "tiling_status", "feature_status", "aggregation_status", "error", "traceback"] + process_df = process_df[cols] + skip_feature_aggregation = process_df["aggregation_status"].str.contains("success").all() if skip_feature_aggregation and distributed.is_main_process(): diff --git a/slide2vec/embed.py b/slide2vec/embed.py index 3c3d643..06fb022 100644 --- a/slide2vec/embed.py +++ b/slide2vec/embed.py @@ -28,14 +28,20 @@ def get_args_parser(add_help: bool = True): "--config-file", default="", metavar="FILE", help="path to config file" ) parser.add_argument( - "--run-id", + "--output-dir", type=str, - default="", - help="Name of output subdirectory", + default=None, + help="output directory to save logs and checkpoints", ) parser.add_argument( "--run-on-cpu", action="store_true", help="run inference on cpu" ) + parser.add_argument( + "opts", + help="Modify config options at the end of the command using \"path.key=value\".", + default=None, + nargs=argparse.REMAINDER, + ) return parser @@ -123,7 +129,7 @@ def main(args): # setup configuration run_on_cpu = args.run_on_cpu cfg = get_cfg_from_file(args.config_file) - output_dir = Path(cfg.output_dir, args.run_id) + output_dir = Path(cfg.output_dir, args.output_dir) cfg.output_dir = str(output_dir) if not run_on_cpu: @@ -148,6 +154,11 @@ def main(args): process_list.is_file() ), "Process list CSV not found. Ensure tiling has been run." process_df = pd.read_csv(process_list) + if "feature_status" not in process_df.columns: + process_df["feature_status"] = ["tbp"] * len(process_df) + cols = ["wsi_name", "wsi_path", "mask_path", "tiling_status", "feature_status", "error", "traceback"] + process_df = process_df[cols] + skip_feature_extraction = process_df["feature_status"].str.contains("success").all() if skip_feature_extraction: diff --git a/slide2vec/hs2p b/slide2vec/hs2p new file mode 160000 index 0000000..f1a9def --- /dev/null +++ b/slide2vec/hs2p @@ -0,0 +1 @@ +Subproject commit f1a9defa3528c129b933b9b7b3fbea0dddd9056e diff --git a/slide2vec/main.py b/slide2vec/main.py index d82a48d..1a15914 100644 --- a/slide2vec/main.py +++ b/slide2vec/main.py @@ -48,24 +48,26 @@ def log_progress(features_dir: Path, stop_event: threading.Event, log_interval: time.sleep(log_interval) -def run_tiling(config_file, run_id): - print("Running tiling.py...") +def run_tiling(root_dir, config_file, output_dir): + print(f"Running tiling.py from {root_dir}...") cmd = [ sys.executable, - "slide2vec/tiling.py", - "--run-id", - run_id, + "hs2p/tiling.py", "--config-file", - config_file, + os.path.abspath(config_file), + "--output-dir", + os.path.abspath(output_dir), + "--skip-datetime", + "--skip-logging", + "wandb.enable=false", # disable wandb to avoid dupliacte logging ] - proc = subprocess.Popen(cmd) - proc.wait() + proc = subprocess.run(cmd, cwd=root_dir) if proc.returncode != 0: print("Slide tiling failed. Exiting.") sys.exit(proc.returncode) -def run_feature_extraction(config_file, run_id, run_on_cpu: False): +def run_feature_extraction(config_file, output_dir, run_on_cpu: False): print("Running embed.py...") # find a free port with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: @@ -78,19 +80,19 @@ def run_feature_extraction(config_file, run_id, run_on_cpu: False): f"--master_port={free_port}", "--nproc_per_node=gpu", "slide2vec/embed.py", - "--run-id", - run_id, "--config-file", - config_file, + os.path.abspath(config_file), + "--output-dir", + os.path.abspath(output_dir), ] if run_on_cpu: cmd = [ sys.executable, "slide2vec/embed.py", - "--run-id", - run_id, "--config-file", - config_file, + os.path.abspath(config_file), + "--output-dir", + os.path.abspath(output_dir), "--run-on-cpu", ] # launch in its own process group. @@ -107,16 +109,16 @@ def run_feature_extraction(config_file, run_id, run_on_cpu: False): sys.exit(proc.returncode) -def run_feature_aggregation(config_file, run_id, run_on_cpu: False): +def run_feature_aggregation(config_file, output_dir, run_on_cpu: False): print("Running aggregate.py...") # find a free port cmd = [ sys.executable, "slide2vec/aggregate.py", - "--run-id", - run_id, "--config-file", - config_file, + os.path.abspath(config_file), + "--output-dir", + os.path.abspath(output_dir), ] if run_on_cpu: cmd.append("--run-on-cpu") @@ -137,15 +139,17 @@ def run_feature_aggregation(config_file, run_id, run_on_cpu: False): def main(args): run_on_cpu = args.run_on_cpu - cfg, cfg_path, run_id = setup(args) + cfg, cfg_path = setup(args) + output_dir = Path(cfg.output_dir) + hf_login() - run_tiling(cfg_path, run_id) + root_dir = "slide2vec/hs2p" + run_tiling(root_dir, cfg_path, output_dir) print("Tiling completed.") print("=+=" * 10) - output_dir = Path(cfg.output_dir) features_dir = output_dir / "features" if cfg.wandb.enable: stop_event = threading.Event() @@ -154,10 +158,10 @@ def main(args): ) log_thread.start() - run_feature_extraction(cfg_path, run_id, run_on_cpu) + run_feature_extraction(cfg_path, output_dir, run_on_cpu) if cfg.model.level == "slide": - run_feature_aggregation(cfg_path, run_id, run_on_cpu) + run_feature_aggregation(cfg_path, output_dir, run_on_cpu) print("Feature extraction completed.") print("=+=" * 10) else: diff --git a/slide2vec/tiling.py b/slide2vec/tiling.py deleted file mode 100644 index 5dcbdd7..0000000 --- a/slide2vec/tiling.py +++ /dev/null @@ -1,228 +0,0 @@ -import os -import tqdm -import argparse -import traceback -import numpy as np -import pandas as pd -import multiprocessing as mp -from pathlib import Path - -from slide2vec.utils import load_csv, fix_random_seeds -from slide2vec.utils.config import get_cfg_from_file -from slide2vec.wsi import extract_coordinates, save_coordinates, visualize_coordinates - - -def get_args_parser(add_help: bool = True): - parser = argparse.ArgumentParser("slide2vec", add_help=add_help) - parser.add_argument( - "--config-file", default="", metavar="FILE", help="path to config file" - ) - parser.add_argument( - "--run-id", - type=str, - default="", - help="Name of output subdirectory", - ) - return parser - - -def process_slide_wrapper(kwargs): - return process_slide(**kwargs) - - -def process_slide( - *, - wsi_path: Path, - mask_path: Path, - cfg, - mask_visualize_dir, - tile_visualize_dir, - disable_tqdm: bool = False, - num_workers: int = 4, -): - """ - Process a single slide: extract tile coordinates and visualize if needed. - """ - wsi_name = wsi_path.stem.replace(" ", "_") - try: - tissue_mask_visu_path = None - if cfg.visualize and mask_visualize_dir is not None: - tissue_mask_visu_path = Path(mask_visualize_dir, f"{wsi_name}.jpg") - coordinates, tile_level, resize_factor, tile_size_lv0 = extract_coordinates( - wsi_path=wsi_path, - mask_path=mask_path, - backend=cfg.tiling.backend, - tiling_params=cfg.tiling.params, - segment_params=cfg.tiling.seg_params, - filter_params=cfg.tiling.filter_params, - mask_visu_path=tissue_mask_visu_path, - disable_tqdm=disable_tqdm, - num_workers=num_workers, - ) - coordinates_dir = Path(cfg.output_dir, "coordinates") - coordinates_path = Path(coordinates_dir, f"{wsi_name}.npy") - save_coordinates( - coordinates=coordinates, - target_spacing=cfg.tiling.params.spacing, - tile_level=tile_level, - tile_size=cfg.tiling.params.tile_size, - resize_factor=resize_factor, - tile_size_lv0=tile_size_lv0, - save_path=coordinates_path, - ) - if cfg.visualize and tile_visualize_dir is not None: - visualize_coordinates( - wsi_path=wsi_path, - coordinates=coordinates, - tile_size_lv0=tile_size_lv0, - save_dir=tile_visualize_dir, - downsample=cfg.tiling.visu_params.downsample, - backend=cfg.tiling.backend, - ) - return str(wsi_path), {"status": "success"} - - except Exception as e: - return str(wsi_path), { - "status": "failed", - "error": str(e), - "traceback": str(traceback.format_exc()), - } - - -def main(args): - # setup configuration - cfg = get_cfg_from_file(args.config_file) - output_dir = Path(cfg.output_dir, args.run_id) - cfg.output_dir = str(output_dir) - - fix_random_seeds(cfg.seed) - - wsi_paths, mask_paths = load_csv(cfg) - - parallel_workers = min(mp.cpu_count(), cfg.speed.num_workers_tiling) - if "SLURM_JOB_CPUS_PER_NODE" in os.environ: - parallel_workers = min( - parallel_workers, int(os.environ["SLURM_JOB_CPUS_PER_NODE"]) - ) - - process_list = Path(cfg.output_dir, "process_list.csv") - if process_list.is_file() and cfg.resume: - process_df = pd.read_csv(process_list) - else: - data = { - "wsi_name": [p.stem for p in wsi_paths], - "wsi_path": [str(p) for p in wsi_paths], - "mask_path": [str(p) if p is not None else p for p in mask_paths], - "tiling_status": ["tbp"] * len(wsi_paths), - "feature_status": ["tbp"] * len(wsi_paths), - "error": [str(np.nan)] * len(wsi_paths), - "traceback": [str(np.nan)] * len(wsi_paths), - } - if getattr(cfg.model, "level", None) == "slide": - data["aggregation_status"] = ["tbp"] * len(wsi_paths) - process_df = pd.DataFrame(data) - - skip_tiling = process_df["tiling_status"].str.contains("success").all() - - if not skip_tiling: - if cfg.tiling.read_coordinates_from is not None: - coordinates_dir = Path(cfg.tiling.read_coordinates_from) - coordinates_files = list(coordinates_dir.glob("*.npy")) - for coordinates_file in coordinates_files: - wsi_name = coordinates_file.stem - if wsi_name in process_df["wsi_name"].values: - process_df.loc[ - process_df["wsi_name"] == wsi_name, "tiling_status" - ] = "success" - process_df.to_csv(process_list, index=False) - - else: - mask = process_df["tiling_status"] != "success" - process_stack = process_df[mask] - total = len(process_stack) - - wsi_paths_to_process = [ - Path(x) for x in process_stack.wsi_path.values.tolist() - ] - mask_paths_to_process = [ - Path(x) if x is not None else x - for x in process_stack.mask_path.values.tolist() - ] - - # setup directories for coordinates and visualization - coordinates_dir = Path(cfg.output_dir, "coordinates") - coordinates_dir.mkdir(exist_ok=True, parents=True) - mask_visualize_dir = None - tile_visualize_dir = None - if cfg.visualize: - visualize_dir = Path(cfg.output_dir, "visualization") - mask_visualize_dir = Path(visualize_dir, "mask") - tile_visualize_dir = Path(visualize_dir, "tiling") - mask_visualize_dir.mkdir(exist_ok=True, parents=True) - tile_visualize_dir.mkdir(exist_ok=True, parents=True) - - tiling_updates = {} - with mp.Pool(processes=parallel_workers) as pool: - args_list = [ - { - "wsi_path": wsi_fp, - "mask_path": mask_fp, - "cfg": cfg, - "mask_visualize_dir": mask_visualize_dir, - "tile_visualize_dir": tile_visualize_dir, - "disable_tqdm": True, - "num_workers": parallel_workers, - } - for wsi_fp, mask_fp in zip( - wsi_paths_to_process, mask_paths_to_process - ) - ] - results = list( - tqdm.tqdm( - pool.imap(process_slide_wrapper, args_list), - total=total, - desc="Slide tiling", - unit="slide", - leave=True, - ) - ) - for wsi_path, status_info in results: - tiling_updates[wsi_path] = status_info - - for wsi_path, status_info in tiling_updates.items(): - process_df.loc[ - process_df["wsi_path"] == wsi_path, "tiling_status" - ] = status_info["status"] - if "error" in status_info: - process_df.loc[ - process_df["wsi_path"] == wsi_path, "error" - ] = status_info["error"] - process_df.loc[ - process_df["wsi_path"] == wsi_path, "traceback" - ] = status_info["traceback"] - process_df.to_csv(process_list, index=False) - - # summary logging - total_slides = len(process_df) - slides_with_tiles = [ - str(p) - for p in wsi_paths - if Path(coordinates_dir, f"{p.stem}.npy").is_file() - ] - failed_tiling = process_df[process_df["tiling_status"] == "failed"] - no_tiles = process_df[~process_df["wsi_path"].isin(slides_with_tiles)] - print("=+=" * 10) - print(f"Total number of slides: {total_slides}") - print(f"Failed tiling: {len(failed_tiling)}") - print(f"No tiles after tiling step: {len(no_tiles)}") - print("=+=" * 10) - - else: - print("=+=" * 10) - print("All slides have been tiled. Skipping tiling step.") - print("=+=" * 10) - - -if __name__ == "__main__": - args = get_args_parser(add_help=True).parse_args() - main(args) diff --git a/slide2vec/utils/config.py b/slide2vec/utils/config.py index e61c811..f396b79 100644 --- a/slide2vec/utils/config.py +++ b/slide2vec/utils/config.py @@ -78,7 +78,7 @@ def setup(args): cfg_path = write_config(cfg, cfg.output_dir) if cfg.wandb.enable: wandb_run.save(cfg_path) - return cfg, cfg_path, run_id + return cfg, cfg_path def setup_distributed(): diff --git a/slide2vec/wsi/__init__.py b/slide2vec/wsi/__init__.py deleted file mode 100644 index d62e461..0000000 --- a/slide2vec/wsi/__init__.py +++ /dev/null @@ -1,261 +0,0 @@ -from pathlib import Path - -import cv2 -import numpy as np -from PIL import Image, ImageOps - -from .wsi import ( - FilterParameters, - SegmentationParameters, - TilingParameters, - WholeSlideImage, -) - - -def sort_coordinates_with_tissue(coordinates, tissue_percentages): - # mock region filenames - mocked_filenames = [f"{x}_{y}.jpg" for x, y in coordinates] - # combine mocked filenames with coordinates and tissue percentages - combined = list(zip(mocked_filenames, coordinates, tissue_percentages)) - # sort combined list by mocked filenames - sorted_combined = sorted(combined, key=lambda x: x[0]) - # extract sorted coordinates and tissue percentages - sorted_coordinates = [coord for _, coord, _ in sorted_combined] - sorted_tissue_percentages = [tissue for _, _, tissue in sorted_combined] - return sorted_coordinates, sorted_tissue_percentages - - -def extract_coordinates( - *, - wsi_path: Path, - mask_path: Path, - backend: str, - segment_params: SegmentationParameters, - tiling_params: TilingParameters, - filter_params: FilterParameters, - mask_visu_path: Path | None = None, - disable_tqdm: bool = False, - num_workers: int = 1, -): - wsi = WholeSlideImage( - path=wsi_path, - mask_path=mask_path, - backend=backend, - segment=True, - segment_params=segment_params, - ) - tolerance = tiling_params.tolerance - starting_spacing = wsi.spacings[0] - desired_spacing = tiling_params.spacing - if desired_spacing < starting_spacing: - relative_diff = abs(starting_spacing - desired_spacing) / desired_spacing - if relative_diff > tolerance: - raise ValueError( - f"Desired spacing ({desired_spacing}) is smaller than the whole-slide image starting spacing ({starting_spacing}) and does not fall within tolerance ({tolerance})" - ) - ( - contours, - holes, - coordinates, - tissue_percentages, - tile_level, - resize_factor, - tile_size_lv0, - ) = wsi.get_tile_coordinates(tiling_params, filter_params, disable_tqdm=disable_tqdm,num_workers=num_workers) - sorted_coordinates, _ = sort_coordinates_with_tissue( - coordinates, tissue_percentages - ) - if mask_visu_path is not None: - wsi.visualize_mask(contours, holes).save(mask_visu_path) - return ( - sorted_coordinates, - tile_level, - resize_factor, - tile_size_lv0, - ) - - -def save_coordinates( - *, - coordinates: list[tuple[int, int]], - target_spacing: float, - tile_level: int, - tile_size: int, - resize_factor: float, - tile_size_lv0: int, - save_path: Path, -): - x = [x for x, _ in coordinates] # defined w.r.t level 0 - y = [y for _, y in coordinates] # defined w.r.t level 0 - ntile = len(x) - tile_size_resized = int(round(tile_size * resize_factor,0)) - dtype = [ - ("x", int), - ("y", int), - ("tile_size_resized", int), - ("tile_level", int), - ("resize_factor", float), - ("tile_size_lv0", int), - ("target_spacing", float), - ] - data = np.zeros(ntile, dtype=dtype) - for i in range(ntile): - data[i] = ( - x[i], - y[i], - tile_size_resized, - tile_level, - resize_factor, - tile_size_lv0, - target_spacing, - ) - data_arr = np.array(data) - np.save(save_path, data_arr) - return save_path - - -def draw_grid(img, coord, shape, thickness=2, color=(0, 0, 0, 255)): - cv2.rectangle( - img, - tuple(np.maximum([0, 0], coord - thickness // 2)), - tuple(coord - thickness // 2 + np.array(shape)), - color, - thickness=thickness, - ) - return img - - -def draw_grid_from_coordinates( - canvas, - wsi, - coords, - tile_size_at_0, - vis_level: int, - thickness: int = 2, - indices: list[int] | None = None, -): - downsamples = wsi.level_downsamples[vis_level] - if indices is None: - indices = np.arange(len(coords)) - total = len(indices) - - tile_size = tuple( - np.ceil((np.array(tile_size_at_0) / np.array(downsamples))).astype(np.int32) - ) # defined w.r.t vis_level - - wsi_width_at_0, wsi_height_at_0 = wsi.level_dimensions[ - 0 - ] # retrieve slide dimension at level 0 - - for idx in range(total): - tile_id = indices[idx] - coord = coords[tile_id] - x, y = coord - vis_spacing = wsi.get_level_spacing(vis_level) - - width, height = tile_size - tile = np.ones((height, width, 3), dtype=np.uint8) * 255 # White background - - # compute valid tile area - if x + tile_size_at_0[0] > wsi_width_at_0: - valid_width_at_0 = max( - 0, wsi_width_at_0 - x - ) # how much of the tile width is inside the wsi - valid_width = int(valid_width_at_0 / downsamples[0]) - else: - valid_width = width - - if y + tile_size_at_0[1] > wsi_height_at_0: - valid_height_at_0 = max( - 0, wsi_height_at_0 - y - ) # how much of the tile height is inside the wsi - valid_height = int(valid_height_at_0 / downsamples[1]) - else: - valid_height = height - - # extract only the valid portion of the tile - if valid_width > 0 and valid_height > 0: - valid_tile = wsi.get_tile( - x, y, valid_width, valid_height, spacing=vis_spacing - ) - valid_tile = Image.fromarray(valid_tile).convert("RGB") - valid_tile = np.array(valid_tile) - # paste the valid part into the white tile - tile[:valid_height, :valid_width, :] = valid_tile - - coord = np.ceil( - tuple(coord[i] / downsamples[i] for i in range(len(coord))) - ).astype(np.int32) - canvas_crop_shape = canvas[ - coord[1] : coord[1] + tile_size[1], - coord[0] : coord[0] + tile_size[0], - :3, - ].shape[:2] - canvas[ - coord[1] : coord[1] + tile_size[1], - coord[0] : coord[0] + tile_size[0], - :3, - ] = tile[: canvas_crop_shape[0], : canvas_crop_shape[1], :] - draw_grid(canvas, coord, tile_size, thickness=thickness) - - return Image.fromarray(canvas) - - -def pad_to_patch_size(canvas: Image.Image, patch_size: tuple[int, int]) -> Image.Image: - width, height = canvas.size - # compute amount of padding required for width and height - pad_width = (patch_size[0] - (width % patch_size[0])) % patch_size[0] - pad_height = (patch_size[1] - (height % patch_size[1])) % patch_size[1] - # apply the padding to canvas - padded_canvas = ImageOps.expand( - canvas, (0, 0, pad_width, pad_height), fill=(255, 255, 255) - ) # white padding - return padded_canvas - - -def visualize_coordinates( - *, - wsi_path: Path, - coordinates: list[tuple[int, int]], - tile_size_lv0: int, - save_dir: Path, - downsample: int = 64, - backend: str = "asap", - grid_thickness: int = 1, -): - wsi = WholeSlideImage(wsi_path, backend=backend) - vis_level = wsi.get_best_level_for_downsample_custom(downsample) - vis_spacing = wsi.spacings[vis_level] - - canvas = wsi.get_slide(spacing=vis_spacing) - canvas = Image.fromarray(canvas).convert("RGB") - if len(coordinates) == 0: - return canvas - - w, h = wsi.level_dimensions[vis_level] - if w * h > Image.MAX_IMAGE_PIXELS: - raise Image.DecompressionBombError( - f"Visualization downsample ({downsample}) is too large" - ) - - tile_size_at_0 = (tile_size_lv0, tile_size_lv0) - tile_size_at_vis_level = tuple( - (np.array(tile_size_at_0) / np.array(wsi.level_downsamples[vis_level])).astype( - np.int32 - ) - ) # defined w.r.t vis_level - - canvas = pad_to_patch_size(canvas, tile_size_at_vis_level) - canvas = np.array(canvas) - canvas = draw_grid_from_coordinates( - canvas, - wsi, - coordinates, - tile_size_at_0, - vis_level, - indices=None, - thickness=grid_thickness, - ) - wsi_name = wsi_path.stem.replace(" ", "_") - visu_path = Path(save_dir, f"{wsi_name}.jpg") - canvas.save(visu_path) diff --git a/slide2vec/wsi/utils.py b/slide2vec/wsi/utils.py deleted file mode 100644 index 5ac2e6a..0000000 --- a/slide2vec/wsi/utils.py +++ /dev/null @@ -1,111 +0,0 @@ -import cv2 -import numpy as np - - -class HasEnoughTissue(object): - def __init__(self, contour, contour_holes, tissue_mask, tile_size, scale, pct=0.01): - self.cont = contour - self.holes = contour_holes - self.mask = tissue_mask // 255 - self.tile_size = tile_size - self.scale = scale - self.pct = pct - - # Precompute the combined tissue mask - self.precomputed_mask = self._precompute_tissue_mask() - - def _precompute_tissue_mask(self): - """ - Precompute a binary mask for the entire region, combining the contour and holes. - - Returns: - np.ndarray: A binary mask where tissue regions are 1 and non-tissue regions are 0. - """ - contour_mask = np.zeros_like(self.mask, dtype=np.uint8) - - # Draw white filled contour on black background - cv2.drawContours(contour_mask, [self.cont], 0, 255, -1) - - # Draw black filled holes on white filled contour - cv2.drawContours(contour_mask, self.holes, 0, 0, -1) - - # Combine with the tissue mask - return cv2.bitwise_and(self.mask, contour_mask) - - def __call__(self, pt): - """ - Check if a single tile at the given point has enough tissue. - - Args: - pt (tuple): The (x, y) coordinates of the top-left corner of the tile. - - Returns: - tuple: (keep_flag, tissue_pct), where: - - keep_flag is 1 if the tile has enough tissue, otherwise 0. - - tissue_pct is the percentage of tissue in the tile. - """ - downsampled_tile_size = int(round(self.tile_size * 1 / self.scale[0], 0)) - assert ( - downsampled_tile_size > 0 - ), "downsampled tile_size is equal to zero, aborting; please consider using a smaller seg_params.downsample parameter" - downsampled_pt = pt * 1 / self.scale[0] - x_tile, y_tile = map(int, downsampled_pt) - - # Extract the sub-mask for the tile - sub_mask = self.precomputed_mask[ - y_tile : y_tile + downsampled_tile_size, - x_tile : x_tile + downsampled_tile_size, - ] - - tile_area = downsampled_tile_size**2 - tissue_area = np.sum(sub_mask) - tissue_pct = round(tissue_area / tile_area, 3) - - if tissue_pct >= self.pct: - return 1, tissue_pct - else: - return 0, tissue_pct - - def check_coordinates(self, coords): - """ - Check multiple tile coordinates for tissue coverage in a vectorized manner. - - Args: - coords (np.ndarray): An array of shape (N, 2), where each row is (x, y). - - Returns: - tuple: (keep_flags, tissue_pcts), where: - - keep_flags is a list of 1s and 0s indicating whether each tile has enough tissue. - - tissue_pcts is a list of tissue percentages for each tile. - """ - downsampled_tile_size = int(round(self.tile_size * 1 / self.scale[0], 0)) - assert ( - downsampled_tile_size > 0 - ), "downsampled tile_size is equal to zero, aborting; please consider using a smaller seg_params.downsample parameter" - - # Downsample coordinates - downsampled_coords = coords * 1 / self.scale[0] - downsampled_coords = downsampled_coords.astype(int) - - keep_flags = [] - tissue_pcts = [] - - for x_tile, y_tile in downsampled_coords: - # Extract the sub-mask for the tile - sub_mask = self.precomputed_mask[ - y_tile : y_tile + downsampled_tile_size, - x_tile : x_tile + downsampled_tile_size, - ] - - tile_area = downsampled_tile_size**2 - tissue_area = np.sum(sub_mask) - tissue_pct = round(tissue_area / tile_area, 3) - - if tissue_pct >= self.pct: - keep_flags.append(1) - tissue_pcts.append(tissue_pct) - else: - keep_flags.append(0) - tissue_pcts.append(tissue_pct) - - return keep_flags, tissue_pcts diff --git a/slide2vec/wsi/wsi.py b/slide2vec/wsi/wsi.py deleted file mode 100644 index 83137ab..0000000 --- a/slide2vec/wsi/wsi.py +++ /dev/null @@ -1,983 +0,0 @@ -import math -import sys -import warnings -from concurrent.futures import ThreadPoolExecutor -from pathlib import Path -from typing import NamedTuple - -import cv2 -import numpy as np -import tqdm -import wholeslidedata as wsd -from PIL import Image - -from slide2vec.wsi.utils import HasEnoughTissue - -# ignore all warnings from wholeslidedata -warnings.filterwarnings("ignore", module="wholeslidedata") - -Image.MAX_IMAGE_PIXELS = 933120000 - - -class SegmentationParameters(NamedTuple): - """ - Parameters for filtering contours. - """ - - downsample: int # dowsample factor for loading segmentation mask - sthresh: int # segmentation threshold (positive integer, using a higher threshold leads to less foreground and more background detection) (not used when use_otsu=True) - sthresh_up: int # upper threshold value for scaling the binary mask - mthresh: int # median filter size (positive, odd integer) - close: int # additional morphological closing to apply following initial thresholding (positive integer) - use_otsu: bool # whether to use Otsu's method for thresholding - tissue_pixel_value: int # when loading mask from disk, what pixel value corresponds to tissue - - -class FilterParameters(NamedTuple): - """ - Parameters for filtering contours. - """ - - ref_tile_size: int # reference tile size for filtering - a_t: int # contour area threshold for filtering - a_h: int # hole area threshold for filtering - max_n_holes: int # maximum number of holes allowed - - -class TilingParameters(NamedTuple): - """ - Parameters for tiling. - """ - - spacing: float # spacing at which to tile the slide, in microns per pixel - tolerance: float # for matching the spacing, deciding how much spacing can deviate from those specified in the slide metadata. - tile_size: int # size of the tiles to extract, in pixels - overlap: float # overlap between tiles - min_tissue_percentage: float # minimum tissue percentage required for a tile - drop_holes: bool # whether to drop tiles that fall within holes - use_padding: bool # whether to use padding for tiles at the edges - - -class WholeSlideImage(object): - """ - A class for handling Whole Slide Images (wsi) and tile extraction. - Attributes: - path (Path): Full path to the wsi. - name (str): Name of the wsi (stem of the path). - fmt (str): File format of the wsi. - wsi (wsd.WholeSlideImage): wsi object. - spacing_at_level_0 (float): Manually set spacing at level 0. - spacings (list[float]): List of spacings for each level. - level_dimensions (list[tuple[int, int]]): Dimensions at each level. - level_downsamples (list[tuple[float, float]]): Downsample factors for each level. - backend (str): Backend used for opening the wsi (default: "asap"). - mask_path (Path, optional): Path to the segmentation mask. - mask (wsd.WholeSlideImage, optional): Segmentation mask object. - seg_level (int): Level for segmentation. - binary_mask (np.ndarray): Binary segmentation mask as a numpy array. - """ - - def __init__( - self, - path: Path, - # mask_path: Path | None = None, - mask_path: Path = None, - # spacing_at_level_0: float | None = None, - spacing_at_level_0: float = None, - backend: str = "asap", - segment: bool = False, - # segment_params: SegmentationParameters | None = None, - segment_params: SegmentationParameters = None, - ): - """ - Initializes a Whole Slide Image object with optional mask and spacing. - - Args: - path (Path): Path to the wsi. - mask_path (Path, optional): Path to the tissue mask, if available. Defaults to None. - spacing_at_level_0 (float, optional): Manually set spacing at level 0, if speficied. Defaults to None. - backend (str): Backend to use for opening the wsi. Defaults to "asap". - segment (bool): Whether to segment the slide if tissue mask is not provided. Defaults to False. - segment_params (NamedTuple, optional): Segmentation parameters. Used for either loading an existing mask or segmenting the slide. - """ - - self.path = path - self.name = path.stem.replace(" ", "_") - self.fmt = path.suffix - self.wsi = wsd.WholeSlideImage(path, backend=backend) - - self._scaled_contours_cache = {} # add a cache for scaled contours - self._scaled_holes_cache = {} # add a cache for scaled holes - self._level_spacing_cache = {} # add a cache for level spacings - - self.spacing_at_level_0 = spacing_at_level_0 # manually set spacing at level 0 - self.spacings = self.get_spacings() - self.level_dimensions = self.wsi.shapes - self.level_downsamples = self.get_downsamples() - self.backend = backend - - self.mask_path = mask_path - if mask_path is not None: - self.mask = wsd.WholeSlideImage(mask_path, backend=backend) - self.seg_level = self.load_segmentation(segment_params) - elif segment: - self.seg_level = self.segment_tissue(segment_params) - - def get_slide(self, spacing: float): - return self.wsi.get_slide(spacing=spacing) - - def get_tile(self, x: int, y: int, width: int, height: int, spacing: float): - """ - Extracts a tile from a whole slide image at the specified coordinates, size, and spacing. - - Args: - x (int): The x-coordinate of the top-left corner of the tile. - y (int): The y-coordinate of the top-left corner of the tile. - width (int): Tile width. - height (int): Tile height. - spacing (float): The spacing (resolution) at which the tile should be extracted. - - Returns: - numpy.ndarray: The extracted tile as a numpy array. - """ - return self.wsi.get_patch( - x, - y, - width, - height, - spacing=spacing, - center=False, - ) - - def get_downsamples(self): - """ - Calculate the downsample factors for each level of the image pyramid. - - This method computes the downsample factors for each level in the image - pyramid relative to the base level (level 0). The downsample factor for - each level is represented as a tuple of two values, corresponding to the - downsampling in the width and height dimensions. - - Returns: - list of tuple: A list of tuples where each tuple contains two float - values representing the downsample factors (width_factor, height_factor) - for each level relative to the base level. - """ - level_downsamples = [] - dim_0 = self.level_dimensions[0] - for dim in self.level_dimensions: - level_downsample = (dim_0[0] / float(dim[0]), dim_0[1] / float(dim[1])) - level_downsamples.append(level_downsample) - return level_downsamples - - def get_spacings(self): - """ - Retrieve the spacings for the whole slide image. - - If the `spacing` attribute is not set, the method returns the original spacings - from the wsi. Otherwise, it calculates adjusted spacings based on the provided - `spacing` value and the original spacings. - - Returns: - list: A list of spacings, either the original or adjusted based on the - `spacing` attribute. - """ - if self.spacing_at_level_0 is None: - spacings = self.wsi.spacings - else: - spacings = [ - self.spacing_at_level_0 * s / self.wsi.spacings[0] - for s in self.wsi.spacings - ] - return spacings - - def get_level_spacing(self, level: int): - """ - Retrieve the spacing value for a specified level. - - Args: - level (int): Level for which to retrieve the spacing. - - Returns: - float: Spacing value corresponding to the specified level. - """ - if level not in self._level_spacing_cache: - self._level_spacing_cache[level] = self.spacings[level] - return self._level_spacing_cache[level] - - def get_best_level_for_spacing( - self, target_spacing: float, tolerance: float - ): - """ - Determines the best level in a multi-resolution image pyramid for a given target spacing. - - Ensures that the spacing of the returned level is either within the specified tolerance of the target - spacing or smaller than the target spacing to avoid upsampling. - - Args: - target_spacing (float): Desired spacing. - tolerance (float, optional): Tolerance for matching the spacing, deciding how much - spacing can deviate from those specified in the slide metadata. - - Returns: - level (int): Index of the best matching level in the image pyramid. - """ - spacing = self.get_level_spacing(0) - target_downsample = target_spacing / spacing - level = self.get_best_level_for_downsample_custom(target_downsample) - level_spacing = self.get_level_spacing(level) - - # check if the level_spacing is within the tolerance of the target_spacing - is_within_tolerance = False - if abs(level_spacing - target_spacing) / target_spacing <= tolerance: - is_within_tolerance = True - return level, is_within_tolerance - - # otherwise, look for a spacing smaller than or equal to the target_spacing - else: - while level > 0 and level_spacing > target_spacing: - level -= 1 - level_spacing = self.get_level_spacing(level) - if abs(level_spacing - target_spacing) / target_spacing <= tolerance: - is_within_tolerance = True - break - - assert ( - level_spacing <= target_spacing - or abs(level_spacing - target_spacing) / target_spacing <= tolerance - ), f"Unable to find a spacing less than or equal to the target spacing ({target_spacing}) or within {int(tolerance * 100)}% of the target spacing." - return level, is_within_tolerance - - # def get_best_level_for_downsample_custom(self, downsample: float | int): - def get_best_level_for_downsample_custom(self, downsample: int): - """ - Determines the best level for a given downsample factor based on the available - level downsample values. - - Args: - downsample (float): Target downsample factor. - - Returns: - int: Index of the best matching level for the given downsample factor. - """ - level = int(np.argmin([abs(x - downsample) for x, _ in self.level_downsamples])) - return level - - def load_segmentation( - self, - segment_params: SegmentationParameters, - ): - """ - Load and process a segmentation mask for a whole slide image. - - This method ensures that the segmentation mask and the slide have at least one - common spacing, determines the best level for the given downsample factor, and - processes the segmentation mask to create a binary mask. - - Args: - downsample (int): Downsample factor for finding best level for tissue segmentation. - sthresh_up (int, optional): Upper threshold value for scaling the binary - mask. Defaults to 255. - tissue_pixel_value (int, optional): Pixel value in the segmentation mask that - represents tissue. Defaults to 1. - - Returns: - int: Level at which the tissue mask was loaded. - """ - mask_spacing_at_level_0 = self.mask.spacings[0] - seg_level = self.get_best_level_for_downsample_custom(segment_params.downsample) - seg_spacing = self.get_level_spacing(seg_level) - - mask_downsample = seg_spacing / mask_spacing_at_level_0 - mask_level = int( - np.argmin([abs(x - mask_downsample) for x in self.mask.downsamplings]) - ) - mask_spacing = self.mask.spacings[mask_level] - - scale = seg_spacing / mask_spacing - while scale < 1 and mask_level > 0: - mask_level -= 1 - mask_spacing = self.mask.spacings[mask_level] - scale = seg_spacing / mask_spacing - - mask = self.mask.get_slide(spacing=mask_spacing) - width, height, _ = mask.shape - - # resize the mask to the size of the slide at seg_spacing - mask = cv2.resize( - mask.astype(np.uint8), - (int(round(height / scale, 0)), int(round(width / scale, 0))), - interpolation=cv2.INTER_NEAREST, - ) - - m = (mask == segment_params.tissue_pixel_value).astype("uint8") - if np.max(m) <= 1: - m = m * segment_params.sthresh_up - - self.binary_mask = m - return seg_level - - def segment_tissue( - self, - segment_params: SegmentationParameters, - ): - """ - Segment the tissue via HSV -> Median thresholding -> Binary thresholding -> Morphological closing. - - Args: - downsample (int): Downsample factor for finding best level for tissue segmentation. - sthresh (int, optional): Lower threshold for binary thresholding. Defaults to 20. - sthresh_up (int, optional): Upper threshold for binary thresholding. Defaults to 255. - mthresh (int, optional): Kernel size for median blurring. Defaults to 7. - close (int, optional): Size of the kernel for morphological closing. - If 0, no morphological closing is applied. Defaults to 0. - use_otsu (bool, optional): Whether to use Otsu's method for thresholding. Defaults to False. - - Returns: - int: Level at which the tissue mask was created. - """ - - seg_level = self.get_best_level_for_downsample_custom(segment_params.downsample) - seg_spacing = self.get_level_spacing(seg_level) - - img = self.wsi.get_slide(spacing=seg_spacing) - img = np.array(Image.fromarray(img).convert("RGBA")) - img_hsv = cv2.cvtColor(img, cv2.COLOR_RGB2HSV) # convert to HSV space - img_med = cv2.medianBlur( - img_hsv[:, :, 1], segment_params.mthresh - ) # apply median blurring - - # thresholding - if segment_params.use_otsu: - _, img_thresh = cv2.threshold( - img_med, - 0, - segment_params.sthresh_up, - cv2.THRESH_OTSU + cv2.THRESH_BINARY, - ) - else: - _, img_thresh = cv2.threshold( - img_med, - segment_params.sthresh, - segment_params.sthresh_up, - cv2.THRESH_BINARY, - ) - - # morphological closing - if segment_params.close > 0: - kernel = np.ones((segment_params.close, segment_params.close), np.uint8) - img_thresh = cv2.morphologyEx(img_thresh, cv2.MORPH_CLOSE, kernel) - - self.binary_mask = img_thresh - return seg_level - - def visualize_mask( - self, - contours, - holes, - downsample: int = 32, - color: tuple[int] = (0, 255, 0), - hole_color: tuple[int] = (0, 0, 255), - line_thickness: int = 250, - # max_size: int | None = None, - max_size: int = None, - number_contours: bool = False, - ): - vis_level = self.get_best_level_for_downsample_custom(downsample) - level_downsample = self.level_downsamples[vis_level] - scale = [1 / level_downsample[0], 1 / level_downsample[1]] - - s = self.spacings[vis_level] - img = self.wsi.get_slide(spacing=s) - if self.backend == "openslide": - img = np.ascontiguousarray(img) - - offset = tuple(-(np.array((0, 0)) * scale).astype(int)) - line_thickness = int(line_thickness * math.sqrt(scale[0] * scale[1])) - if contours is not None: - if not number_contours: - cv2.drawContours( - img, - self.scaleContourDim(contours, scale), - -1, - color, - line_thickness, - lineType=cv2.LINE_8, - offset=offset, - ) - - else: - # add numbering to each contour - for idx, cont in enumerate(contours): - contour = np.array(self.scaleContourDim(cont, scale)) - M = cv2.moments(contour) - cX = int(M["m10"] / (M["m00"] + 1e-9)) - cY = int(M["m01"] / (M["m00"] + 1e-9)) - # draw the contour and put text next to center - cv2.drawContours( - img, - [contour], - -1, - color, - line_thickness, - lineType=cv2.LINE_8, - offset=offset, - ) - cv2.putText( - img, - "{}".format(idx), - (cX, cY), - cv2.FONT_HERSHEY_SIMPLEX, - 2, - (255, 0, 0), - 10, - ) - - for holes in holes: - cv2.drawContours( - img, - self.scaleContourDim(holes, scale), - -1, - hole_color, - line_thickness, - lineType=cv2.LINE_8, - ) - - img = Image.fromarray(img) - - w, h = img.size - if max_size is not None and (w > max_size or h > max_size): - resizeFactor = max_size / w if w > h else max_size / h - img = img.resize((int(w * resizeFactor), int(h * resizeFactor))) - - return img - - def get_tile_coordinates( - self, - tiling_params: TilingParameters, - filter_params: FilterParameters, - disable_tqdm: bool = False, - num_workers: int = 1, - ): - """ - Extract tile coordinates based on the specified target spacing, tile size, overlap, - and additional tiling and filtering parameters. - - Args: - tiling_params (NamedTuple): Parameters for tiling, including: - - spacing (float): Desired spacing of the tiles. - - tolerance (float): Tolerance for matching the spacing, deciding how much - spacing can deviate from those specified in the slide metadata. - - tile_size (int): Desired size of the tiles at the target spacing. - - overlap (float, optional): Overlap between adjacent tiles. Defaults to 0.0. - - "drop_holes" (bool): If True, tiles falling within a hole will be excluded. Defaults to False. - - "min_tissue_percentage" (float): Minimum amount pixels covered with tissue required for a tile. Defaults to 0.25 (25 percent). - - "use_padding" (bool): Whether to use padding for tiles at the edges. Defaults to True. - filter_params (NamedTuple): Parameters for filtering contours, including: - - "ref_tile_size" (int): Reference tile size for filtering. Defaults to 256. - - "a_t" (int): Contour area threshold for filtering. Defaults to 4. - - "a_h" (int): Hole area threshold for filtering. Defaults to 2. - - "max_n_holes" (int): Maximum number of holes allowed. Defaults to 8. - num_workers (int, optional): Number of workers to use for parallel processing. - Defaults to 1. - - Returns: - tuple: - - tile_coordinates (list[tuple[int, int]]): List of (x, y) coordinates for the extracted tiles. - - tissue_percentages (list[float]): List of tissue percentages for each tile. - - tile_level (int): Level of the wsi used for tile extraction. - - resize_factor (float): The factor by which the tile size was resized. - - tile_size_lv0 (int): The tile size at level 0 of the wsi pyramid. - """ - scale = tiling_params.spacing / self.get_level_spacing(0) - tile_size_lv0 = int(round(tiling_params.tile_size * scale, 0)) - - contours, holes = self.detect_contours( - target_spacing=tiling_params.spacing, - tolerance=tiling_params.tolerance, - filter_params=filter_params - ) - ( - running_x_coords, - running_y_coords, - tissue_percentages, - tile_level, - resize_factor, - ) = self.process_contours( - contours, - holes, - spacing=tiling_params.spacing, - tolerance=tiling_params.tolerance, - tile_size=tiling_params.tile_size, - overlap=tiling_params.overlap, - drop_holes=tiling_params.drop_holes, - min_tissue_percentage=tiling_params.min_tissue_percentage, - use_padding=tiling_params.use_padding, - disable_tqdm=disable_tqdm, - num_workers=num_workers, - ) - tile_coordinates = list(zip(running_x_coords, running_y_coords)) - return ( - contours, - holes, - tile_coordinates, - tissue_percentages, - tile_level, - resize_factor, - tile_size_lv0, - ) - - @staticmethod - def filter_contours(contours, hierarchy, filter_params: FilterParameters): - """ - Filter contours by area using FilterParameters. - """ - filtered = [] - - # find indices of foreground contours (parent == -1) - hierarchy_1 = np.flatnonzero(hierarchy[:, 1] == -1) - all_holes = [] - - # loop through foreground contour indices - for cont_idx in hierarchy_1: - # actual contour - cont = contours[cont_idx] - # indices of holes contained in this contour (children of parent contour) - holes = np.flatnonzero(hierarchy[:, 1] == cont_idx) - # take contour area (includes holes) - a = cv2.contourArea(cont) - # calculate the contour area of each hole - hole_areas = [cv2.contourArea(contours[hole_idx]) for hole_idx in holes] - # actual area of foreground contour region - a = a - np.array(hole_areas).sum() - if a == 0: - continue - if a > filter_params.a_t: # Use named tuple instead of dictionary - filtered.append(cont_idx) - all_holes.append(holes) - - foreground_contours = [contours[cont_idx] for cont_idx in filtered] - - hole_contours = [] - for hole_ids in all_holes: - unfiltered_holes = [contours[idx] for idx in hole_ids] - unfilered_holes = sorted( - unfiltered_holes, key=cv2.contourArea, reverse=True - ) - # take max_n_holes largest holes by area - unfilered_holes = unfilered_holes[ - : filter_params.max_n_holes - ] # Use named tuple - filtered_holes = [] - - # filter these holes - for hole in unfilered_holes: - if cv2.contourArea(hole) > filter_params.a_h: # Use named tuple - filtered_holes.append(hole) - - hole_contours.append(filtered_holes) - - return foreground_contours, hole_contours - - def detect_contours( - self, - target_spacing: float, - tolerance: float, - filter_params: FilterParameters, - ): - """ - Detect and filter contours from a binary mask based on specified parameters. - - This method identifies contours in a binary mask, filters them based on area - thresholds, and scales the contours to a specified target resolution. - - Args: - target_spacing (float): Desired spacing at which tiles should be extracted. - tolerance (float): Tolerance for matching the spacing, deciding how much - spacing can deviate from those specified in the slide metadata. - filter_params (NamedTuple): A NamedTuple containing filtering parameters: - - "a_t" (int): Minimum area threshold for foreground contours. - - "a_h" (int): Minimum area threshold for holes within contours. - - "max_n_holes" (int): Maximum number of holes to retain per contour. - - "ref_tile_size" (int): Reference tile size for computing areas. - - Returns: - tuple[list[np.ndarray], list[list[np.ndarray]]]: - - A list of scaled foreground contours. - - A list of lists containing scaled hole contours for each foreground contour. - """ - - spacing_level, _ = self.get_best_level_for_spacing(target_spacing, tolerance) - current_scale = self.level_downsamples[spacing_level] - target_scale = self.level_downsamples[self.seg_level] - scale = tuple(a / b for a, b in zip(target_scale, current_scale)) - ref_tile_size = filter_params.ref_tile_size - scaled_ref_tile_area = int(round(ref_tile_size**2 / (scale[0] * scale[1]),0)) - - adjusted_filter_params = FilterParameters( - ref_tile_size=filter_params.ref_tile_size, - a_t=filter_params.a_t * scaled_ref_tile_area, - a_h=filter_params.a_h * scaled_ref_tile_area, - max_n_holes=filter_params.max_n_holes, - ) - - # find and filter contours - contours, hierarchy = cv2.findContours( - self.binary_mask, cv2.RETR_CCOMP, cv2.CHAIN_APPROX_NONE - ) - hierarchy = np.squeeze(hierarchy, axis=(0,))[:, 2:] - - foreground_contours, hole_contours = self.filter_contours( - contours, hierarchy, adjusted_filter_params - ) - - # scale detected contours to level 0 - contours = self.scaleContourDim(foreground_contours, target_scale) - holes = self.scaleHolesDim(hole_contours, target_scale) - return contours, holes - - @staticmethod - def isInHoles(holes, pt, tile_size): - """ - Check if a given tile is inside any of the specified polygonal holes. - - Args: - holes (list): A list of polygonal contours, where each contour is represented - as a list of points (e.g., from OpenCV's findContours function). - pt (tuple): The (x, y) coordinates of the top-left corner of the tile to check. - tile_size (int or float): The size of the tile, used to calculate the center - of the point being tested. - - Returns: - int: Returns 1 if the point is inside any of the holes, otherwise returns 0. - """ - for hole in holes: - if ( - cv2.pointPolygonTest( - hole, (pt[0] + tile_size / 2, pt[1] + tile_size / 2), False - ) - > 0 - ): - return 1 - - return 0 - - @staticmethod - def isInContours(cont_check_fn, pt, holes=None, drop_holes=True, tile_size=256): - """ - Determines whether a given tile is within contours (and optionally outside of holes). - - Args: - cont_check_fn (callable): A function that checks if a tile is within contours. - It should accept a (x,y) coordinates as input and return a tuple (keep_flag, tissue_pct), - where `keep_flag` is a boolean indicating if the tile is within contours, - and `tissue_pct` is the percentage of tissue coverage of the tile. - pt (tuple): The (x, y) coordinates of the top-left corner of the tile to check. - holes (list, optional): A list of holes (e.g., regions to exclude) to check against. - Defaults to None. - drop_holes (bool, optional): If True, tiles falling within a hole will be excluded. - Defaults to True. - tile_size (int, optional): The size of the tile to consider. - Defaults to 256. - - Returns: - tuple: A tuple (keep_flag, tissue_pct), where: - - `keep_flag` is 1 if the tile is within contours and not in holes (if applicable), - otherwise 0. - - `tissue_pct` is the percentage of tissue coverage of the tile. - """ - keep_flag, tissue_pct = cont_check_fn(pt) - if keep_flag: - if holes is not None and drop_holes: - return not WholeSlideImage.isInHoles(holes, pt, tile_size), tissue_pct - else: - return 1, tissue_pct - return 0, tissue_pct - - @staticmethod - def scaleContourDim(contours, scale): - """ - Scales the dimensions of a list of contours by a given factor. - - Args: - contours (list of numpy.ndarray): A list of contours, where each contour is - represented as a numpy array of coordinates. - scale (float): The scaling factor to apply to the contours. - - Returns: - list of numpy.ndarray: A list of scaled contours, where each contour's - coordinates are multiplied by the scaling factor and converted to integers. - """ - return [np.array(cont * scale, dtype="int32") for cont in contours] - - @staticmethod - def scaleHolesDim(contours, scale): - """ - Scales the dimensions of holes within a set of contours by a given factor. - - Args: - contours (list of list of numpy.ndarray): A list of contours, where each contour - is represented as a list of holes, and each hole is a numpy array of coordinates. - scale (float): The scaling factor to apply to the dimensions of the holes. - - Returns: - list of list of numpy.ndarray: A new list of contours with the dimensions of - the holes scaled by the specified factor. - """ - return [ - [np.array(hole * scale, dtype="int32") for hole in holes] - for holes in contours - ] - - def process_contours( - self, - contours, - holes, - spacing: float, - tolerance: float, - tile_size: int, - overlap: float, - drop_holes: bool, - min_tissue_percentage: float, - use_padding: bool, - disable_tqdm: bool = False, - num_workers: int = 1, - ): - """ - Processes a list of contours and their corresponding holes to generate tile coordinates, - tissue percentages, and other metadata. - - Args: - contours (list): List of contours representing tissue blobs in the wsi. - holes (list): List of tissue holes in each contour. - spacing (float): Desired spacing for tiling. - tolerance (float): Tolerance for matching the spacing, deciding how much - spacing can deviate from those specified in the slide metadata. - tile_size (int): Desired tile size in pixels. - overlap (float): Overlap between adjacent tiles. - drop_holes (bool): Whether to drop tiles that fall within holes. - min_tissue_percentage (float): Minimum amount pixels covered with tissue required for a tile. - use_padding (bool): Whether to pad the tiles to ensure full coverage. - num_workers (int, optional): Number of workers to use for parallel processing. Defaults to 1. - - Returns: - tuple: A tuple containing: - - running_x_coords (list): The x-coordinates of the extracted tiles. - - running_y_coords (list): The y-coordinates of the extracted tiles. - - running_tissue_pct (list): List of tissue percentages for each extracted tile. - - tile_level (int): Level of the wsi used for tile extraction. - - resize_factor (float): The factor by which the tile size was resized. - """ - running_x_coords, running_y_coords = [], [] - running_tissue_pct = [] - tile_level = None - resize_factor = None - - def process_single_contour(i): - return self.process_contour( - contours[i], - holes[i], - spacing, - tolerance, - tile_size, - overlap, - drop_holes, - min_tissue_percentage, - use_padding, - ) - - with ThreadPoolExecutor(max_workers=num_workers) as executor: - results = list( - tqdm.tqdm( - executor.map(process_single_contour, range(len(contours))), - desc="Extracting tissue tiles", - unit=" tissue blob", - total=len(contours), - leave=True, - disable=disable_tqdm, - file=sys.stdout, - ) - ) - - for ( - x_coords, - y_coords, - tissue_pct, - cont_tile_level, - cont_resize_factor, - ) in results: - if len(x_coords) > 0: - if tile_level is not None: - assert ( - tile_level == cont_tile_level - ), "Tile level should be the same for all contours" - tile_level = cont_tile_level - resize_factor = cont_resize_factor - running_x_coords.extend(x_coords) - running_y_coords.extend(y_coords) - running_tissue_pct.extend(tissue_pct) - - return ( - running_x_coords, - running_y_coords, - running_tissue_pct, - tile_level, - resize_factor, - ) - - def process_contour( - self, - contour, - contour_holes, - spacing: float, - tolerance: float, - tile_size: int, - overlap: float, - drop_holes: bool, - min_tissue_percentage: float, - use_padding: bool, - ): - """ - Processes a contour to generate tile coordinates and associated metadata. - - Args: - contour (numpy.ndarray): Contour to process, defined as a set of points. - contour_holes (list): List of holes within the contour. - spacing (float): Target spacing for the tiles. - tolerance (float): Tolerance for matching the spacing, deciding how much - spacing can deviate from those specified in the slide metadata. - tile_size (int): Size of the tiles in pixels. - overlap (float): Overlap between tiles. - drop_holes (bool): Whether to drop tiles that fall within holes. - min_tissue_percentage (float): Minimum amount pixels covered with tissue required for a tile. - use_padding (bool): Whether to pad the image to ensure full coverage. - - Returns: - tuple: A tuple containing: - - x_coords (list): List of x-coordinates for each tile. - - y_coords (list): List of y-coordinates for each tile. - - filtered_tissue_percentages (list): List of tissue percentages for each tile. - - tile_level (int): Level of the image used for tile extraction. - - resize_factor (float): The factor by which the tile size was resized. - """ - tile_level, is_within_tolerance = self.get_best_level_for_spacing( - spacing, tolerance - ) - tile_spacing = self.get_level_spacing(tile_level) - resize_factor = spacing / tile_spacing - if is_within_tolerance: - resize_factor = 1.0 - - assert ( - resize_factor >= 1 - ), f"Resize factor should be greater than or equal to 1. Got {resize_factor}" - - tile_size_resized = int(round(tile_size * resize_factor,0)) - step_size = int(tile_size_resized * (1.0 - overlap)) - - if contour is not None: - start_x, start_y, w, h = cv2.boundingRect(contour) - else: - start_x, start_y, w, h = ( - 0, - 0, - self.level_dimensions[tile_level][0], - self.level_dimensions[tile_level][1], - ) - - tile_downsample = ( - int(self.level_downsamples[tile_level][0]), - int(self.level_downsamples[tile_level][1]), - ) - ref_tile_size = ( - tile_size_resized * tile_downsample[0], - tile_size_resized * tile_downsample[1], - ) - - img_w, img_h = self.level_dimensions[0] - if use_padding: - stop_y = int(start_y + h) - stop_x = int(start_x + w) - else: - stop_y = min(start_y + h, img_h - ref_tile_size[1] + 1) - stop_x = min(start_x + w, img_w - ref_tile_size[0] + 1) - - scale = self.level_downsamples[self.seg_level] - cont = self.scaleContourDim([contour], (1.0 / scale[0], 1.0 / scale[1]))[0] - - tissue_checker = HasEnoughTissue( - contour=cont, - contour_holes=contour_holes, - tissue_mask=self.binary_mask, - tile_size=ref_tile_size[0], - scale=scale, - pct=min_tissue_percentage, - ) - - ref_step_size_x = int(round(step_size * tile_downsample[0], 0)) - ref_step_size_y = int(round(step_size * tile_downsample[1], 0)) - - x_range = np.arange(start_x, stop_x, step=ref_step_size_x) - y_range = np.arange(start_y, stop_y, step=ref_step_size_y) - x_coords, y_coords = np.meshgrid(x_range, y_range, indexing="ij") - coord_candidates = np.array( - [x_coords.flatten(), y_coords.flatten()] - ).transpose() - - # vectorized processing of coordinates using the tissue_checker - keep_flags, tissue_pcts = tissue_checker.check_coordinates(coord_candidates) - - if drop_holes: - keep_flags = [ - flag and not self.isInHoles(contour_holes, coord, ref_tile_size[0]) - for flag, coord in zip(keep_flags, coord_candidates) - ] - - filtered_coordinates = coord_candidates[np.array(keep_flags) == 1] - filtered_tissue_percentages = np.array(tissue_pcts)[np.array(keep_flags) == 1] - - ntile = len(filtered_coordinates) - - if ntile > 0: - x_coords = list(filtered_coordinates[:, 0]) - y_coords = list(filtered_coordinates[:, 1]) - return ( - x_coords, - y_coords, - filtered_tissue_percentages, - tile_level, - resize_factor, - ) - - else: - return [], [], [], None, None - - @staticmethod - def process_coord_candidate( - coord, contour_holes, tile_size, cont_check_fn, drop_holes - ): - """ - Processes a candidate coordinate to determine if it should be kept based on - its location relative to contours and the percentage of tissue it contains. - - Args: - coord (tuple): (x, y) coordinate to be processed. - contour_holes (list): A list of contours and holes to check against. - tile_size (int): Size of the tile to consider. - cont_check_fn (callable): A function to check if the coordinate is within - the contours or holes. - drop_holes (bool): A flag indicating whether to drop tiles falling in holes during the check. - - Returns: - tuple: A tuple containing: - - coord (tuple or None): Input coordinate if it passes the check, - otherwise None. - - tissue_pct (float): Percentage of tissue in the tile. - """ - keep_flag, tissue_pct = WholeSlideImage.isInContours( - cont_check_fn, coord, contour_holes, drop_holes, tile_size - ) - if keep_flag: - return coord, tissue_pct - else: - return None, tissue_pct