From 502685ca2843b5570447a9e369fbef53a18eb7e3 Mon Sep 17 00:00:00 2001 From: Michal Wozniak Date: Mon, 30 Jan 2023 14:22:34 +0100 Subject: [PATCH 1/2] Use local copy of RunPolicy by MPI-operator Steps performed: - copy the `RunPolicy` from common to `types.go` - fix compilation errors by using the local RunPolicy definition - run `make generate` - run `make all` - regenerate openapi_generated.go by `./hack/python-sdk/gen-sdk.sh` (with commented out rollback) --- pkg/apis/kubeflow/v2beta1/default.go | 2 +- pkg/apis/kubeflow/v2beta1/default_test.go | 10 +- .../kubeflow/v2beta1/openapi_generated.go | 54 +++- pkg/apis/kubeflow/v2beta1/swagger.json | 31 ++- pkg/apis/kubeflow/v2beta1/types.go | 30 ++- .../kubeflow/v2beta1/zz_generated.deepcopy.go | 41 +++ pkg/apis/kubeflow/validation/validation.go | 2 +- .../kubeflow/validation/validation_test.go | 13 +- pkg/controller/mpi_job_controller_test.go | 4 +- sdk/python/v2beta1/README.md | 1 + sdk/python/v2beta1/docs/V2beta1MPIJobSpec.md | 2 +- sdk/python/v2beta1/docs/V2beta1RunPolicy.md | 16 ++ sdk/python/v2beta1/mpijob/__init__.py | 1 + sdk/python/v2beta1/mpijob/models/__init__.py | 1 + .../mpijob/models/v2beta1_mpi_job_spec.py | 6 +- .../mpijob/models/v2beta1_run_policy.py | 240 ++++++++++++++++++ .../v2beta1/test/test_v2beta1_run_policy.py | 62 +++++ test/integration/mpi_job_controller_test.go | 4 +- 18 files changed, 495 insertions(+), 25 deletions(-) create mode 100644 sdk/python/v2beta1/docs/V2beta1RunPolicy.md create mode 100644 sdk/python/v2beta1/mpijob/models/v2beta1_run_policy.py create mode 100644 sdk/python/v2beta1/test/test_v2beta1_run_policy.py diff --git a/pkg/apis/kubeflow/v2beta1/default.go b/pkg/apis/kubeflow/v2beta1/default.go index 4a3adfb68..d93618ea5 100644 --- a/pkg/apis/kubeflow/v2beta1/default.go +++ b/pkg/apis/kubeflow/v2beta1/default.go @@ -49,7 +49,7 @@ func setDefaultsTypeWorker(spec *common.ReplicaSpec) { } } -func setDefaultsRunPolicy(policy *common.RunPolicy) { +func setDefaultsRunPolicy(policy *RunPolicy) { if policy.CleanPodPolicy == nil { policy.CleanPodPolicy = newCleanPodPolicy(common.CleanPodPolicyNone) } diff --git a/pkg/apis/kubeflow/v2beta1/default_test.go b/pkg/apis/kubeflow/v2beta1/default_test.go index 8f434b39a..df831d3d1 100644 --- a/pkg/apis/kubeflow/v2beta1/default_test.go +++ b/pkg/apis/kubeflow/v2beta1/default_test.go @@ -30,7 +30,7 @@ func TestSetDefaults_MPIJob(t *testing.T) { want: MPIJob{ Spec: MPIJobSpec{ SlotsPerWorker: newInt32(1), - RunPolicy: common.RunPolicy{ + RunPolicy: RunPolicy{ CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyNone), }, SSHAuthMountPath: "/root/.ssh", @@ -42,7 +42,7 @@ func TestSetDefaults_MPIJob(t *testing.T) { job: MPIJob{ Spec: MPIJobSpec{ SlotsPerWorker: newInt32(10), - RunPolicy: common.RunPolicy{ + RunPolicy: RunPolicy{ CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), TTLSecondsAfterFinished: newInt32(2), ActiveDeadlineSeconds: newInt64(3), @@ -55,7 +55,7 @@ func TestSetDefaults_MPIJob(t *testing.T) { want: MPIJob{ Spec: MPIJobSpec{ SlotsPerWorker: newInt32(10), - RunPolicy: common.RunPolicy{ + RunPolicy: RunPolicy{ CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), TTLSecondsAfterFinished: newInt32(2), ActiveDeadlineSeconds: newInt64(3), @@ -77,7 +77,7 @@ func TestSetDefaults_MPIJob(t *testing.T) { want: MPIJob{ Spec: MPIJobSpec{ SlotsPerWorker: newInt32(1), - RunPolicy: common.RunPolicy{ + RunPolicy: RunPolicy{ CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyNone), }, SSHAuthMountPath: "/root/.ssh", @@ -102,7 +102,7 @@ func TestSetDefaults_MPIJob(t *testing.T) { want: MPIJob{ Spec: MPIJobSpec{ SlotsPerWorker: newInt32(1), - RunPolicy: common.RunPolicy{ + RunPolicy: RunPolicy{ CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyNone), }, SSHAuthMountPath: "/root/.ssh", diff --git a/pkg/apis/kubeflow/v2beta1/openapi_generated.go b/pkg/apis/kubeflow/v2beta1/openapi_generated.go index b0f433c6d..d4810f2e0 100644 --- a/pkg/apis/kubeflow/v2beta1/openapi_generated.go +++ b/pkg/apis/kubeflow/v2beta1/openapi_generated.go @@ -37,6 +37,7 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1.MPIJob": schema_pkg_apis_kubeflow_v2beta1_MPIJob(ref), "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1.MPIJobList": schema_pkg_apis_kubeflow_v2beta1_MPIJobList(ref), "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1.MPIJobSpec": schema_pkg_apis_kubeflow_v2beta1_MPIJobSpec(ref), + "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1.RunPolicy": schema_pkg_apis_kubeflow_v2beta1_RunPolicy(ref), } } @@ -460,7 +461,7 @@ func schema_pkg_apis_kubeflow_v2beta1_MPIJobSpec(ref common.ReferenceCallback) c SchemaProps: spec.SchemaProps{ Description: "RunPolicy encapsulates various runtime policies of the job.", Default: map[string]interface{}{}, - Ref: ref("github.com/kubeflow/common/pkg/apis/common/v1.RunPolicy"), + Ref: ref("github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1.RunPolicy"), }, }, "mpiReplicaSpecs": { @@ -496,6 +497,55 @@ func schema_pkg_apis_kubeflow_v2beta1_MPIJobSpec(ref common.ReferenceCallback) c }, }, Dependencies: []string{ - "github.com/kubeflow/common/pkg/apis/common/v1.ReplicaSpec", "github.com/kubeflow/common/pkg/apis/common/v1.RunPolicy"}, + "github.com/kubeflow/common/pkg/apis/common/v1.ReplicaSpec", "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1.RunPolicy"}, + } +} + +func schema_pkg_apis_kubeflow_v2beta1_RunPolicy(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "RunPolicy encapsulates various runtime policies of the distributed training job, for example how to clean up resources and how long the job can stay active.", + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "cleanPodPolicy": { + SchemaProps: spec.SchemaProps{ + Description: "CleanPodPolicy defines the policy to kill pods after the job completes. Default to Running.", + Type: []string{"string"}, + Format: "", + }, + }, + "ttlSecondsAfterFinished": { + SchemaProps: spec.SchemaProps{ + Description: "TTLSecondsAfterFinished is the TTL to clean up jobs. It may take extra ReconcilePeriod seconds for the cleanup, since reconcile gets called periodically. Default to infinite.", + Type: []string{"integer"}, + Format: "int32", + }, + }, + "activeDeadlineSeconds": { + SchemaProps: spec.SchemaProps{ + Description: "Specifies the duration in seconds relative to the startTime that the job may be active before the system tries to terminate it; value must be positive integer.", + Type: []string{"integer"}, + Format: "int64", + }, + }, + "backoffLimit": { + SchemaProps: spec.SchemaProps{ + Description: "Optional number of retries before marking this job failed.", + Type: []string{"integer"}, + Format: "int32", + }, + }, + "schedulingPolicy": { + SchemaProps: spec.SchemaProps{ + Description: "SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling", + Ref: ref("github.com/kubeflow/common/pkg/apis/common/v1.SchedulingPolicy"), + }, + }, + }, + }, + }, + Dependencies: []string{ + "github.com/kubeflow/common/pkg/apis/common/v1.SchedulingPolicy"}, } } diff --git a/pkg/apis/kubeflow/v2beta1/swagger.json b/pkg/apis/kubeflow/v2beta1/swagger.json index 4aaeb81e8..ea4dbfd78 100644 --- a/pkg/apis/kubeflow/v2beta1/swagger.json +++ b/pkg/apis/kubeflow/v2beta1/swagger.json @@ -260,7 +260,7 @@ "runPolicy": { "description": "RunPolicy encapsulates various runtime policies of the job.", "default": {}, - "$ref": "#/definitions/v1.RunPolicy" + "$ref": "#/definitions/v2beta1.RunPolicy" }, "slotsPerWorker": { "description": "Specifies the number of slots per worker used in hostfile. Defaults to 1.", @@ -272,6 +272,35 @@ "type": "string" } } + }, + "v2beta1.RunPolicy": { + "description": "RunPolicy encapsulates various runtime policies of the distributed training job, for example how to clean up resources and how long the job can stay active.", + "type": "object", + "properties": { + "activeDeadlineSeconds": { + "description": "Specifies the duration in seconds relative to the startTime that the job may be active before the system tries to terminate it; value must be positive integer.", + "type": "integer", + "format": "int64" + }, + "backoffLimit": { + "description": "Optional number of retries before marking this job failed.", + "type": "integer", + "format": "int32" + }, + "cleanPodPolicy": { + "description": "CleanPodPolicy defines the policy to kill pods after the job completes. Default to Running.", + "type": "string" + }, + "schedulingPolicy": { + "description": "SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling", + "$ref": "#/definitions/v1.SchedulingPolicy" + }, + "ttlSecondsAfterFinished": { + "description": "TTLSecondsAfterFinished is the TTL to clean up jobs. It may take extra ReconcilePeriod seconds for the cleanup, since reconcile gets called periodically. Default to infinite.", + "type": "integer", + "format": "int32" + } + } } } } diff --git a/pkg/apis/kubeflow/v2beta1/types.go b/pkg/apis/kubeflow/v2beta1/types.go index c1d37528c..e6af4d7b6 100644 --- a/pkg/apis/kubeflow/v2beta1/types.go +++ b/pkg/apis/kubeflow/v2beta1/types.go @@ -37,6 +37,34 @@ type MPIJobList struct { Items []MPIJob `json:"items"` } +// RunPolicy encapsulates various runtime policies of the distributed training +// job, for example how to clean up resources and how long the job can stay +// active. +type RunPolicy struct { + // CleanPodPolicy defines the policy to kill pods after the job completes. + // Default to Running. + CleanPodPolicy *common.CleanPodPolicy `json:"cleanPodPolicy,omitempty"` + + // TTLSecondsAfterFinished is the TTL to clean up jobs. + // It may take extra ReconcilePeriod seconds for the cleanup, since + // reconcile gets called periodically. + // Default to infinite. + TTLSecondsAfterFinished *int32 `json:"ttlSecondsAfterFinished,omitempty"` + + // Specifies the duration in seconds relative to the startTime that the job may be active + // before the system tries to terminate it; value must be positive integer. + // +optional + ActiveDeadlineSeconds *int64 `json:"activeDeadlineSeconds,omitempty"` + + // Optional number of retries before marking this job failed. + // +optional + BackoffLimit *int32 `json:"backoffLimit,omitempty"` + + // SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling + // +optional + SchedulingPolicy *common.SchedulingPolicy `json:"schedulingPolicy,omitempty"` +} + type MPIJobSpec struct { // Specifies the number of slots per worker used in hostfile. @@ -46,7 +74,7 @@ type MPIJobSpec struct { SlotsPerWorker *int32 `json:"slotsPerWorker,omitempty"` // RunPolicy encapsulates various runtime policies of the job. - RunPolicy common.RunPolicy `json:"runPolicy,omitempty"` + RunPolicy RunPolicy `json:"runPolicy,omitempty"` // MPIReplicaSpecs contains maps from `MPIReplicaType` to `ReplicaSpec` that // specify the MPI replicas to run. diff --git a/pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go b/pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go index b1f8d21be..119af521b 100644 --- a/pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go +++ b/pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go @@ -121,3 +121,44 @@ func (in *MPIJobSpec) DeepCopy() *MPIJobSpec { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RunPolicy) DeepCopyInto(out *RunPolicy) { + *out = *in + if in.CleanPodPolicy != nil { + in, out := &in.CleanPodPolicy, &out.CleanPodPolicy + *out = new(v1.CleanPodPolicy) + **out = **in + } + if in.TTLSecondsAfterFinished != nil { + in, out := &in.TTLSecondsAfterFinished, &out.TTLSecondsAfterFinished + *out = new(int32) + **out = **in + } + if in.ActiveDeadlineSeconds != nil { + in, out := &in.ActiveDeadlineSeconds, &out.ActiveDeadlineSeconds + *out = new(int64) + **out = **in + } + if in.BackoffLimit != nil { + in, out := &in.BackoffLimit, &out.BackoffLimit + *out = new(int32) + **out = **in + } + if in.SchedulingPolicy != nil { + in, out := &in.SchedulingPolicy, &out.SchedulingPolicy + *out = new(v1.SchedulingPolicy) + (*in).DeepCopyInto(*out) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RunPolicy. +func (in *RunPolicy) DeepCopy() *RunPolicy { + if in == nil { + return nil + } + out := new(RunPolicy) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/apis/kubeflow/validation/validation.go b/pkg/apis/kubeflow/validation/validation.go index 407fbe858..68149dfff 100644 --- a/pkg/apis/kubeflow/validation/validation.go +++ b/pkg/apis/kubeflow/validation/validation.go @@ -81,7 +81,7 @@ func validateMPIJobSpec(spec *kubeflow.MPIJobSpec, path *field.Path) field.Error return errs } -func validateRunPolicy(policy *common.RunPolicy, path *field.Path) field.ErrorList { +func validateRunPolicy(policy *kubeflow.RunPolicy, path *field.Path) field.ErrorList { var errs field.ErrorList if policy.CleanPodPolicy == nil { errs = append(errs, field.Required(path.Child("cleanPodPolicy"), "must have clean Pod policy")) diff --git a/pkg/apis/kubeflow/validation/validation_test.go b/pkg/apis/kubeflow/validation/validation_test.go index 0df03559a..27b9a3c32 100644 --- a/pkg/apis/kubeflow/validation/validation_test.go +++ b/pkg/apis/kubeflow/validation/validation_test.go @@ -21,6 +21,7 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" common "github.com/kubeflow/common/pkg/apis/common/v1" "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1" + kubeflow "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/validation/field" @@ -38,7 +39,7 @@ func TestValidateMPIJob(t *testing.T) { }, Spec: v2beta1.MPIJobSpec{ SlotsPerWorker: newInt32(2), - RunPolicy: common.RunPolicy{ + RunPolicy: kubeflow.RunPolicy{ CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), }, SSHAuthMountPath: "/home/mpiuser/.ssh", @@ -64,7 +65,7 @@ func TestValidateMPIJob(t *testing.T) { }, Spec: v2beta1.MPIJobSpec{ SlotsPerWorker: newInt32(2), - RunPolicy: common.RunPolicy{ + RunPolicy: kubeflow.RunPolicy{ CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), }, SSHAuthMountPath: "/home/mpiuser/.ssh", @@ -127,7 +128,7 @@ func TestValidateMPIJob(t *testing.T) { }, Spec: v2beta1.MPIJobSpec{ SlotsPerWorker: newInt32(2), - RunPolicy: common.RunPolicy{ + RunPolicy: kubeflow.RunPolicy{ CleanPodPolicy: newCleanPodPolicy("unknown"), TTLSecondsAfterFinished: newInt32(-1), ActiveDeadlineSeconds: newInt64(-1), @@ -191,7 +192,7 @@ func TestValidateMPIJob(t *testing.T) { }, Spec: v2beta1.MPIJobSpec{ SlotsPerWorker: newInt32(2), - RunPolicy: common.RunPolicy{ + RunPolicy: kubeflow.RunPolicy{ CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), }, SSHAuthMountPath: "/root/.ssh", @@ -213,7 +214,7 @@ func TestValidateMPIJob(t *testing.T) { }, Spec: v2beta1.MPIJobSpec{ SlotsPerWorker: newInt32(2), - RunPolicy: common.RunPolicy{ + RunPolicy: kubeflow.RunPolicy{ CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), }, SSHAuthMountPath: "/root/.ssh", @@ -258,7 +259,7 @@ func TestValidateMPIJob(t *testing.T) { }, Spec: v2beta1.MPIJobSpec{ SlotsPerWorker: newInt32(2), - RunPolicy: common.RunPolicy{ + RunPolicy: kubeflow.RunPolicy{ CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), }, SSHAuthMountPath: "/root/.ssh", diff --git a/pkg/controller/mpi_job_controller_test.go b/pkg/controller/mpi_job_controller_test.go index 5671e63e5..b38dcecd0 100644 --- a/pkg/controller/mpi_job_controller_test.go +++ b/pkg/controller/mpi_job_controller_test.go @@ -96,7 +96,7 @@ func newMPIJobCommon(name string, startTime, completionTime *metav1.Time) *kubef Namespace: metav1.NamespaceDefault, }, Spec: kubeflow.MPIJobSpec{ - RunPolicy: common.RunPolicy{ + RunPolicy: kubeflow.RunPolicy{ CleanPodPolicy: &cleanPodPolicyAll, }, MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{ @@ -1091,7 +1091,7 @@ func TestNewLauncherAndWorker(t *testing.T) { SSHAuthMountPath: "/home/mpiuser/.ssh", SlotsPerWorker: newInt32(5), MPIImplementation: kubeflow.MPIImplementationIntel, - RunPolicy: common.RunPolicy{ + RunPolicy: kubeflow.RunPolicy{ TTLSecondsAfterFinished: newInt32(1), ActiveDeadlineSeconds: newInt64(2), BackoffLimit: newInt32(3), diff --git a/sdk/python/v2beta1/README.md b/sdk/python/v2beta1/README.md index 381b7f458..bc6a42007 100644 --- a/sdk/python/v2beta1/README.md +++ b/sdk/python/v2beta1/README.md @@ -73,6 +73,7 @@ Class | Method | HTTP request | Description - [V2beta1MPIJob](docs/V2beta1MPIJob.md) - [V2beta1MPIJobList](docs/V2beta1MPIJobList.md) - [V2beta1MPIJobSpec](docs/V2beta1MPIJobSpec.md) + - [V2beta1RunPolicy](docs/V2beta1RunPolicy.md) ## Documentation For Authorization diff --git a/sdk/python/v2beta1/docs/V2beta1MPIJobSpec.md b/sdk/python/v2beta1/docs/V2beta1MPIJobSpec.md index fc32dade9..122791b19 100644 --- a/sdk/python/v2beta1/docs/V2beta1MPIJobSpec.md +++ b/sdk/python/v2beta1/docs/V2beta1MPIJobSpec.md @@ -6,7 +6,7 @@ Name | Type | Description | Notes ------------ | ------------- | ------------- | ------------- **mpi_implementation** | **str** | MPIImplementation is the MPI implementation. Options are \"OpenMPI\" (default) and \"Intel\". | [optional] **mpi_replica_specs** | [**dict(str, V1ReplicaSpec)**](V1ReplicaSpec.md) | MPIReplicaSpecs contains maps from `MPIReplicaType` to `ReplicaSpec` that specify the MPI replicas to run. | -**run_policy** | [**V1RunPolicy**](V1RunPolicy.md) | | [optional] +**run_policy** | [**V2beta1RunPolicy**](V2beta1RunPolicy.md) | | [optional] **slots_per_worker** | **int** | Specifies the number of slots per worker used in hostfile. Defaults to 1. | [optional] **ssh_auth_mount_path** | **str** | SSHAuthMountPath is the directory where SSH keys are mounted. Defaults to \"/root/.ssh\". | [optional] diff --git a/sdk/python/v2beta1/docs/V2beta1RunPolicy.md b/sdk/python/v2beta1/docs/V2beta1RunPolicy.md new file mode 100644 index 000000000..2e98ceb79 --- /dev/null +++ b/sdk/python/v2beta1/docs/V2beta1RunPolicy.md @@ -0,0 +1,16 @@ +# V2beta1RunPolicy + +RunPolicy encapsulates various runtime policies of the distributed training job, for example how to clean up resources and how long the job can stay active. + +## Properties +Name | Type | Description | Notes +------------ | ------------- | ------------- | ------------- +**active_deadline_seconds** | **int** | Specifies the duration in seconds relative to the startTime that the job may be active before the system tries to terminate it; value must be positive integer. | [optional] +**backoff_limit** | **int** | Optional number of retries before marking this job failed. | [optional] +**clean_pod_policy** | **str** | CleanPodPolicy defines the policy to kill pods after the job completes. Default to Running. | [optional] +**scheduling_policy** | [**V1SchedulingPolicy**](V1SchedulingPolicy.md) | | [optional] +**ttl_seconds_after_finished** | **int** | TTLSecondsAfterFinished is the TTL to clean up jobs. It may take extra ReconcilePeriod seconds for the cleanup, since reconcile gets called periodically. Default to infinite. | [optional] + +[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) + + diff --git a/sdk/python/v2beta1/mpijob/__init__.py b/sdk/python/v2beta1/mpijob/__init__.py index da2ae1c99..eff18d173 100644 --- a/sdk/python/v2beta1/mpijob/__init__.py +++ b/sdk/python/v2beta1/mpijob/__init__.py @@ -37,4 +37,5 @@ from mpijob.models.v2beta1_mpi_job import V2beta1MPIJob from mpijob.models.v2beta1_mpi_job_list import V2beta1MPIJobList from mpijob.models.v2beta1_mpi_job_spec import V2beta1MPIJobSpec +from mpijob.models.v2beta1_run_policy import V2beta1RunPolicy diff --git a/sdk/python/v2beta1/mpijob/models/__init__.py b/sdk/python/v2beta1/mpijob/models/__init__.py index 26d5a1f10..2754302dc 100644 --- a/sdk/python/v2beta1/mpijob/models/__init__.py +++ b/sdk/python/v2beta1/mpijob/models/__init__.py @@ -23,3 +23,4 @@ from mpijob.models.v2beta1_mpi_job import V2beta1MPIJob from mpijob.models.v2beta1_mpi_job_list import V2beta1MPIJobList from mpijob.models.v2beta1_mpi_job_spec import V2beta1MPIJobSpec +from mpijob.models.v2beta1_run_policy import V2beta1RunPolicy diff --git a/sdk/python/v2beta1/mpijob/models/v2beta1_mpi_job_spec.py b/sdk/python/v2beta1/mpijob/models/v2beta1_mpi_job_spec.py index 7a0527a5f..098e0c061 100644 --- a/sdk/python/v2beta1/mpijob/models/v2beta1_mpi_job_spec.py +++ b/sdk/python/v2beta1/mpijob/models/v2beta1_mpi_job_spec.py @@ -35,7 +35,7 @@ class V2beta1MPIJobSpec(object): openapi_types = { 'mpi_implementation': 'str', 'mpi_replica_specs': 'dict(str, V1ReplicaSpec)', - 'run_policy': 'V1RunPolicy', + 'run_policy': 'V2beta1RunPolicy', 'slots_per_worker': 'int', 'ssh_auth_mount_path': 'str' } @@ -125,7 +125,7 @@ def run_policy(self): :return: The run_policy of this V2beta1MPIJobSpec. # noqa: E501 - :rtype: V1RunPolicy + :rtype: V2beta1RunPolicy """ return self._run_policy @@ -135,7 +135,7 @@ def run_policy(self, run_policy): :param run_policy: The run_policy of this V2beta1MPIJobSpec. # noqa: E501 - :type run_policy: V1RunPolicy + :type run_policy: V2beta1RunPolicy """ self._run_policy = run_policy diff --git a/sdk/python/v2beta1/mpijob/models/v2beta1_run_policy.py b/sdk/python/v2beta1/mpijob/models/v2beta1_run_policy.py new file mode 100644 index 000000000..403a02a3d --- /dev/null +++ b/sdk/python/v2beta1/mpijob/models/v2beta1_run_policy.py @@ -0,0 +1,240 @@ +# coding: utf-8 + +""" + mpijob + + Python SDK for MPI-Operator # noqa: E501 + + The version of the OpenAPI document: v2beta1 + Generated by: https://openapi-generator.tech +""" + + +import inspect +import pprint +import re # noqa: F401 +import six + +from mpijob.configuration import Configuration + + +class V2beta1RunPolicy(object): + """NOTE: This class is auto generated by OpenAPI Generator. + Ref: https://openapi-generator.tech + + Do not edit the class manually. + """ + + """ + Attributes: + openapi_types (dict): The key is attribute name + and the value is attribute type. + attribute_map (dict): The key is attribute name + and the value is json key in definition. + """ + openapi_types = { + 'active_deadline_seconds': 'int', + 'backoff_limit': 'int', + 'clean_pod_policy': 'str', + 'scheduling_policy': 'V1SchedulingPolicy', + 'ttl_seconds_after_finished': 'int' + } + + attribute_map = { + 'active_deadline_seconds': 'activeDeadlineSeconds', + 'backoff_limit': 'backoffLimit', + 'clean_pod_policy': 'cleanPodPolicy', + 'scheduling_policy': 'schedulingPolicy', + 'ttl_seconds_after_finished': 'ttlSecondsAfterFinished' + } + + def __init__(self, active_deadline_seconds=None, backoff_limit=None, clean_pod_policy=None, scheduling_policy=None, ttl_seconds_after_finished=None, local_vars_configuration=None): # noqa: E501 + """V2beta1RunPolicy - a model defined in OpenAPI""" # noqa: E501 + if local_vars_configuration is None: + local_vars_configuration = Configuration.get_default_copy() + self.local_vars_configuration = local_vars_configuration + + self._active_deadline_seconds = None + self._backoff_limit = None + self._clean_pod_policy = None + self._scheduling_policy = None + self._ttl_seconds_after_finished = None + self.discriminator = None + + if active_deadline_seconds is not None: + self.active_deadline_seconds = active_deadline_seconds + if backoff_limit is not None: + self.backoff_limit = backoff_limit + if clean_pod_policy is not None: + self.clean_pod_policy = clean_pod_policy + if scheduling_policy is not None: + self.scheduling_policy = scheduling_policy + if ttl_seconds_after_finished is not None: + self.ttl_seconds_after_finished = ttl_seconds_after_finished + + @property + def active_deadline_seconds(self): + """Gets the active_deadline_seconds of this V2beta1RunPolicy. # noqa: E501 + + Specifies the duration in seconds relative to the startTime that the job may be active before the system tries to terminate it; value must be positive integer. # noqa: E501 + + :return: The active_deadline_seconds of this V2beta1RunPolicy. # noqa: E501 + :rtype: int + """ + return self._active_deadline_seconds + + @active_deadline_seconds.setter + def active_deadline_seconds(self, active_deadline_seconds): + """Sets the active_deadline_seconds of this V2beta1RunPolicy. + + Specifies the duration in seconds relative to the startTime that the job may be active before the system tries to terminate it; value must be positive integer. # noqa: E501 + + :param active_deadline_seconds: The active_deadline_seconds of this V2beta1RunPolicy. # noqa: E501 + :type active_deadline_seconds: int + """ + + self._active_deadline_seconds = active_deadline_seconds + + @property + def backoff_limit(self): + """Gets the backoff_limit of this V2beta1RunPolicy. # noqa: E501 + + Optional number of retries before marking this job failed. # noqa: E501 + + :return: The backoff_limit of this V2beta1RunPolicy. # noqa: E501 + :rtype: int + """ + return self._backoff_limit + + @backoff_limit.setter + def backoff_limit(self, backoff_limit): + """Sets the backoff_limit of this V2beta1RunPolicy. + + Optional number of retries before marking this job failed. # noqa: E501 + + :param backoff_limit: The backoff_limit of this V2beta1RunPolicy. # noqa: E501 + :type backoff_limit: int + """ + + self._backoff_limit = backoff_limit + + @property + def clean_pod_policy(self): + """Gets the clean_pod_policy of this V2beta1RunPolicy. # noqa: E501 + + CleanPodPolicy defines the policy to kill pods after the job completes. Default to Running. # noqa: E501 + + :return: The clean_pod_policy of this V2beta1RunPolicy. # noqa: E501 + :rtype: str + """ + return self._clean_pod_policy + + @clean_pod_policy.setter + def clean_pod_policy(self, clean_pod_policy): + """Sets the clean_pod_policy of this V2beta1RunPolicy. + + CleanPodPolicy defines the policy to kill pods after the job completes. Default to Running. # noqa: E501 + + :param clean_pod_policy: The clean_pod_policy of this V2beta1RunPolicy. # noqa: E501 + :type clean_pod_policy: str + """ + + self._clean_pod_policy = clean_pod_policy + + @property + def scheduling_policy(self): + """Gets the scheduling_policy of this V2beta1RunPolicy. # noqa: E501 + + + :return: The scheduling_policy of this V2beta1RunPolicy. # noqa: E501 + :rtype: V1SchedulingPolicy + """ + return self._scheduling_policy + + @scheduling_policy.setter + def scheduling_policy(self, scheduling_policy): + """Sets the scheduling_policy of this V2beta1RunPolicy. + + + :param scheduling_policy: The scheduling_policy of this V2beta1RunPolicy. # noqa: E501 + :type scheduling_policy: V1SchedulingPolicy + """ + + self._scheduling_policy = scheduling_policy + + @property + def ttl_seconds_after_finished(self): + """Gets the ttl_seconds_after_finished of this V2beta1RunPolicy. # noqa: E501 + + TTLSecondsAfterFinished is the TTL to clean up jobs. It may take extra ReconcilePeriod seconds for the cleanup, since reconcile gets called periodically. Default to infinite. # noqa: E501 + + :return: The ttl_seconds_after_finished of this V2beta1RunPolicy. # noqa: E501 + :rtype: int + """ + return self._ttl_seconds_after_finished + + @ttl_seconds_after_finished.setter + def ttl_seconds_after_finished(self, ttl_seconds_after_finished): + """Sets the ttl_seconds_after_finished of this V2beta1RunPolicy. + + TTLSecondsAfterFinished is the TTL to clean up jobs. It may take extra ReconcilePeriod seconds for the cleanup, since reconcile gets called periodically. Default to infinite. # noqa: E501 + + :param ttl_seconds_after_finished: The ttl_seconds_after_finished of this V2beta1RunPolicy. # noqa: E501 + :type ttl_seconds_after_finished: int + """ + + self._ttl_seconds_after_finished = ttl_seconds_after_finished + + def to_dict(self, serialize=False): + """Returns the model properties as a dict""" + result = {} + + def convert(x): + if hasattr(x, "to_dict"): + args = inspect.getargspec(x.to_dict).args + if len(args) == 1: + return x.to_dict() + else: + return x.to_dict(serialize) + else: + return x + + for attr, _ in six.iteritems(self.openapi_types): + value = getattr(self, attr) + attr = self.attribute_map.get(attr, attr) if serialize else attr + if isinstance(value, list): + result[attr] = list(map( + lambda x: convert(x), + value + )) + elif isinstance(value, dict): + result[attr] = dict(map( + lambda item: (item[0], convert(item[1])), + value.items() + )) + else: + result[attr] = convert(value) + + return result + + def to_str(self): + """Returns the string representation of the model""" + return pprint.pformat(self.to_dict()) + + def __repr__(self): + """For `print` and `pprint`""" + return self.to_str() + + def __eq__(self, other): + """Returns true if both objects are equal""" + if not isinstance(other, V2beta1RunPolicy): + return False + + return self.to_dict() == other.to_dict() + + def __ne__(self, other): + """Returns true if both objects are not equal""" + if not isinstance(other, V2beta1RunPolicy): + return True + + return self.to_dict() != other.to_dict() diff --git a/sdk/python/v2beta1/test/test_v2beta1_run_policy.py b/sdk/python/v2beta1/test/test_v2beta1_run_policy.py new file mode 100644 index 000000000..97a76b8f3 --- /dev/null +++ b/sdk/python/v2beta1/test/test_v2beta1_run_policy.py @@ -0,0 +1,62 @@ +# coding: utf-8 + +""" + mpijob + + Python SDK for MPI-Operator # noqa: E501 + + The version of the OpenAPI document: v2beta1 + Generated by: https://openapi-generator.tech +""" + + +from __future__ import absolute_import + +import unittest +import datetime + +import mpijob +from mpijob.models.v2beta1_run_policy import V2beta1RunPolicy # noqa: E501 +from mpijob.rest import ApiException + +class TestV2beta1RunPolicy(unittest.TestCase): + """V2beta1RunPolicy unit test stubs""" + + def setUp(self): + pass + + def tearDown(self): + pass + + def make_instance(self, include_optional): + """Test V2beta1RunPolicy + include_option is a boolean, when False only required + params are included, when True both required and + optional params are included """ + # model = mpijob.models.v2beta1_run_policy.V2beta1RunPolicy() # noqa: E501 + if include_optional : + return V2beta1RunPolicy( + active_deadline_seconds = 56, + backoff_limit = 56, + clean_pod_policy = '', + scheduling_policy = mpijob.models.v1/scheduling_policy.v1.SchedulingPolicy( + min_available = 56, + min_resources = { + 'key' : None + }, + priority_class = '', + queue = '', + schedule_timeout_seconds = 56, ), + ttl_seconds_after_finished = 56 + ) + else : + return V2beta1RunPolicy( + ) + + def testV2beta1RunPolicy(self): + """Test V2beta1RunPolicy""" + inst_req_only = self.make_instance(include_optional=False) + inst_req_and_optional = self.make_instance(include_optional=True) + +if __name__ == '__main__': + unittest.main() diff --git a/test/integration/mpi_job_controller_test.go b/test/integration/mpi_job_controller_test.go index d35ab5493..f3328e760 100644 --- a/test/integration/mpi_job_controller_test.go +++ b/test/integration/mpi_job_controller_test.go @@ -56,7 +56,7 @@ func TestMPIJobSuccess(t *testing.T) { }, Spec: kubeflow.MPIJobSpec{ SlotsPerWorker: newInt32(1), - RunPolicy: common.RunPolicy{ + RunPolicy: kubeflow.RunPolicy{ CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), }, MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{ @@ -182,7 +182,7 @@ func TestMPIJobFailure(t *testing.T) { }, Spec: kubeflow.MPIJobSpec{ SlotsPerWorker: newInt32(1), - RunPolicy: common.RunPolicy{ + RunPolicy: kubeflow.RunPolicy{ CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), }, MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{ From 44db325e3f7a4dc660b23cfef0b2421a07b13ac8 Mon Sep 17 00:00:00 2001 From: Michal Wozniak Date: Mon, 30 Jan 2023 15:03:30 +0100 Subject: [PATCH 2/2] Copy SchedulingPolicy and CleanPodPolicy for RunPolicy --- pkg/apis/kubeflow/v2beta1/default.go | 4 +- pkg/apis/kubeflow/v2beta1/default_test.go | 10 +- .../kubeflow/v2beta1/openapi_generated.go | 78 +++++- pkg/apis/kubeflow/v2beta1/swagger.json | 29 ++- pkg/apis/kubeflow/v2beta1/types.go | 25 +- .../kubeflow/v2beta1/zz_generated.deepcopy.go | 43 +++- pkg/apis/kubeflow/validation/validation.go | 6 +- .../kubeflow/validation/validation_test.go | 81 +++--- pkg/controller/mpi_job_controller.go | 6 +- pkg/controller/mpi_job_controller_test.go | 2 +- sdk/python/v2beta1/README.md | 1 + sdk/python/v2beta1/docs/V2beta1RunPolicy.md | 2 +- .../v2beta1/docs/V2beta1SchedulingPolicy.md | 16 ++ sdk/python/v2beta1/mpijob/__init__.py | 1 + sdk/python/v2beta1/mpijob/models/__init__.py | 1 + .../mpijob/models/v2beta1_run_policy.py | 6 +- .../models/v2beta1_scheduling_policy.py | 232 ++++++++++++++++++ .../test/test_v2beta1_scheduling_policy.py | 57 +++++ test/integration/mpi_job_controller_test.go | 6 +- 19 files changed, 527 insertions(+), 79 deletions(-) create mode 100644 sdk/python/v2beta1/docs/V2beta1SchedulingPolicy.md create mode 100644 sdk/python/v2beta1/mpijob/models/v2beta1_scheduling_policy.py create mode 100644 sdk/python/v2beta1/test/test_v2beta1_scheduling_policy.py diff --git a/pkg/apis/kubeflow/v2beta1/default.go b/pkg/apis/kubeflow/v2beta1/default.go index d93618ea5..a63da8df0 100644 --- a/pkg/apis/kubeflow/v2beta1/default.go +++ b/pkg/apis/kubeflow/v2beta1/default.go @@ -51,7 +51,7 @@ func setDefaultsTypeWorker(spec *common.ReplicaSpec) { func setDefaultsRunPolicy(policy *RunPolicy) { if policy.CleanPodPolicy == nil { - policy.CleanPodPolicy = newCleanPodPolicy(common.CleanPodPolicyNone) + policy.CleanPodPolicy = newCleanPodPolicy(CleanPodPolicyNone) } // The remaining fields are passed as-is to the k8s Job API, which does its // own defaulting. @@ -80,6 +80,6 @@ func newInt32(v int32) *int32 { return &v } -func newCleanPodPolicy(policy common.CleanPodPolicy) *common.CleanPodPolicy { +func newCleanPodPolicy(policy CleanPodPolicy) *CleanPodPolicy { return &policy } diff --git a/pkg/apis/kubeflow/v2beta1/default_test.go b/pkg/apis/kubeflow/v2beta1/default_test.go index df831d3d1..a9340e89d 100644 --- a/pkg/apis/kubeflow/v2beta1/default_test.go +++ b/pkg/apis/kubeflow/v2beta1/default_test.go @@ -31,7 +31,7 @@ func TestSetDefaults_MPIJob(t *testing.T) { Spec: MPIJobSpec{ SlotsPerWorker: newInt32(1), RunPolicy: RunPolicy{ - CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyNone), + CleanPodPolicy: newCleanPodPolicy(CleanPodPolicyNone), }, SSHAuthMountPath: "/root/.ssh", MPIImplementation: MPIImplementationOpenMPI, @@ -43,7 +43,7 @@ func TestSetDefaults_MPIJob(t *testing.T) { Spec: MPIJobSpec{ SlotsPerWorker: newInt32(10), RunPolicy: RunPolicy{ - CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), + CleanPodPolicy: newCleanPodPolicy(CleanPodPolicyRunning), TTLSecondsAfterFinished: newInt32(2), ActiveDeadlineSeconds: newInt64(3), BackoffLimit: newInt32(4), @@ -56,7 +56,7 @@ func TestSetDefaults_MPIJob(t *testing.T) { Spec: MPIJobSpec{ SlotsPerWorker: newInt32(10), RunPolicy: RunPolicy{ - CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), + CleanPodPolicy: newCleanPodPolicy(CleanPodPolicyRunning), TTLSecondsAfterFinished: newInt32(2), ActiveDeadlineSeconds: newInt64(3), BackoffLimit: newInt32(4), @@ -78,7 +78,7 @@ func TestSetDefaults_MPIJob(t *testing.T) { Spec: MPIJobSpec{ SlotsPerWorker: newInt32(1), RunPolicy: RunPolicy{ - CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyNone), + CleanPodPolicy: newCleanPodPolicy(CleanPodPolicyNone), }, SSHAuthMountPath: "/root/.ssh", MPIImplementation: MPIImplementationOpenMPI, @@ -103,7 +103,7 @@ func TestSetDefaults_MPIJob(t *testing.T) { Spec: MPIJobSpec{ SlotsPerWorker: newInt32(1), RunPolicy: RunPolicy{ - CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyNone), + CleanPodPolicy: newCleanPodPolicy(CleanPodPolicyNone), }, SSHAuthMountPath: "/root/.ssh", MPIImplementation: MPIImplementationOpenMPI, diff --git a/pkg/apis/kubeflow/v2beta1/openapi_generated.go b/pkg/apis/kubeflow/v2beta1/openapi_generated.go index d4810f2e0..9347831b4 100644 --- a/pkg/apis/kubeflow/v2beta1/openapi_generated.go +++ b/pkg/apis/kubeflow/v2beta1/openapi_generated.go @@ -28,16 +28,17 @@ import ( func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenAPIDefinition { return map[string]common.OpenAPIDefinition{ - "github.com/kubeflow/common/pkg/apis/common/v1.JobCondition": schema_pkg_apis_common_v1_JobCondition(ref), - "github.com/kubeflow/common/pkg/apis/common/v1.JobStatus": schema_pkg_apis_common_v1_JobStatus(ref), - "github.com/kubeflow/common/pkg/apis/common/v1.ReplicaSpec": schema_pkg_apis_common_v1_ReplicaSpec(ref), - "github.com/kubeflow/common/pkg/apis/common/v1.ReplicaStatus": schema_pkg_apis_common_v1_ReplicaStatus(ref), - "github.com/kubeflow/common/pkg/apis/common/v1.RunPolicy": schema_pkg_apis_common_v1_RunPolicy(ref), - "github.com/kubeflow/common/pkg/apis/common/v1.SchedulingPolicy": schema_pkg_apis_common_v1_SchedulingPolicy(ref), - "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1.MPIJob": schema_pkg_apis_kubeflow_v2beta1_MPIJob(ref), - "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1.MPIJobList": schema_pkg_apis_kubeflow_v2beta1_MPIJobList(ref), - "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1.MPIJobSpec": schema_pkg_apis_kubeflow_v2beta1_MPIJobSpec(ref), - "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1.RunPolicy": schema_pkg_apis_kubeflow_v2beta1_RunPolicy(ref), + "github.com/kubeflow/common/pkg/apis/common/v1.JobCondition": schema_pkg_apis_common_v1_JobCondition(ref), + "github.com/kubeflow/common/pkg/apis/common/v1.JobStatus": schema_pkg_apis_common_v1_JobStatus(ref), + "github.com/kubeflow/common/pkg/apis/common/v1.ReplicaSpec": schema_pkg_apis_common_v1_ReplicaSpec(ref), + "github.com/kubeflow/common/pkg/apis/common/v1.ReplicaStatus": schema_pkg_apis_common_v1_ReplicaStatus(ref), + "github.com/kubeflow/common/pkg/apis/common/v1.RunPolicy": schema_pkg_apis_common_v1_RunPolicy(ref), + "github.com/kubeflow/common/pkg/apis/common/v1.SchedulingPolicy": schema_pkg_apis_common_v1_SchedulingPolicy(ref), + "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1.MPIJob": schema_pkg_apis_kubeflow_v2beta1_MPIJob(ref), + "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1.MPIJobList": schema_pkg_apis_kubeflow_v2beta1_MPIJobList(ref), + "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1.MPIJobSpec": schema_pkg_apis_kubeflow_v2beta1_MPIJobSpec(ref), + "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1.RunPolicy": schema_pkg_apis_kubeflow_v2beta1_RunPolicy(ref), + "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1.SchedulingPolicy": schema_pkg_apis_kubeflow_v2beta1_SchedulingPolicy(ref), } } @@ -539,13 +540,66 @@ func schema_pkg_apis_kubeflow_v2beta1_RunPolicy(ref common.ReferenceCallback) co "schedulingPolicy": { SchemaProps: spec.SchemaProps{ Description: "SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling", - Ref: ref("github.com/kubeflow/common/pkg/apis/common/v1.SchedulingPolicy"), + Ref: ref("github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1.SchedulingPolicy"), }, }, }, }, }, Dependencies: []string{ - "github.com/kubeflow/common/pkg/apis/common/v1.SchedulingPolicy"}, + "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1.SchedulingPolicy"}, + } +} + +func schema_pkg_apis_kubeflow_v2beta1_SchedulingPolicy(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "SchedulingPolicy encapsulates various scheduling policies of the distributed training job, for example `minAvailable` for gang-scheduling.", + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "minAvailable": { + SchemaProps: spec.SchemaProps{ + Type: []string{"integer"}, + Format: "int32", + }, + }, + "queue": { + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, + "minResources": { + SchemaProps: spec.SchemaProps{ + Type: []string{"object"}, + AdditionalProperties: &spec.SchemaOrBool{ + Allows: true, + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("k8s.io/apimachinery/pkg/api/resource.Quantity"), + }, + }, + }, + }, + }, + "priorityClass": { + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, + "scheduleTimeoutSeconds": { + SchemaProps: spec.SchemaProps{ + Type: []string{"integer"}, + Format: "int32", + }, + }, + }, + }, + }, + Dependencies: []string{ + "k8s.io/apimachinery/pkg/api/resource.Quantity"}, } } diff --git a/pkg/apis/kubeflow/v2beta1/swagger.json b/pkg/apis/kubeflow/v2beta1/swagger.json index ea4dbfd78..7e67e4f2a 100644 --- a/pkg/apis/kubeflow/v2beta1/swagger.json +++ b/pkg/apis/kubeflow/v2beta1/swagger.json @@ -293,7 +293,7 @@ }, "schedulingPolicy": { "description": "SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling", - "$ref": "#/definitions/v1.SchedulingPolicy" + "$ref": "#/definitions/v2beta1.SchedulingPolicy" }, "ttlSecondsAfterFinished": { "description": "TTLSecondsAfterFinished is the TTL to clean up jobs. It may take extra ReconcilePeriod seconds for the cleanup, since reconcile gets called periodically. Default to infinite.", @@ -301,6 +301,33 @@ "format": "int32" } } + }, + "v2beta1.SchedulingPolicy": { + "description": "SchedulingPolicy encapsulates various scheduling policies of the distributed training job, for example `minAvailable` for gang-scheduling.", + "type": "object", + "properties": { + "minAvailable": { + "type": "integer", + "format": "int32" + }, + "minResources": { + "type": "object", + "additionalProperties": { + "default": {}, + "$ref": "#/definitions/resource.Quantity" + } + }, + "priorityClass": { + "type": "string" + }, + "queue": { + "type": "string" + }, + "scheduleTimeoutSeconds": { + "type": "integer", + "format": "int32" + } + } } } } diff --git a/pkg/apis/kubeflow/v2beta1/types.go b/pkg/apis/kubeflow/v2beta1/types.go index e6af4d7b6..cbe1463fa 100644 --- a/pkg/apis/kubeflow/v2beta1/types.go +++ b/pkg/apis/kubeflow/v2beta1/types.go @@ -16,6 +16,7 @@ package v2beta1 import ( common "github.com/kubeflow/common/pkg/apis/common/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -37,13 +38,33 @@ type MPIJobList struct { Items []MPIJob `json:"items"` } +// CleanPodPolicy describes how to deal with pods when the job is finished. +type CleanPodPolicy string + +const ( + CleanPodPolicyUndefined CleanPodPolicy = "" + CleanPodPolicyAll CleanPodPolicy = "All" + CleanPodPolicyRunning CleanPodPolicy = "Running" + CleanPodPolicyNone CleanPodPolicy = "None" +) + +// SchedulingPolicy encapsulates various scheduling policies of the distributed training +// job, for example `minAvailable` for gang-scheduling. +type SchedulingPolicy struct { + MinAvailable *int32 `json:"minAvailable,omitempty"` + Queue string `json:"queue,omitempty"` + MinResources *v1.ResourceList `json:"minResources,omitempty"` + PriorityClass string `json:"priorityClass,omitempty"` + ScheduleTimeoutSeconds *int32 `json:"scheduleTimeoutSeconds,omitempty"` +} + // RunPolicy encapsulates various runtime policies of the distributed training // job, for example how to clean up resources and how long the job can stay // active. type RunPolicy struct { // CleanPodPolicy defines the policy to kill pods after the job completes. // Default to Running. - CleanPodPolicy *common.CleanPodPolicy `json:"cleanPodPolicy,omitempty"` + CleanPodPolicy *CleanPodPolicy `json:"cleanPodPolicy,omitempty"` // TTLSecondsAfterFinished is the TTL to clean up jobs. // It may take extra ReconcilePeriod seconds for the cleanup, since @@ -62,7 +83,7 @@ type RunPolicy struct { // SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling // +optional - SchedulingPolicy *common.SchedulingPolicy `json:"schedulingPolicy,omitempty"` + SchedulingPolicy *SchedulingPolicy `json:"schedulingPolicy,omitempty"` } type MPIJobSpec struct { diff --git a/pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go b/pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go index 119af521b..3cba3176a 100644 --- a/pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go +++ b/pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go @@ -21,6 +21,8 @@ package v2beta1 import ( v1 "github.com/kubeflow/common/pkg/apis/common/v1" + corev1 "k8s.io/api/core/v1" + resource "k8s.io/apimachinery/pkg/api/resource" runtime "k8s.io/apimachinery/pkg/runtime" ) @@ -127,7 +129,7 @@ func (in *RunPolicy) DeepCopyInto(out *RunPolicy) { *out = *in if in.CleanPodPolicy != nil { in, out := &in.CleanPodPolicy, &out.CleanPodPolicy - *out = new(v1.CleanPodPolicy) + *out = new(CleanPodPolicy) **out = **in } if in.TTLSecondsAfterFinished != nil { @@ -147,7 +149,7 @@ func (in *RunPolicy) DeepCopyInto(out *RunPolicy) { } if in.SchedulingPolicy != nil { in, out := &in.SchedulingPolicy, &out.SchedulingPolicy - *out = new(v1.SchedulingPolicy) + *out = new(SchedulingPolicy) (*in).DeepCopyInto(*out) } return @@ -162,3 +164,40 @@ func (in *RunPolicy) DeepCopy() *RunPolicy { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SchedulingPolicy) DeepCopyInto(out *SchedulingPolicy) { + *out = *in + if in.MinAvailable != nil { + in, out := &in.MinAvailable, &out.MinAvailable + *out = new(int32) + **out = **in + } + if in.MinResources != nil { + in, out := &in.MinResources, &out.MinResources + *out = new(corev1.ResourceList) + if **in != nil { + in, out := *in, *out + *out = make(map[corev1.ResourceName]resource.Quantity, len(*in)) + for key, val := range *in { + (*out)[key] = val.DeepCopy() + } + } + } + if in.ScheduleTimeoutSeconds != nil { + in, out := &in.ScheduleTimeoutSeconds, &out.ScheduleTimeoutSeconds + *out = new(int32) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SchedulingPolicy. +func (in *SchedulingPolicy) DeepCopy() *SchedulingPolicy { + if in == nil { + return nil + } + out := new(SchedulingPolicy) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/apis/kubeflow/validation/validation.go b/pkg/apis/kubeflow/validation/validation.go index 68149dfff..2d2872b8e 100644 --- a/pkg/apis/kubeflow/validation/validation.go +++ b/pkg/apis/kubeflow/validation/validation.go @@ -29,9 +29,9 @@ import ( var ( validCleanPolicies = sets.NewString( - string(common.CleanPodPolicyNone), - string(common.CleanPodPolicyRunning), - string(common.CleanPodPolicyAll)) + string(kubeflow.CleanPodPolicyNone), + string(kubeflow.CleanPodPolicyRunning), + string(kubeflow.CleanPodPolicyAll)) validMPIImplementations = sets.NewString( string(kubeflow.MPIImplementationOpenMPI), diff --git a/pkg/apis/kubeflow/validation/validation_test.go b/pkg/apis/kubeflow/validation/validation_test.go index 27b9a3c32..e548c4b8e 100644 --- a/pkg/apis/kubeflow/validation/validation_test.go +++ b/pkg/apis/kubeflow/validation/validation_test.go @@ -20,7 +20,6 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" common "github.com/kubeflow/common/pkg/apis/common/v1" - "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1" kubeflow "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -29,23 +28,23 @@ import ( func TestValidateMPIJob(t *testing.T) { cases := map[string]struct { - job v2beta1.MPIJob + job kubeflow.MPIJob wantErrs field.ErrorList }{ "valid": { - job: v2beta1.MPIJob{ + job: kubeflow.MPIJob{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", }, - Spec: v2beta1.MPIJobSpec{ + Spec: kubeflow.MPIJobSpec{ SlotsPerWorker: newInt32(2), RunPolicy: kubeflow.RunPolicy{ - CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), + CleanPodPolicy: newCleanPodPolicy(kubeflow.CleanPodPolicyRunning), }, SSHAuthMountPath: "/home/mpiuser/.ssh", - MPIImplementation: v2beta1.MPIImplementationIntel, - MPIReplicaSpecs: map[v2beta1.MPIReplicaType]*common.ReplicaSpec{ - v2beta1.MPIReplicaTypeLauncher: { + MPIImplementation: kubeflow.MPIImplementationIntel, + MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{ + kubeflow.MPIReplicaTypeLauncher: { Replicas: newInt32(1), RestartPolicy: common.RestartPolicyNever, Template: corev1.PodTemplateSpec{ @@ -59,19 +58,19 @@ func TestValidateMPIJob(t *testing.T) { }, }, "valid with worker": { - job: v2beta1.MPIJob{ + job: kubeflow.MPIJob{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", }, - Spec: v2beta1.MPIJobSpec{ + Spec: kubeflow.MPIJobSpec{ SlotsPerWorker: newInt32(2), RunPolicy: kubeflow.RunPolicy{ - CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), + CleanPodPolicy: newCleanPodPolicy(kubeflow.CleanPodPolicyRunning), }, SSHAuthMountPath: "/home/mpiuser/.ssh", - MPIImplementation: v2beta1.MPIImplementationIntel, - MPIReplicaSpecs: map[v2beta1.MPIReplicaType]*common.ReplicaSpec{ - v2beta1.MPIReplicaTypeLauncher: { + MPIImplementation: kubeflow.MPIImplementationIntel, + MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{ + kubeflow.MPIReplicaTypeLauncher: { Replicas: newInt32(1), RestartPolicy: common.RestartPolicyOnFailure, Template: corev1.PodTemplateSpec{ @@ -80,7 +79,7 @@ func TestValidateMPIJob(t *testing.T) { }, }, }, - v2beta1.MPIReplicaTypeWorker: { + kubeflow.MPIReplicaTypeWorker: { Replicas: newInt32(3), RestartPolicy: common.RestartPolicyNever, Template: corev1.PodTemplateSpec{ @@ -122,11 +121,11 @@ func TestValidateMPIJob(t *testing.T) { }, }, "invalid fields": { - job: v2beta1.MPIJob{ + job: kubeflow.MPIJob{ ObjectMeta: metav1.ObjectMeta{ Name: "this-name-is-waaaaaaaay-too-long-for-a-worker-hostname", }, - Spec: v2beta1.MPIJobSpec{ + Spec: kubeflow.MPIJobSpec{ SlotsPerWorker: newInt32(2), RunPolicy: kubeflow.RunPolicy{ CleanPodPolicy: newCleanPodPolicy("unknown"), @@ -135,9 +134,9 @@ func TestValidateMPIJob(t *testing.T) { BackoffLimit: newInt32(-1), }, SSHAuthMountPath: "/root/.ssh", - MPIImplementation: v2beta1.MPIImplementation("Unknown"), - MPIReplicaSpecs: map[v2beta1.MPIReplicaType]*common.ReplicaSpec{ - v2beta1.MPIReplicaTypeLauncher: { + MPIImplementation: kubeflow.MPIImplementation("Unknown"), + MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{ + kubeflow.MPIReplicaTypeLauncher: { Replicas: newInt32(1), RestartPolicy: common.RestartPolicyNever, Template: corev1.PodTemplateSpec{ @@ -146,7 +145,7 @@ func TestValidateMPIJob(t *testing.T) { }, }, }, - v2beta1.MPIReplicaTypeWorker: { + kubeflow.MPIReplicaTypeWorker: { Replicas: newInt32(1000), RestartPolicy: common.RestartPolicyNever, Template: corev1.PodTemplateSpec{ @@ -186,18 +185,18 @@ func TestValidateMPIJob(t *testing.T) { }, }, "empty replica specs": { - job: v2beta1.MPIJob{ + job: kubeflow.MPIJob{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", }, - Spec: v2beta1.MPIJobSpec{ + Spec: kubeflow.MPIJobSpec{ SlotsPerWorker: newInt32(2), RunPolicy: kubeflow.RunPolicy{ - CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), + CleanPodPolicy: newCleanPodPolicy(kubeflow.CleanPodPolicyRunning), }, SSHAuthMountPath: "/root/.ssh", - MPIImplementation: v2beta1.MPIImplementationOpenMPI, - MPIReplicaSpecs: map[v2beta1.MPIReplicaType]*common.ReplicaSpec{}, + MPIImplementation: kubeflow.MPIImplementationOpenMPI, + MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{}, }, }, wantErrs: field.ErrorList{ @@ -208,20 +207,20 @@ func TestValidateMPIJob(t *testing.T) { }, }, "missing replica spec fields": { - job: v2beta1.MPIJob{ + job: kubeflow.MPIJob{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", }, - Spec: v2beta1.MPIJobSpec{ + Spec: kubeflow.MPIJobSpec{ SlotsPerWorker: newInt32(2), RunPolicy: kubeflow.RunPolicy{ - CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), + CleanPodPolicy: newCleanPodPolicy(kubeflow.CleanPodPolicyRunning), }, SSHAuthMountPath: "/root/.ssh", - MPIImplementation: v2beta1.MPIImplementationOpenMPI, - MPIReplicaSpecs: map[v2beta1.MPIReplicaType]*common.ReplicaSpec{ - v2beta1.MPIReplicaTypeLauncher: {}, - v2beta1.MPIReplicaTypeWorker: {}, + MPIImplementation: kubeflow.MPIImplementationOpenMPI, + MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{ + kubeflow.MPIReplicaTypeLauncher: {}, + kubeflow.MPIReplicaTypeWorker: {}, }, }, }, @@ -253,19 +252,19 @@ func TestValidateMPIJob(t *testing.T) { }, }, "invalid replica fields": { - job: v2beta1.MPIJob{ + job: kubeflow.MPIJob{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", }, - Spec: v2beta1.MPIJobSpec{ + Spec: kubeflow.MPIJobSpec{ SlotsPerWorker: newInt32(2), RunPolicy: kubeflow.RunPolicy{ - CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), + CleanPodPolicy: newCleanPodPolicy(kubeflow.CleanPodPolicyRunning), }, SSHAuthMountPath: "/root/.ssh", - MPIImplementation: v2beta1.MPIImplementationOpenMPI, - MPIReplicaSpecs: map[v2beta1.MPIReplicaType]*common.ReplicaSpec{ - v2beta1.MPIReplicaTypeLauncher: { + MPIImplementation: kubeflow.MPIImplementationOpenMPI, + MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{ + kubeflow.MPIReplicaTypeLauncher: { Replicas: newInt32(2), RestartPolicy: common.RestartPolicyAlways, Template: corev1.PodTemplateSpec{ @@ -274,7 +273,7 @@ func TestValidateMPIJob(t *testing.T) { }, }, }, - v2beta1.MPIReplicaTypeWorker: { + kubeflow.MPIReplicaTypeWorker: { Replicas: newInt32(0), RestartPolicy: "Invalid", Template: corev1.PodTemplateSpec{ @@ -324,6 +323,6 @@ func newInt64(v int64) *int64 { return &v } -func newCleanPodPolicy(v common.CleanPodPolicy) *common.CleanPodPolicy { +func newCleanPodPolicy(v kubeflow.CleanPodPolicy) *kubeflow.CleanPodPolicy { return &v } diff --git a/pkg/controller/mpi_job_controller.go b/pkg/controller/mpi_job_controller.go index 055dfe473..eb8301c8b 100644 --- a/pkg/controller/mpi_job_controller.go +++ b/pkg/controller/mpi_job_controller.go @@ -886,7 +886,7 @@ func (c *MPIJobController) deleteWorkerPods(mpiJob *kubeflow.MPIJob) error { // set to CleanPodPolicyRunning, keep the pod. // Note that pending pod should still be removed under this // situation, since it may turn to running in the future. - if *mpiJob.Spec.RunPolicy.CleanPodPolicy == common.CleanPodPolicyRunning && !isPodRunning(pod) && !isPodPending(pod) { + if *mpiJob.Spec.RunPolicy.CleanPodPolicy == kubeflow.CleanPodPolicyRunning && !isPodRunning(pod) && !isPodPending(pod) { // Keep the worker pod continue } @@ -1492,8 +1492,8 @@ func isPodFailed(p *corev1.Pod) bool { return p.Status.Phase == corev1.PodFailed } -func isCleanUpPods(cleanPodPolicy *common.CleanPodPolicy) bool { - if *cleanPodPolicy == common.CleanPodPolicyAll || *cleanPodPolicy == common.CleanPodPolicyRunning { +func isCleanUpPods(cleanPodPolicy *kubeflow.CleanPodPolicy) bool { + if *cleanPodPolicy == kubeflow.CleanPodPolicyAll || *cleanPodPolicy == kubeflow.CleanPodPolicyRunning { return true } return false diff --git a/pkg/controller/mpi_job_controller_test.go b/pkg/controller/mpi_job_controller_test.go index b38dcecd0..ab4c1dce4 100644 --- a/pkg/controller/mpi_job_controller_test.go +++ b/pkg/controller/mpi_job_controller_test.go @@ -88,7 +88,7 @@ func newFixture(t *testing.T) *fixture { } func newMPIJobCommon(name string, startTime, completionTime *metav1.Time) *kubeflow.MPIJob { - cleanPodPolicyAll := common.CleanPodPolicyAll + cleanPodPolicyAll := kubeflow.CleanPodPolicyAll mpiJob := &kubeflow.MPIJob{ TypeMeta: metav1.TypeMeta{APIVersion: kubeflow.SchemeGroupVersion.String()}, ObjectMeta: metav1.ObjectMeta{ diff --git a/sdk/python/v2beta1/README.md b/sdk/python/v2beta1/README.md index bc6a42007..70fe72ab7 100644 --- a/sdk/python/v2beta1/README.md +++ b/sdk/python/v2beta1/README.md @@ -74,6 +74,7 @@ Class | Method | HTTP request | Description - [V2beta1MPIJobList](docs/V2beta1MPIJobList.md) - [V2beta1MPIJobSpec](docs/V2beta1MPIJobSpec.md) - [V2beta1RunPolicy](docs/V2beta1RunPolicy.md) + - [V2beta1SchedulingPolicy](docs/V2beta1SchedulingPolicy.md) ## Documentation For Authorization diff --git a/sdk/python/v2beta1/docs/V2beta1RunPolicy.md b/sdk/python/v2beta1/docs/V2beta1RunPolicy.md index 2e98ceb79..54b052d1f 100644 --- a/sdk/python/v2beta1/docs/V2beta1RunPolicy.md +++ b/sdk/python/v2beta1/docs/V2beta1RunPolicy.md @@ -8,7 +8,7 @@ Name | Type | Description | Notes **active_deadline_seconds** | **int** | Specifies the duration in seconds relative to the startTime that the job may be active before the system tries to terminate it; value must be positive integer. | [optional] **backoff_limit** | **int** | Optional number of retries before marking this job failed. | [optional] **clean_pod_policy** | **str** | CleanPodPolicy defines the policy to kill pods after the job completes. Default to Running. | [optional] -**scheduling_policy** | [**V1SchedulingPolicy**](V1SchedulingPolicy.md) | | [optional] +**scheduling_policy** | [**V2beta1SchedulingPolicy**](V2beta1SchedulingPolicy.md) | | [optional] **ttl_seconds_after_finished** | **int** | TTLSecondsAfterFinished is the TTL to clean up jobs. It may take extra ReconcilePeriod seconds for the cleanup, since reconcile gets called periodically. Default to infinite. | [optional] [[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) diff --git a/sdk/python/v2beta1/docs/V2beta1SchedulingPolicy.md b/sdk/python/v2beta1/docs/V2beta1SchedulingPolicy.md new file mode 100644 index 000000000..bd46cf939 --- /dev/null +++ b/sdk/python/v2beta1/docs/V2beta1SchedulingPolicy.md @@ -0,0 +1,16 @@ +# V2beta1SchedulingPolicy + +SchedulingPolicy encapsulates various scheduling policies of the distributed training job, for example `minAvailable` for gang-scheduling. + +## Properties +Name | Type | Description | Notes +------------ | ------------- | ------------- | ------------- +**min_available** | **int** | | [optional] +**min_resources** | [**dict(str, ResourceQuantity)**](ResourceQuantity.md) | | [optional] +**priority_class** | **str** | | [optional] +**queue** | **str** | | [optional] +**schedule_timeout_seconds** | **int** | | [optional] + +[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) + + diff --git a/sdk/python/v2beta1/mpijob/__init__.py b/sdk/python/v2beta1/mpijob/__init__.py index eff18d173..05e1f6604 100644 --- a/sdk/python/v2beta1/mpijob/__init__.py +++ b/sdk/python/v2beta1/mpijob/__init__.py @@ -38,4 +38,5 @@ from mpijob.models.v2beta1_mpi_job_list import V2beta1MPIJobList from mpijob.models.v2beta1_mpi_job_spec import V2beta1MPIJobSpec from mpijob.models.v2beta1_run_policy import V2beta1RunPolicy +from mpijob.models.v2beta1_scheduling_policy import V2beta1SchedulingPolicy diff --git a/sdk/python/v2beta1/mpijob/models/__init__.py b/sdk/python/v2beta1/mpijob/models/__init__.py index 2754302dc..548a03870 100644 --- a/sdk/python/v2beta1/mpijob/models/__init__.py +++ b/sdk/python/v2beta1/mpijob/models/__init__.py @@ -24,3 +24,4 @@ from mpijob.models.v2beta1_mpi_job_list import V2beta1MPIJobList from mpijob.models.v2beta1_mpi_job_spec import V2beta1MPIJobSpec from mpijob.models.v2beta1_run_policy import V2beta1RunPolicy +from mpijob.models.v2beta1_scheduling_policy import V2beta1SchedulingPolicy diff --git a/sdk/python/v2beta1/mpijob/models/v2beta1_run_policy.py b/sdk/python/v2beta1/mpijob/models/v2beta1_run_policy.py index 403a02a3d..ae3d4e411 100644 --- a/sdk/python/v2beta1/mpijob/models/v2beta1_run_policy.py +++ b/sdk/python/v2beta1/mpijob/models/v2beta1_run_policy.py @@ -36,7 +36,7 @@ class V2beta1RunPolicy(object): 'active_deadline_seconds': 'int', 'backoff_limit': 'int', 'clean_pod_policy': 'str', - 'scheduling_policy': 'V1SchedulingPolicy', + 'scheduling_policy': 'V2beta1SchedulingPolicy', 'ttl_seconds_after_finished': 'int' } @@ -147,7 +147,7 @@ def scheduling_policy(self): :return: The scheduling_policy of this V2beta1RunPolicy. # noqa: E501 - :rtype: V1SchedulingPolicy + :rtype: V2beta1SchedulingPolicy """ return self._scheduling_policy @@ -157,7 +157,7 @@ def scheduling_policy(self, scheduling_policy): :param scheduling_policy: The scheduling_policy of this V2beta1RunPolicy. # noqa: E501 - :type scheduling_policy: V1SchedulingPolicy + :type scheduling_policy: V2beta1SchedulingPolicy """ self._scheduling_policy = scheduling_policy diff --git a/sdk/python/v2beta1/mpijob/models/v2beta1_scheduling_policy.py b/sdk/python/v2beta1/mpijob/models/v2beta1_scheduling_policy.py new file mode 100644 index 000000000..07bb7a51d --- /dev/null +++ b/sdk/python/v2beta1/mpijob/models/v2beta1_scheduling_policy.py @@ -0,0 +1,232 @@ +# coding: utf-8 + +""" + mpijob + + Python SDK for MPI-Operator # noqa: E501 + + The version of the OpenAPI document: v2beta1 + Generated by: https://openapi-generator.tech +""" + + +import inspect +import pprint +import re # noqa: F401 +import six + +from mpijob.configuration import Configuration + + +class V2beta1SchedulingPolicy(object): + """NOTE: This class is auto generated by OpenAPI Generator. + Ref: https://openapi-generator.tech + + Do not edit the class manually. + """ + + """ + Attributes: + openapi_types (dict): The key is attribute name + and the value is attribute type. + attribute_map (dict): The key is attribute name + and the value is json key in definition. + """ + openapi_types = { + 'min_available': 'int', + 'min_resources': 'dict(str, ResourceQuantity)', + 'priority_class': 'str', + 'queue': 'str', + 'schedule_timeout_seconds': 'int' + } + + attribute_map = { + 'min_available': 'minAvailable', + 'min_resources': 'minResources', + 'priority_class': 'priorityClass', + 'queue': 'queue', + 'schedule_timeout_seconds': 'scheduleTimeoutSeconds' + } + + def __init__(self, min_available=None, min_resources=None, priority_class=None, queue=None, schedule_timeout_seconds=None, local_vars_configuration=None): # noqa: E501 + """V2beta1SchedulingPolicy - a model defined in OpenAPI""" # noqa: E501 + if local_vars_configuration is None: + local_vars_configuration = Configuration.get_default_copy() + self.local_vars_configuration = local_vars_configuration + + self._min_available = None + self._min_resources = None + self._priority_class = None + self._queue = None + self._schedule_timeout_seconds = None + self.discriminator = None + + if min_available is not None: + self.min_available = min_available + if min_resources is not None: + self.min_resources = min_resources + if priority_class is not None: + self.priority_class = priority_class + if queue is not None: + self.queue = queue + if schedule_timeout_seconds is not None: + self.schedule_timeout_seconds = schedule_timeout_seconds + + @property + def min_available(self): + """Gets the min_available of this V2beta1SchedulingPolicy. # noqa: E501 + + + :return: The min_available of this V2beta1SchedulingPolicy. # noqa: E501 + :rtype: int + """ + return self._min_available + + @min_available.setter + def min_available(self, min_available): + """Sets the min_available of this V2beta1SchedulingPolicy. + + + :param min_available: The min_available of this V2beta1SchedulingPolicy. # noqa: E501 + :type min_available: int + """ + + self._min_available = min_available + + @property + def min_resources(self): + """Gets the min_resources of this V2beta1SchedulingPolicy. # noqa: E501 + + + :return: The min_resources of this V2beta1SchedulingPolicy. # noqa: E501 + :rtype: dict(str, ResourceQuantity) + """ + return self._min_resources + + @min_resources.setter + def min_resources(self, min_resources): + """Sets the min_resources of this V2beta1SchedulingPolicy. + + + :param min_resources: The min_resources of this V2beta1SchedulingPolicy. # noqa: E501 + :type min_resources: dict(str, ResourceQuantity) + """ + + self._min_resources = min_resources + + @property + def priority_class(self): + """Gets the priority_class of this V2beta1SchedulingPolicy. # noqa: E501 + + + :return: The priority_class of this V2beta1SchedulingPolicy. # noqa: E501 + :rtype: str + """ + return self._priority_class + + @priority_class.setter + def priority_class(self, priority_class): + """Sets the priority_class of this V2beta1SchedulingPolicy. + + + :param priority_class: The priority_class of this V2beta1SchedulingPolicy. # noqa: E501 + :type priority_class: str + """ + + self._priority_class = priority_class + + @property + def queue(self): + """Gets the queue of this V2beta1SchedulingPolicy. # noqa: E501 + + + :return: The queue of this V2beta1SchedulingPolicy. # noqa: E501 + :rtype: str + """ + return self._queue + + @queue.setter + def queue(self, queue): + """Sets the queue of this V2beta1SchedulingPolicy. + + + :param queue: The queue of this V2beta1SchedulingPolicy. # noqa: E501 + :type queue: str + """ + + self._queue = queue + + @property + def schedule_timeout_seconds(self): + """Gets the schedule_timeout_seconds of this V2beta1SchedulingPolicy. # noqa: E501 + + + :return: The schedule_timeout_seconds of this V2beta1SchedulingPolicy. # noqa: E501 + :rtype: int + """ + return self._schedule_timeout_seconds + + @schedule_timeout_seconds.setter + def schedule_timeout_seconds(self, schedule_timeout_seconds): + """Sets the schedule_timeout_seconds of this V2beta1SchedulingPolicy. + + + :param schedule_timeout_seconds: The schedule_timeout_seconds of this V2beta1SchedulingPolicy. # noqa: E501 + :type schedule_timeout_seconds: int + """ + + self._schedule_timeout_seconds = schedule_timeout_seconds + + def to_dict(self, serialize=False): + """Returns the model properties as a dict""" + result = {} + + def convert(x): + if hasattr(x, "to_dict"): + args = inspect.getargspec(x.to_dict).args + if len(args) == 1: + return x.to_dict() + else: + return x.to_dict(serialize) + else: + return x + + for attr, _ in six.iteritems(self.openapi_types): + value = getattr(self, attr) + attr = self.attribute_map.get(attr, attr) if serialize else attr + if isinstance(value, list): + result[attr] = list(map( + lambda x: convert(x), + value + )) + elif isinstance(value, dict): + result[attr] = dict(map( + lambda item: (item[0], convert(item[1])), + value.items() + )) + else: + result[attr] = convert(value) + + return result + + def to_str(self): + """Returns the string representation of the model""" + return pprint.pformat(self.to_dict()) + + def __repr__(self): + """For `print` and `pprint`""" + return self.to_str() + + def __eq__(self, other): + """Returns true if both objects are equal""" + if not isinstance(other, V2beta1SchedulingPolicy): + return False + + return self.to_dict() == other.to_dict() + + def __ne__(self, other): + """Returns true if both objects are not equal""" + if not isinstance(other, V2beta1SchedulingPolicy): + return True + + return self.to_dict() != other.to_dict() diff --git a/sdk/python/v2beta1/test/test_v2beta1_scheduling_policy.py b/sdk/python/v2beta1/test/test_v2beta1_scheduling_policy.py new file mode 100644 index 000000000..82c13b59c --- /dev/null +++ b/sdk/python/v2beta1/test/test_v2beta1_scheduling_policy.py @@ -0,0 +1,57 @@ +# coding: utf-8 + +""" + mpijob + + Python SDK for MPI-Operator # noqa: E501 + + The version of the OpenAPI document: v2beta1 + Generated by: https://openapi-generator.tech +""" + + +from __future__ import absolute_import + +import unittest +import datetime + +import mpijob +from mpijob.models.v2beta1_scheduling_policy import V2beta1SchedulingPolicy # noqa: E501 +from mpijob.rest import ApiException + +class TestV2beta1SchedulingPolicy(unittest.TestCase): + """V2beta1SchedulingPolicy unit test stubs""" + + def setUp(self): + pass + + def tearDown(self): + pass + + def make_instance(self, include_optional): + """Test V2beta1SchedulingPolicy + include_option is a boolean, when False only required + params are included, when True both required and + optional params are included """ + # model = mpijob.models.v2beta1_scheduling_policy.V2beta1SchedulingPolicy() # noqa: E501 + if include_optional : + return V2beta1SchedulingPolicy( + min_available = 56, + min_resources = { + 'key' : None + }, + priority_class = '', + queue = '', + schedule_timeout_seconds = 56 + ) + else : + return V2beta1SchedulingPolicy( + ) + + def testV2beta1SchedulingPolicy(self): + """Test V2beta1SchedulingPolicy""" + inst_req_only = self.make_instance(include_optional=False) + inst_req_and_optional = self.make_instance(include_optional=True) + +if __name__ == '__main__': + unittest.main() diff --git a/test/integration/mpi_job_controller_test.go b/test/integration/mpi_job_controller_test.go index f3328e760..14c785ca6 100644 --- a/test/integration/mpi_job_controller_test.go +++ b/test/integration/mpi_job_controller_test.go @@ -57,7 +57,7 @@ func TestMPIJobSuccess(t *testing.T) { Spec: kubeflow.MPIJobSpec{ SlotsPerWorker: newInt32(1), RunPolicy: kubeflow.RunPolicy{ - CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), + CleanPodPolicy: newCleanPodPolicy(kubeflow.CleanPodPolicyRunning), }, MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{ kubeflow.MPIReplicaTypeLauncher: { @@ -183,7 +183,7 @@ func TestMPIJobFailure(t *testing.T) { Spec: kubeflow.MPIJobSpec{ SlotsPerWorker: newInt32(1), RunPolicy: kubeflow.RunPolicy{ - CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), + CleanPodPolicy: newCleanPodPolicy(kubeflow.CleanPodPolicyRunning), }, MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{ kubeflow.MPIReplicaTypeLauncher: { @@ -545,7 +545,7 @@ func newInt32(v int32) *int32 { return &v } -func newCleanPodPolicy(policy common.CleanPodPolicy) *common.CleanPodPolicy { +func newCleanPodPolicy(policy kubeflow.CleanPodPolicy) *kubeflow.CleanPodPolicy { return &policy }