Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions compute_horde/compute_horde/miner_client/organic.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ def total(self):
job_timing: TimingDetails | None = None
docker_run_options_preset: DockerRunOptionsPreset = "nvidia_all"
docker_run_cmd: list[str] = field(default_factory=list)
docker_run_env: dict[str, str] | None = None
total_job_timeout: int = 300 # Deprecated, use job_timing instead.
volume: Volume | None = None
output: OutputUpload | None = None
Expand Down Expand Up @@ -605,6 +606,7 @@ async def execute_organic_job_on_miner(
docker_image=job_details.docker_image,
docker_run_options_preset=job_details.docker_run_options_preset,
docker_run_cmd=job_details.docker_run_cmd,
docker_run_env=job_details.docker_run_env,
volume=None, # Was sent in the initial request
output_upload=job_details.output,
artifacts_dir=job_details.artifacts_dir,
Expand Down
1 change: 1 addition & 0 deletions compute_horde/compute_horde/protocol_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ class V0JobRequest(BaseModel):
raw_script: str | None = None
docker_run_options_preset: DockerRunOptionsPreset
docker_run_cmd: list[str]
docker_run_env: dict[str, str] | None = None
volume: Volume | None = None
output_upload: OutputUpload | None = None
artifacts_dir: str | None = None
Expand Down
6 changes: 6 additions & 0 deletions compute_horde_sdk/src/compute_horde_sdk/_internal/sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,12 @@ class ComputeHordeJobSpec:
If the limit is reached, the job will fail.
"""

def __post_init__(self) -> None:
self.env = {key.replace("\0", ""): value for key, value in self.env.items()}
for key in self.env:
if not key or "=" in key:
raise ValueError(f"{key!r} is not a valid environment variable")


class ComputeHordeJob:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,11 @@ async def get_docker_run_args(self) -> dict[str, Any]:
f"{raw_script_path.absolute().as_posix()}:/script.py"
]

docker_kwargs["Env"] = []
if self.full_job_request.docker_run_env:
for key, value in self.full_job_request.docker_run_env.items():
docker_kwargs["Env"].extend(("-e", f"{key}={value}"))

return docker_kwargs

async def get_docker_run_cmd(self) -> list[str]:
Expand Down
5 changes: 3 additions & 2 deletions local_stack/send_hello_world_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ async def main() -> None:
streaming_job.streaming_private_key,
streaming_job.streaming_server_cert
)

max_terminate_retries = 3
for attempt in range(1, max_terminate_retries + 1):
try:
Expand Down Expand Up @@ -143,7 +143,8 @@ async def main() -> None:
executor_class=ExecutorClass.always_on__llm__a6000,
job_namespace="SN123.0",
docker_image="alpine",
args=["sh", "-c", "echo 'Hello, World!' > /artifacts/stuff"],
args=["sh", "-c", 'echo "Hello, ${WHOM}!" > /artifacts/stuff'],
env={"WHOM": "World"},
artifacts_dir="/artifacts",
download_time_limit_sec=5,
execution_time_limit_sec=10,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,15 @@ async def handle_job_request(self, msg: V0JobRequest):
await self.send(GenericError(details=error_msg).model_dump_json())
return

if msg.docker_run_env:
for key in msg.docker_run_env:
clean_key = key.replace("\0", "")
if not clean_key or "=" in clean_key:
error_msg = f"{key!r} is not a valid environment variable"
logger.error(error_msg)
await self.send(GenericError(details=error_msg).model_dump_json())
return

await self.send_job_request(job.executor_token, msg)
logger.debug(f"Passing job details to executor consumer job_uuid: {msg.job_uuid}")
job.status = AcceptedJob.Status.RUNNING
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ async def streaming_ready_callback(msg: V0StreamingJobReadyRequest) -> None:
docker_image=job_request.docker_image,
docker_run_options_preset="nvidia_all" if job_request.use_gpu else "none",
docker_run_cmd=job_request.get_args(),
docker_run_env=job_request.env,
total_job_timeout=job_request.timeout
if isinstance(job_request, AdminJobRequest)
else OrganicJobDetails.total_job_timeout,
Expand Down