diff --git a/docker/Dockerfile b/docker/Dockerfile index e38a48a..df316d6 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -21,6 +21,8 @@ FROM python:${PYTHON_VERSION} # build config ARG HTCONDOR_VERSION=9.0 +ARG DISABLE_NODEJS= + # switch to root to do root-level config USER root @@ -39,9 +41,9 @@ ENV USER=mapper \ PATH="/home/mapper/.local/bin:${PATH}" \ PYTHONPATH="/home/mapper/htmap:${PYTHONPATH}" RUN : \ - && groupadd ${USER} \ - && useradd -m -g ${USER} ${USER} \ - && : + && groupadd ${USER} \ + && useradd -m -g ${USER} ${USER} \ + && : # switch to the user, don't need root anymore USER ${USER} @@ -66,6 +68,9 @@ RUN : \ requirement="htcondor~=${htcondor_version_major}.0.0"; \ # ^^ gets translated into e.g. >=9.0.0,<9.1 \ fi \ - && python -m pip install --user --no-cache-dir --disable-pip-version-check "/home/${USER}/htmap[tests,docs]" "$requirement" + && python -m pip install --user --no-cache-dir --disable-pip-version-check "/home/${USER}/htmap[tests,docs,widgets]" "$requirement" \ + && jupyter nbextension enable --py widgetsnbextension \ + && [ "X${DISABLE_NODEJS}" != X ] || jupyter labextension install --minimize=False @jupyter-widgets/jupyterlab-manager \ + && : WORKDIR /home/${USER}/htmap diff --git a/docker/install-htcondor.sh b/docker/install-htcondor.sh index c6801d7..071632c 100755 --- a/docker/install-htcondor.sh +++ b/docker/install-htcondor.sh @@ -13,6 +13,9 @@ export DEBIAN_FRONTEND=noninteractive apt-get update apt-get -y install --no-install-recommends vim less git gnupg wget ca-certificates locales graphviz pandoc strace +if [[ ! $DISABLE_NODEJS ]]; then + apt-get -y install --no-install-recommends nodejs npm +fi echo "en_US.UTF-8 UTF-8" > /etc/locale.gen locale-gen wget -qO - "https://research.cs.wisc.edu/htcondor/repo/keys/HTCondor-${HTCONDOR_VERSION}-Key" | apt-key add - diff --git a/dr b/dr index d2af731..0779865 100755 --- a/dr +++ b/dr @@ -6,4 +6,10 @@ set -e docker build -t ${CONTAINER_TAG} --file docker/Dockerfile . -docker run -it --rm --mount type=bind,src="$PWD",dst=/home/mapper/htmap -p 8000:8000 ${CONTAINER_TAG} $@ +docker run \ + -it --rm \ + --mount type=bind,src="$PWD",dst=/home/mapper/htmap \ + -p 8000:8000 \ + -p 8888:8888 \ + ${CONTAINER_TAG} \ + $@ diff --git a/htmap/mapping.py b/htmap/mapping.py index 8d62d48..7bb5505 100644 --- a/htmap/mapping.py +++ b/htmap/mapping.py @@ -309,7 +309,7 @@ def create_map( tags.tag_file_path(tag).write_text(str(uid)) - m = maps.Map(tag=tag, map_dir=map_dir,) + m = maps.Map(tag=tag, map_dir=map_dir) if transient: m._make_transient() diff --git a/htmap/maps.py b/htmap/maps.py index 6c3b723..8f0b07d 100644 --- a/htmap/maps.py +++ b/htmap/maps.py @@ -20,6 +20,7 @@ import inspect import logging import shutil +import threading import time import weakref from copy import copy @@ -70,6 +71,32 @@ def maps_by_tag() -> Dict[str, "Map"]: return {m.tag: m for m in MAPS} +def update_widgets(): + while True: + for map in MAPS.copy(): + try: + _, update = map._widget() + + if update is not None: + update() + except: + logger.exception("Widget update thread encountered error!") + + time.sleep(settings["WAIT_TIME"]) + + +WIDGET_UPDATE_THREAD = threading.Thread(target=update_widgets, daemon=True) + + +def start_widget_update_thread(): + try: + if not WIDGET_UPDATE_THREAD.is_alive(): + WIDGET_UPDATE_THREAD.start() + except RuntimeError: + # Someone else started the thread before we did, no worries + pass + + @_protect_map_after_remove class Map(collections.abc.Sequence): """ @@ -105,6 +132,8 @@ def __init__( self._stderr: MapStdErr = MapStdErr(self) self._output_files: MapOutputFiles = MapOutputFiles(self) + self._cached_widget = (None, None) + MAPS.add(self) @property @@ -140,10 +169,121 @@ def load(cls, tag: str) -> "Map": logger.debug(f"Loaded map {tag} from {map_dir}") - return cls(tag=tag, map_dir=map_dir,) + return cls(tag=tag, map_dir=map_dir) def __repr__(self): - return f"{self.__class__.__name__}(tag = {self.tag})" + return f"{self.__class__.__name__}(tag={self.tag})" + + def status(self): + """Display a string containing the number of jobs in each status.""" + counts = collections.Counter(self.component_statuses) + stat = " | ".join( + f"{str(js)} = {counts[js]}" for js in state.ComponentStatus.display_statuses() + ) + plain = f"{self.__class__.__name__} {self.tag} ({len(self)} components): {stat}" + + if not utils.is_jupyter(): + print(plain) + return + + from IPython.display import display + + widget, _ = self._widget() + + if widget is not None: + display(widget) + return + + data = {"text/plain": plain, "text/html": self._repr_html_()} + + display(data, raw=True) + + def _ipython_display_(self, **kwargs): + self.status() + + def _widget(self): + try: + from ipywidgets import Layout, VBox, widgets + except ImportError: + return self._cached_widget + + if self._cached_widget != (None, None): + return self._cached_widget + + table = widgets.HTML(value=self._repr_html_(), layout=Layout(min_width="150px")) + + pbar = widgets.IntProgress( + value=0, min=0, max=len(self), orientation="horizontal", layout=Layout(width="90%"), + ) + widget = VBox([table, pbar]) + + def update(): + table.value = self._repr_html_() + pbar.value = len(self.components_by_status().get(state.ComponentStatus.COMPLETED, [])) + + update() + + self._cached_widget = widget, update + + start_widget_update_thread() + + return self._cached_widget + + def _repr_html_(self): + return self._html_table() + + def _html_table(self): + table = [ + # Hacked together by looking at the classes of the parent div in + # the version formatted by Jupyter... probably not very stable. + '
', + '', + " ", + f" {self._html_table_header()}", + " ", + " ", + f" {self._html_table_body()}", + " ", + "
", + "
", + ] + + return "\n".join(table) + + @staticmethod + def _html_table_header(): + return " TAG " + "".join( + f" {h} " + for h in [ + *state.ComponentStatus.display_statuses(), + "Local Data", + "Max Memory", + "Max Runtime", + "Total Runtime", + ] + ) + + def _html_table_body(self): + sc = collections.Counter(self.component_statuses) + + local_data = utils.num_bytes_to_str(self.local_data) + max_memory = utils.num_bytes_to_str(max(self.memory_usage) * 1024 * 1024) + max_runtime = str(max(self.runtime)) + total_runtime = str(sum(self.runtime, datetime.timedelta())) + + return f' {self.tag} ' + "".join( + f' {h} ' + for h in [ + *[ + sc[component_state] + for component_state in state.ComponentStatus.display_statuses() + ], + local_data, + max_memory, + max_runtime, + total_runtime, + ] + ) def __gt__(self, other): return self.tag > other.tag @@ -222,7 +362,7 @@ def is_active(self) -> bool: def wait( self, timeout: utils.Timeout = None, - show_progress_bar: bool = False, + show_progress_bar: Optional[bool] = None, holds_ok: bool = False, errors_ok: bool = False, ) -> None: @@ -240,6 +380,8 @@ def wait( If ``None``, wait forever. show_progress_bar If ``True``, a progress bar will be displayed. + If ``None`` (the default), a progress bar will be displayed if you + are running Python interactively (e.g., in a REPL or Jupyter session). holds_ok If ``True``, will not raise exceptions if components are held. errors_ok @@ -248,11 +390,22 @@ def wait( start_time = time.time() timeout = utils.timeout_to_seconds(timeout) + if show_progress_bar is None and utils.is_interactive_session(): + show_progress_bar = True + try: + pbar = None if show_progress_bar: - pbar = tqdm(desc=self.tag, total=len(self), unit="component", ascii=True,) + # TODO: what if no widget + widget, update = self._widget() + if utils.is_jupyter() and widget is not None: + from IPython.display import display + + display(widget) + else: + pbar = tqdm(desc=self.tag, total=len(self), unit="component", ascii=True,) - previous_pbar_len = 0 + previous_pbar_len = 0 ok_statuses = {state.ComponentStatus.COMPLETED} if holds_ok: @@ -262,10 +415,15 @@ def wait( while True: num_incomplete = sum(cs not in ok_statuses for cs in self.component_statuses) + if show_progress_bar: - pbar_len = self._num_components - num_incomplete - pbar.update(pbar_len - previous_pbar_len) - previous_pbar_len = pbar_len + if pbar: + pbar_len = self._num_components - num_incomplete + pbar.update(pbar_len - previous_pbar_len) + previous_pbar_len = pbar_len + else: + update() + if num_incomplete == 0: break @@ -284,7 +442,7 @@ def wait( time.sleep(settings["WAIT_TIME"]) finally: - if show_progress_bar: + if show_progress_bar and pbar: pbar.close() def _wait_for_component(self, component: int, timeout: utils.Timeout = None) -> None: @@ -603,16 +761,6 @@ def job(x): status: tuple(sorted(components)) for status, components in status_to_components.items() } - def status(self) -> str: - """Return a string containing the number of jobs in each status.""" - counts = collections.Counter(self.component_statuses) - stat = " | ".join( - f"{str(js)} = {counts[js]}" for js in state.ComponentStatus.display_statuses() - ) - msg = f"{self.__class__.__name__} {self.tag} ({len(self)} components): {stat}" - - return utils.rstr(msg) - @property def holds(self) -> Dict[int, holds.ComponentHold]: """ @@ -922,8 +1070,18 @@ def _submit(self, components: Optional[Iterable[int]] = None) -> None: # if we fail to write the cluster id for any reason, abort the submit try: htio.append_cluster_id(self._map_dir, new_cluster_id) - except BaseException as e: - condor.get_schedd().act(htcondor.JobAction.Remove, f"ClusterId=={new_cluster_id}") + except BaseException as write_exception: + logger.exception( + f"Failed to write new cluster id {new_cluster_id} for map {self.tag}, aborting submission" + ) + try: + condor.get_schedd().act(htcondor.JobAction.Remove, f"ClusterId=={new_cluster_id}") + except BaseException as remove_exception: + logger.exception( + f"Was not able to abort submission of cluster id {new_cluster_id} for map {self.tag}" + ) + raise remove_exception + raise write_exception logger.debug( f"Submitted {len(sliced_itemdata)} components (out of {self._num_components}) from map {self.tag}" diff --git a/htmap/state.py b/htmap/state.py index 7a0c194..1238b29 100644 --- a/htmap/state.py +++ b/htmap/state.py @@ -112,8 +112,9 @@ def _event_log_path(self): def _read_events(self): with self._event_reader_lock: # no thread can be in here at the same time as another if self._event_reader is None: - logger.debug(f"Created event log reader for map {self.map.tag}") + self._event_log_path.touch(exist_ok=True) self._event_reader = htcondor.JobEventLog(self._event_log_path.as_posix()) + logger.debug(f"Created event log reader for map {self.map.tag}") with utils.Timer() as timer: handled_events = self._handle_events() diff --git a/htmap/utils.py b/htmap/utils.py index ee66ce7..943624c 100644 --- a/htmap/utils.py +++ b/htmap/utils.py @@ -239,6 +239,22 @@ def is_interactive_session() -> bool: ) +def is_jupyter() -> bool: + # https://stackoverflow.com/questions/15411967/how-can-i-check-if-code-is-executed-in-the-ipython-notebook/24937408 + # This seems quite fragile, but it also seems hard to determine otherwise... + # I would not be shocked if this breaks in the future. + try: + shell = get_ipython().__class__.__name__ + if shell == "ZMQInteractiveShell": + return True # Jupyter notebook or qtconsole + elif shell == "TerminalInteractiveShell": + return False # Terminal running IPython + else: + return False # Something else... + except NameError: + return False # Probably standard Python interpreter + + def enable_debug_logging(): logger = logging.getLogger("htmap") logger.setLevel(logging.DEBUG) diff --git a/setup.cfg b/setup.cfg index f0e6d5a..3e3aea8 100644 --- a/setup.cfg +++ b/setup.cfg @@ -65,6 +65,9 @@ tests = pytest-timeout pytest-watch pytest-xdist +widgets = + ipywidgets + jupyterlab>=2 [options.package_data] * = diff --git a/tests/integration/test_widgets.py b/tests/integration/test_widgets.py new file mode 100644 index 0000000..06f4bd4 --- /dev/null +++ b/tests/integration/test_widgets.py @@ -0,0 +1,22 @@ +# Copyright 2019 HTCondor Team, Computer Sciences Department, +# University of Wisconsin-Madison, WI. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +import htmap + + +def test_repr_html_contains_map_tag(map_that_never_finishes): + assert map_that_never_finishes.tag in map_that_never_finishes._repr_html_()