diff --git a/docs/data_structures/libE_specs.rst b/docs/data_structures/libE_specs.rst index d471cf968..105335ca4 100644 --- a/docs/data_structures/libE_specs.rst +++ b/docs/data_structures/libE_specs.rst @@ -28,7 +28,11 @@ libEnsemble is primarily customized by setting options within a ``LibeSpecs`` cl Manager/Worker communications mode: ``'mpi'``, ``'local'``, or ``'tcp'``. **nworkers** [int]: - Number of worker processes in ``"local"`` or ``"tcp"``. + Number of worker processes in ``"local"``, ``"threads"``, or ``"tcp"``. + + **gen_on_manager** Optional[bool] = False + Instructs Manager process to run generator functions. + This generator function can access/modify user objects by reference. **mpi_comm** [MPI communicator] = ``MPI.COMM_WORLD``: libEnsemble MPI communicator. @@ -51,6 +55,10 @@ libEnsemble is primarily customized by setting options within a ``LibeSpecs`` cl **disable_log_files** [bool] = ``False``: Disable ``ensemble.log`` and ``libE_stats.txt`` log files. + **gen_workers** [list of ints]: + List of workers that should only run generators. All other workers will only + run simulator functions. + .. tab-item:: Directories .. tab-set:: diff --git a/libensemble/alloc_funcs/fast_alloc.py b/libensemble/alloc_funcs/fast_alloc.py index ccb2fec56..bb009740c 100644 --- a/libensemble/alloc_funcs/fast_alloc.py +++ b/libensemble/alloc_funcs/fast_alloc.py @@ -32,10 +32,9 @@ def give_sim_work_first(W, H, sim_specs, gen_specs, alloc_specs, persis_info, li Work = {} gen_in = gen_specs.get("in", []) - for wid in support.avail_worker_ids(): + # Give sim work if possible + for wid in support.avail_worker_ids(gen_workers=False): persis_info = support.skip_canceled_points(H, persis_info) - - # Give sim work if possible if persis_info["next_to_give"] < len(H): try: Work[wid] = support.sim_work(wid, H, sim_specs["in"], [persis_info["next_to_give"]], []) @@ -43,14 +42,16 @@ def give_sim_work_first(W, H, sim_specs, gen_specs, alloc_specs, persis_info, li break persis_info["next_to_give"] += 1 - elif gen_count < user.get("num_active_gens", gen_count + 1): - # Give gen work - return_rows = range(len(H)) if gen_in else [] - try: - Work[wid] = support.gen_work(wid, gen_in, return_rows, persis_info.get(wid)) - except InsufficientFreeResources: - break - gen_count += 1 - persis_info["total_gen_calls"] += 1 + # Give gen work if possible + if persis_info["next_to_give"] >= len(H): + for wid in support.avail_worker_ids(gen_workers=True): + if wid not in Work and gen_count < user.get("num_active_gens", gen_count + 1): + return_rows = range(len(H)) if gen_in else [] + try: + Work[wid] = support.gen_work(wid, gen_in, return_rows, persis_info.get(wid)) + except InsufficientFreeResources: + break + gen_count += 1 + persis_info["total_gen_calls"] += 1 return Work, persis_info diff --git a/libensemble/alloc_funcs/fast_alloc_and_pausing.py b/libensemble/alloc_funcs/fast_alloc_and_pausing.py index 4a85f69fb..dfb747cc7 100644 --- a/libensemble/alloc_funcs/fast_alloc_and_pausing.py +++ b/libensemble/alloc_funcs/fast_alloc_and_pausing.py @@ -43,9 +43,10 @@ def give_sim_work_first(W, H, sim_specs, gen_specs, alloc_specs, persis_info, li for pt_id in persis_info["pt_ids"]: persis_info["inds_of_pt_ids"][pt_id] = H["pt_id"] == pt_id - idle_workers = support.avail_worker_ids() + idle_sim_workers = support.avail_worker_ids(gen_workers=False) + idle_gen_workers = support.avail_worker_ids(gen_workers=True) - while len(idle_workers): + while len(idle_sim_workers): pt_ids_to_pause = set() # Find indices of H that are not yet given out to be evaluated @@ -106,15 +107,19 @@ def give_sim_work_first(W, H, sim_specs, gen_specs, alloc_specs, persis_info, li if len(persis_info["need_to_give"]) != 0: next_row = persis_info["need_to_give"].pop() - i = idle_workers[0] + i = idle_sim_workers[0] try: Work[i] = support.sim_work(i, H, sim_specs["in"], [next_row], []) except InsufficientFreeResources: persis_info["need_to_give"].add(next_row) break - idle_workers = idle_workers[1:] + idle_sim_workers = idle_sim_workers[1:] - elif gen_count < alloc_specs["user"].get("num_active_gens", gen_count + 1): + else: + break + + while len(idle_gen_workers): + if gen_count < alloc_specs["user"].get("num_active_gens", gen_count + 1): lw = persis_info["last_worker"] last_size = persis_info.get("last_size") @@ -126,18 +131,18 @@ def give_sim_work_first(W, H, sim_specs, gen_specs, alloc_specs, persis_info, li break # Give gen work - i = idle_workers[0] + i = idle_gen_workers[0] try: Work[i] = support.gen_work(i, gen_specs["in"], range(len(H)), persis_info[lw]) except InsufficientFreeResources: break - idle_workers = idle_workers[1:] + idle_gen_workers = idle_gen_workers[1:] gen_count += 1 persis_info["total_gen_calls"] += 1 persis_info["last_worker"] = i persis_info["last_size"] = len(H) elif gen_count >= alloc_specs["user"].get("num_active_gens", gen_count + 1): - idle_workers = [] + idle_gen_workers = [] return Work, persis_info diff --git a/libensemble/alloc_funcs/give_pregenerated_work.py b/libensemble/alloc_funcs/give_pregenerated_work.py index 1d6edb160..d2b1ee7aa 100644 --- a/libensemble/alloc_funcs/give_pregenerated_work.py +++ b/libensemble/alloc_funcs/give_pregenerated_work.py @@ -23,7 +23,7 @@ def give_pregenerated_sim_work(W, H, sim_specs, gen_specs, alloc_specs, persis_i if persis_info["next_to_give"] >= len(H): return Work, persis_info, 1 - for i in support.avail_worker_ids(): + for i in support.avail_worker_ids(gen_workers=False): persis_info = support.skip_canceled_points(H, persis_info) # Give sim work diff --git a/libensemble/alloc_funcs/give_sim_work_first.py b/libensemble/alloc_funcs/give_sim_work_first.py index a7aa74d3b..1e528917b 100644 --- a/libensemble/alloc_funcs/give_sim_work_first.py +++ b/libensemble/alloc_funcs/give_sim_work_first.py @@ -64,15 +64,19 @@ def give_sim_work_first( Work = {} points_to_evaluate = ~H["sim_started"] & ~H["cancel_requested"] - for wid in support.avail_worker_ids(): - if np.any(points_to_evaluate): + + if np.any(points_to_evaluate): + for wid in support.avail_worker_ids(gen_workers=False): sim_ids_to_send = support.points_by_priority(H, points_avail=points_to_evaluate, batch=batch_give) try: Work[wid] = support.sim_work(wid, H, sim_specs["in"], sim_ids_to_send, persis_info.get(wid)) except InsufficientFreeResources: break points_to_evaluate[sim_ids_to_send] = False - else: + if not np.any(points_to_evaluate): + break + else: + for wid in support.avail_worker_ids(gen_workers=True): # Allow at most num_active_gens active generator instances if gen_count >= user.get("num_active_gens", gen_count + 1): break diff --git a/libensemble/alloc_funcs/inverse_bayes_allocf.py b/libensemble/alloc_funcs/inverse_bayes_allocf.py index 56a3f6e79..e0521df6f 100644 --- a/libensemble/alloc_funcs/inverse_bayes_allocf.py +++ b/libensemble/alloc_funcs/inverse_bayes_allocf.py @@ -42,8 +42,9 @@ def only_persistent_gens_for_inverse_bayes(W, H, sim_specs, gen_specs, alloc_spe Work[wid] = support.gen_work(wid, ["like"], inds_to_send_back, persis_info.get(wid), persistent=True) points_to_evaluate = ~H["sim_started"] & ~H["cancel_requested"] - for wid in support.avail_worker_ids(persistent=False): - if np.any(points_to_evaluate): + if np.any(points_to_evaluate): + for wid in support.avail_worker_ids(persistent=False, gen_workers=False): + # perform sim evaluations (if any point hasn't been given). sim_subbatches = H["subbatch"][points_to_evaluate] sim_inds = sim_subbatches == np.min(sim_subbatches) @@ -54,13 +55,11 @@ def only_persistent_gens_for_inverse_bayes(W, H, sim_specs, gen_specs, alloc_spe except InsufficientFreeResources: break points_to_evaluate[sim_ids_to_send] = False - - elif gen_count == 0: - # Finally, generate points since there is nothing else to do. - try: - Work[wid] = support.gen_work(wid, gen_specs["in"], [], persis_info.get(wid), persistent=True) - except InsufficientFreeResources: + if not np.any(points_to_evaluate): break - gen_count += 1 + + elif gen_count == 0: + wid = support.avail_worker_ids(persistent=False, gen_workers=True)[0] + Work[wid] = support.gen_work(wid, gen_specs["in"], [], persis_info.get(wid), persistent=True) return Work, persis_info diff --git a/libensemble/alloc_funcs/only_one_gen_alloc.py b/libensemble/alloc_funcs/only_one_gen_alloc.py index fe9a0f59c..7eb6a91e0 100644 --- a/libensemble/alloc_funcs/only_one_gen_alloc.py +++ b/libensemble/alloc_funcs/only_one_gen_alloc.py @@ -21,27 +21,30 @@ def ensure_one_active_gen(W, H, sim_specs, gen_specs, alloc_specs, persis_info, gen_flag = True gen_in = gen_specs.get("in", []) - for wid in support.avail_worker_ids(): - persis_info = support.skip_canceled_points(H, persis_info) - - if persis_info["next_to_give"] < len(H): + if persis_info["next_to_give"] < len(H): + for wid in support.avail_worker_ids(gen_workers=False): + persis_info = support.skip_canceled_points(H, persis_info) try: Work[wid] = support.sim_work(wid, H, sim_specs["in"], [persis_info["next_to_give"]], []) except InsufficientFreeResources: break persis_info["next_to_give"] += 1 - - elif not support.test_any_gen() and gen_flag: - if not support.all_sim_ended(H): + if persis_info["next_to_give"] >= len(H): break - # Give gen work - return_rows = range(len(H)) if gen_in else [] - try: - Work[wid] = support.gen_work(wid, gen_in, return_rows, persis_info.get(wid)) - except InsufficientFreeResources: - break - gen_flag = False - persis_info["total_gen_calls"] += 1 + elif not support.test_any_gen() and gen_flag: + # Give gen work + return_rows = range(len(H)) if gen_in else [] + wid = support.avail_worker_ids(gen_workers=True)[0] + + if not support.all_sim_ended(H): + return Work, persis_info + + try: + Work[wid] = support.gen_work(wid, gen_in, return_rows, persis_info.get(wid)) + except InsufficientFreeResources: + return Work, persis_info + gen_flag = False + persis_info["total_gen_calls"] += 1 return Work, persis_info diff --git a/libensemble/alloc_funcs/persistent_aposmm_alloc.py b/libensemble/alloc_funcs/persistent_aposmm_alloc.py index 8327d3975..3b87d5b5b 100644 --- a/libensemble/alloc_funcs/persistent_aposmm_alloc.py +++ b/libensemble/alloc_funcs/persistent_aposmm_alloc.py @@ -53,7 +53,7 @@ def persistent_aposmm_alloc(W, H, sim_specs, gen_specs, alloc_specs, persis_info ) returned_but_not_given[point_ids] = False - for wid in support.avail_worker_ids(persistent=False): + for wid in support.avail_worker_ids(persistent=False, gen_workers=False): persis_info = support.skip_canceled_points(H, persis_info) if persis_info["next_to_give"] < len(H): @@ -63,8 +63,11 @@ def persistent_aposmm_alloc(W, H, sim_specs, gen_specs, alloc_specs, persis_info except InsufficientFreeResources: break persis_info["next_to_give"] += 1 + if persis_info["next_to_give"] >= len(H): + break - elif persis_info.get("gen_started") is None: + if persis_info.get("gen_started") is None: + for wid in support.avail_worker_ids(persistent=False, gen_workers=True): # Finally, call a persistent generator as there is nothing else to do. persis_info.get(wid)["nworkers"] = len(W) try: @@ -74,5 +77,6 @@ def persistent_aposmm_alloc(W, H, sim_specs, gen_specs, alloc_specs, persis_info except InsufficientFreeResources: break persis_info["gen_started"] = True # Must set after - in case break on resources + break return Work, persis_info diff --git a/libensemble/alloc_funcs/start_fd_persistent.py b/libensemble/alloc_funcs/start_fd_persistent.py index 0c2e939d3..36fba0a73 100644 --- a/libensemble/alloc_funcs/start_fd_persistent.py +++ b/libensemble/alloc_funcs/start_fd_persistent.py @@ -49,8 +49,8 @@ def finite_diff_alloc(W, H, sim_specs, gen_specs, alloc_specs, persis_info, libE ) points_to_evaluate = ~H["sim_started"] & ~H["cancel_requested"] - for wid in support.avail_worker_ids(persistent=False): - if np.any(points_to_evaluate): + if np.any(points_to_evaluate): + for wid in support.avail_worker_ids(persistent=False, gen_workers=False): # perform sim evaluations (if they exist in History). sim_ids_to_send = np.nonzero(points_to_evaluate)[0][0] # oldest point try: @@ -58,13 +58,12 @@ def finite_diff_alloc(W, H, sim_specs, gen_specs, alloc_specs, persis_info, libE except InsufficientFreeResources: break points_to_evaluate[sim_ids_to_send] = False - - elif gen_count == 0: - # Finally, call a persistent generator as there is nothing else to do. - try: - Work[wid] = support.gen_work(wid, gen_specs.get("in", []), [], persis_info.get(wid), persistent=True) - except InsufficientFreeResources: + if not np.any(points_to_evaluate): break - gen_count += 1 + + if gen_count == 0: + wid = support.avail_worker_ids(persistent=False, gen_workers=True)[0] + Work[wid] = support.gen_work(wid, gen_specs.get("in", []), [], persis_info.get(wid), persistent=True) + gen_count += 1 return Work, persis_info, 0 diff --git a/libensemble/alloc_funcs/start_only_persistent.py b/libensemble/alloc_funcs/start_only_persistent.py index ee9d4105f..101a90f51 100644 --- a/libensemble/alloc_funcs/start_only_persistent.py +++ b/libensemble/alloc_funcs/start_only_persistent.py @@ -89,7 +89,7 @@ def only_persistent_gens(W, H, sim_specs, gen_specs, alloc_specs, persis_info, l # Now the give_sim_work_first part points_to_evaluate = ~H["sim_started"] & ~H["cancel_requested"] - avail_workers = support.avail_worker_ids(persistent=False, zero_resource_workers=False) + avail_workers = support.avail_worker_ids(persistent=False, zero_resource_workers=False, gen_workers=False) if user.get("alt_type"): avail_workers = list( set(support.avail_worker_ids(persistent=False, zero_resource_workers=False)) @@ -115,7 +115,7 @@ def only_persistent_gens(W, H, sim_specs, gen_specs, alloc_specs, persis_info, l # Start persistent gens if no worker to give out. Uses zero_resource_workers if defined. if not np.any(points_to_evaluate): - avail_workers = support.avail_worker_ids(persistent=False, zero_resource_workers=True) + avail_workers = support.avail_worker_ids(persistent=False, zero_resource_workers=True, gen_workers=True) for wid in avail_workers: if gen_count < user.get("num_active_gens", 1): diff --git a/libensemble/alloc_funcs/start_persistent_local_opt_gens.py b/libensemble/alloc_funcs/start_persistent_local_opt_gens.py index 12ad45100..918ebbb75 100644 --- a/libensemble/alloc_funcs/start_persistent_local_opt_gens.py +++ b/libensemble/alloc_funcs/start_persistent_local_opt_gens.py @@ -54,7 +54,7 @@ def start_persistent_local_opt_gens(W, H, sim_specs, gen_specs, alloc_specs, per Work[wid] = support.gen_work(wid, gen_specs["persis_in"], last_ind, persis_info[wid], persistent=True) persis_info[wid]["run_order"].append(last_ind) - for wid in support.avail_worker_ids(persistent=False): + for wid in support.avail_worker_ids(persistent=False, gen_workers=True): # Find candidates to start local opt runs if a sample has been evaluated if np.any(np.logical_and(~H["local_pt"], H["sim_ended"], ~H["cancel_requested"])): n = len(H["x"][0]) @@ -78,7 +78,8 @@ def start_persistent_local_opt_gens(W, H, sim_specs, gen_specs, alloc_specs, per persis_info[wid]["run_order"] = [ind] gen_count += 1 - elif np.any(points_to_evaluate): + if np.any(points_to_evaluate): + for wid in support.avail_worker_ids(persistent=False, gen_workers=False): # Perform sim evaluations from existing runs q_inds_logical = np.logical_and(points_to_evaluate, H["local_pt"]) if not np.any(q_inds_logical): @@ -89,10 +90,13 @@ def start_persistent_local_opt_gens(W, H, sim_specs, gen_specs, alloc_specs, per except InsufficientFreeResources: break points_to_evaluate[sim_ids_to_send] = False + if not np.any(points_to_evaluate): + break - elif gen_count == 0 and not np.any(np.logical_and(W["active"] == EVAL_GEN_TAG, W["persis_state"] == 0)): - # Finally, generate points since there is nothing else to do (no resource sets req.) - Work[wid] = support.gen_work(wid, gen_specs.get("in", []), [], persis_info[wid], rset_team=[]) - gen_count += 1 + if gen_count == 0 and not np.any(np.logical_and(W["active"] == EVAL_GEN_TAG, W["persis_state"] == 0)): + # Finally, generate points since there is nothing else to do (no resource sets req.) + wid = support.avail_worker_ids(persistent=False, gen_workers=True)[0] + Work[wid] = support.gen_work(wid, gen_specs.get("in", []), [], persis_info[wid], rset_team=[]) + gen_count += 1 return Work, persis_info diff --git a/libensemble/comms/comms.py b/libensemble/comms/comms.py index 9bf14e98a..51042c463 100644 --- a/libensemble/comms/comms.py +++ b/libensemble/comms/comms.py @@ -146,7 +146,7 @@ def mail_flag(self): class QCommLocal(Comm): - def __init__(self, main, nworkers, *args, **kwargs): + def __init__(self, main, *args, **kwargs): self._result = None self._exception = None self._done = False @@ -207,16 +207,6 @@ def result(self, timeout=None): raise RemoteException(self._exception.msg, self._exception.exc) return self._result - @staticmethod - def _qcomm_main(comm, main, *args, **kwargs): - """Main routine -- handles return values and exceptions.""" - try: - _result = main(comm, *args, **kwargs) - comm.send(CommResult(_result)) - except Exception as e: - comm.send(CommResultErr(str(e), format_exc())) - raise e - @property def running(self): """Check if the thread/process is running.""" @@ -230,15 +220,28 @@ def __exit__(self, etype, value, traceback): self.handle.join() +def _qcomm_main(comm, main, *args, **kwargs): + """Main routine -- handles return values and exceptions.""" + try: + if not kwargs.get("user_function"): + _result = main(comm, *args, **kwargs) + else: + _result = main(*args) + comm.send(CommResult(_result)) + except Exception as e: + comm.send(CommResultErr(str(e), format_exc())) + raise e + + class QCommThread(QCommLocal): """Launch a user function in a thread with an attached QComm.""" def __init__(self, main, nworkers, *args, **kwargs): self.inbox = thread_queue.Queue() self.outbox = thread_queue.Queue() - super().__init__(self, main, nworkers, *args, **kwargs) + super().__init__(self, main, *args, **kwargs) comm = QComm(self.inbox, self.outbox, nworkers) - self.handle = Thread(target=QCommThread._qcomm_main, args=(comm, main) + args, kwargs=kwargs) + self.handle = Thread(target=_qcomm_main, args=(comm, main) + args, kwargs=kwargs) def terminate(self, timeout=None): """Terminate the thread. @@ -260,9 +263,9 @@ class QCommProcess(QCommLocal): def __init__(self, main, nworkers, *args, **kwargs): self.inbox = Queue() self.outbox = Queue() - super().__init__(self, main, nworkers, *args, **kwargs) + super().__init__(self, main, *args, **kwargs) comm = QComm(self.inbox, self.outbox, nworkers) - self.handle = Process(target=QCommProcess._qcomm_main, args=(comm, main) + args, kwargs=kwargs) + self.handle = Process(target=_qcomm_main, args=(comm, main) + args, kwargs=kwargs) def terminate(self, timeout=None): """Terminate the process.""" diff --git a/libensemble/comms/logs.py b/libensemble/comms/logs.py index 10acbae07..47f85f351 100644 --- a/libensemble/comms/logs.py +++ b/libensemble/comms/logs.py @@ -203,6 +203,7 @@ def manager_logging_config(specs={}): def exit_logger(): stat_timer.stop() stat_logger.info(f"Exiting ensemble at: {stat_timer.date_end} Time Taken: {stat_timer.elapsed}") + stat_logger.handlers[0].close() # If closing logs - each libE() call will log to a new file. # fh.close() diff --git a/libensemble/ensemble.py b/libensemble/ensemble.py index b037d0bc3..545443851 100644 --- a/libensemble/ensemble.py +++ b/libensemble/ensemble.py @@ -326,8 +326,14 @@ def libE_specs(self, new_specs): return # Cast new libE_specs temporarily to dict - if not isinstance(new_specs, dict): - new_specs = specs_dump(new_specs, by_alias=True, exclude_none=True, exclude_unset=True) + if not isinstance(new_specs, dict): # exclude_defaults should only be enabled with Pydantic v2 + platform_specs_set = False + if new_specs.platform_specs != {}: # bugginess across Pydantic versions for recursively casting to dict + platform_specs_set = True + platform_specs = new_specs.platform_specs + new_specs = specs_dump(new_specs, exclude_none=True, exclude_defaults=True) + if platform_specs_set: + new_specs["platform_specs"] = specs_dump(platform_specs, exclude_none=True) # Unset "comms" if we already have a libE_specs that contains that field, that came from parse_args if new_specs.get("comms") and hasattr(self._libE_specs, "comms") and self.parsed: diff --git a/libensemble/executors/executor.py b/libensemble/executors/executor.py index bff39873b..b2b7012b6 100644 --- a/libensemble/executors/executor.py +++ b/libensemble/executors/executor.py @@ -667,7 +667,7 @@ def set_workerID(self, workerid) -> None: """Sets the worker ID for this executor""" self.workerID = workerid - def set_worker_info(self, comm, workerid=None) -> None: + def set_worker_info(self, comm=None, workerid=None) -> None: """Sets info for this executor""" self.workerID = workerid self.comm = comm diff --git a/libensemble/manager.py b/libensemble/manager.py index bc0a3114c..aa9c53b0c 100644 --- a/libensemble/manager.py +++ b/libensemble/manager.py @@ -18,7 +18,8 @@ import numpy.typing as npt from numpy.lib.recfunctions import repack_fields -from libensemble.comms.comms import CommFinishedException +from libensemble.comms.comms import CommFinishedException, QCommThread +from libensemble.executors.executor import Executor from libensemble.message_numbers import ( EVAL_GEN_TAG, EVAL_SIM_TAG, @@ -33,10 +34,10 @@ from libensemble.resources.resources import Resources from libensemble.tools.fields_keys import protected_libE_fields from libensemble.tools.tools import _PERSIS_RETURN_WARNING, _USER_CALC_DIR_WARNING -from libensemble.utils.misc import extract_H_ranges +from libensemble.utils.misc import _WorkerIndexer, extract_H_ranges from libensemble.utils.output_directory import EnsembleDirectory from libensemble.utils.timer import Timer -from libensemble.worker import WorkerErrMsg +from libensemble.worker import WorkerErrMsg, worker_main logger = logging.getLogger(__name__) # For debug messages - uncomment @@ -158,13 +159,36 @@ class Manager: worker_dtype = [ ("worker_id", int), + ("gen_worker", bool), ("active", int), ("persis_state", int), - ("active_recv", int), + ("active_recv", bool), ("gen_started_time", float), ("zero_resource_worker", bool), ] + def _run_additional_worker(self, hist, sim_specs, gen_specs, libE_specs): + dtypes = { + EVAL_SIM_TAG: repack_fields(hist.H[sim_specs["in"]]).dtype, + EVAL_GEN_TAG: repack_fields(hist.H[gen_specs["in"]]).dtype, + } + local_worker_comm = QCommThread( + worker_main, + len(self.wcomms), + sim_specs, + gen_specs, + libE_specs, + 0, + False, + Resources.resources, + Executor.executor, + ) + local_worker_comm.run() + local_worker_comm.send(0, dtypes) + if libE_specs.get("use_workflow_dir"): + local_worker_comm.send(0, libE_specs.get("workflow_dir_path")) + return local_worker_comm + def __init__( self, hist, @@ -199,8 +223,6 @@ def __init__( self.gen_num_procs = libE_specs.get("gen_num_procs", 0) self.gen_num_gpus = libE_specs.get("gen_num_gpus", 0) - self.W = np.zeros(len(self.wcomms), dtype=Manager.worker_dtype) - self.W["worker_id"] = np.arange(len(self.wcomms)) + 1 self.term_tests = [ (2, "wallclock_max", self.term_test_wallclock), (1, "sim_max", self.term_test_sim_max), @@ -208,6 +230,20 @@ def __init__( (1, "stop_val", self.term_test_stop_val), ] + gen_on_manager = self.libE_specs.get("gen_on_manager", False) + + self.W = np.zeros(len(self.wcomms) + gen_on_manager, dtype=Manager.worker_dtype) + if gen_on_manager: + self.W["worker_id"] = np.arange(len(self.wcomms) + 1) # [0, 1, 2, ...] + self.W[0]["gen_worker"] = True + local_worker_comm = self._run_additional_worker(hist, sim_specs, gen_specs, libE_specs) + self.wcomms = [local_worker_comm] + self.wcomms + else: + self.W["worker_id"] = np.arange(len(self.wcomms)) + 1 # [1, 2, 3, ...] + + self.W = _WorkerIndexer(self.W, gen_on_manager) + self.wcomms = _WorkerIndexer(self.wcomms, gen_on_manager) + temp_EnsembleDirectory = EnsembleDirectory(libE_specs=libE_specs) self.resources = Resources.resources self.scheduler_opts = self.libE_specs.get("scheduler_opts", {}) @@ -218,6 +254,10 @@ def __init__( if wrk["worker_id"] in gresource.zero_resource_workers: wrk["zero_resource_worker"] = True + for wrk in self.W: + if wrk["worker_id"] in self.libE_specs.get("gen_workers", []): + wrk["gen_worker"] = True + try: temp_EnsembleDirectory.make_copyback() except AssertionError as e: # Ensemble dir exists and isn't empty. @@ -265,7 +305,9 @@ def term_test(self, logged: bool = True) -> Union[bool, int]: def _kill_workers(self) -> None: """Kills the workers""" for w in self.W["worker_id"]: - self.wcomms[w - 1].send(STOP_TAG, MAN_SIGNAL_FINISH) + self.wcomms[w].send(STOP_TAG, MAN_SIGNAL_FINISH) + if w == 0: + self.wcomms[0].result() # --- Checkpointing logic @@ -319,15 +361,15 @@ def _init_every_k_save(self, complete=False) -> None: def _check_work_order(self, Work: dict, w: int, force: bool = False) -> None: """Checks validity of an allocation function order""" - assert w != 0, "Can't send to worker 0; this is the manager." - if self.W[w - 1]["active_recv"]: + # assert w != 0, "Can't send to worker 0; this is the manager." + if self.W[w]["active_recv"]: assert "active_recv" in Work["libE_info"], ( "Messages to a worker in active_recv mode should have active_recv" f"set to True in libE_info. Work['libE_info'] is {Work['libE_info']}" ) else: if not force: - assert self.W[w - 1]["active"] == 0, ( + assert not self.W[w]["active"], ( "Allocation function requested work be sent to worker %d, an already active worker." % w ) work_rows = Work["libE_info"]["H_rows"] @@ -372,10 +414,10 @@ def _send_work_order(self, Work: dict, w: int) -> None: if self.resources: self._set_resources(Work, w) - self.wcomms[w - 1].send(Work["tag"], Work) + self.wcomms[w].send(Work["tag"], Work) if Work["tag"] == EVAL_GEN_TAG: - self.W[w - 1]["gen_started_time"] = time.time() + self.W[w]["gen_started_time"] = time.time() work_rows = Work["libE_info"]["H_rows"] work_name = calc_type_strings[Work["tag"]] @@ -385,18 +427,19 @@ def _send_work_order(self, Work: dict, w: int) -> None: H_to_be_sent = np.empty(len(work_rows), dtype=new_dtype) for i, row in enumerate(work_rows): H_to_be_sent[i] = repack_fields(self.hist.H[Work["H_fields"]][row]) - self.wcomms[w - 1].send(0, H_to_be_sent) + + self.wcomms[w].send(0, H_to_be_sent) def _update_state_on_alloc(self, Work: dict, w: int): """Updates a workers' active/idle status following an allocation order""" - self.W[w - 1]["active"] = Work["tag"] - if "libE_info" in Work: - if "persistent" in Work["libE_info"]: - self.W[w - 1]["persis_state"] = Work["tag"] - if Work["libE_info"].get("active_recv", False): - self.W[w - 1]["active_recv"] = Work["tag"] - else: - assert "active_recv" not in Work["libE_info"], "active_recv worker must also be persistent" + + self.W[w]["active"] = Work["tag"] + if "persistent" in Work["libE_info"]: + self.W[w]["persis_state"] = Work["tag"] + if Work["libE_info"].get("active_recv", False): + self.W[w]["active_recv"] = True + else: + assert "active_recv" not in Work["libE_info"], "active_recv worker must also be persistent" work_rows = Work["libE_info"]["H_rows"] if Work["tag"] == EVAL_SIM_TAG: @@ -417,7 +460,7 @@ def _receive_from_workers(self, persis_info: dict) -> dict: while new_stuff: new_stuff = False for w in self.W["worker_id"]: - if self.wcomms[w - 1].mail_flag(): + if self.wcomms[w].mail_flag(): new_stuff = True self._handle_msg_from_worker(persis_info, w) @@ -430,37 +473,37 @@ def _update_state_on_worker_msg(self, persis_info: dict, D_recv: dict, w: int) - calc_status = D_recv["calc_status"] keep_state = D_recv["libE_info"].get("keep_state", False) - if w not in self.persis_pending and not self.W[w - 1]["active_recv"] and not keep_state: - self.W[w - 1]["active"] = 0 + if w not in self.persis_pending and not self.W[w]["active_recv"] and not keep_state: + self.W[w]["active"] = 0 if calc_status in [FINISHED_PERSISTENT_SIM_TAG, FINISHED_PERSISTENT_GEN_TAG]: final_data = D_recv.get("calc_out", None) if isinstance(final_data, np.ndarray): if calc_status is FINISHED_PERSISTENT_GEN_TAG and self.libE_specs.get("use_persis_return_gen", False): - self.hist.update_history_x_in(w, final_data, self.W[w - 1]["gen_started_time"]) + self.hist.update_history_x_in(w, final_data, self.W[w]["gen_started_time"]) elif calc_status is FINISHED_PERSISTENT_SIM_TAG and self.libE_specs.get("use_persis_return_sim", False): self.hist.update_history_f(D_recv, self.kill_canceled_sims) else: logger.info(_PERSIS_RETURN_WARNING) - self.W[w - 1]["persis_state"] = 0 - if self.W[w - 1]["active_recv"]: - self.W[w - 1]["active"] = 0 - self.W[w - 1]["active_recv"] = 0 + self.W[w]["persis_state"] = 0 + if self.W[w]["active_recv"]: + self.W[w]["active"] = 0 + self.W[w]["active_recv"] = False if w in self.persis_pending: self.persis_pending.remove(w) - self.W[w - 1]["active"] = 0 + self.W[w]["active"] = 0 self._freeup_resources(w) else: if calc_type == EVAL_SIM_TAG: self.hist.update_history_f(D_recv, self.kill_canceled_sims) if calc_type == EVAL_GEN_TAG: - self.hist.update_history_x_in(w, D_recv["calc_out"], self.W[w - 1]["gen_started_time"]) + self.hist.update_history_x_in(w, D_recv["calc_out"], self.W[w]["gen_started_time"]) assert ( - len(D_recv["calc_out"]) or np.any(self.W["active"]) or self.W[w - 1]["persis_state"] + len(D_recv["calc_out"]) or np.any(self.W["active"]) or self.W[w]["persis_state"] ), "Gen must return work when is is the only thing active and not persistent." if "libE_info" in D_recv and "persistent" in D_recv["libE_info"]: # Now a waiting, persistent worker - self.W[w - 1]["persis_state"] = calc_type + self.W[w]["persis_state"] = D_recv["calc_type"] else: self._freeup_resources(w) @@ -470,13 +513,13 @@ def _update_state_on_worker_msg(self, persis_info: dict, D_recv: dict, w: int) - def _handle_msg_from_worker(self, persis_info: dict, w: int) -> None: """Handles a message from worker w""" try: - msg = self.wcomms[w - 1].recv() + msg = self.wcomms[w].recv() tag, D_recv = msg except CommFinishedException: logger.debug(f"Finalizing message from Worker {w}") return if isinstance(D_recv, WorkerErrMsg): - self.W[w - 1]["active"] = 0 + self.W[w]["active"] = 0 logger.debug(f"Manager received exception from worker {w}") if not self.WorkerExc: self.WorkerExc = True @@ -509,7 +552,7 @@ def _kill_cancelled_sims(self) -> None: kill_ids = self.hist.H["sim_id"][kill_sim_rows] kill_on_workers = self.hist.H["sim_worker"][kill_sim_rows] for w in kill_on_workers: - self.wcomms[w - 1].send(STOP_TAG, MAN_SIGNAL_KILL) + self.wcomms[w].send(STOP_TAG, MAN_SIGNAL_KILL) self.hist.H["kill_sent"][kill_ids] = True # --- Handle termination @@ -539,10 +582,10 @@ def _final_receive_and_kill(self, persis_info: dict) -> (dict, int, int): self._send_work_order(work, w) self.hist.update_history_to_gen(rows_to_send) else: - self.wcomms[w - 1].send(PERSIS_STOP, MAN_SIGNAL_KILL) - if not self.W[w - 1]["active"]: + self.wcomms[w].send(PERSIS_STOP, MAN_SIGNAL_KILL) + if not self.W[w]["active"]: # Re-activate if necessary - self.W[w - 1]["active"] = self.W[w - 1]["persis_state"] + self.W[w]["active"] = self.W[w]["persis_state"] self.persis_pending.append(w) exit_flag = 0 @@ -585,6 +628,7 @@ def _get_alloc_libE_info(self) -> dict: "use_resource_sets": self.use_resource_sets, "gen_num_procs": self.gen_num_procs, "gen_num_gpus": self.gen_num_gpus, + "gen_on_manager": self.libE_specs.get("gen_on_manager", False), } def _alloc_work(self, H: npt.NDArray, persis_info: dict) -> dict: @@ -653,6 +697,7 @@ def run(self, persis_info: dict) -> (dict, int, int): finally: # Return persis_info, exit_flag, elapsed time result = self._final_receive_and_kill(persis_info) + self.wcomms = None sys.stdout.flush() sys.stderr.flush() return result diff --git a/libensemble/specs.py b/libensemble/specs.py index ae115e216..f0a401fe6 100644 --- a/libensemble/specs.py +++ b/libensemble/specs.py @@ -55,6 +55,11 @@ class SimSpecs(BaseModel): calling them locally. """ + threaded: Optional[bool] = False + """ + Instruct Worker process to launch user function to a thread. + """ + user: Optional[dict] = {} """ A user-data dictionary to place bounds, constants, settings, or other parameters for customizing @@ -100,6 +105,11 @@ class GenSpecs(BaseModel): calling them locally. """ + threaded: Optional[bool] = False + """ + Instruct Worker process to launch user function to a thread. + """ + user: Optional[dict] = {} """ A user-data dictionary to place bounds, constants, settings, or other parameters for @@ -160,7 +170,12 @@ class LibeSpecs(BaseModel): """ Manager/Worker communications mode. ``'mpi'``, ``'local'``, ``'threads'``, or ``'tcp'`` """ nworkers: Optional[int] = 0 - """ Number of worker processes in ``"local"`` or ``"tcp"``.""" + """ Number of worker processes in ``"local"``, ``"threads"``, or ``"tcp"``.""" + + gen_on_manager: Optional[bool] = False + """ Instructs Manager process to run generator functions. + This generator function can access/modify user objects by reference. + """ mpi_comm: Optional[Any] = None """ libEnsemble MPI communicator. Default: ``MPI.COMM_WORLD``""" @@ -442,6 +457,12 @@ class LibeSpecs(BaseModel): For use with supported allocation functions. """ + gen_workers: Optional[List[int]] = [] + """ + List of workers that should only run generators. All other workers will only + run simulator functions. + """ + resource_info: Optional[dict] = {} """ Resource information to override automatically detected resources. diff --git a/libensemble/tests/functionality_tests/check_libE_stats.py b/libensemble/tests/functionality_tests/check_libE_stats.py index 8e4e9c0cc..424c07d8b 100644 --- a/libensemble/tests/functionality_tests/check_libE_stats.py +++ b/libensemble/tests/functionality_tests/check_libE_stats.py @@ -42,7 +42,7 @@ def check_start_end_times(start="Start:", end="End:", everyline=True): s_cnt = 0 e_cnt = 0 lst = line.split() - if lst[0] == "Manager": + if line.startswith("Manager : Starting") or line.startswith("Manager : Exiting"): check_datetime(lst[5], lst[6]) continue for i, val in enumerate(lst): diff --git a/libensemble/tests/functionality_tests/test_GPU_gen_resources.py b/libensemble/tests/functionality_tests/test_GPU_gen_resources.py index 6e692dfa2..0fc8192f7 100644 --- a/libensemble/tests/functionality_tests/test_GPU_gen_resources.py +++ b/libensemble/tests/functionality_tests/test_GPU_gen_resources.py @@ -100,30 +100,34 @@ libE_specs["resource_info"] = {"cores_on_node": (nworkers * 2, nworkers * 4), "gpus_on_node": nworkers} base_libE_specs = libE_specs.copy() - for run in range(5): - # reset - libE_specs = base_libE_specs.copy() - persis_info = add_unique_random_streams({}, nworkers + 1) - - if run == 0: - libE_specs["gen_num_procs"] = 2 - elif run == 1: - libE_specs["gen_num_gpus"] = 1 - elif run == 2: - persis_info["gen_num_gpus"] = 1 - elif run == 3: - # Two GPUs per resource set - libE_specs["resource_info"]["gpus_on_node"] = nworkers * 2 - persis_info["gen_num_gpus"] = 1 - elif run == 4: - # Two GPUs requested for gen - persis_info["gen_num_procs"] = 2 - persis_info["gen_num_gpus"] = 2 - gen_specs["user"]["max_procs"] = max(nworkers - 2, 1) - - # Perform the run - H, persis_info, flag = libE( - sim_specs, gen_specs, exit_criteria, persis_info, libE_specs=libE_specs, alloc_specs=alloc_specs - ) + for gen_on_manager in [False, True]: + for run in range(5): + # reset + libE_specs = base_libE_specs.copy() + libE_specs["gen_on_manager"] = gen_on_manager + persis_info = add_unique_random_streams({}, nworkers + 1) + + if run == 0: + libE_specs["gen_num_procs"] = 2 + elif run == 1: + if gen_on_manager: + print("SECOND LIBE CALL WITH GEN ON MANAGER") + libE_specs["gen_num_gpus"] = 1 + elif run == 2: + persis_info["gen_num_gpus"] = 1 + elif run == 3: + # Two GPUs per resource set + libE_specs["resource_info"]["gpus_on_node"] = nworkers * 2 + persis_info["gen_num_gpus"] = 1 + elif run == 4: + # Two GPUs requested for gen + persis_info["gen_num_procs"] = 2 + persis_info["gen_num_gpus"] = 2 + gen_specs["user"]["max_procs"] = max(nworkers - 2, 1) + + # Perform the run + H, persis_info, flag = libE( + sim_specs, gen_specs, exit_criteria, persis_info, libE_specs=libE_specs, alloc_specs=alloc_specs + ) # All asserts are in gen and sim funcs diff --git a/libensemble/tests/functionality_tests/test_evaluate_existing_plus_gen.py b/libensemble/tests/functionality_tests/test_evaluate_existing_plus_gen.py new file mode 100644 index 000000000..2b601efe3 --- /dev/null +++ b/libensemble/tests/functionality_tests/test_evaluate_existing_plus_gen.py @@ -0,0 +1,70 @@ +""" +Test libEnsemble's capability to evaluate existing points and then generate +new samples via gen_on_manager. + +Execute via one of the following commands (e.g. 3 workers): + mpiexec -np 4 python test_evaluate_existing_sample.py + python test_evaluate_existing_sample.py --nworkers 3 --comms local + python test_evaluate_existing_sample.py --nworkers 3 --comms tcp + +The number of concurrent evaluations of the objective function will be 4-1=3. +""" + +# Do not change these lines - they are parsed by run-tests.sh +# TESTSUITE_COMMS: mpi local tcp +# TESTSUITE_NPROCS: 2 4 + +import numpy as np + +# Import libEnsemble items for this test +from libensemble import Ensemble +from libensemble.gen_funcs.sampling import latin_hypercube_sample as gen_f +from libensemble.sim_funcs.six_hump_camel import six_hump_camel as sim_f +from libensemble.specs import ExitCriteria, GenSpecs, SimSpecs +from libensemble.tools import add_unique_random_streams + + +def create_H0(persis_info, gen_specs, H0_size): + """Create an H0 for give_pregenerated_sim_work""" + # Manually creating H0 + ub = gen_specs["user"]["ub"] + lb = gen_specs["user"]["lb"] + n = len(lb) + b = H0_size + + H0 = np.zeros(b, dtype=[("x", float, 2), ("sim_id", int), ("sim_started", bool)]) + H0["x"] = persis_info[0]["rand_stream"].uniform(lb, ub, (b, n)) + H0["sim_id"] = range(b) + H0["sim_started"] = False + return H0 + + +# Main block is necessary only when using local comms with spawn start method (default on macOS and Windows). +if __name__ == "__main__": + + sampling = Ensemble(parse_args=True) + sampling.libE_specs.gen_on_manager = True + sampling.sim_specs = SimSpecs(sim_f=sim_f, inputs=["x"], out=[("f", float)]) + + gen_specs = { + "gen_f": gen_f, + "outputs": [("x", float, (2,))], + "user": { + "gen_batch_size": 50, + "lb": np.array([-3, -3]), + "ub": np.array([3, 3]), + }, + } + sampling.gen_specs = GenSpecs(**gen_specs) + sampling.exit_criteria = ExitCriteria(sim_max=100) + sampling.persis_info = add_unique_random_streams({}, sampling.nworkers + 1) + sampling.H0 = create_H0(sampling.persis_info, gen_specs, 50) + sampling.run() + + if sampling.is_manager: + assert len(sampling.H) == 2 * len(sampling.H0) + assert np.array_equal(sampling.H0["x"][:50], sampling.H["x"][:50]) + assert np.all(sampling.H["sim_ended"]) + assert np.all(sampling.H["gen_worker"] == 0) + print("\nlibEnsemble correctly appended to the initial sample via an additional gen.") + sampling.save_output(__file__) diff --git a/libensemble/tests/functionality_tests/test_persistent_uniform_sampling.py b/libensemble/tests/functionality_tests/test_persistent_uniform_sampling.py index bd381f3ae..305521a02 100644 --- a/libensemble/tests/functionality_tests/test_persistent_uniform_sampling.py +++ b/libensemble/tests/functionality_tests/test_persistent_uniform_sampling.py @@ -62,7 +62,7 @@ libE_specs["kill_canceled_sims"] = False - for run in range(3): + for run in range(5): persis_info = add_unique_random_streams({}, nworkers + 1) for i in persis_info: persis_info[i]["get_grad"] = True @@ -86,6 +86,11 @@ sim_specs["out"] = [("f_i", float), ("gradf_i", float, 2 * m)] sim_specs["in"] = ["x", "obj_component"] # sim_specs["out"] = [("f", float), ("grad", float, n)] + elif run == 3: + libE_specs["gen_on_manager"] = True + elif run == 4: + libE_specs["gen_on_manager"] = False + libE_specs["gen_workers"] = [2] # Perform the run H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, persis_info, alloc_specs, libE_specs) diff --git a/libensemble/tests/regression_tests/test_evaluate_mixed_sample.py b/libensemble/tests/regression_tests/test_evaluate_mixed_sample.py index 38998baa7..1574e8d57 100644 --- a/libensemble/tests/regression_tests/test_evaluate_mixed_sample.py +++ b/libensemble/tests/regression_tests/test_evaluate_mixed_sample.py @@ -44,6 +44,7 @@ H0["sim_ended"][:500] = True sampling = Ensemble(parse_args=True) + sampling.libE_specs.gen_on_manager = True sampling.H0 = H0 sampling.sim_specs = SimSpecs(sim_f=sim_f, inputs=["x"], out=[("f", float)]) sampling.alloc_specs = AllocSpecs(alloc_f=alloc_f) diff --git a/libensemble/tests/unit_tests/test_allocation_funcs_and_support.py b/libensemble/tests/unit_tests/test_allocation_funcs_and_support.py index 631c0a60b..6d056b1e0 100644 --- a/libensemble/tests/unit_tests/test_allocation_funcs_and_support.py +++ b/libensemble/tests/unit_tests/test_allocation_funcs_and_support.py @@ -11,18 +11,25 @@ from libensemble.tools import add_unique_random_streams from libensemble.tools.alloc_support import AllocException, AllocSupport from libensemble.tools.fields_keys import libE_fields +from libensemble.utils.misc import _WorkerIndexer al = {"alloc_f": give_sim_work_first} libE_specs = {"comms": "local", "nworkers": 4} H0 = [] W = np.array( - [(1, 0, 0, 0, False), (2, 0, 0, 0, False), (3, 0, 0, 0, False), (4, 0, 0, 0, False)], + [ + (1, False, 0, 0, False, False), + (2, False, 0, 0, False, False), + (3, False, 0, 0, False, False), + (4, False, 0, 0, False, False), + ], dtype=[ ("worker_id", " 1 + assert ensemble.libE_specs.platform_specs == specs_dump(platform_specs, exclude_none=True) + + if __name__ == "__main__": test_ensemble_init() test_ensemble_parse_args_false() @@ -173,3 +197,4 @@ def test_flakey_workflow(): test_bad_func_loads() test_full_workflow() test_flakey_workflow() + test_ensemble_specs_update_libE_specs() diff --git a/libensemble/tests/unit_tests/test_ufunc_runners.py b/libensemble/tests/unit_tests/test_ufunc_runners.py index 85b986d39..1d3cbb4b2 100644 --- a/libensemble/tests/unit_tests/test_ufunc_runners.py +++ b/libensemble/tests/unit_tests/test_ufunc_runners.py @@ -3,9 +3,8 @@ import pytest import libensemble.tests.unit_tests.setup as setup -from libensemble.message_numbers import EVAL_GEN_TAG, EVAL_SIM_TAG from libensemble.tools.fields_keys import libE_fields -from libensemble.utils.runners import Runners +from libensemble.utils.runners import Runner def get_ufunc_args(): @@ -19,7 +18,7 @@ def get_ufunc_args(): sim_ids = np.zeros(1, dtype=int) Work = { - "tag": EVAL_SIM_TAG, + "tag": 1, "persis_info": {}, "libE_info": {"H_rows": sim_ids}, "H_fields": sim_specs["in"], @@ -28,29 +27,31 @@ def get_ufunc_args(): return calc_in, sim_specs, gen_specs -@pytest.mark.extra def test_normal_runners(): calc_in, sim_specs, gen_specs = get_ufunc_args() - runners = Runners(sim_specs, gen_specs) - assert ( - not runners.has_globus_compute_sim and not runners.has_globus_compute_gen + simrunner = Runner(sim_specs) + genrunner = Runner(gen_specs) + assert not hasattr(simrunner, "globus_compute_executor") and not hasattr( + genrunner, "globus_compute_executor" ), "Globus Compute use should not be detected without setting endpoint fields" - ro = runners.make_runners() - assert all( - [i in ro for i in [EVAL_SIM_TAG, EVAL_GEN_TAG]] - ), "Both user function tags should be included in runners dictionary" - -@pytest.mark.extra -def test_normal_no_gen(): +def test_thread_runners(): calc_in, sim_specs, gen_specs = get_ufunc_args() - runners = Runners(sim_specs, {}) - ro = runners.make_runners() + def tupilize(arg1, arg2): + return (arg1, arg2) + + sim_specs["threaded"] = True # TODO: undecided interface + sim_specs["sim_f"] = tupilize + persis_info = {"hello": "threads"} - assert not ro[2], "generator function shouldn't be provided if not using gen_specs" + simrunner = Runner(sim_specs) + result = simrunner._result(calc_in, persis_info, {}) + assert result == (calc_in, persis_info) + assert hasattr(simrunner, "thread_handle") + simrunner.shutdown() @pytest.mark.extra @@ -60,10 +61,10 @@ def test_globus_compute_runner_init(): sim_specs["globus_compute_endpoint"] = "1234" with mock.patch("globus_compute_sdk.Executor"): - runners = Runners(sim_specs, gen_specs) + runner = Runner(sim_specs) - assert ( - runners.sim_globus_compute_executor is not None + assert hasattr( + runner, "globus_compute_executor" ), "Globus ComputeExecutor should have been instantiated when globus_compute_endpoint found in specs" @@ -74,7 +75,7 @@ def test_globus_compute_runner_pass(): sim_specs["globus_compute_endpoint"] = "1234" with mock.patch("globus_compute_sdk.Executor"): - runners = Runners(sim_specs, gen_specs) + runner = Runner(sim_specs) # Creating Mock Globus ComputeExecutor and Globus Compute future object - no exception globus_compute_mock = mock.Mock() @@ -83,12 +84,12 @@ def test_globus_compute_runner_pass(): globus_compute_future.exception.return_value = None globus_compute_future.result.return_value = (True, True) - runners.sim_globus_compute_executor = globus_compute_mock - ro = runners.make_runners() + runner.globus_compute_executor = globus_compute_mock + runners = {1: runner.run} libE_info = {"H_rows": np.array([2, 3, 4]), "workerID": 1, "comm": "fakecomm"} - out, persis_info = ro[1](calc_in, {"libE_info": libE_info, "persis_info": {}, "tag": 1}) + out, persis_info = runners[1](calc_in, {"libE_info": libE_info, "persis_info": {}, "tag": 1}) assert all([out, persis_info]), "Globus Compute runner correctly returned results" @@ -100,7 +101,7 @@ def test_globus_compute_runner_fail(): gen_specs["globus_compute_endpoint"] = "4321" with mock.patch("globus_compute_sdk.Executor"): - runners = Runners(sim_specs, gen_specs) + runner = Runner(gen_specs) # Creating Mock Globus ComputeExecutor and Globus Compute future object - yes exception globus_compute_mock = mock.Mock() @@ -108,19 +109,19 @@ def test_globus_compute_runner_fail(): globus_compute_mock.submit_to_registered_function.return_value = globus_compute_future globus_compute_future.exception.return_value = Exception - runners.gen_globus_compute_executor = globus_compute_mock - ro = runners.make_runners() + runner.globus_compute_executor = globus_compute_mock + runners = {2: runner.run} libE_info = {"H_rows": np.array([2, 3, 4]), "workerID": 1, "comm": "fakecomm"} with pytest.raises(Exception): - out, persis_info = ro[2](calc_in, {"libE_info": libE_info, "persis_info": {}, "tag": 2}) + out, persis_info = runners[2](calc_in, {"libE_info": libE_info, "persis_info": {}, "tag": 2}) pytest.fail("Expected exception") if __name__ == "__main__": test_normal_runners() - test_normal_no_gen() + test_thread_runners() test_globus_compute_runner_init() test_globus_compute_runner_pass() test_globus_compute_runner_fail() diff --git a/libensemble/tools/alloc_support.py b/libensemble/tools/alloc_support.py index d1d8ac802..0d4ce91d8 100644 --- a/libensemble/tools/alloc_support.py +++ b/libensemble/tools/alloc_support.py @@ -87,24 +87,19 @@ def assign_resources(self, rsets_req, use_gpus=None, user_params=[]): rset_team = self.sched.assign_resources(rsets_req, use_gpus, user_params) return rset_team - def avail_worker_ids(self, persistent=None, active_recv=False, zero_resource_workers=None): + def avail_worker_ids(self, persistent=None, active_recv=False, zero_resource_workers=None, gen_workers=None): """Returns available workers as a list of IDs, filtered by the given options. :param persistent: (Optional) Int. Only return workers with given ``persis_state`` (1=sim, 2=gen). :param active_recv: (Optional) Boolean. Only return workers with given active_recv state. :param zero_resource_workers: (Optional) Boolean. Only return workers that require no resources. + :param gen_workers: (Optional) Boolean. If True, return gen-only workers. If False, return all other workers. :returns: List of worker IDs. If there are no zero resource workers defined, then the ``zero_resource_workers`` argument will be ignored. """ - def fltr(wrk, field, option): - """Filter by condition if supplied""" - if option is None: - return True - return wrk[field] == option - # For abbrev. def fltr_persis(): if persistent is None: @@ -121,26 +116,33 @@ def fltr_recving(): if active_recv: return wrk["active_recv"] else: - return not wrk["active"] + return wrk["active"] == 0 + + def fltr_gen_workers(): + if no_gen_workers or gen_workers is None: + return True + return wrk["gen_worker"] == gen_workers if active_recv and not persistent: raise AllocException("Cannot ask for non-persistent active receive workers") # If there are no zero resource workers - then ignore zrw (i.e., use only if they exist) no_zrw = not any(self.W["zero_resource_worker"]) + no_gen_workers = not any(self.W["gen_worker"]) + wrks = [] for wrk in self.W: - if fltr_recving() and fltr_persis() and fltr_zrw(): + if fltr_recving() and fltr_persis() and fltr_zrw() and fltr_gen_workers(): wrks.append(wrk["worker_id"]) return wrks def count_gens(self): """Returns the number of active generators.""" - return sum(self.W["active"] == EVAL_GEN_TAG) + return sum((self.W["active"] == EVAL_GEN_TAG)) def test_any_gen(self): """Returns ``True`` if a generator worker is active.""" - return any(self.W["active"] == EVAL_GEN_TAG) + return any((self.W["active"] == EVAL_GEN_TAG)) def count_persis_gens(self): """Return the number of active persistent generators.""" @@ -201,7 +203,7 @@ def _update_rset_team(self, libE_info, wid, H=None, H_rows=None): """Add rset_team to libE_info.""" if self.manage_resources and not libE_info.get("rset_team"): num_rsets_req = 0 - if self.W[wid - 1]["persis_state"]: + if self.W[wid]["persis_state"]: # Even if empty list, non-None rset_team stops manager giving default resources libE_info["rset_team"] = [] return @@ -272,7 +274,7 @@ def gen_work(self, wid, H_fields, H_rows, persis_info, **libE_info): """ self._update_rset_team(libE_info, wid) - if not self.W[wid - 1]["persis_state"]: + if not self.W[wid]["persis_state"]: AllocSupport.gen_counter += 1 # Count total gens libE_info["gen_count"] = AllocSupport.gen_counter diff --git a/libensemble/utils/misc.py b/libensemble/utils/misc.py index 76e4ccaf2..ca67095ac 100644 --- a/libensemble/utils/misc.py +++ b/libensemble/utils/misc.py @@ -33,6 +33,27 @@ def extract_H_ranges(Work: dict) -> str: return "_".join(ranges) +class _WorkerIndexer: + def __init__(self, iterable: list, additional_worker=False): + self.iterable = iterable + self.additional_worker = additional_worker + + def __getitem__(self, key): + if self.additional_worker or isinstance(key, str): + return self.iterable[key] + else: + return self.iterable[key - 1] + + def __setitem__(self, key, value): + self.iterable[key] = value + + def __len__(self): + return len(self.iterable) + + def __iter__(self): + return iter(self.iterable) + + def specs_dump(specs, **kwargs): if pydanticV1: return specs.dict(**kwargs) diff --git a/libensemble/utils/runners.py b/libensemble/utils/runners.py index 07897b942..629c733b1 100644 --- a/libensemble/utils/runners.py +++ b/libensemble/utils/runners.py @@ -1,76 +1,50 @@ import inspect import logging import logging.handlers -from typing import Callable, Dict, Optional +from typing import Optional import numpy.typing as npt -from libensemble.message_numbers import EVAL_GEN_TAG, EVAL_SIM_TAG +from libensemble.comms.comms import QCommThread logger = logging.getLogger(__name__) -class Runners: - """Determines and returns methods for workers to run user functions. - - Currently supported: direct-call and Globus Compute - """ - - def __init__(self, sim_specs: dict, gen_specs: dict) -> None: - self.sim_specs = sim_specs - self.gen_specs = gen_specs - self.sim_f = sim_specs["sim_f"] - self.gen_f = gen_specs.get("gen_f") - self.has_globus_compute_sim = len(sim_specs.get("globus_compute_endpoint", "")) > 0 - self.has_globus_compute_gen = len(gen_specs.get("globus_compute_endpoint", "")) > 0 - - if any([self.has_globus_compute_sim, self.has_globus_compute_gen]): - if self.has_globus_compute_sim: - self.sim_globus_compute_executor = self._get_globus_compute_executor()( - endpoint_id=self.sim_specs["globus_compute_endpoint"] - ) - self.globus_compute_simfid = self.sim_globus_compute_executor.register_function(self.sim_f) - - if self.has_globus_compute_gen: - self.gen_globus_compute_executor = self._get_globus_compute_executor()( - endpoint_id=self.gen_specs["globus_compute_endpoint"] - ) - self.globus_compute_genfid = self.gen_globus_compute_executor.register_function(self.gen_f) - - def make_runners(self) -> Dict[int, Callable]: - """Creates functions to run a sim or gen. These functions are either - called directly by the worker or submitted to a Globus Compute endpoint.""" - - def run_sim(calc_in, Work): - """Determines how to run sim.""" - if self.has_globus_compute_sim: - result = self._globus_compute_result - else: - result = self._normal_result +class Runner: + def __new__(cls, specs): + if len(specs.get("globus_compute_endpoint", "")) > 0: + return super(Runner, GlobusComputeRunner).__new__(GlobusComputeRunner) + if specs.get("threaded"): # TODO: undecided interface + return super(Runner, ThreadRunner).__new__(ThreadRunner) + else: + return super().__new__(Runner) - return result(calc_in, Work["persis_info"], self.sim_specs, Work["libE_info"], self.sim_f, Work["tag"]) + def __init__(self, specs): + self.specs = specs + self.f = specs.get("sim_f") or specs.get("gen_f") - if self.gen_specs: + def _truncate_args(self, calc_in: npt.NDArray, persis_info, libE_info): + nparams = len(inspect.signature(self.f).parameters) + args = [calc_in, persis_info, self.specs, libE_info] + return args[:nparams] - def run_gen(calc_in, Work): - """Determines how to run gen.""" - if self.has_globus_compute_gen: - result = self._globus_compute_result - else: - result = self._normal_result + def _result(self, calc_in: npt.NDArray, persis_info: dict, libE_info: dict) -> (npt.NDArray, dict, Optional[int]): + """User function called in-place""" + args = self._truncate_args(calc_in, persis_info, libE_info) + return self.f(*args) - return result(calc_in, Work["persis_info"], self.gen_specs, Work["libE_info"], self.gen_f, Work["tag"]) + def shutdown(self) -> None: + pass - else: - run_gen = [] + def run(self, calc_in: npt.NDArray, Work: dict) -> (npt.NDArray, dict, Optional[int]): + return self._result(calc_in, Work["persis_info"], Work["libE_info"]) - return {EVAL_SIM_TAG: run_sim, EVAL_GEN_TAG: run_gen} - def shutdown(self) -> None: - if self.has_globus_compute_sim: - self.sim_globus_compute_executor.shutdown() - if self.has_globus_compute_gen: - self.gen_globus_compute_executor.shutdown() +class GlobusComputeRunner(Runner): + def __init__(self, specs): + super().__init__(specs) + self.globus_compute_executor = self._get_globus_compute_executor()(endpoint_id=specs["globus_compute_endpoint"]) + self.globus_compute_fid = self.globus_compute_executor.register_function(self.f) def _get_globus_compute_executor(self): try: @@ -82,42 +56,31 @@ def _get_globus_compute_executor(self): else: return Executor - def _truncate_args(self, calc_in, persis_info, specs, libE_info, user_f): - nparams = len(inspect.signature(user_f).parameters) - args = [calc_in, persis_info, specs, libE_info] - return args[:nparams] - - def _normal_result( - self, calc_in: npt.NDArray, persis_info: dict, specs: dict, libE_info: dict, user_f: Callable, tag: int - ) -> (npt.NDArray, dict, Optional[int]): - """User function called in-place""" - args = self._truncate_args(calc_in, persis_info, specs, libE_info, user_f) - return user_f(*args) - - def _get_func_uuid(self, tag): - if tag == EVAL_SIM_TAG: - return self.globus_compute_simfid - elif tag == EVAL_GEN_TAG: - return self.globus_compute_genfid - - def _get_globus_compute_exctr(self, tag): - if tag == EVAL_SIM_TAG: - return self.sim_globus_compute_executor - elif tag == EVAL_GEN_TAG: - return self.gen_globus_compute_executor - - def _globus_compute_result( - self, calc_in: npt.NDArray, persis_info: dict, specs: dict, libE_info: dict, user_f: Callable, tag: int - ) -> (npt.NDArray, dict, Optional[int]): - """User function submitted to Globus Compute""" + def _result(self, calc_in: npt.NDArray, persis_info: dict, libE_info: dict) -> (npt.NDArray, dict, Optional[int]): from libensemble.worker import Worker libE_info["comm"] = None # 'comm' object not pickle-able Worker._set_executor(0, None) # ditto for executor - fargs = self._truncate_args(calc_in, persis_info, specs, libE_info, user_f) - exctr = self._get_globus_compute_exctr(tag) - func_id = self._get_func_uuid(tag) - - task_fut = exctr.submit_to_registered_function(func_id, fargs) + args = self._truncate_args(calc_in, persis_info, libE_info) + task_fut = self.globus_compute_executor.submit_to_registered_function(self.globus_compute_fid, args) return task_fut.result() + + def shutdown(self) -> None: + self.globus_compute_executor.shutdown() + + +class ThreadRunner(Runner): + def __init__(self, specs): + super().__init__(specs) + self.thread_handle = None + + def _result(self, calc_in: npt.NDArray, persis_info: dict, libE_info: dict) -> (npt.NDArray, dict, Optional[int]): + args = self._truncate_args(calc_in, persis_info, libE_info) + self.thread_handle = QCommThread(self.f, None, *args, user_function=True) + self.thread_handle.run() + return self.thread_handle.result() + + def shutdown(self) -> None: + if self.thread_handle is not None: + self.thread_handle.terminate() diff --git a/libensemble/worker.py b/libensemble/worker.py index 70c9b2266..10823ad8a 100644 --- a/libensemble/worker.py +++ b/libensemble/worker.py @@ -32,7 +32,7 @@ from libensemble.utils.loc_stack import LocationStack from libensemble.utils.misc import extract_H_ranges from libensemble.utils.output_directory import EnsembleDirectory -from libensemble.utils.runners import Runners +from libensemble.utils.runners import Runner from libensemble.utils.timer import Timer logger = logging.getLogger(__name__) @@ -97,7 +97,7 @@ def worker_main( if libE_specs.get("use_workflow_dir"): _, libE_specs["workflow_dir_path"] = comm.recv() - workerID = workerID or comm.rank + workerID = workerID or getattr(comm, "rank", 0) # Initialize logging on comms if log_comm: @@ -166,10 +166,10 @@ def __init__( self.workerID = workerID self.libE_specs = libE_specs self.stats_fmt = libE_specs.get("stats_fmt", {}) - + self.sim_runner = Runner(sim_specs) + self.gen_runner = Runner(gen_specs) + self.runners = {EVAL_SIM_TAG: self.sim_runner.run, EVAL_GEN_TAG: self.gen_runner.run} self.calc_iter = {EVAL_SIM_TAG: 0, EVAL_GEN_TAG: 0} - self.runners = Runners(sim_specs, gen_specs) - self._run_calc = self.runners.make_runners() Worker._set_executor(self.workerID, self.comm) Worker._set_resources(self.workerID, self.comm) self.EnsembleDirectory = EnsembleDirectory(libE_specs=libE_specs) @@ -256,7 +256,7 @@ def _handle_calc(self, Work: dict, calc_in: npt.NDArray) -> (npt.NDArray, dict, try: logger.debug(f"Starting {enum_desc}: {calc_id}") - calc = self._run_calc[calc_type] + calc = self.runners[calc_type] with timer: if self.EnsembleDirectory.use_calc_dirs(calc_type): loc_stack, calc_dir = self.EnsembleDirectory.prep_calc_dir( @@ -412,5 +412,8 @@ def run(self) -> None: else: self.comm.kill_pending() finally: - self.runners.shutdown() + self.gen_runner.shutdown() + self.sim_runner.shutdown() self.EnsembleDirectory.copy_back() + if Executor.executor is not None: + Executor.executor.comm = None # so Executor can be pickled upon further libE calls