diff --git a/src/psij-descriptors/nqsv_descriptor.py b/src/psij-descriptors/nqsv_descriptor.py new file mode 100644 index 00000000..680db765 --- /dev/null +++ b/src/psij-descriptors/nqsv_descriptor.py @@ -0,0 +1,5 @@ +from packaging.version import Version +from psij.descriptor import Descriptor + +__PSI_J_EXECUTORS__ = [Descriptor(name='nqsv', version=Version('0.0.1'), + cls='psij.executors.batch.nqsv.NQSVJobExecutor')] diff --git a/src/psij/executors/batch/nqsv.py b/src/psij/executors/batch/nqsv.py new file mode 100644 index 00000000..f9cddc73 --- /dev/null +++ b/src/psij/executors/batch/nqsv.py @@ -0,0 +1,235 @@ +from pathlib import Path +from psij import Job, JobState, JobStatus, SubmitException +from typing import IO, Optional, List, Dict, Collection, Union, Sequence, Any, cast +from psij.executors.batch.script_generator import TemplatedScriptGenerator +from psij.executors.batch.batch_scheduler_executor import BatchSchedulerExecutor +from psij.executors.batch.batch_scheduler_executor import BatchSchedulerExecutorConfig +from psij.executors.batch.batch_scheduler_executor import check_status_exit_code + +import re +import subprocess +from threading import Thread +from datetime import timedelta + +_NQSV_DIR = '/opt/nec/nqsv/bin/' +_QDEL_COMMAND = _NQSV_DIR + 'qdel' +_QSUB_COMMAND = _NQSV_DIR + 'qsub' +_QSTAT_COMMAND = _NQSV_DIR + 'qstat' +_QWAIT_COMMAND = _NQSV_DIR + 'qwait' + +LARGE_TIMEOUT = timedelta(days=3650) + + +class _NQSJobWaitingThread(Thread): + """A thread that waits for a job to finish and updates its status.""" + + def __init__(self, job: Job, ex: Any) -> None: + super().__init__() + self._job = job + self._ex = ex + + def run(self) -> None: + """Wait for the job to finish and update its status.""" + st = self._wait() + self._ex._set_job_status(self._job, st) + + def _enable_wait_status(self, + target_states: Optional[Union[JobState, Sequence[JobState]]] = None) \ + -> bool: + """Check if the target states are valid for waiting.""" + if target_states is None: + return True + if isinstance(target_states, JobState): + target_states = [target_states] + # NQSV's qwait command is not support ACTIVE/QUEUED state + for state1 in target_states: + if state1 is JobState.ACTIVE or state1 is JobState.QUEUED: + return False + return True + + def _parse_wait_output(self, out: str) -> JobStatus: + """Parse the output of the qwait command.""" + state = JobState.FAILED + exit_code = None + if 'exited' in out: + s = out.split(' ') + if int(s[1]) == 0: + state = JobState.COMPLETED + else: + state = JobState.FAILED + exit_code = int(s[1]) + elif 'deleted' in out: + state = JobState.CANCELED + elif 'error' in out or 'time out' in out or 'qwait error' in out: + state = JobState.FAILED + # killed by signal or rerun or system failure or resource limit exceeded + else: + # The job has already finished... + state = JobState.COMPLETED + r = JobStatus(state=state, exit_code=exit_code, message=None) + return r + + def _run_command_using_stderr(self, cmd: List[str]) -> str: + """Run a command and return the stderr output.""" + res = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + return res.stderr + + def _wait(self, timeout: Optional[timedelta] = None, + target_states: Optional[Union[JobState, Sequence[JobState]]] = None) \ + -> Any: + + # NQSV's qwait command is not support ACTIVE/QUEUED state, then use the orignal wait func. + if self._enable_wait_status(target_states) is False: + return self._ex._job_wait(timeout, target_states) + + if timeout: + command = [_QWAIT_COMMAND, '-w', 'exited', '-t', str(timeout.total_seconds()), + str(self._job.native_id)] + else: + command = [_QWAIT_COMMAND, '-w', 'exited', str(self._job.native_id)] + + out = self._run_command_using_stderr(command) + return self._parse_wait_output(out) + + +class NQSVExecutorConfig(BatchSchedulerExecutorConfig): + """Configuration for the NQSV executor.""" + + pass + + +class NQSVJobExecutor(BatchSchedulerExecutor): + """ + An executor for the NEC NQSV batch scheduler. + This executor uses NQSV to submit jobs. It is + assumed that NQSV is installed and available in + the system path. NQSV is a batch job scheduler + developed by NEC Corporation. + """ + + _STATE_MAP = { + 'QUE': JobState.QUEUED, + 'RUN': JobState.ACTIVE, + 'WAT': JobState.QUEUED, + 'HLD': JobState.QUEUED, + 'SUS': JobState.QUEUED, + 'ARI': JobState.QUEUED, + 'TRS': JobState.QUEUED, + 'EXT': JobState.COMPLETED, + 'PRR': JobState.QUEUED, + 'POR': JobState.COMPLETED, + 'MIG': JobState.QUEUED, + 'STG': JobState.QUEUED, + } + + def __init__(self, url: Optional[str] = None, config: Optional[NQSVExecutorConfig] = None): + """Initialize the NQSV executor.""" + if config is None: + config = NQSVExecutorConfig() + super().__init__(url=url, config=config) + path = Path(__file__).parent / 'nqsv/nqsv.mustache' + self.generator = TemplatedScriptGenerator(config, path) + self.submit_frag = False + self.cancel_frag = False + self.use_wait_command = False + self._wait_threads: List[_NQSJobWaitingThread] = [] + + # Override submit function. + def submit(self, job: Job) -> None: + """Submit a job to the NQSV scheduler.""" + super().submit(job) + if self.use_wait_command: + thread = _NQSJobWaitingThread(job, self) + thread.start() + self._wait_threads.append(thread) + return None + + def generate_submit_script(self, + job: Job, context: Dict[str, object], submit_file: IO[str]) -> None: + """Generate a submit script for the NQSV scheduler.""" + self.generator.generate_submit_script(job, context, submit_file) + + def get_submit_command(self, job: Job, submit_file_path: Path) -> List[str]: + """Get the command to submit a job to the NQSV scheduler.""" + return [_QSUB_COMMAND, str(submit_file_path.absolute())] + + def job_id_from_submit_output(self, out: str) -> str: + """Extract the job ID from the output of the submit command.""" + self.submit_frag = True + s = out.strip().split()[1] + out = "" + for char in s: + if char.isdigit(): + out += char + return out + + def get_cancel_command(self, native_id: str) -> List[str]: + """Get the command to cancel a job in the NQSV scheduler.""" + self.cancel_frag = True + return [_QDEL_COMMAND, native_id] + + def process_cancel_command_output(self, exit_code: int, out: str) -> None: + """See :meth:`~.BatchSchedulerExecutor.process_cancel_command_output`.""" + raise SubmitException('Failed job cancel job: %s' % out) + + def get_status_command(self, native_ids: Collection[str]) -> List[str]: + """Get the command to check the status of a job in the NQSV scheduler.""" + return [_QSTAT_COMMAND, '-F', 'rid,stt', '-n', '-l'] + list(native_ids) + + def parse_status_output(self, exit_code: int, out: str) -> Dict[str, JobStatus]: + """Parse the output of the status command.""" + check_status_exit_code('qstat', exit_code, out) + r = {} + lines = iter(out.split('\n')) + for line in lines: + if not line: + continue + + cols = line.split() + + if (len(cols) == 8 and self.cancel_frag): + s = cols[2] + native_id = "" + for char in s: + if char.isdigit(): + native_id += char + state = JobState.CANCELED + r[native_id] = JobStatus(state=state, message=None) + + elif (len(cols) == 8): + s = cols[1] + native_id = "" + for char in s: + if char.isdigit(): + native_id += char + state = JobState.COMPLETED + r[native_id] = JobStatus(state=state, message=None) + + else: + assert len(cols) == 2 + match = re.search(r'\b(\d+)\b', cols[0]) + native_id = cast(str, match.group(1) if match else None) + native_state = cols[1] + state = self._get_state(native_state) + msg = None + r[native_id] = JobStatus(state=state, message=msg) + + return r + + def _get_state(self, state: str) -> JobState: + """Convert the state string to a JobState enum.""" + assert state in NQSVJobExecutor._STATE_MAP + return NQSVJobExecutor._STATE_MAP[state] + + def get_list_command(self) -> List[str]: + """Get the command to list jobs in the NQSV scheduler.""" + return [_QSTAT_COMMAND, '-F', 'rid', '-n', '-l'] + + def parse_list_output(self, out: str) -> List[str]: + """Parse the output of the list command.""" + r = [] + lines = iter(out.split('\n')) + for line in lines: + c = line.split('.') + r.append(c[0]) + return r diff --git a/src/psij/executors/batch/nqsv/nqsv.mustache b/src/psij/executors/batch/nqsv/nqsv.mustache new file mode 100644 index 00000000..a6184020 --- /dev/null +++ b/src/psij/executors/batch/nqsv/nqsv.mustache @@ -0,0 +1,84 @@ +#!/bin/bash + +{{#job.spec.name}} +#PBS -N {{.}} +{{/job.spec.name}} + +{{#job.spec.inherit_environment}} +#PBS -V +{{/job.spec.inherit_environment}} +{{#env}} +#PBS -v {{name}}="{{value}}" +{{/env}} + +{{#job.spec.resources}} + {{#exclusive_node_use}} +#PBS --exclusive + {{/exclusive_node_use}} +#PBS --cpunum-lhost={{computed_processes_per_node}} -b {{computed_node_count}} + {{#gpu_cores_per_process}} +#PBS --gpunum-lhost=${{.}} + {{/gpu_cores_per_process}} +{{/job.spec.resources}} + +{{#job.spec.attributes}} + {{#duration}} +#PBS -l elapstim_req={{.}} + {{/duration}} + {{#queue_name}} +#PBS -q {{.}} + {{/queue_name}} + {{#project_name}} +#PBS -A {{.}} + {{/project_name}} + {{#reservation_id}} +#PBS -y {{.}} + {{/reservation_id}} +{{/job.spec.attributes}} + +#custom_attributes + {{#custom_attributes}} + {{#nqsv}} +#PBS -{{key}} {{value}} + {{/nqsv}} + {{/custom_attributes}} + +{{!we replace the follow environment variable to cpus when the job is submitted.}} +#PBS -e /dev/null +#PBS -o /dev/null + +{{#job.spec.directory}} +cd "{{.}}" +{{/job.spec.directory}} + +J=`echo $PBS_JOBID | awk -F ':' '{print $1}'` +ID=`echo $PBS_JOBID | awk -F ':' '{print $2}' | awk -F '.' '{print $1}'` +if [ "$J" = "0" ]; then + exec &>> "{{psij.script_dir}}/$ID.out" +fi + +# create node file for PSIJ +{{#job.spec.resources}} +_PSIJ_PPN={{computed_processes_per_node}} +{{/job.spec.resources}} + +PSIJ_NODEFILE="{{psij.script_dir}}/$ID.nodefile" +while read line +do + for i in `seq 1 $_PSIJ_PPN`; + do + echo $line >> $PSIJ_NODEFILE + done +done < $PBS_NODEFILE +export PSIJ_NODEFILE + +{{#psij.launch_command}}{{.}} {{/psij.launch_command}} + +E=$? + +{{!we redirect to a file tied to the native ID so that we can reach the file with attach().}} +if [ "$J" = "0" ]; then + echo "$E" > "{{psij.script_dir}}/$ID.ec" +fi + +exit $E