From 618323feacefb042017a42d8a927fe547d0a65a6 Mon Sep 17 00:00:00 2001 From: vshekar1 Date: Thu, 6 Mar 2025 18:31:00 -0500 Subject: [PATCH 1/4] Auto-autoproc initial implementation --- daq_lib.py | 30 +++++++++++++++++++++--------- runFastDPH5.py | 28 +++++++++++++++++++++++++++- 2 files changed, 48 insertions(+), 10 deletions(-) diff --git a/daq_lib.py b/daq_lib.py index 63adf37a..b4b7e701 100644 --- a/daq_lib.py +++ b/daq_lib.py @@ -22,6 +22,7 @@ import bluesky.plan_stubs as bps import logging from utils import validation +import requests logger = logging.getLogger(__name__) try: @@ -446,6 +447,14 @@ def runDCQueue(): #maybe don't run rasters from here??? logger.info("running queue in daq server") while (1): currentRequest = db_lib.popNextRequest(daq_utils.beamline) + if (currentRequest == {}): + break + elif currentRequest is None: + gui_message("Queue contains collection requests from different proposals" + "and not using commissioning directory." + "Please remove invalid requests or switch to" + "commissioning directory to continue") + break if (getBlConfig("queueCollect") == 1): if (getBlConfig(BEAM_CHECK) == 1): waitBeam() @@ -462,14 +471,6 @@ def runDCQueue(): #maybe don't run rasters from here??? if (abort_flag): abort_flag = 0 #careful about when to reset this return - if (currentRequest == {}): - break - elif currentRequest is None: - gui_message("Queue contains collection requests from different proposals" - "and not using commissioning directory." - "Please remove invalid requests or switch to" - "commissioning directory to continue") - break logger.info("processing request " + str(time.time())) reqObj = currentRequest["request_obj"] gov_lib.set_detz_in(gov_robot, reqObj["detDist"]) @@ -753,8 +754,19 @@ def collectData(currentRequest): node = getBlConfig(nodeName) dimpleNode = getBlConfig("dimpleNode") if (daq_utils.detector_id == "EIGER-16"): + run_autoproc = 0 + try: + r = requests.get(f"{os.environ['NSLS2_API_URL']}/v1/proposal/{currentRequest['proposalID']}") + r.raise_for_status() + response = r.json()['proposal'] + if "proprietary" in response["type"].lower(): + run_autoproc = 0 + else: + run_autoproc = 1 + except Exception as e: + run_autoproc = 0 seqNum = flyer.detector.cam.sequence_id.get() - comm_s = os.environ["LSDCHOME"] + "/runFastDPH5.py " + data_directory_name + " " + str(seqNum) + " " + str(currentRequest["uid"]) + " " + str(fastEPFlag) + " " + node + " " + str(dimpleFlag) + " " + dimpleNode + " " + str(currentIspybDCID)+ "&" + comm_s = os.environ["LSDCHOME"] + "/runFastDPH5.py " + data_directory_name + " " + str(seqNum) + " " + str(currentRequest["uid"]) + " " + str(fastEPFlag) + " " + node + " " + str(dimpleFlag) + " " + dimpleNode + " " + str(currentIspybDCID)+ " " + str(run_autoproc) + " &" else: comm_s = os.environ["LSDCHOME"] + "/runFastDP.py " + data_directory_name + " " + file_prefix + " " + str(file_number_start) + " " + str(int(round(range_degrees/img_width))) + " " + str(currentRequest["uid"]) + " " + str(fastEPFlag) + " " + node + " " + str(dimpleFlag) + " " + dimpleNode + "&" logger.info(f'Running fastdp command: {comm_s}') diff --git a/runFastDPH5.py b/runFastDPH5.py index 66c845a0..f0bf446f 100755 --- a/runFastDPH5.py +++ b/runFastDPH5.py @@ -2,9 +2,10 @@ import os import sys import db_lib -from daq_utils import getBlConfig +from daq_utils import getBlConfig, setBlConfig import xmltodict import logging +import subprocess logger = logging.getLogger() logging.getLogger().setLevel(logging.INFO) handler1 = logging.FileHandler('fast_dp.txt') @@ -32,6 +33,31 @@ runDimple = int(sys.argv[6]) dimpleNode = sys.argv[7] ispybDCID = 1 #int(sys.argv[8]) +runAutoProc = int(sys.argv[9]) + +setBlConfig("auto_proc_lock", True) +try: + queue = getBlConfig("auto_proc_queue") + queue.append((directory, request_id)) + setBlConfig("auto_proc_queue", queue) +except Exception as e: + logger.exception("Could not add request to autoproc queue") +finally: + setBlConfig("auto_proc_lock", False) + +if runAutoProc: + start_in_proc = None + for proc_num in ['uranus-cpu044', 'uranus-cpu041', 'uranus-cpu021']: + if not getBlConfig(proc_num): + start_in_proc = proc_num + break + if start_in_proc: + comm_s = f"ssh {start_in_proc} \"nohup {os.environ['MXPROCESSINGSCRIPTSDIR']}autoproc.sh {start_in_proc} amx & \" " + # comm_s = f"ssh {start_in_proc} \"{os.environ['MXPROCESSINGSCRIPTSDIR']}autoproc.sh {start_in_proc} amx & \" " + logger.info(f"Initializing AUTO-AUTOPROC {comm_s} \n In ({directory}, {request_id})") + # os.system(comm_s) + subprocess.Popen(comm_s, shell=True) + comm_s = f"ssh -q {node} \"{os.environ['MXPROCESSINGSCRIPTSDIR']}fast_dp.sh {request_id} {numstart}\"" logger.info(comm_s) From e0f1aefe3323d8a406f6c0ee4f758d9e3f9849df Mon Sep 17 00:00:00 2001 From: vshekar1 Date: Thu, 6 Mar 2025 18:36:54 -0500 Subject: [PATCH 2/4] Auto-autoproc set beamline id in fastdph5.py --- runFastDPH5.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runFastDPH5.py b/runFastDPH5.py index f0bf446f..a6b60e59 100755 --- a/runFastDPH5.py +++ b/runFastDPH5.py @@ -52,7 +52,7 @@ start_in_proc = proc_num break if start_in_proc: - comm_s = f"ssh {start_in_proc} \"nohup {os.environ['MXPROCESSINGSCRIPTSDIR']}autoproc.sh {start_in_proc} amx & \" " + comm_s = f"ssh {start_in_proc} \"nohup {os.environ['MXPROCESSINGSCRIPTSDIR']}autoproc.sh {start_in_proc} {os.environ['BEAMLINE_ID']} & \" " # comm_s = f"ssh {start_in_proc} \"{os.environ['MXPROCESSINGSCRIPTSDIR']}autoproc.sh {start_in_proc} amx & \" " logger.info(f"Initializing AUTO-AUTOPROC {comm_s} \n In ({directory}, {request_id})") # os.system(comm_s) From fc50737fa2843846ca88e538c6a6c747bc2bdbf9 Mon Sep 17 00:00:00 2001 From: vshekar1 Date: Mon, 14 Jul 2025 14:23:53 -0400 Subject: [PATCH 3/4] Added autoproc blacklist and running after fastdp --- daq_lib.py | 5 ++++- runFastDPH5.py | 45 +++++++++++++++++++++------------------------ 2 files changed, 25 insertions(+), 25 deletions(-) diff --git a/daq_lib.py b/daq_lib.py index b4b7e701..c2705065 100644 --- a/daq_lib.py +++ b/daq_lib.py @@ -754,12 +754,15 @@ def collectData(currentRequest): node = getBlConfig(nodeName) dimpleNode = getBlConfig("dimpleNode") if (daq_utils.detector_id == "EIGER-16"): + with open(f"{os.environ['CONFIGDIR']}/autoproc_blacklist.json") as autoproc_blacklist_file: + autoproc_proposal_data = json.load(autoproc_blacklist_file) run_autoproc = 0 try: r = requests.get(f"{os.environ['NSLS2_API_URL']}/v1/proposal/{currentRequest['proposalID']}") r.raise_for_status() response = r.json()['proposal'] - if "proprietary" in response["type"].lower(): + if ("proprietary" in response["type"].lower() + or int(currentRequest["proposalID"]) in autoproc_proposal_data["blacklist"]): run_autoproc = 0 else: run_autoproc = 1 diff --git a/runFastDPH5.py b/runFastDPH5.py index a6b60e59..bf40c327 100755 --- a/runFastDPH5.py +++ b/runFastDPH5.py @@ -34,30 +34,7 @@ dimpleNode = sys.argv[7] ispybDCID = 1 #int(sys.argv[8]) runAutoProc = int(sys.argv[9]) - -setBlConfig("auto_proc_lock", True) -try: - queue = getBlConfig("auto_proc_queue") - queue.append((directory, request_id)) - setBlConfig("auto_proc_queue", queue) -except Exception as e: - logger.exception("Could not add request to autoproc queue") -finally: - setBlConfig("auto_proc_lock", False) - -if runAutoProc: - start_in_proc = None - for proc_num in ['uranus-cpu044', 'uranus-cpu041', 'uranus-cpu021']: - if not getBlConfig(proc_num): - start_in_proc = proc_num - break - if start_in_proc: - comm_s = f"ssh {start_in_proc} \"nohup {os.environ['MXPROCESSINGSCRIPTSDIR']}autoproc.sh {start_in_proc} {os.environ['BEAMLINE_ID']} & \" " - # comm_s = f"ssh {start_in_proc} \"{os.environ['MXPROCESSINGSCRIPTSDIR']}autoproc.sh {start_in_proc} amx & \" " - logger.info(f"Initializing AUTO-AUTOPROC {comm_s} \n In ({directory}, {request_id})") - # os.system(comm_s) - subprocess.Popen(comm_s, shell=True) - +# runAutoProc = 0 comm_s = f"ssh -q {node} \"{os.environ['MXPROCESSINGSCRIPTSDIR']}fast_dp.sh {request_id} {numstart}\"" logger.info(comm_s) @@ -81,3 +58,23 @@ comm_s = f"ssh -q {dimpleNode} \"{os.environ['MXPROCESSINGSCRIPTSDIR']}dimple.sh {request_id} {numstart}\"" logger.info(f"running dimple: {comm_s}") os.system(comm_s) + +if runAutoProc: + setBlConfig("auto_proc_lock", True) + try: + queue = getBlConfig("auto_proc_queue") + queue.append((directory, request_id)) + setBlConfig("auto_proc_queue", queue) + except Exception as e: + logger.exception("Could not add request to autoproc queue") + finally: + setBlConfig("auto_proc_lock", False) + start_in_proc = None + for proc_num in ['uranus-cpu044', 'uranus-cpu041', 'uranus-cpu021']: + if not getBlConfig(proc_num): + start_in_proc = proc_num + break + if start_in_proc: + comm_s = f"ssh {start_in_proc} \"nohup {os.environ['MXPROCESSINGSCRIPTSDIR']}autoproc.sh {start_in_proc} {os.environ['BEAMLINE_ID']} & \" " + logger.info(f"Initializing AUTO-AUTOPROC {comm_s} \n In ({directory}, {request_id})") + subprocess.Popen(comm_s, shell=True) \ No newline at end of file From bbbbb33ea0a2ba435556fddb6039de871df2a906 Mon Sep 17 00:00:00 2001 From: vshekar1 Date: Mon, 12 Jan 2026 13:11:26 -0500 Subject: [PATCH 4/4] Updated runFastDP to handle failure in fastdp execution --- runFastDPH5.py | 46 +++++++++++++++++++++++++++++----------------- 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/runFastDPH5.py b/runFastDPH5.py index bf40c327..047a5c6a 100755 --- a/runFastDPH5.py +++ b/runFastDPH5.py @@ -4,6 +4,7 @@ import db_lib from daq_utils import getBlConfig, setBlConfig import xmltodict +import json import logging import subprocess logger = logging.getLogger() @@ -36,30 +37,40 @@ runAutoProc = int(sys.argv[9]) # runAutoProc = 0 -comm_s = f"ssh -q {node} \"{os.environ['MXPROCESSINGSCRIPTSDIR']}fast_dp.sh {request_id} {numstart}\"" -logger.info(comm_s) -os.system(comm_s) +try: + comm_s = f"ssh -q {node} \"{os.environ['MXPROCESSINGSCRIPTSDIR']}fast_dp.sh {request_id} {numstart}\"" + logger.info(comm_s) + os.system(comm_s) + fastDPResultFile = runningDir+"/fast_dp.xml" + fd = open(fastDPResultFile) + resultObj = xmltodict.parse(fd.read()) + logger.info(f"finished fast_dp {request_id}") + resultID = db_lib.addResultforRequest("fastDP",request_id,owner,resultObj,beamline=os.environ["BEAMLINE_ID"]) + newResult = db_lib.getResult(resultID) + visitName = getBlConfig("visitName") +except Exception as e: + logger.error("runfastdph5 error running fastdp: %s" % e) -fastDPResultFile = runningDir+"/fast_dp.xml" -fd = open(fastDPResultFile) -resultObj = xmltodict.parse(fd.read()) -logger.info(f"finished fast_dp {request_id}") -resultID = db_lib.addResultforRequest("fastDP",request_id,owner,resultObj,beamline=os.environ["BEAMLINE_ID"]) -newResult = db_lib.getResult(resultID) -visitName = getBlConfig("visitName") try: ispybLib.insertResult(newResult,"fastDP",request,visitName,ispybDCID,fastDPResultFile) except Exception as e: logger.error("runfastdph5 insert result ispyb error: %s" % e) + if (runFastEP): os.system("fast_ep") #looks very bad! running on ca1! -if (runDimple): - dimpleComm = getBlConfig("dimpleComm") - comm_s = f"ssh -q {dimpleNode} \"{os.environ['MXPROCESSINGSCRIPTSDIR']}dimple.sh {request_id} {numstart}\"" - logger.info(f"running dimple: {comm_s}") - os.system(comm_s) +if (runDimple): + try: + dimpleComm = getBlConfig("dimpleComm") + comm_s = f"ssh -q {dimpleNode} \"{os.environ['MXPROCESSINGSCRIPTSDIR']}dimple.sh {request_id} {numstart}\"" + logger.info(f"running dimple: {comm_s}") + # os.system(comm_s) + subprocess.Popen(comm_s, shell=True) + except Exception as e: + logger.error("runfastdph5 error running dimple: %s" % e) + if runAutoProc: + logger.info("Running AUTO-AUTOPROC...") setBlConfig("auto_proc_lock", True) try: queue = getBlConfig("auto_proc_queue") @@ -70,11 +81,12 @@ finally: setBlConfig("auto_proc_lock", False) start_in_proc = None - for proc_num in ['uranus-cpu044', 'uranus-cpu041', 'uranus-cpu021']: + autoproc_processor_list = json.loads(getBlConfig("autoprocNodes")) + for proc_num in autoproc_processor_list: if not getBlConfig(proc_num): start_in_proc = proc_num break if start_in_proc: comm_s = f"ssh {start_in_proc} \"nohup {os.environ['MXPROCESSINGSCRIPTSDIR']}autoproc.sh {start_in_proc} {os.environ['BEAMLINE_ID']} & \" " logger.info(f"Initializing AUTO-AUTOPROC {comm_s} \n In ({directory}, {request_id})") - subprocess.Popen(comm_s, shell=True) \ No newline at end of file + subprocess.Popen(comm_s, shell=True)