diff --git a/compute_horde/compute_horde/miner_client/organic.py b/compute_horde/compute_horde/miner_client/organic.py index 54f28bf73..3cac55e3f 100644 --- a/compute_horde/compute_horde/miner_client/organic.py +++ b/compute_horde/compute_horde/miner_client/organic.py @@ -493,6 +493,7 @@ def total(self): job_timing: TimingDetails | None = None docker_run_options_preset: DockerRunOptionsPreset = "nvidia_all" docker_run_cmd: list[str] = field(default_factory=list) + docker_run_env: dict[str, str] | None = None total_job_timeout: int = 300 # Deprecated, use job_timing instead. volume: Volume | None = None output: OutputUpload | None = None @@ -605,6 +606,7 @@ async def execute_organic_job_on_miner( docker_image=job_details.docker_image, docker_run_options_preset=job_details.docker_run_options_preset, docker_run_cmd=job_details.docker_run_cmd, + docker_run_env=job_details.docker_run_env, volume=None, # Was sent in the initial request output_upload=job_details.output, artifacts_dir=job_details.artifacts_dir, diff --git a/compute_horde/compute_horde/protocol_messages.py b/compute_horde/compute_horde/protocol_messages.py index 07de598eb..575c3c518 100644 --- a/compute_horde/compute_horde/protocol_messages.py +++ b/compute_horde/compute_horde/protocol_messages.py @@ -180,6 +180,7 @@ class V0JobRequest(BaseModel): raw_script: str | None = None docker_run_options_preset: DockerRunOptionsPreset docker_run_cmd: list[str] + docker_run_env: dict[str, str] | None = None volume: Volume | None = None output_upload: OutputUpload | None = None artifacts_dir: str | None = None diff --git a/compute_horde_sdk/src/compute_horde_sdk/_internal/sdk.py b/compute_horde_sdk/src/compute_horde_sdk/_internal/sdk.py index b6162d12e..b2f8e1a91 100644 --- a/compute_horde_sdk/src/compute_horde_sdk/_internal/sdk.py +++ b/compute_horde_sdk/src/compute_horde_sdk/_internal/sdk.py @@ -183,6 +183,12 @@ class ComputeHordeJobSpec: If the limit is reached, the job will fail. """ + def __post_init__(self) -> None: + self.env = {key.replace("\0", ""): value for key, value in self.env.items()} + for key in self.env: + if not key or "=" in key: + raise ValueError(f"{key!r} is not a valid environment variable") + class ComputeHordeJob: """ diff --git a/executor/app/src/compute_horde_executor/executor/job_runner.py b/executor/app/src/compute_horde_executor/executor/job_runner.py index 0ba08b353..264b00486 100644 --- a/executor/app/src/compute_horde_executor/executor/job_runner.py +++ b/executor/app/src/compute_horde_executor/executor/job_runner.py @@ -498,6 +498,11 @@ async def get_docker_run_args(self) -> dict[str, Any]: f"{raw_script_path.absolute().as_posix()}:/script.py" ] + docker_kwargs["Env"] = [] + if self.full_job_request.docker_run_env: + for key, value in self.full_job_request.docker_run_env.items(): + docker_kwargs["Env"].extend(("-e", f"{key}={value}")) + return docker_kwargs async def get_docker_run_cmd(self) -> list[str]: diff --git a/local_stack/send_hello_world_job.py b/local_stack/send_hello_world_job.py index 2972d3f38..d79ca7f7c 100644 --- a/local_stack/send_hello_world_job.py +++ b/local_stack/send_hello_world_job.py @@ -115,7 +115,7 @@ async def main() -> None: streaming_job.streaming_private_key, streaming_job.streaming_server_cert ) - + max_terminate_retries = 3 for attempt in range(1, max_terminate_retries + 1): try: @@ -143,7 +143,8 @@ async def main() -> None: executor_class=ExecutorClass.always_on__llm__a6000, job_namespace="SN123.0", docker_image="alpine", - args=["sh", "-c", "echo 'Hello, World!' > /artifacts/stuff"], + args=["sh", "-c", 'echo "Hello, ${WHOM}!" > /artifacts/stuff'], + env={"WHOM": "World"}, artifacts_dir="/artifacts", download_time_limit_sec=5, execution_time_limit_sec=10, diff --git a/miner/app/src/compute_horde_miner/miner/miner_consumer/validator_interface.py b/miner/app/src/compute_horde_miner/miner/miner_consumer/validator_interface.py index a190d8a8c..532a35df0 100644 --- a/miner/app/src/compute_horde_miner/miner/miner_consumer/validator_interface.py +++ b/miner/app/src/compute_horde_miner/miner/miner_consumer/validator_interface.py @@ -470,6 +470,15 @@ async def handle_job_request(self, msg: V0JobRequest): await self.send(GenericError(details=error_msg).model_dump_json()) return + if msg.docker_run_env: + for key in msg.docker_run_env: + clean_key = key.replace("\0", "") + if not clean_key or "=" in clean_key: + error_msg = f"{key!r} is not a valid environment variable" + logger.error(error_msg) + await self.send(GenericError(details=error_msg).model_dump_json()) + return + await self.send_job_request(job.executor_token, msg) logger.debug(f"Passing job details to executor consumer job_uuid: {msg.job_uuid}") job.status = AcceptedJob.Status.RUNNING diff --git a/validator/app/src/compute_horde_validator/validator/organic_jobs/miner_driver.py b/validator/app/src/compute_horde_validator/validator/organic_jobs/miner_driver.py index b57288e6e..a7932acfb 100644 --- a/validator/app/src/compute_horde_validator/validator/organic_jobs/miner_driver.py +++ b/validator/app/src/compute_horde_validator/validator/organic_jobs/miner_driver.py @@ -312,6 +312,7 @@ async def streaming_ready_callback(msg: V0StreamingJobReadyRequest) -> None: docker_image=job_request.docker_image, docker_run_options_preset="nvidia_all" if job_request.use_gpu else "none", docker_run_cmd=job_request.get_args(), + docker_run_env=job_request.env, total_job_timeout=job_request.timeout if isinstance(job_request, AdminJobRequest) else OrganicJobDetails.total_job_timeout,