From 0ade6f67a0dec4ad8eaa126d69b1700f10139bf5 Mon Sep 17 00:00:00 2001 From: Jim Date: Mon, 17 May 2021 03:55:41 +0000 Subject: [PATCH] Global phase staggering --- src/plotman/configuration.py | 3 + src/plotman/manager.py | 177 +++++++++++++++-------------- src/plotman/resources/plotman.yaml | 7 ++ 3 files changed, 102 insertions(+), 85 deletions(-) diff --git a/src/plotman/configuration.py b/src/plotman/configuration.py index f081e75d..1d0da91b 100644 --- a/src/plotman/configuration.py +++ b/src/plotman/configuration.py @@ -87,6 +87,9 @@ def get_dst_directories(self): class Scheduling: global_max_jobs: int global_stagger_m: int + global_stagger_phase_major: int + global_stagger_phase_minor: int + global_stagger_phase_limit: int polling_time_s: int tmpdir_max_jobs: int tmpdir_stagger_phase_major: int diff --git a/src/plotman/manager.py b/src/plotman/manager.py index 8c418f35..fd537e77 100644 --- a/src/plotman/manager.py +++ b/src/plotman/manager.py @@ -84,95 +84,102 @@ def maybe_start_new_plot(dir_cfg, sched_cfg, plotting_cfg): elif len(jobs) >= sched_cfg.global_max_jobs: wait_reason = 'max jobs (%d) - (%ds/%ds)' % (sched_cfg.global_max_jobs, youngest_job_age, global_stagger) else: - tmp_to_all_phases = [(d, job.job_phases_for_tmpdir(d, jobs)) for d in dir_cfg.tmp] - eligible = [ (d, phases) for (d, phases) in tmp_to_all_phases - if phases_permit_new_job(phases, d, sched_cfg, dir_cfg) ] - rankable = [ (d, phases[0]) if phases else (d, job.Phase(known=False)) - for (d, phases) in eligible ] - - if not eligible: - wait_reason = 'no eligible tempdirs (%ds/%ds)' % (youngest_job_age, global_stagger) + milestone = job.Phase( + major=sched_cfg.tmpdir_stagger_phase_major, + minor=sched_cfg.tmpdir_stagger_phase_minor, + ) + if len([j for j in jobs if j.progress() < milestone]) >= sched_cfg.global_stagger_phase_limit: + wait_reason = 'max jobs (%d) before phase [%d : %d] - (%ds/%ds)' % (sched_cfg.global_stagger_phase_limit, sched_cfg.global_stagger_phase_major, sched_cfg.global_stagger_phase_minor, youngest_job_age, global_stagger) else: - # Plot to oldest tmpdir. - tmpdir = max(rankable, key=operator.itemgetter(1))[0] - - # Select the dst dir least recently selected - dst_dir = dir_cfg.get_dst_directories() - if dir_cfg.dst_is_tmp(): - dstdir = tmpdir + tmp_to_all_phases = [(d, job.job_phases_for_tmpdir(d, jobs)) for d in dir_cfg.tmp] + eligible = [ (d, phases) for (d, phases) in tmp_to_all_phases + if phases_permit_new_job(phases, d, sched_cfg, dir_cfg) ] + rankable = [ (d, phases[0]) if phases else (d, job.Phase(known=False)) + for (d, phases) in eligible ] + + if not eligible: + wait_reason = 'no eligible tempdirs (%ds/%ds)' % (youngest_job_age, global_stagger) else: - dir2ph = { d:ph for (d, ph) in dstdirs_to_youngest_phase(jobs).items() - if d in dst_dir and ph is not None} - unused_dirs = [d for d in dst_dir if d not in dir2ph.keys()] - dstdir = '' - if unused_dirs: - dstdir = random.choice(unused_dirs) + # Plot to oldest tmpdir. + tmpdir = max(rankable, key=operator.itemgetter(1))[0] + + # Select the dst dir least recently selected + dst_dir = dir_cfg.get_dst_directories() + if dir_cfg.dst_is_tmp(): + dstdir = tmpdir else: - dstdir = max(dir2ph, key=dir2ph.get) - - logfile = os.path.join( - dir_cfg.log, pendulum.now().isoformat(timespec='microseconds').replace(':', '_') + '.log' - ) - - plot_args = ['chia', 'plots', 'create', - '-k', str(plotting_cfg.k), - '-r', str(plotting_cfg.n_threads), - '-u', str(plotting_cfg.n_buckets), - '-b', str(plotting_cfg.job_buffer), - '-t', tmpdir, - '-d', dstdir ] - if plotting_cfg.e: - plot_args.append('-e') - if plotting_cfg.farmer_pk is not None: - plot_args.append('-f') - plot_args.append(plotting_cfg.farmer_pk) - if plotting_cfg.pool_pk is not None: - plot_args.append('-p') - plot_args.append(plotting_cfg.pool_pk) - if dir_cfg.tmp2 is not None: - plot_args.append('-2') - plot_args.append(dir_cfg.tmp2) - if plotting_cfg.x: - plot_args.append('-x') - - logmsg = ('Starting plot job: %s ; logging to %s' % (' '.join(plot_args), logfile)) - - try: - open_log_file = open(logfile, 'x') - except FileExistsError: - # The desired log file name already exists. Most likely another - # plotman process already launched a new process in response to - # the same scenario that triggered us. Let's at least not - # confuse things further by having two plotting processes - # logging to the same file. If we really should launch another - # plotting process, we'll get it at the next check cycle anyways. - message = ( - f'Plot log file already exists, skipping attempt to start a' - f' new plot: {logfile!r}' + dir2ph = { d:ph for (d, ph) in dstdirs_to_youngest_phase(jobs).items() + if d in dst_dir and ph is not None} + unused_dirs = [d for d in dst_dir if d not in dir2ph.keys()] + dstdir = '' + if unused_dirs: + dstdir = random.choice(unused_dirs) + else: + dstdir = max(dir2ph, key=dir2ph.get) + + logfile = os.path.join( + dir_cfg.log, pendulum.now().isoformat(timespec='microseconds').replace(':', '_') + '.log' ) - return (False, logmsg) - except FileNotFoundError as e: - message = ( - f'Unable to open log file. Verify that the directory exists' - f' and has proper write permissions: {logfile!r}' - ) - raise Exception(message) from e - - # Preferably, do not add any code between the try block above - # and the with block below. IOW, this space intentionally left - # blank... As is, this provides a good chance that our handle - # of the log file will get closed explicitly while still - # allowing handling of just the log file opening error. - - with open_log_file: - # start_new_sessions to make the job independent of this controlling tty. - p = subprocess.Popen(plot_args, - stdout=open_log_file, - stderr=subprocess.STDOUT, - start_new_session=True) - - psutil.Process(p.pid).nice(15) - return (True, logmsg) + + plot_args = ['chia', 'plots', 'create', + '-k', str(plotting_cfg.k), + '-r', str(plotting_cfg.n_threads), + '-u', str(plotting_cfg.n_buckets), + '-b', str(plotting_cfg.job_buffer), + '-t', tmpdir, + '-d', dstdir ] + if plotting_cfg.e: + plot_args.append('-e') + if plotting_cfg.farmer_pk is not None: + plot_args.append('-f') + plot_args.append(plotting_cfg.farmer_pk) + if plotting_cfg.pool_pk is not None: + plot_args.append('-p') + plot_args.append(plotting_cfg.pool_pk) + if dir_cfg.tmp2 is not None: + plot_args.append('-2') + plot_args.append(dir_cfg.tmp2) + if plotting_cfg.x: + plot_args.append('-x') + + logmsg = ('Starting plot job: %s ; logging to %s' % (' '.join(plot_args), logfile)) + + try: + open_log_file = open(logfile, 'x') + except FileExistsError: + # The desired log file name already exists. Most likely another + # plotman process already launched a new process in response to + # the same scenario that triggered us. Let's at least not + # confuse things further by having two plotting processes + # logging to the same file. If we really should launch another + # plotting process, we'll get it at the next check cycle anyways. + message = ( + f'Plot log file already exists, skipping attempt to start a' + f' new plot: {logfile!r}' + ) + return (False, logmsg) + except FileNotFoundError as e: + message = ( + f'Unable to open log file. Verify that the directory exists' + f' and has proper write permissions: {logfile!r}' + ) + raise Exception(message) from e + + # Preferably, do not add any code between the try block above + # and the with block below. IOW, this space intentionally left + # blank... As is, this provides a good chance that our handle + # of the log file will get closed explicitly while still + # allowing handling of just the log file opening error. + + with open_log_file: + # start_new_sessions to make the job independent of this controlling tty. + p = subprocess.Popen(plot_args, + stdout=open_log_file, + stderr=subprocess.STDOUT, + start_new_session=True) + + psutil.Process(p.pid).nice(15) + return (True, logmsg) return (False, wait_reason) diff --git a/src/plotman/resources/plotman.yaml b/src/plotman/resources/plotman.yaml index 060989b8..3e06cde0 100644 --- a/src/plotman/resources/plotman.yaml +++ b/src/plotman/resources/plotman.yaml @@ -107,6 +107,13 @@ scheduling: # Don't run more than this many jobs at a time in total. global_max_jobs: 12 + # You can also limit the number of jobs by phase globally. This is useful + # to limit memory or CPU core use. These values allow only three jobs in + # phase 1 across all temp dirs. + global_stagger_phase_major: 2 + global_stagger_phase_minor: 1 + global_stagger_phase_limit: 3 + # Don't run any jobs (across all temp dirs) more often than this, in minutes. global_stagger_m: 30