Skip to content

Commit 541587b

Browse files
authored
Merge pull request #8362 from aldbr/9.0_fix-too-many-stamps-ssh
[9.0] feat: SSHCE export inputs/import outputs as a JSON file to handle more jobs in parallel
2 parents 4c6d7b0 + 68bd24b commit 541587b

File tree

4 files changed

+76
-50
lines changed

4 files changed

+76
-50
lines changed

src/DIRAC/Resources/Computing/BatchSystems/Condor.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -203,9 +203,6 @@ def submitJob(self, **kwargs):
203203
resultDict["Jobs"] = []
204204
for i in range(submittedJobs):
205205
resultDict["Jobs"].append(".".join([cluster, str(i)]))
206-
# Executable is transferred afterward
207-
# Inform the caller that Condor cannot delete it before the end of the execution
208-
resultDict["ExecutableToKeep"] = executable
209206
else:
210207
resultDict["Status"] = status
211208
resultDict["Message"] = error

src/DIRAC/Resources/Computing/BatchSystems/executeBatch.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,10 @@
3535
from urllib.parse import unquote as urlunquote
3636
3737
38-
arguments = sys.argv[1]
39-
inputDict = json.loads(urlunquote(arguments))
38+
# Read options from JSON file
39+
optionsFilePath = sys.argv[1]
40+
with open(optionsFilePath, 'r') as f:
41+
inputDict = json.load(f)
4042
4143
method = inputDict.pop('Method')
4244
batchSystem = inputDict.pop('BatchSystem')
@@ -45,9 +47,15 @@
4547
try:
4648
result = getattr(batch, method)(**inputDict)
4749
except Exception:
48-
result = traceback.format_exc()
49-
50-
resultJson = urlquote(json.dumps(result))
51-
print("============= Start output ===============")
52-
print(resultJson)
50+
# Wrap the traceback in a proper error structure
51+
result = {
52+
'Status': -1,
53+
'Message': 'Exception during batch method execution',
54+
'Traceback': traceback.format_exc()
55+
}
56+
57+
# Write result to JSON file
58+
resultFilePath = optionsFilePath.replace('.json', '_result.json')
59+
with open(resultFilePath, 'w') as f:
60+
json.dump(result, f)
5361
"""

src/DIRAC/Resources/Computing/LocalComputingElement.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,8 +182,6 @@ def submitJob(self, executableFile, proxy=None, numberOfJobs=1):
182182
batchSystemName = self.batchSystem.__class__.__name__.lower()
183183
jobIDs = ["ssh" + batchSystemName + "://" + self.ceName + "/" + _id for _id in resultSubmit["Jobs"]]
184184
result = S_OK(jobIDs)
185-
if "ExecutableToKeep" in resultSubmit:
186-
result["ExecutableToKeep"] = resultSubmit["ExecutableToKeep"]
187185
else:
188186
result = S_ERROR(resultSubmit["Message"])
189187

src/DIRAC/Resources/Computing/SSHComputingElement.py

Lines changed: 61 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,10 @@
6767
import os
6868
import shutil
6969
import stat
70+
import tempfile
7071
import uuid
7172
from shlex import quote as shlex_quote
72-
from urllib.parse import quote, unquote, urlparse
73+
from urllib.parse import urlparse
7374

7475
import pexpect
7576

@@ -484,47 +485,69 @@ def __executeHostCommand(self, command, options, ssh=None, host=None):
484485
options["User"] = self.user
485486
options["Queue"] = self.queue
486487

487-
options = json.dumps(options)
488-
options = quote(options)
488+
localOptionsFile = None
489+
remoteOptionsFile = None
490+
localResultFile = None
491+
remoteResultFile = None
492+
try:
493+
# Write options to a local temporary file
494+
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
495+
json.dump(options, f)
496+
localOptionsFile = f.name
497+
498+
# Upload the options file to the remote host
499+
remoteOptionsFile = f"{self.sharedArea}/batch_options_{uuid.uuid4().hex}.json"
500+
result = ssh.scpCall(30, localOptionsFile, remoteOptionsFile)
501+
if not result["OK"]:
502+
return result
489503

490-
cmd = (
491-
"bash --login -c 'python3 %s/execute_batch %s || python %s/execute_batch %s || python2 %s/execute_batch %s'"
492-
% (self.sharedArea, options, self.sharedArea, options, self.sharedArea, options)
493-
)
504+
# Execute the batch command with the options file path
505+
cmd = (
506+
f"bash --login -c 'python3 {self.sharedArea}/execute_batch {remoteOptionsFile} || "
507+
f"python {self.sharedArea}/execute_batch {remoteOptionsFile} || "
508+
f"python2 {self.sharedArea}/execute_batch {remoteOptionsFile}'"
509+
)
494510

495-
self.log.verbose(f"CE submission command: {cmd}")
511+
self.log.verbose(f"CE submission command: {cmd}")
496512

497-
result = ssh.sshCall(120, cmd)
498-
if not result["OK"]:
499-
self.log.error(f"{self.ceType} CE job submission failed", result["Message"])
500-
return result
513+
result = ssh.sshCall(120, cmd)
514+
if not result["OK"]:
515+
self.log.error(f"{self.ceType} CE job submission failed", result["Message"])
516+
return result
501517

502-
sshStatus = result["Value"][0]
503-
sshStdout = result["Value"][1]
504-
sshStderr = result["Value"][2]
505-
506-
# Examine results of the job submission
507-
if sshStatus == 0:
508-
output = sshStdout.strip().replace("\r", "").strip()
509-
if not output:
510-
return S_ERROR("No output from remote command")
511-
512-
try:
513-
index = output.index("============= Start output ===============")
514-
output = output[index + 42 :]
515-
except ValueError:
516-
return S_ERROR(f"Invalid output from remote command: {output}")
517-
518-
try:
519-
output = unquote(output)
520-
result = json.loads(output)
521-
if isinstance(result, str) and result.startswith("Exception:"):
522-
return S_ERROR(result)
523-
return S_OK(result)
524-
except Exception:
525-
return S_ERROR("Invalid return structure from job submission")
526-
else:
527-
return S_ERROR("\n".join([sshStdout, sshStderr]))
518+
sshStatus = result["Value"][0]
519+
if sshStatus != 0:
520+
sshStdout = result["Value"][1]
521+
sshStderr = result["Value"][2]
522+
return S_ERROR(f"CE job submission command failed with status {sshStatus}: {sshStdout} {sshStderr}")
523+
524+
# The result should be written to a JSON file by execute_batch
525+
# Compute the expected result file path
526+
remoteResultFile = remoteOptionsFile.replace(".json", "_result.json")
527+
528+
# Try to download the result file
529+
with tempfile.NamedTemporaryFile(mode="r", suffix=".json", delete=False) as f:
530+
localResultFile = f.name
531+
532+
result = ssh.scpCall(30, localResultFile, remoteResultFile, upload=False)
533+
if not result["OK"]:
534+
return result
535+
536+
# Read the result from the downloaded file
537+
with open(localResultFile) as f:
538+
result = json.load(f)
539+
return S_OK(result)
540+
finally:
541+
# Clean up local temporary file
542+
if localOptionsFile and os.path.exists(localOptionsFile):
543+
os.remove(localOptionsFile)
544+
if localResultFile and os.path.exists(localResultFile):
545+
os.remove(localResultFile)
546+
# Clean up remote temporary files
547+
if remoteOptionsFile:
548+
ssh.sshCall(30, f"rm -f {remoteOptionsFile}")
549+
if remoteResultFile:
550+
ssh.sshCall(30, f"rm -f {remoteResultFile}")
528551

529552
def submitJob(self, executableFile, proxy, numberOfJobs=1):
530553
# self.log.verbose( "Executable file path: %s" % executableFile )

0 commit comments

Comments
 (0)