diff --git a/.gitignore b/.gitignore new file mode 100755 index 0000000..80a8b74 --- /dev/null +++ b/.gitignore @@ -0,0 +1,59 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +pytestdebug.log + +# Sphinx documentation +docs/_build/ +doc/_build/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IDE specific +.vscode + +# Development +poetry.lock diff --git a/cluster_files/placefield_job.sh b/cluster_files/placefield_job.sh old mode 100644 new mode 100755 index 2017f84..b7e4a0d --- a/cluster_files/placefield_job.sh +++ b/cluster_files/placefield_job.sh @@ -8,6 +8,7 @@ REWARD_ID=$6 TRIAL_STATUS=$7 BIN_SIZE=$8 FORCE_RECALC=$9 +CONDA_ENV=${10:-suite2p} # Default to 'suite2p' if argument is not provided echo "input path: $INPUT_PATH" echo "output path: $OUTPUT_PATH" @@ -17,12 +18,14 @@ echo "reward id: $REWARD_ID" echo "trial status: $TRIAL_STATUS" echo "bin size: $BIN_SIZE" echo "force recalc: $FORCE_RECALC" -source ~/anaconda3/etc/profile.d/conda.sh -conda activate vr2p + +conda init bash +conda activate $CONDA_ENV echo $CONDA_DEFAULT_ENV + RESULT=$(python < logs/pf-log-{job_id}.txt"' stdin, stdout, stderr = ssh.exec_command(run_command) + print(run_command) # find job id. stdout = stdout.read().decode('utf-8') stdout = re.search('Job <(\d*)>',stdout) @@ -140,6 +141,37 @@ def send_placefield_job(info, server): raise NameError("Could not find job id (was job submit succesfull?)") pass +def send_placefield_job(info, server, moreargs='', pf_script_path='~/.linear2ac/placefield_job.sh'): + import uuid, re, os + from pathlib import Path + from IPython import get_ipython + + job_id = f"{info['session_id']}-{info['reward_id']}-{uuid.uuid4().hex[:3].upper()}" + n_cores = server.get('n_cores', 1) + + logs_dir = Path('logs') + if not logs_dir.exists(): + logs_dir.mkdir(exist_ok=True) + + run_command = ( + f"sbatch -n {n_cores} -J {job_id} {moreargs} " + f"-o logs/out-{job_id}.txt " + f'--wrap="source {pf_script_path}' + ) + + for key in ['input_path','output_path','settings_file','session_id','cueset', + 'reward_id','trial_status','bin_size','force_recalc']: + run_command += f" '{info[key]}'" + run_command += f' > logs/pf-log-{job_id}.txt"' + + output = get_ipython().getoutput(run_command) + output_str = "\n".join(output) + match = re.search(r"Submitted batch job (\d+)", output_str) + if match: + return {'info': info, 'job_id': int(match.group(1))} + else: + raise NameError("Could not find job id (was job submit successful?)") + def check_placefield_job_status(info,local_output_path, server, verbose=True): """Checks on status of job generated by send_placefield_job. @@ -180,7 +212,7 @@ def check_placefield_results(data_path,local_output_path): with zarr.open(local_output_path.as_posix(), mode="r") as zarr_output: return (f"{data_path}/significant" in zarr_output) -def run_placefield_cluster(local_input_path, server_input_path, local_output_path, server_output_path, server_settings_file, settings, force_recalc): +def run_placefield_cluster(local_input_path, server_input_path, local_output_path, server_output_path, server_settings_file, settings, force_recalc, jobargs=''):# -> list: data = vr2p.ExperimentData(local_input_path) jobs = [] for session_id, vr in enumerate(data.vr): @@ -188,15 +220,21 @@ def run_placefield_cluster(local_input_path, server_input_path, local_output_pat for reward_id in [1,2]: for trial_status in ['correct','incorrect','excl_no_response','all']: # gather info. - process_info = {'input_path':server_input_path.as_posix(),'output_path':server_output_path.as_posix(), - 'settings_file': server_settings_file.as_posix(), - 'session_id':session_id,'cueset':cueset,'reward_id':reward_id, - 'trial_status':trial_status,'bin_size':settings['placefield']['bin_size'], - 'force_recalc':force_recalc} + process_info = { + 'input_path':Path(server_input_path).as_posix(), + 'output_path':Path(server_output_path).as_posix(), + 'settings_file': Path(server_settings_file).as_posix(), + 'session_id':session_id, + 'cueset':cueset, + 'reward_id':reward_id, + 'trial_status':trial_status, + 'bin_size':settings['placefield']['bin_size'], + 'force_recalc':force_recalc + } # check if results are already present. data_path = f"{cueset}/{reward_id}/{trial_status}/{session_id}" - if (not check_placefield_results(data_path,local_output_path)) | (force_recalc): - jobs.append(send_placefield_job(process_info, settings['server'])) + if (not check_placefield_results(data_path,Path(local_output_path))) | (force_recalc): + jobs.append(send_placefield_job(process_info, settings['server'], jobargs)) job_str = "Submitted" else: job_str = "Skipped"