diff --git a/crd/kubeflow.org_mpijobs.yaml b/crd/kubeflow.org_mpijobs.yaml index 21b0b469b..9998e2095 100644 --- a/crd/kubeflow.org_mpijobs.yaml +++ b/crd/kubeflow.org_mpijobs.yaml @@ -7898,7 +7898,7 @@ spec: format: date-time type: string conditions: - description: Conditions is an array of current observed job conditions. + description: conditions is a list of current observed job conditions. items: description: JobCondition describes the state of the job at a certain point. @@ -7913,23 +7913,30 @@ spec: format: date-time type: string message: - description: A human readable message indicating details about + description: A human-readable message indicating details about the transition. type: string reason: description: The reason for the condition's last transition. type: string status: - description: Status of the condition, one of True, False, Unknown. + description: status of the condition, one of True, False, Unknown. + enum: + - "True" + - "False" + - Unknown type: string type: - description: Type of job condition. + description: type of job condition. type: string required: - status - type type: object type: array + x-kubernetes-list-map-keys: + - type + x-kubernetes-list-type: map lastReconcileTime: description: Represents last time when the job was reconciled. It is not guaranteed to be set in happens-before order across separate @@ -7946,11 +7953,11 @@ spec: format: int32 type: integer failed: - description: The number of pods which reached phase Failed. + description: The number of pods which reached phase failed. format: int32 type: integer labelSelector: - description: 'Deprecated: Use Selector instead' + description: 'Deprecated: Use selector instead' properties: matchExpressions: description: matchExpressions is a list of label selector @@ -7995,17 +8002,17 @@ spec: type: object x-kubernetes-map-type: atomic selector: - description: A Selector is a label query over a set of resources. + description: A selector is a label query over a set of resources. The result of matchLabels and matchExpressions are ANDed. - An empty Selector matches all objects. A null Selector matches + An empty selector matches all objects. A null selector matches no objects. type: string succeeded: - description: The number of pods which reached phase Succeeded. + description: The number of pods which reached phase succeeded. format: int32 type: integer type: object - description: ReplicaStatuses is map of ReplicaType and ReplicaStatus, + description: replicaStatuses is map of ReplicaType and ReplicaStatus, specifies the status of each replica. type: object startTime: @@ -8015,9 +8022,6 @@ spec: and is in UTC. format: date-time type: string - required: - - conditions - - replicaStatuses type: object type: object served: true diff --git a/pkg/apis/kubeflow/v2beta1/swagger.json b/pkg/apis/kubeflow/v2beta1/swagger.json index 7e67e4f2a..7588b8c0c 100644 --- a/pkg/apis/kubeflow/v2beta1/swagger.json +++ b/pkg/apis/kubeflow/v2beta1/swagger.json @@ -187,6 +187,81 @@ } } }, + "v2beta1.JobCondition": { + "description": "JobCondition describes the state of the job at a certain point.", + "type": "object", + "required": [ + "type", + "status" + ], + "properties": { + "lastTransitionTime": { + "description": "Last time the condition transitioned from one status to another.", + "default": {}, + "$ref": "#/definitions/v1.Time" + }, + "lastUpdateTime": { + "description": "The last time this condition was updated.", + "default": {}, + "$ref": "#/definitions/v1.Time" + }, + "message": { + "description": "A human-readable message indicating details about the transition.", + "type": "string" + }, + "reason": { + "description": "The reason for the condition's last transition.", + "type": "string" + }, + "status": { + "description": "status of the condition, one of True, False, Unknown.", + "type": "string", + "default": "" + }, + "type": { + "description": "type of job condition.", + "type": "string", + "default": "" + } + } + }, + "v2beta1.JobStatus": { + "description": "JobStatus represents the current observed state of the training Job.", + "type": "object", + "properties": { + "completionTime": { + "description": "Represents time when the job was completed. It is not guaranteed to be set in happens-before order across separate operations. It is represented in RFC3339 form and is in UTC.", + "$ref": "#/definitions/v1.Time" + }, + "conditions": { + "description": "conditions is a list of current observed job conditions.", + "type": "array", + "items": { + "default": {}, + "$ref": "#/definitions/v2beta1.JobCondition" + }, + "x-kubernetes-list-map-keys": [ + "type" + ], + "x-kubernetes-list-type": "map" + }, + "lastReconcileTime": { + "description": "Represents last time when the job was reconciled. It is not guaranteed to be set in happens-before order across separate operations. It is represented in RFC3339 form and is in UTC.", + "$ref": "#/definitions/v1.Time" + }, + "replicaStatuses": { + "description": "replicaStatuses is map of ReplicaType and ReplicaStatus, specifies the status of each replica.", + "type": "object", + "additionalProperties": { + "$ref": "#/definitions/v2beta1.ReplicaStatus" + } + }, + "startTime": { + "description": "Represents time when the job was acknowledged by the job controller. It is not guaranteed to be set in happens-before order across separate operations. It is represented in RFC3339 form and is in UTC.", + "$ref": "#/definitions/v1.Time" + } + } + }, "v2beta1.MPIJob": { "type": "object", "properties": { @@ -208,7 +283,7 @@ }, "status": { "default": {}, - "$ref": "#/definitions/v1.JobStatus" + "$ref": "#/definitions/v2beta1.JobStatus" } } }, @@ -273,6 +348,35 @@ } } }, + "v2beta1.ReplicaStatus": { + "description": "ReplicaStatus represents the current observed state of the replica.", + "type": "object", + "properties": { + "active": { + "description": "The number of actively running pods.", + "type": "integer", + "format": "int32" + }, + "failed": { + "description": "The number of pods which reached phase failed.", + "type": "integer", + "format": "int32" + }, + "labelSelector": { + "description": "Deprecated: Use selector instead", + "$ref": "#/definitions/v1.LabelSelector" + }, + "selector": { + "description": "A selector is a label query over a set of resources. The result of matchLabels and matchExpressions are ANDed. An empty selector matches all objects. A null selector matches no objects.", + "type": "string" + }, + "succeeded": { + "description": "The number of pods which reached phase succeeded.", + "type": "integer", + "format": "int32" + } + } + }, "v2beta1.RunPolicy": { "description": "RunPolicy encapsulates various runtime policies of the distributed training job, for example how to clean up resources and how long the job can stay active.", "type": "object", diff --git a/pkg/apis/kubeflow/v2beta1/types.go b/pkg/apis/kubeflow/v2beta1/types.go index cbe1463fa..1fa23c246 100644 --- a/pkg/apis/kubeflow/v2beta1/types.go +++ b/pkg/apis/kubeflow/v2beta1/types.go @@ -26,8 +26,8 @@ import ( type MPIJob struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` - Spec MPIJobSpec `json:"spec,omitempty"` - Status common.JobStatus `json:"status,omitempty"` + Spec MPIJobSpec `json:"spec,omitempty"` + Status JobStatus `json:"status,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object @@ -114,7 +114,7 @@ type MPIJobSpec struct { } // MPIReplicaType is the type for MPIReplica. -type MPIReplicaType common.ReplicaType +type MPIReplicaType string const ( // MPIReplicaTypeLauncher is the type for launcher replica. @@ -130,3 +130,117 @@ const ( MPIImplementationOpenMPI MPIImplementation = "OpenMPI" MPIImplementationIntel MPIImplementation = "Intel" ) + +// JobStatus represents the current observed state of the training Job. +type JobStatus struct { + // conditions is a list of current observed job conditions. + // +optional + // +listType=map + // +listMapKey=type + Conditions []JobCondition `json:"conditions,omitempty"` + + // replicaStatuses is map of ReplicaType and ReplicaStatus, + // specifies the status of each replica. + // +optional + ReplicaStatuses map[MPIReplicaType]*ReplicaStatus `json:"replicaStatuses,omitempty"` + + // Represents time when the job was acknowledged by the job controller. + // It is not guaranteed to be set in happens-before order across separate operations. + // It is represented in RFC3339 form and is in UTC. + // +optional + StartTime *metav1.Time `json:"startTime,omitempty"` + + // Represents time when the job was completed. It is not guaranteed to + // be set in happens-before order across separate operations. + // It is represented in RFC3339 form and is in UTC. + // +optional + CompletionTime *metav1.Time `json:"completionTime,omitempty"` + + // Represents last time when the job was reconciled. It is not guaranteed to + // be set in happens-before order across separate operations. + // It is represented in RFC3339 form and is in UTC. + // +optional + LastReconcileTime *metav1.Time `json:"lastReconcileTime,omitempty"` +} + +// ReplicaStatus represents the current observed state of the replica. +type ReplicaStatus struct { + // The number of actively running pods. + // +optional + Active int32 `json:"active,omitempty"` + + // The number of pods which reached phase succeeded. + // +optional + Succeeded int32 `json:"succeeded,omitempty"` + + // The number of pods which reached phase failed. + // +optional + Failed int32 `json:"failed,omitempty"` + + // Deprecated: Use selector instead + // +optional + LabelSelector *metav1.LabelSelector `json:"labelSelector,omitempty"` + + // A selector is a label query over a set of resources. The result of matchLabels and + // matchExpressions are ANDed. An empty selector matches all objects. A null + // selector matches no objects. + // +optional + Selector string `json:"selector,omitempty"` +} + +// JobCondition describes the state of the job at a certain point. +type JobCondition struct { + // type of job condition. + Type JobConditionType `json:"type"` + + // status of the condition, one of True, False, Unknown. + // +kubebuilder:validation:Enum:=True;False;Unknown + Status v1.ConditionStatus `json:"status"` + + // The reason for the condition's last transition. + // +optional + Reason string `json:"reason,omitempty"` + + // A human-readable message indicating details about the transition. + // +optional + Message string `json:"message,omitempty"` + + // The last time this condition was updated. + // +optional + LastUpdateTime metav1.Time `json:"lastUpdateTime,omitempty"` + + // Last time the condition transitioned from one status to another. + // +optional + LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty"` +} + +// JobConditionType defines all kinds of types of JobStatus. +type JobConditionType string + +const ( + // JobCreated means the job has been accepted by the system, + // but one or more of the pods/services has not been started. + // This includes time before pods being scheduled and launched. + JobCreated JobConditionType = "Created" + + // JobRunning means all sub-resources (e.g. services/pods) of this job + // have been successfully scheduled and launched. + // The training is running without error. + JobRunning JobConditionType = "Running" + + // JobRestarting means one or more sub-resources (e.g. services/pods) of this job + // reached phase failed but maybe restarted according to it's restart policy + // which specified by user in v1.PodTemplateSpec. + // The training is freezing/pending. + JobRestarting JobConditionType = "Restarting" + + // JobSucceeded means all sub-resources (e.g. services/pods) of this job + // reached phase have terminated in success. + // The training is complete without error. + JobSucceeded JobConditionType = "Succeeded" + + // JobFailed means one or more sub-resources (e.g. services/pods) of this job + // reached phase failed with no restarting. + // The training has failed its execution. + JobFailed JobConditionType = "Failed" +) diff --git a/pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go b/pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go index 3cba3176a..6a74a66f0 100644 --- a/pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go +++ b/pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go @@ -23,9 +23,78 @@ import ( v1 "github.com/kubeflow/common/pkg/apis/common/v1" corev1 "k8s.io/api/core/v1" resource "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *JobCondition) DeepCopyInto(out *JobCondition) { + *out = *in + in.LastUpdateTime.DeepCopyInto(&out.LastUpdateTime) + in.LastTransitionTime.DeepCopyInto(&out.LastTransitionTime) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new JobCondition. +func (in *JobCondition) DeepCopy() *JobCondition { + if in == nil { + return nil + } + out := new(JobCondition) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *JobStatus) DeepCopyInto(out *JobStatus) { + *out = *in + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make([]JobCondition, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + if in.ReplicaStatuses != nil { + in, out := &in.ReplicaStatuses, &out.ReplicaStatuses + *out = make(map[MPIReplicaType]*ReplicaStatus, len(*in)) + for key, val := range *in { + var outVal *ReplicaStatus + if val == nil { + (*out)[key] = nil + } else { + in, out := &val, &outVal + *out = new(ReplicaStatus) + (*in).DeepCopyInto(*out) + } + (*out)[key] = outVal + } + } + if in.StartTime != nil { + in, out := &in.StartTime, &out.StartTime + *out = (*in).DeepCopy() + } + if in.CompletionTime != nil { + in, out := &in.CompletionTime, &out.CompletionTime + *out = (*in).DeepCopy() + } + if in.LastReconcileTime != nil { + in, out := &in.LastReconcileTime, &out.LastReconcileTime + *out = (*in).DeepCopy() + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new JobStatus. +func (in *JobStatus) DeepCopy() *JobStatus { + if in == nil { + return nil + } + out := new(JobStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *MPIJob) DeepCopyInto(out *MPIJob) { *out = *in @@ -124,6 +193,27 @@ func (in *MPIJobSpec) DeepCopy() *MPIJobSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ReplicaStatus) DeepCopyInto(out *ReplicaStatus) { + *out = *in + if in.LabelSelector != nil { + in, out := &in.LabelSelector, &out.LabelSelector + *out = new(metav1.LabelSelector) + (*in).DeepCopyInto(*out) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ReplicaStatus. +func (in *ReplicaStatus) DeepCopy() *ReplicaStatus { + if in == nil { + return nil + } + out := new(ReplicaStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *RunPolicy) DeepCopyInto(out *RunPolicy) { *out = *in diff --git a/pkg/controller/mpi_job_controller.go b/pkg/controller/mpi_job_controller.go index eb8301c8b..e51a3ac52 100644 --- a/pkg/controller/mpi_job_controller.go +++ b/pkg/controller/mpi_job_controller.go @@ -493,7 +493,7 @@ func (c *MPIJobController) syncHandler(key string) error { if len(mpiJob.Status.Conditions) == 0 { msg := fmt.Sprintf("MPIJob %s/%s is created.", mpiJob.Namespace, mpiJob.Name) - updateMPIJobConditions(mpiJob, common.JobCreated, mpiJobCreatedReason, msg) + updateMPIJobConditions(mpiJob, kubeflow.JobCreated, mpiJobCreatedReason, msg) c.recorder.Event(mpiJob, corev1.EventTypeNormal, "MPIJobCreated", msg) mpiJobsCreatedCount.Inc() } @@ -513,7 +513,7 @@ func (c *MPIJobController) syncHandler(key string) error { return err } } - mpiJob.Status.ReplicaStatuses[common.ReplicaType(kubeflow.MPIReplicaTypeWorker)].Active = 0 + mpiJob.Status.ReplicaStatuses[kubeflow.MPIReplicaTypeWorker].Active = 0 return c.updateStatusHandler(mpiJob) } return nil @@ -910,7 +910,7 @@ func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher launcherPodsCnt := countRunningPods(launcherPods) if launcher != nil { initializeMPIJobStatuses(mpiJob, kubeflow.MPIReplicaTypeLauncher) - launcherStatus := mpiJob.Status.ReplicaStatuses[common.ReplicaType(kubeflow.MPIReplicaTypeLauncher)] + launcherStatus := mpiJob.Status.ReplicaStatuses[kubeflow.MPIReplicaTypeLauncher] launcherStatus.Failed = launcher.Status.Failed if isJobSucceeded(launcher) { launcherStatus.Succeeded = 1 @@ -919,12 +919,12 @@ func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher if mpiJob.Status.CompletionTime == nil { mpiJob.Status.CompletionTime = launcher.Status.CompletionTime } - updateMPIJobConditions(mpiJob, common.JobSucceeded, mpiJobSucceededReason, msg) + updateMPIJobConditions(mpiJob, kubeflow.JobSucceeded, mpiJobSucceededReason, msg) mpiJobsSuccessCount.Inc() } else if isJobFailed(launcher) { c.updateMPIJobFailedStatus(mpiJob, launcher, launcherPods) } else { - mpiJob.Status.ReplicaStatuses[common.ReplicaType(kubeflow.MPIReplicaTypeLauncher)].Active = int32(launcherPodsCnt) + mpiJob.Status.ReplicaStatuses[kubeflow.MPIReplicaTypeLauncher].Active = int32(launcherPodsCnt) } mpiJobInfoGauge.WithLabelValues(launcher.Name, mpiJob.Namespace).Set(1) } @@ -939,27 +939,27 @@ func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher for i := 0; i < len(worker); i++ { switch worker[i].Status.Phase { case corev1.PodFailed: - mpiJob.Status.ReplicaStatuses[common.ReplicaType(kubeflow.MPIReplicaTypeWorker)].Failed += 1 + mpiJob.Status.ReplicaStatuses[kubeflow.MPIReplicaTypeWorker].Failed += 1 if worker[i].Status.Reason == "Evicted" { evict += 1 } case corev1.PodSucceeded: - mpiJob.Status.ReplicaStatuses[common.ReplicaType(kubeflow.MPIReplicaTypeWorker)].Succeeded += 1 + mpiJob.Status.ReplicaStatuses[kubeflow.MPIReplicaTypeWorker].Succeeded += 1 case corev1.PodRunning: running += 1 - mpiJob.Status.ReplicaStatuses[common.ReplicaType(kubeflow.MPIReplicaTypeWorker)].Active += 1 + mpiJob.Status.ReplicaStatuses[kubeflow.MPIReplicaTypeWorker].Active += 1 } } if evict > 0 { msg := fmt.Sprintf("%d/%d workers are evicted", evict, len(worker)) klog.Infof("MPIJob <%s/%s>: %v", mpiJob.Namespace, mpiJob.Name, msg) - updateMPIJobConditions(mpiJob, common.JobFailed, mpiJobEvict, msg) + updateMPIJobConditions(mpiJob, kubeflow.JobFailed, mpiJobEvict, msg) c.recorder.Event(mpiJob, corev1.EventTypeWarning, mpiJobEvict, msg) } if launcher != nil && launcherPodsCnt >= 1 && running == len(worker) { msg := fmt.Sprintf("MPIJob %s/%s is running.", mpiJob.Namespace, mpiJob.Name) - updateMPIJobConditions(mpiJob, common.JobRunning, mpiJobRunningReason, msg) + updateMPIJobConditions(mpiJob, kubeflow.JobRunning, mpiJobRunningReason, msg) c.recorder.Eventf(mpiJob, corev1.EventTypeNormal, "MPIJobRunning", "MPIJob %s/%s is running", mpiJob.Namespace, mpiJob.Name) } @@ -999,7 +999,7 @@ func (c *MPIJobController) updateMPIJobFailedStatus(mpiJob *kubeflow.MPIJob, lau now := metav1.Now() mpiJob.Status.CompletionTime = &now } - updateMPIJobConditions(mpiJob, common.JobFailed, reason, msg) + updateMPIJobConditions(mpiJob, kubeflow.JobFailed, reason, msg) mpiJobsFailureCount.Inc() } diff --git a/pkg/controller/mpi_job_controller_status.go b/pkg/controller/mpi_job_controller_status.go index 240cc2fed..123530853 100644 --- a/pkg/controller/mpi_job_controller_status.go +++ b/pkg/controller/mpi_job_controller_status.go @@ -18,7 +18,6 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - common "github.com/kubeflow/common/pkg/apis/common/v1" kubeflow "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1" ) @@ -37,23 +36,23 @@ const ( // initializeMPIJobStatuses initializes the ReplicaStatuses for MPIJob. func initializeMPIJobStatuses(mpiJob *kubeflow.MPIJob, mtype kubeflow.MPIReplicaType) { - replicaType := common.ReplicaType(mtype) + replicaType := kubeflow.MPIReplicaType(mtype) if mpiJob.Status.ReplicaStatuses == nil { - mpiJob.Status.ReplicaStatuses = make(map[common.ReplicaType]*common.ReplicaStatus) + mpiJob.Status.ReplicaStatuses = make(map[kubeflow.MPIReplicaType]*kubeflow.ReplicaStatus) } - mpiJob.Status.ReplicaStatuses[replicaType] = &common.ReplicaStatus{} + mpiJob.Status.ReplicaStatuses[replicaType] = &kubeflow.ReplicaStatus{} } // updateMPIJobConditions updates the conditions of the given mpiJob. -func updateMPIJobConditions(mpiJob *kubeflow.MPIJob, conditionType common.JobConditionType, reason, message string) { +func updateMPIJobConditions(mpiJob *kubeflow.MPIJob, conditionType kubeflow.JobConditionType, reason, message string) { condition := newCondition(conditionType, reason, message) setCondition(&mpiJob.Status, condition) } // newCondition creates a new mpiJob condition. -func newCondition(conditionType common.JobConditionType, reason, message string) common.JobCondition { - return common.JobCondition{ +func newCondition(conditionType kubeflow.JobConditionType, reason, message string) kubeflow.JobCondition { + return kubeflow.JobCondition{ Type: conditionType, Status: v1.ConditionTrue, LastUpdateTime: metav1.Now(), @@ -64,7 +63,7 @@ func newCondition(conditionType common.JobConditionType, reason, message string) } // getCondition returns the condition with the provided type. -func getCondition(status common.JobStatus, condType common.JobConditionType) *common.JobCondition { +func getCondition(status kubeflow.JobStatus, condType kubeflow.JobConditionType) *kubeflow.JobCondition { for _, condition := range status.Conditions { if condition.Type == condType { return &condition @@ -73,7 +72,7 @@ func getCondition(status common.JobStatus, condType common.JobConditionType) *co return nil } -func hasCondition(status common.JobStatus, condType common.JobConditionType) bool { +func hasCondition(status kubeflow.JobStatus, condType kubeflow.JobConditionType) bool { for _, condition := range status.Conditions { if condition.Type == condType && condition.Status == v1.ConditionTrue { return true @@ -82,22 +81,22 @@ func hasCondition(status common.JobStatus, condType common.JobConditionType) boo return false } -func isFinished(status common.JobStatus) bool { +func isFinished(status kubeflow.JobStatus) bool { return isSucceeded(status) || isFailed(status) } -func isSucceeded(status common.JobStatus) bool { - return hasCondition(status, common.JobSucceeded) +func isSucceeded(status kubeflow.JobStatus) bool { + return hasCondition(status, kubeflow.JobSucceeded) } -func isFailed(status common.JobStatus) bool { - return hasCondition(status, common.JobFailed) +func isFailed(status kubeflow.JobStatus) bool { + return hasCondition(status, kubeflow.JobFailed) } // setCondition updates the mpiJob to include the provided condition. // If the condition that we are about to add already exists // and has the same status and reason then we are not going to update. -func setCondition(status *common.JobStatus, condition common.JobCondition) { +func setCondition(status *kubeflow.JobStatus, condition kubeflow.JobCondition) { currentCond := getCondition(*status, condition.Type) @@ -117,13 +116,13 @@ func setCondition(status *common.JobStatus, condition common.JobCondition) { } // filterOutCondition returns a new slice of mpiJob conditions without conditions with the provided type. -func filterOutCondition(conditions []common.JobCondition, condType common.JobConditionType) []common.JobCondition { - var newConditions []common.JobCondition +func filterOutCondition(conditions []kubeflow.JobCondition, condType kubeflow.JobConditionType) []kubeflow.JobCondition { + var newConditions []kubeflow.JobCondition for _, c := range conditions { - if condType == common.JobRestarting && c.Type == common.JobRunning { + if condType == kubeflow.JobRestarting && c.Type == kubeflow.JobRunning { continue } - if condType == common.JobRunning && c.Type == common.JobRestarting { + if condType == kubeflow.JobRunning && c.Type == kubeflow.JobRestarting { continue } @@ -132,7 +131,7 @@ func filterOutCondition(conditions []common.JobCondition, condType common.JobCon } // Set the running condition status to be false when current condition failed or succeeded - if (condType == common.JobFailed || condType == common.JobSucceeded) && (c.Type == common.JobRunning || c.Type == common.JobFailed) { + if (condType == kubeflow.JobFailed || condType == kubeflow.JobSucceeded) && (c.Type == kubeflow.JobRunning || c.Type == kubeflow.JobFailed) { c.Status = v1.ConditionFalse } diff --git a/pkg/controller/mpi_job_controller_test.go b/pkg/controller/mpi_job_controller_test.go index ab4c1dce4..ff2cb1c12 100644 --- a/pkg/controller/mpi_job_controller_test.go +++ b/pkg/controller/mpi_job_controller_test.go @@ -50,7 +50,7 @@ var ( alwaysReady = func() bool { return true } noResyncPeriodFunc = func() time.Duration { return 0 } - ignoreConditionTimes = cmpopts.IgnoreFields(common.JobCondition{}, "LastUpdateTime", "LastTransitionTime") + ignoreConditionTimes = cmpopts.IgnoreFields(kubeflow.JobCondition{}, "LastUpdateTime", "LastTransitionTime") ignoreSecretEntries = cmpopts.IgnoreMapEntries(func(k string, v []uint8) bool { return true }) ) @@ -126,7 +126,7 @@ func newMPIJobCommon(name string, startTime, completionTime *metav1.Time) *kubef }, }, }, - Status: common.JobStatus{}, + Status: kubeflow.JobStatus{}, } if startTime != nil { @@ -486,10 +486,10 @@ func TestAllResourcesCreated(t *testing.T) { } f.expectCreateJobAction(fmjc.newLauncherJob(mpiJobCopy)) - mpiJobCopy.Status.Conditions = []common.JobCondition{newCondition(common.JobCreated, mpiJobCreatedReason, "MPIJob default/foo is created.")} - mpiJobCopy.Status.ReplicaStatuses = map[common.ReplicaType]*common.ReplicaStatus{ - common.ReplicaType(kubeflow.MPIReplicaTypeLauncher): {}, - common.ReplicaType(kubeflow.MPIReplicaTypeWorker): {}, + mpiJobCopy.Status.Conditions = []kubeflow.JobCondition{newCondition(kubeflow.JobCreated, mpiJobCreatedReason, "MPIJob default/foo is created.")} + mpiJobCopy.Status.ReplicaStatuses = map[kubeflow.MPIReplicaType]*kubeflow.ReplicaStatus{ + kubeflow.MPIReplicaTypeLauncher: {}, + kubeflow.MPIReplicaTypeWorker: {}, } f.expectUpdateMPIJobStatusAction(mpiJobCopy) @@ -535,21 +535,21 @@ func TestLauncherSucceeded(t *testing.T) { }) f.setUpLauncher(launcher) - mpiJobCopy.Status.ReplicaStatuses = map[common.ReplicaType]*common.ReplicaStatus{ - common.ReplicaType(kubeflow.MPIReplicaTypeLauncher): { + mpiJobCopy.Status.ReplicaStatuses = map[kubeflow.MPIReplicaType]*kubeflow.ReplicaStatus{ + kubeflow.MPIReplicaTypeLauncher: { Active: 0, Succeeded: 1, Failed: 0, }, - common.ReplicaType(kubeflow.MPIReplicaTypeWorker): {}, + kubeflow.MPIReplicaTypeWorker: {}, } setUpMPIJobTimestamp(mpiJobCopy, &startTime, &completionTime) msg := fmt.Sprintf("MPIJob %s/%s is created.", mpiJob.Namespace, mpiJob.Name) - updateMPIJobConditions(mpiJobCopy, common.JobCreated, mpiJobCreatedReason, msg) + updateMPIJobConditions(mpiJobCopy, kubeflow.JobCreated, mpiJobCreatedReason, msg) msg = fmt.Sprintf("MPIJob %s/%s successfully completed.", mpiJob.Namespace, mpiJob.Name) - updateMPIJobConditions(mpiJobCopy, common.JobSucceeded, mpiJobSucceededReason, msg) + updateMPIJobConditions(mpiJobCopy, kubeflow.JobSucceeded, mpiJobSucceededReason, msg) f.expectUpdateMPIJobStatusAction(mpiJobCopy) f.run(getKey(mpiJob, t)) @@ -590,20 +590,20 @@ func TestLauncherFailed(t *testing.T) { launcherPod2.CreationTimestamp = metav1.NewTime(now.Add(time.Second)) f.setUpPod(launcherPod2) - mpiJobCopy.Status.ReplicaStatuses = map[common.ReplicaType]*common.ReplicaStatus{ - common.ReplicaType(kubeflow.MPIReplicaTypeLauncher): { + mpiJobCopy.Status.ReplicaStatuses = map[kubeflow.MPIReplicaType]*kubeflow.ReplicaStatus{ + kubeflow.MPIReplicaTypeLauncher: { Active: 0, Succeeded: 0, Failed: 2, }, - common.ReplicaType(kubeflow.MPIReplicaTypeWorker): {}, + kubeflow.MPIReplicaTypeWorker: {}, } setUpMPIJobTimestamp(mpiJobCopy, &startTime, &completionTime) msg := fmt.Sprintf("MPIJob %s/%s is created.", mpiJob.Namespace, mpiJob.Name) - updateMPIJobConditions(mpiJobCopy, common.JobCreated, mpiJobCreatedReason, msg) + updateMPIJobConditions(mpiJobCopy, kubeflow.JobCreated, mpiJobCreatedReason, msg) msg = "Job has reached the specified backoff limit: second message" - updateMPIJobConditions(mpiJobCopy, common.JobFailed, jobBackoffLimitExceededReason+"/FailedReason2", msg) + updateMPIJobConditions(mpiJobCopy, kubeflow.JobFailed, jobBackoffLimitExceededReason+"/FailedReason2", msg) f.expectUpdateMPIJobStatusAction(mpiJobCopy) @@ -715,7 +715,7 @@ func TestShutdownWorker(t *testing.T) { var replicas int32 = 8 mpiJob := newMPIJob("test", &replicas, &startTime, &completionTime) msg := fmt.Sprintf("MPIJob %s/%s successfully completed.", mpiJob.Namespace, mpiJob.Name) - updateMPIJobConditions(mpiJob, common.JobSucceeded, mpiJobSucceededReason, msg) + updateMPIJobConditions(mpiJob, kubeflow.JobSucceeded, mpiJobSucceededReason, msg) f.setUpMPIJob(mpiJob) fmjc := f.newFakeMPIJobController() @@ -738,8 +738,8 @@ func TestShutdownWorker(t *testing.T) { f.kubeActions = append(f.kubeActions, core.NewDeleteAction(schema.GroupVersionResource{Resource: "pods"}, mpiJob.Namespace, name)) } - mpiJobCopy.Status.ReplicaStatuses = map[common.ReplicaType]*common.ReplicaStatus{ - common.ReplicaType(kubeflow.MPIReplicaTypeWorker): { + mpiJobCopy.Status.ReplicaStatuses = map[kubeflow.MPIReplicaType]*kubeflow.ReplicaStatus{ + kubeflow.MPIReplicaTypeWorker: { Active: 0, Succeeded: 0, Failed: 0, @@ -816,14 +816,14 @@ func TestLauncherActiveWorkerNotReady(t *testing.T) { f.setUpPod(worker) } msg := fmt.Sprintf("MPIJob %s/%s is created.", mpiJob.Namespace, mpiJob.Name) - updateMPIJobConditions(mpiJobCopy, common.JobCreated, mpiJobCreatedReason, msg) - mpiJobCopy.Status.ReplicaStatuses = map[common.ReplicaType]*common.ReplicaStatus{ - common.ReplicaType(kubeflow.MPIReplicaTypeLauncher): { + updateMPIJobConditions(mpiJobCopy, kubeflow.JobCreated, mpiJobCreatedReason, msg) + mpiJobCopy.Status.ReplicaStatuses = map[kubeflow.MPIReplicaType]*kubeflow.ReplicaStatus{ + kubeflow.MPIReplicaTypeLauncher: { Active: 1, Succeeded: 0, Failed: 0, }, - common.ReplicaType(kubeflow.MPIReplicaTypeWorker): { + kubeflow.MPIReplicaTypeWorker: { Active: 0, Succeeded: 0, Failed: 0, @@ -872,13 +872,13 @@ func TestLauncherActiveWorkerReady(t *testing.T) { updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, runningPodList) f.setUpConfigMap(configMap) - mpiJobCopy.Status.ReplicaStatuses = map[common.ReplicaType]*common.ReplicaStatus{ - common.ReplicaType(kubeflow.MPIReplicaTypeLauncher): { + mpiJobCopy.Status.ReplicaStatuses = map[kubeflow.MPIReplicaType]*kubeflow.ReplicaStatus{ + kubeflow.MPIReplicaTypeLauncher: { Active: 1, Succeeded: 0, Failed: 0, }, - common.ReplicaType(kubeflow.MPIReplicaTypeWorker): { + kubeflow.MPIReplicaTypeWorker: { Active: 8, Succeeded: 0, Failed: 0, @@ -886,9 +886,9 @@ func TestLauncherActiveWorkerReady(t *testing.T) { } setUpMPIJobTimestamp(mpiJobCopy, &startTime, &completionTime) msg := fmt.Sprintf("MPIJob %s/%s is created.", mpiJob.Namespace, mpiJob.Name) - updateMPIJobConditions(mpiJobCopy, common.JobCreated, mpiJobCreatedReason, msg) + updateMPIJobConditions(mpiJobCopy, kubeflow.JobCreated, mpiJobCreatedReason, msg) msg = fmt.Sprintf("MPIJob %s/%s is running.", mpiJob.Namespace, mpiJob.Name) - updateMPIJobConditions(mpiJobCopy, common.JobRunning, mpiJobRunningReason, msg) + updateMPIJobConditions(mpiJobCopy, kubeflow.JobRunning, mpiJobRunningReason, msg) f.expectUpdateMPIJobStatusAction(mpiJobCopy) f.run(getKey(mpiJob, t)) @@ -929,20 +929,20 @@ func TestWorkerReady(t *testing.T) { expLauncher := fmjc.newLauncherJob(mpiJobCopy) f.expectCreateJobAction(expLauncher) - mpiJobCopy.Status.ReplicaStatuses = map[common.ReplicaType]*common.ReplicaStatus{ - common.ReplicaType(kubeflow.MPIReplicaTypeLauncher): { + mpiJobCopy.Status.ReplicaStatuses = map[kubeflow.MPIReplicaType]*kubeflow.ReplicaStatus{ + kubeflow.MPIReplicaTypeLauncher: { Active: 0, Succeeded: 0, Failed: 0, }, - common.ReplicaType(kubeflow.MPIReplicaTypeWorker): { + kubeflow.MPIReplicaTypeWorker: { Active: 16, Succeeded: 0, Failed: 0, }, } msg := fmt.Sprintf("MPIJob %s/%s is created.", mpiJob.Namespace, mpiJob.Name) - updateMPIJobConditions(mpiJobCopy, common.JobCreated, mpiJobCreatedReason, msg) + updateMPIJobConditions(mpiJobCopy, kubeflow.JobCreated, mpiJobCreatedReason, msg) setUpMPIJobTimestamp(mpiJobCopy, &startTime, &completionTime) f.expectUpdateMPIJobStatusAction(mpiJobCopy) diff --git a/sdk/python/v2beta1/README.md b/sdk/python/v2beta1/README.md index 70fe72ab7..a2e817fd1 100644 --- a/sdk/python/v2beta1/README.md +++ b/sdk/python/v2beta1/README.md @@ -70,9 +70,12 @@ Class | Method | HTTP request | Description - [V1ReplicaStatus](docs/V1ReplicaStatus.md) - [V1RunPolicy](docs/V1RunPolicy.md) - [V1SchedulingPolicy](docs/V1SchedulingPolicy.md) + - [V2beta1JobCondition](docs/V2beta1JobCondition.md) + - [V2beta1JobStatus](docs/V2beta1JobStatus.md) - [V2beta1MPIJob](docs/V2beta1MPIJob.md) - [V2beta1MPIJobList](docs/V2beta1MPIJobList.md) - [V2beta1MPIJobSpec](docs/V2beta1MPIJobSpec.md) + - [V2beta1ReplicaStatus](docs/V2beta1ReplicaStatus.md) - [V2beta1RunPolicy](docs/V2beta1RunPolicy.md) - [V2beta1SchedulingPolicy](docs/V2beta1SchedulingPolicy.md) diff --git a/sdk/python/v2beta1/docs/V2beta1JobCondition.md b/sdk/python/v2beta1/docs/V2beta1JobCondition.md new file mode 100644 index 000000000..7138b1668 --- /dev/null +++ b/sdk/python/v2beta1/docs/V2beta1JobCondition.md @@ -0,0 +1,17 @@ +# V2beta1JobCondition + +JobCondition describes the state of the job at a certain point. + +## Properties +Name | Type | Description | Notes +------------ | ------------- | ------------- | ------------- +**last_transition_time** | [**V1Time**](V1Time.md) | | [optional] +**last_update_time** | [**V1Time**](V1Time.md) | | [optional] +**message** | **str** | A human-readable message indicating details about the transition. | [optional] +**reason** | **str** | The reason for the condition's last transition. | [optional] +**status** | **str** | status of the condition, one of True, False, Unknown. | [default to ''] +**type** | **str** | type of job condition. | [default to ''] + +[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) + + diff --git a/sdk/python/v2beta1/docs/V2beta1JobStatus.md b/sdk/python/v2beta1/docs/V2beta1JobStatus.md new file mode 100644 index 000000000..4b6e076bf --- /dev/null +++ b/sdk/python/v2beta1/docs/V2beta1JobStatus.md @@ -0,0 +1,16 @@ +# V2beta1JobStatus + +JobStatus represents the current observed state of the training Job. + +## Properties +Name | Type | Description | Notes +------------ | ------------- | ------------- | ------------- +**completion_time** | [**V1Time**](V1Time.md) | | [optional] +**conditions** | [**list[V2beta1JobCondition]**](V2beta1JobCondition.md) | conditions is a list of current observed job conditions. | [optional] +**last_reconcile_time** | [**V1Time**](V1Time.md) | | [optional] +**replica_statuses** | [**dict(str, V2beta1ReplicaStatus)**](V2beta1ReplicaStatus.md) | replicaStatuses is map of ReplicaType and ReplicaStatus, specifies the status of each replica. | [optional] +**start_time** | [**V1Time**](V1Time.md) | | [optional] + +[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) + + diff --git a/sdk/python/v2beta1/docs/V2beta1MPIJob.md b/sdk/python/v2beta1/docs/V2beta1MPIJob.md index 6e45d7cdf..d6aac2842 100644 --- a/sdk/python/v2beta1/docs/V2beta1MPIJob.md +++ b/sdk/python/v2beta1/docs/V2beta1MPIJob.md @@ -8,7 +8,7 @@ Name | Type | Description | Notes **kind** | **str** | Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds | [optional] **metadata** | [**V1ObjectMeta**](V1ObjectMeta.md) | | [optional] **spec** | [**V2beta1MPIJobSpec**](V2beta1MPIJobSpec.md) | | [optional] -**status** | [**V1JobStatus**](V1JobStatus.md) | | [optional] +**status** | [**V2beta1JobStatus**](V2beta1JobStatus.md) | | [optional] [[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) diff --git a/sdk/python/v2beta1/docs/V2beta1ReplicaStatus.md b/sdk/python/v2beta1/docs/V2beta1ReplicaStatus.md new file mode 100644 index 000000000..09d83de72 --- /dev/null +++ b/sdk/python/v2beta1/docs/V2beta1ReplicaStatus.md @@ -0,0 +1,16 @@ +# V2beta1ReplicaStatus + +ReplicaStatus represents the current observed state of the replica. + +## Properties +Name | Type | Description | Notes +------------ | ------------- | ------------- | ------------- +**active** | **int** | The number of actively running pods. | [optional] +**failed** | **int** | The number of pods which reached phase failed. | [optional] +**label_selector** | [**V1LabelSelector**](V1LabelSelector.md) | | [optional] +**selector** | **str** | A selector is a label query over a set of resources. The result of matchLabels and matchExpressions are ANDed. An empty selector matches all objects. A null selector matches no objects. | [optional] +**succeeded** | **int** | The number of pods which reached phase succeeded. | [optional] + +[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) + + diff --git a/sdk/python/v2beta1/mpijob/__init__.py b/sdk/python/v2beta1/mpijob/__init__.py index 05e1f6604..1e7c84993 100644 --- a/sdk/python/v2beta1/mpijob/__init__.py +++ b/sdk/python/v2beta1/mpijob/__init__.py @@ -34,9 +34,12 @@ from mpijob.models.v1_replica_status import V1ReplicaStatus from mpijob.models.v1_run_policy import V1RunPolicy from mpijob.models.v1_scheduling_policy import V1SchedulingPolicy +from mpijob.models.v2beta1_job_condition import V2beta1JobCondition +from mpijob.models.v2beta1_job_status import V2beta1JobStatus from mpijob.models.v2beta1_mpi_job import V2beta1MPIJob from mpijob.models.v2beta1_mpi_job_list import V2beta1MPIJobList from mpijob.models.v2beta1_mpi_job_spec import V2beta1MPIJobSpec +from mpijob.models.v2beta1_replica_status import V2beta1ReplicaStatus from mpijob.models.v2beta1_run_policy import V2beta1RunPolicy from mpijob.models.v2beta1_scheduling_policy import V2beta1SchedulingPolicy diff --git a/sdk/python/v2beta1/mpijob/models/__init__.py b/sdk/python/v2beta1/mpijob/models/__init__.py index 548a03870..f1d1b4dbb 100644 --- a/sdk/python/v2beta1/mpijob/models/__init__.py +++ b/sdk/python/v2beta1/mpijob/models/__init__.py @@ -20,8 +20,11 @@ from mpijob.models.v1_replica_status import V1ReplicaStatus from mpijob.models.v1_run_policy import V1RunPolicy from mpijob.models.v1_scheduling_policy import V1SchedulingPolicy +from mpijob.models.v2beta1_job_condition import V2beta1JobCondition +from mpijob.models.v2beta1_job_status import V2beta1JobStatus from mpijob.models.v2beta1_mpi_job import V2beta1MPIJob from mpijob.models.v2beta1_mpi_job_list import V2beta1MPIJobList from mpijob.models.v2beta1_mpi_job_spec import V2beta1MPIJobSpec +from mpijob.models.v2beta1_replica_status import V2beta1ReplicaStatus from mpijob.models.v2beta1_run_policy import V2beta1RunPolicy from mpijob.models.v2beta1_scheduling_policy import V2beta1SchedulingPolicy diff --git a/sdk/python/v2beta1/mpijob/models/v2beta1_job_condition.py b/sdk/python/v2beta1/mpijob/models/v2beta1_job_condition.py new file mode 100644 index 000000000..2b5bb5ddf --- /dev/null +++ b/sdk/python/v2beta1/mpijob/models/v2beta1_job_condition.py @@ -0,0 +1,268 @@ +# coding: utf-8 + +""" + mpijob + + Python SDK for MPI-Operator # noqa: E501 + + The version of the OpenAPI document: v2beta1 + Generated by: https://openapi-generator.tech +""" + + +import inspect +import pprint +import re # noqa: F401 +import six + +from mpijob.configuration import Configuration + + +class V2beta1JobCondition(object): + """NOTE: This class is auto generated by OpenAPI Generator. + Ref: https://openapi-generator.tech + + Do not edit the class manually. + """ + + """ + Attributes: + openapi_types (dict): The key is attribute name + and the value is attribute type. + attribute_map (dict): The key is attribute name + and the value is json key in definition. + """ + openapi_types = { + 'last_transition_time': 'V1Time', + 'last_update_time': 'V1Time', + 'message': 'str', + 'reason': 'str', + 'status': 'str', + 'type': 'str' + } + + attribute_map = { + 'last_transition_time': 'lastTransitionTime', + 'last_update_time': 'lastUpdateTime', + 'message': 'message', + 'reason': 'reason', + 'status': 'status', + 'type': 'type' + } + + def __init__(self, last_transition_time=None, last_update_time=None, message=None, reason=None, status='', type='', local_vars_configuration=None): # noqa: E501 + """V2beta1JobCondition - a model defined in OpenAPI""" # noqa: E501 + if local_vars_configuration is None: + local_vars_configuration = Configuration.get_default_copy() + self.local_vars_configuration = local_vars_configuration + + self._last_transition_time = None + self._last_update_time = None + self._message = None + self._reason = None + self._status = None + self._type = None + self.discriminator = None + + if last_transition_time is not None: + self.last_transition_time = last_transition_time + if last_update_time is not None: + self.last_update_time = last_update_time + if message is not None: + self.message = message + if reason is not None: + self.reason = reason + self.status = status + self.type = type + + @property + def last_transition_time(self): + """Gets the last_transition_time of this V2beta1JobCondition. # noqa: E501 + + + :return: The last_transition_time of this V2beta1JobCondition. # noqa: E501 + :rtype: V1Time + """ + return self._last_transition_time + + @last_transition_time.setter + def last_transition_time(self, last_transition_time): + """Sets the last_transition_time of this V2beta1JobCondition. + + + :param last_transition_time: The last_transition_time of this V2beta1JobCondition. # noqa: E501 + :type last_transition_time: V1Time + """ + + self._last_transition_time = last_transition_time + + @property + def last_update_time(self): + """Gets the last_update_time of this V2beta1JobCondition. # noqa: E501 + + + :return: The last_update_time of this V2beta1JobCondition. # noqa: E501 + :rtype: V1Time + """ + return self._last_update_time + + @last_update_time.setter + def last_update_time(self, last_update_time): + """Sets the last_update_time of this V2beta1JobCondition. + + + :param last_update_time: The last_update_time of this V2beta1JobCondition. # noqa: E501 + :type last_update_time: V1Time + """ + + self._last_update_time = last_update_time + + @property + def message(self): + """Gets the message of this V2beta1JobCondition. # noqa: E501 + + A human-readable message indicating details about the transition. # noqa: E501 + + :return: The message of this V2beta1JobCondition. # noqa: E501 + :rtype: str + """ + return self._message + + @message.setter + def message(self, message): + """Sets the message of this V2beta1JobCondition. + + A human-readable message indicating details about the transition. # noqa: E501 + + :param message: The message of this V2beta1JobCondition. # noqa: E501 + :type message: str + """ + + self._message = message + + @property + def reason(self): + """Gets the reason of this V2beta1JobCondition. # noqa: E501 + + The reason for the condition's last transition. # noqa: E501 + + :return: The reason of this V2beta1JobCondition. # noqa: E501 + :rtype: str + """ + return self._reason + + @reason.setter + def reason(self, reason): + """Sets the reason of this V2beta1JobCondition. + + The reason for the condition's last transition. # noqa: E501 + + :param reason: The reason of this V2beta1JobCondition. # noqa: E501 + :type reason: str + """ + + self._reason = reason + + @property + def status(self): + """Gets the status of this V2beta1JobCondition. # noqa: E501 + + status of the condition, one of True, False, Unknown. # noqa: E501 + + :return: The status of this V2beta1JobCondition. # noqa: E501 + :rtype: str + """ + return self._status + + @status.setter + def status(self, status): + """Sets the status of this V2beta1JobCondition. + + status of the condition, one of True, False, Unknown. # noqa: E501 + + :param status: The status of this V2beta1JobCondition. # noqa: E501 + :type status: str + """ + if self.local_vars_configuration.client_side_validation and status is None: # noqa: E501 + raise ValueError("Invalid value for `status`, must not be `None`") # noqa: E501 + + self._status = status + + @property + def type(self): + """Gets the type of this V2beta1JobCondition. # noqa: E501 + + type of job condition. # noqa: E501 + + :return: The type of this V2beta1JobCondition. # noqa: E501 + :rtype: str + """ + return self._type + + @type.setter + def type(self, type): + """Sets the type of this V2beta1JobCondition. + + type of job condition. # noqa: E501 + + :param type: The type of this V2beta1JobCondition. # noqa: E501 + :type type: str + """ + if self.local_vars_configuration.client_side_validation and type is None: # noqa: E501 + raise ValueError("Invalid value for `type`, must not be `None`") # noqa: E501 + + self._type = type + + def to_dict(self, serialize=False): + """Returns the model properties as a dict""" + result = {} + + def convert(x): + if hasattr(x, "to_dict"): + args = inspect.getargspec(x.to_dict).args + if len(args) == 1: + return x.to_dict() + else: + return x.to_dict(serialize) + else: + return x + + for attr, _ in six.iteritems(self.openapi_types): + value = getattr(self, attr) + attr = self.attribute_map.get(attr, attr) if serialize else attr + if isinstance(value, list): + result[attr] = list(map( + lambda x: convert(x), + value + )) + elif isinstance(value, dict): + result[attr] = dict(map( + lambda item: (item[0], convert(item[1])), + value.items() + )) + else: + result[attr] = convert(value) + + return result + + def to_str(self): + """Returns the string representation of the model""" + return pprint.pformat(self.to_dict()) + + def __repr__(self): + """For `print` and `pprint`""" + return self.to_str() + + def __eq__(self, other): + """Returns true if both objects are equal""" + if not isinstance(other, V2beta1JobCondition): + return False + + return self.to_dict() == other.to_dict() + + def __ne__(self, other): + """Returns true if both objects are not equal""" + if not isinstance(other, V2beta1JobCondition): + return True + + return self.to_dict() != other.to_dict() diff --git a/sdk/python/v2beta1/mpijob/models/v2beta1_job_status.py b/sdk/python/v2beta1/mpijob/models/v2beta1_job_status.py new file mode 100644 index 000000000..b3dd4733f --- /dev/null +++ b/sdk/python/v2beta1/mpijob/models/v2beta1_job_status.py @@ -0,0 +1,236 @@ +# coding: utf-8 + +""" + mpijob + + Python SDK for MPI-Operator # noqa: E501 + + The version of the OpenAPI document: v2beta1 + Generated by: https://openapi-generator.tech +""" + + +import inspect +import pprint +import re # noqa: F401 +import six + +from mpijob.configuration import Configuration + + +class V2beta1JobStatus(object): + """NOTE: This class is auto generated by OpenAPI Generator. + Ref: https://openapi-generator.tech + + Do not edit the class manually. + """ + + """ + Attributes: + openapi_types (dict): The key is attribute name + and the value is attribute type. + attribute_map (dict): The key is attribute name + and the value is json key in definition. + """ + openapi_types = { + 'completion_time': 'V1Time', + 'conditions': 'list[V2beta1JobCondition]', + 'last_reconcile_time': 'V1Time', + 'replica_statuses': 'dict(str, V2beta1ReplicaStatus)', + 'start_time': 'V1Time' + } + + attribute_map = { + 'completion_time': 'completionTime', + 'conditions': 'conditions', + 'last_reconcile_time': 'lastReconcileTime', + 'replica_statuses': 'replicaStatuses', + 'start_time': 'startTime' + } + + def __init__(self, completion_time=None, conditions=None, last_reconcile_time=None, replica_statuses=None, start_time=None, local_vars_configuration=None): # noqa: E501 + """V2beta1JobStatus - a model defined in OpenAPI""" # noqa: E501 + if local_vars_configuration is None: + local_vars_configuration = Configuration.get_default_copy() + self.local_vars_configuration = local_vars_configuration + + self._completion_time = None + self._conditions = None + self._last_reconcile_time = None + self._replica_statuses = None + self._start_time = None + self.discriminator = None + + if completion_time is not None: + self.completion_time = completion_time + if conditions is not None: + self.conditions = conditions + if last_reconcile_time is not None: + self.last_reconcile_time = last_reconcile_time + if replica_statuses is not None: + self.replica_statuses = replica_statuses + if start_time is not None: + self.start_time = start_time + + @property + def completion_time(self): + """Gets the completion_time of this V2beta1JobStatus. # noqa: E501 + + + :return: The completion_time of this V2beta1JobStatus. # noqa: E501 + :rtype: V1Time + """ + return self._completion_time + + @completion_time.setter + def completion_time(self, completion_time): + """Sets the completion_time of this V2beta1JobStatus. + + + :param completion_time: The completion_time of this V2beta1JobStatus. # noqa: E501 + :type completion_time: V1Time + """ + + self._completion_time = completion_time + + @property + def conditions(self): + """Gets the conditions of this V2beta1JobStatus. # noqa: E501 + + conditions is a list of current observed job conditions. # noqa: E501 + + :return: The conditions of this V2beta1JobStatus. # noqa: E501 + :rtype: list[V2beta1JobCondition] + """ + return self._conditions + + @conditions.setter + def conditions(self, conditions): + """Sets the conditions of this V2beta1JobStatus. + + conditions is a list of current observed job conditions. # noqa: E501 + + :param conditions: The conditions of this V2beta1JobStatus. # noqa: E501 + :type conditions: list[V2beta1JobCondition] + """ + + self._conditions = conditions + + @property + def last_reconcile_time(self): + """Gets the last_reconcile_time of this V2beta1JobStatus. # noqa: E501 + + + :return: The last_reconcile_time of this V2beta1JobStatus. # noqa: E501 + :rtype: V1Time + """ + return self._last_reconcile_time + + @last_reconcile_time.setter + def last_reconcile_time(self, last_reconcile_time): + """Sets the last_reconcile_time of this V2beta1JobStatus. + + + :param last_reconcile_time: The last_reconcile_time of this V2beta1JobStatus. # noqa: E501 + :type last_reconcile_time: V1Time + """ + + self._last_reconcile_time = last_reconcile_time + + @property + def replica_statuses(self): + """Gets the replica_statuses of this V2beta1JobStatus. # noqa: E501 + + replicaStatuses is map of ReplicaType and ReplicaStatus, specifies the status of each replica. # noqa: E501 + + :return: The replica_statuses of this V2beta1JobStatus. # noqa: E501 + :rtype: dict(str, V2beta1ReplicaStatus) + """ + return self._replica_statuses + + @replica_statuses.setter + def replica_statuses(self, replica_statuses): + """Sets the replica_statuses of this V2beta1JobStatus. + + replicaStatuses is map of ReplicaType and ReplicaStatus, specifies the status of each replica. # noqa: E501 + + :param replica_statuses: The replica_statuses of this V2beta1JobStatus. # noqa: E501 + :type replica_statuses: dict(str, V2beta1ReplicaStatus) + """ + + self._replica_statuses = replica_statuses + + @property + def start_time(self): + """Gets the start_time of this V2beta1JobStatus. # noqa: E501 + + + :return: The start_time of this V2beta1JobStatus. # noqa: E501 + :rtype: V1Time + """ + return self._start_time + + @start_time.setter + def start_time(self, start_time): + """Sets the start_time of this V2beta1JobStatus. + + + :param start_time: The start_time of this V2beta1JobStatus. # noqa: E501 + :type start_time: V1Time + """ + + self._start_time = start_time + + def to_dict(self, serialize=False): + """Returns the model properties as a dict""" + result = {} + + def convert(x): + if hasattr(x, "to_dict"): + args = inspect.getargspec(x.to_dict).args + if len(args) == 1: + return x.to_dict() + else: + return x.to_dict(serialize) + else: + return x + + for attr, _ in six.iteritems(self.openapi_types): + value = getattr(self, attr) + attr = self.attribute_map.get(attr, attr) if serialize else attr + if isinstance(value, list): + result[attr] = list(map( + lambda x: convert(x), + value + )) + elif isinstance(value, dict): + result[attr] = dict(map( + lambda item: (item[0], convert(item[1])), + value.items() + )) + else: + result[attr] = convert(value) + + return result + + def to_str(self): + """Returns the string representation of the model""" + return pprint.pformat(self.to_dict()) + + def __repr__(self): + """For `print` and `pprint`""" + return self.to_str() + + def __eq__(self, other): + """Returns true if both objects are equal""" + if not isinstance(other, V2beta1JobStatus): + return False + + return self.to_dict() == other.to_dict() + + def __ne__(self, other): + """Returns true if both objects are not equal""" + if not isinstance(other, V2beta1JobStatus): + return True + + return self.to_dict() != other.to_dict() diff --git a/sdk/python/v2beta1/mpijob/models/v2beta1_mpi_job.py b/sdk/python/v2beta1/mpijob/models/v2beta1_mpi_job.py index 530fb9ffd..310c27bf6 100644 --- a/sdk/python/v2beta1/mpijob/models/v2beta1_mpi_job.py +++ b/sdk/python/v2beta1/mpijob/models/v2beta1_mpi_job.py @@ -37,7 +37,7 @@ class V2beta1MPIJob(object): 'kind': 'str', 'metadata': 'V1ObjectMeta', 'spec': 'V2beta1MPIJobSpec', - 'status': 'V1JobStatus' + 'status': 'V2beta1JobStatus' } attribute_map = { @@ -166,7 +166,7 @@ def status(self): :return: The status of this V2beta1MPIJob. # noqa: E501 - :rtype: V1JobStatus + :rtype: V2beta1JobStatus """ return self._status @@ -176,7 +176,7 @@ def status(self, status): :param status: The status of this V2beta1MPIJob. # noqa: E501 - :type status: V1JobStatus + :type status: V2beta1JobStatus """ self._status = status diff --git a/sdk/python/v2beta1/mpijob/models/v2beta1_replica_status.py b/sdk/python/v2beta1/mpijob/models/v2beta1_replica_status.py new file mode 100644 index 000000000..6785f2a25 --- /dev/null +++ b/sdk/python/v2beta1/mpijob/models/v2beta1_replica_status.py @@ -0,0 +1,240 @@ +# coding: utf-8 + +""" + mpijob + + Python SDK for MPI-Operator # noqa: E501 + + The version of the OpenAPI document: v2beta1 + Generated by: https://openapi-generator.tech +""" + + +import inspect +import pprint +import re # noqa: F401 +import six + +from mpijob.configuration import Configuration + + +class V2beta1ReplicaStatus(object): + """NOTE: This class is auto generated by OpenAPI Generator. + Ref: https://openapi-generator.tech + + Do not edit the class manually. + """ + + """ + Attributes: + openapi_types (dict): The key is attribute name + and the value is attribute type. + attribute_map (dict): The key is attribute name + and the value is json key in definition. + """ + openapi_types = { + 'active': 'int', + 'failed': 'int', + 'label_selector': 'V1LabelSelector', + 'selector': 'str', + 'succeeded': 'int' + } + + attribute_map = { + 'active': 'active', + 'failed': 'failed', + 'label_selector': 'labelSelector', + 'selector': 'selector', + 'succeeded': 'succeeded' + } + + def __init__(self, active=None, failed=None, label_selector=None, selector=None, succeeded=None, local_vars_configuration=None): # noqa: E501 + """V2beta1ReplicaStatus - a model defined in OpenAPI""" # noqa: E501 + if local_vars_configuration is None: + local_vars_configuration = Configuration.get_default_copy() + self.local_vars_configuration = local_vars_configuration + + self._active = None + self._failed = None + self._label_selector = None + self._selector = None + self._succeeded = None + self.discriminator = None + + if active is not None: + self.active = active + if failed is not None: + self.failed = failed + if label_selector is not None: + self.label_selector = label_selector + if selector is not None: + self.selector = selector + if succeeded is not None: + self.succeeded = succeeded + + @property + def active(self): + """Gets the active of this V2beta1ReplicaStatus. # noqa: E501 + + The number of actively running pods. # noqa: E501 + + :return: The active of this V2beta1ReplicaStatus. # noqa: E501 + :rtype: int + """ + return self._active + + @active.setter + def active(self, active): + """Sets the active of this V2beta1ReplicaStatus. + + The number of actively running pods. # noqa: E501 + + :param active: The active of this V2beta1ReplicaStatus. # noqa: E501 + :type active: int + """ + + self._active = active + + @property + def failed(self): + """Gets the failed of this V2beta1ReplicaStatus. # noqa: E501 + + The number of pods which reached phase failed. # noqa: E501 + + :return: The failed of this V2beta1ReplicaStatus. # noqa: E501 + :rtype: int + """ + return self._failed + + @failed.setter + def failed(self, failed): + """Sets the failed of this V2beta1ReplicaStatus. + + The number of pods which reached phase failed. # noqa: E501 + + :param failed: The failed of this V2beta1ReplicaStatus. # noqa: E501 + :type failed: int + """ + + self._failed = failed + + @property + def label_selector(self): + """Gets the label_selector of this V2beta1ReplicaStatus. # noqa: E501 + + + :return: The label_selector of this V2beta1ReplicaStatus. # noqa: E501 + :rtype: V1LabelSelector + """ + return self._label_selector + + @label_selector.setter + def label_selector(self, label_selector): + """Sets the label_selector of this V2beta1ReplicaStatus. + + + :param label_selector: The label_selector of this V2beta1ReplicaStatus. # noqa: E501 + :type label_selector: V1LabelSelector + """ + + self._label_selector = label_selector + + @property + def selector(self): + """Gets the selector of this V2beta1ReplicaStatus. # noqa: E501 + + A selector is a label query over a set of resources. The result of matchLabels and matchExpressions are ANDed. An empty selector matches all objects. A null selector matches no objects. # noqa: E501 + + :return: The selector of this V2beta1ReplicaStatus. # noqa: E501 + :rtype: str + """ + return self._selector + + @selector.setter + def selector(self, selector): + """Sets the selector of this V2beta1ReplicaStatus. + + A selector is a label query over a set of resources. The result of matchLabels and matchExpressions are ANDed. An empty selector matches all objects. A null selector matches no objects. # noqa: E501 + + :param selector: The selector of this V2beta1ReplicaStatus. # noqa: E501 + :type selector: str + """ + + self._selector = selector + + @property + def succeeded(self): + """Gets the succeeded of this V2beta1ReplicaStatus. # noqa: E501 + + The number of pods which reached phase succeeded. # noqa: E501 + + :return: The succeeded of this V2beta1ReplicaStatus. # noqa: E501 + :rtype: int + """ + return self._succeeded + + @succeeded.setter + def succeeded(self, succeeded): + """Sets the succeeded of this V2beta1ReplicaStatus. + + The number of pods which reached phase succeeded. # noqa: E501 + + :param succeeded: The succeeded of this V2beta1ReplicaStatus. # noqa: E501 + :type succeeded: int + """ + + self._succeeded = succeeded + + def to_dict(self, serialize=False): + """Returns the model properties as a dict""" + result = {} + + def convert(x): + if hasattr(x, "to_dict"): + args = inspect.getargspec(x.to_dict).args + if len(args) == 1: + return x.to_dict() + else: + return x.to_dict(serialize) + else: + return x + + for attr, _ in six.iteritems(self.openapi_types): + value = getattr(self, attr) + attr = self.attribute_map.get(attr, attr) if serialize else attr + if isinstance(value, list): + result[attr] = list(map( + lambda x: convert(x), + value + )) + elif isinstance(value, dict): + result[attr] = dict(map( + lambda item: (item[0], convert(item[1])), + value.items() + )) + else: + result[attr] = convert(value) + + return result + + def to_str(self): + """Returns the string representation of the model""" + return pprint.pformat(self.to_dict()) + + def __repr__(self): + """For `print` and `pprint`""" + return self.to_str() + + def __eq__(self, other): + """Returns true if both objects are equal""" + if not isinstance(other, V2beta1ReplicaStatus): + return False + + return self.to_dict() == other.to_dict() + + def __ne__(self, other): + """Returns true if both objects are not equal""" + if not isinstance(other, V2beta1ReplicaStatus): + return True + + return self.to_dict() != other.to_dict() diff --git a/sdk/python/v2beta1/test/test_v2beta1_job_condition.py b/sdk/python/v2beta1/test/test_v2beta1_job_condition.py new file mode 100644 index 000000000..4c975243f --- /dev/null +++ b/sdk/python/v2beta1/test/test_v2beta1_job_condition.py @@ -0,0 +1,56 @@ +# coding: utf-8 + +""" + mpijob + + Python SDK for MPI-Operator # noqa: E501 + + The version of the OpenAPI document: v2beta1 + Generated by: https://openapi-generator.tech +""" + + +from __future__ import absolute_import + +import unittest +import datetime + +import mpijob +from mpijob.models.v2beta1_job_condition import V2beta1JobCondition # noqa: E501 +from mpijob.rest import ApiException + +class TestV2beta1JobCondition(unittest.TestCase): + """V2beta1JobCondition unit test stubs""" + + def setUp(self): + pass + + def tearDown(self): + pass + + def make_instance(self, include_optional): + """Test V2beta1JobCondition + include_option is a boolean, when False only required + params are included, when True both required and + optional params are included """ + # model = mpijob.models.v2beta1_job_condition.V2beta1JobCondition() # noqa: E501 + if include_optional : + return V2beta1JobCondition( + last_transition_time = None, + last_update_time = None, + message = '', + reason = '', + status = '', + type = '' + ) + else : + return V2beta1JobCondition( + ) + + def testV2beta1JobCondition(self): + """Test V2beta1JobCondition""" + inst_req_only = self.make_instance(include_optional=False) + inst_req_and_optional = self.make_instance(include_optional=True) + +if __name__ == '__main__': + unittest.main() diff --git a/sdk/python/v2beta1/test/test_v2beta1_job_status.py b/sdk/python/v2beta1/test/test_v2beta1_job_status.py new file mode 100644 index 000000000..0b8a62c10 --- /dev/null +++ b/sdk/python/v2beta1/test/test_v2beta1_job_status.py @@ -0,0 +1,70 @@ +# coding: utf-8 + +""" + mpijob + + Python SDK for MPI-Operator # noqa: E501 + + The version of the OpenAPI document: v2beta1 + Generated by: https://openapi-generator.tech +""" + + +from __future__ import absolute_import + +import unittest +import datetime + +import mpijob +from mpijob.models.v2beta1_job_status import V2beta1JobStatus # noqa: E501 +from mpijob.rest import ApiException + +class TestV2beta1JobStatus(unittest.TestCase): + """V2beta1JobStatus unit test stubs""" + + def setUp(self): + pass + + def tearDown(self): + pass + + def make_instance(self, include_optional): + """Test V2beta1JobStatus + include_option is a boolean, when False only required + params are included, when True both required and + optional params are included """ + # model = mpijob.models.v2beta1_job_status.V2beta1JobStatus() # noqa: E501 + if include_optional : + return V2beta1JobStatus( + completion_time = None, + conditions = [ + mpijob.models.v2beta1/job_condition.v2beta1.JobCondition( + last_transition_time = None, + last_update_time = None, + message = '', + reason = '', + status = '', + type = '', ) + ], + last_reconcile_time = None, + replica_statuses = { + 'key' : mpijob.models.v2beta1/replica_status.v2beta1.ReplicaStatus( + active = 56, + failed = 56, + label_selector = None, + selector = '', + succeeded = 56, ) + }, + start_time = None + ) + else : + return V2beta1JobStatus( + ) + + def testV2beta1JobStatus(self): + """Test V2beta1JobStatus""" + inst_req_only = self.make_instance(include_optional=False) + inst_req_and_optional = self.make_instance(include_optional=True) + +if __name__ == '__main__': + unittest.main() diff --git a/sdk/python/v2beta1/test/test_v2beta1_replica_status.py b/sdk/python/v2beta1/test/test_v2beta1_replica_status.py new file mode 100644 index 000000000..d4f097d12 --- /dev/null +++ b/sdk/python/v2beta1/test/test_v2beta1_replica_status.py @@ -0,0 +1,55 @@ +# coding: utf-8 + +""" + mpijob + + Python SDK for MPI-Operator # noqa: E501 + + The version of the OpenAPI document: v2beta1 + Generated by: https://openapi-generator.tech +""" + + +from __future__ import absolute_import + +import unittest +import datetime + +import mpijob +from mpijob.models.v2beta1_replica_status import V2beta1ReplicaStatus # noqa: E501 +from mpijob.rest import ApiException + +class TestV2beta1ReplicaStatus(unittest.TestCase): + """V2beta1ReplicaStatus unit test stubs""" + + def setUp(self): + pass + + def tearDown(self): + pass + + def make_instance(self, include_optional): + """Test V2beta1ReplicaStatus + include_option is a boolean, when False only required + params are included, when True both required and + optional params are included """ + # model = mpijob.models.v2beta1_replica_status.V2beta1ReplicaStatus() # noqa: E501 + if include_optional : + return V2beta1ReplicaStatus( + active = 56, + failed = 56, + label_selector = None, + selector = '', + succeeded = 56 + ) + else : + return V2beta1ReplicaStatus( + ) + + def testV2beta1ReplicaStatus(self): + """Test V2beta1ReplicaStatus""" + inst_req_only = self.make_instance(include_optional=False) + inst_req_and_optional = self.make_instance(include_optional=True) + +if __name__ == '__main__': + unittest.main() diff --git a/test/e2e/mpi_job_test.go b/test/e2e/mpi_job_test.go index f8e00a437..789739726 100644 --- a/test/e2e/mpi_job_test.go +++ b/test/e2e/mpi_job_test.go @@ -107,7 +107,7 @@ var _ = ginkgo.Describe("MPIJob", func() { }) ginkgo.It("should fail", func() { mpiJob := createJobAndWaitForCompletion(mpiJob) - expectConditionToBeTrue(mpiJob, common.JobFailed) + expectConditionToBeTrue(mpiJob, kubeflow.JobFailed) }) }) @@ -119,7 +119,7 @@ var _ = ginkgo.Describe("MPIJob", func() { ginkgo.It("should succeed", func() { mpiJob := createJobAndWaitForCompletion(mpiJob) - expectConditionToBeTrue(mpiJob, common.JobSucceeded) + expectConditionToBeTrue(mpiJob, kubeflow.JobSucceeded) }) ginkgo.When("running with host network", func() { @@ -134,7 +134,7 @@ var _ = ginkgo.Describe("MPIJob", func() { ginkgo.It("should succeed", func() { mpiJob := createJobAndWaitForCompletion(mpiJob) - expectConditionToBeTrue(mpiJob, common.JobSucceeded) + expectConditionToBeTrue(mpiJob, kubeflow.JobSucceeded) }) }) }) @@ -158,7 +158,7 @@ var _ = ginkgo.Describe("MPIJob", func() { ginkgo.It("should succeed", func() { mpiJob := createJobAndWaitForCompletion(mpiJob) - expectConditionToBeTrue(mpiJob, common.JobSucceeded) + expectConditionToBeTrue(mpiJob, kubeflow.JobSucceeded) }) }) @@ -204,7 +204,7 @@ var _ = ginkgo.Describe("MPIJob", func() { ginkgo.It("should succeed", func() { mpiJob := createJobAndWaitForCompletion(mpiJob) - expectConditionToBeTrue(mpiJob, common.JobSucceeded) + expectConditionToBeTrue(mpiJob, kubeflow.JobSucceeded) }) }) @@ -297,8 +297,8 @@ func podLogs(ctx context.Context, p *corev1.Pod) error { return nil } -func expectConditionToBeTrue(mpiJob *kubeflow.MPIJob, condType common.JobConditionType) { - var condition *common.JobCondition +func expectConditionToBeTrue(mpiJob *kubeflow.MPIJob, condType kubeflow.JobConditionType) { + var condition *kubeflow.JobCondition for _, cond := range mpiJob.Status.Conditions { if cond.Type == condType { condition = &cond diff --git a/test/integration/mpi_job_controller_test.go b/test/integration/mpi_job_controller_test.go index 14c785ca6..2cb961c1e 100644 --- a/test/integration/mpi_job_controller_test.go +++ b/test/integration/mpi_job_controller_test.go @@ -100,11 +100,11 @@ func TestMPIJobSuccess(t *testing.T) { }, mpiJob)) workerPods, launcherJob := validateMPIJobDependencies(ctx, t, s.kClient, mpiJob, 2) - mpiJob = validateMPIJobStatus(ctx, t, s.mpiClient, mpiJob, map[common.ReplicaType]*common.ReplicaStatus{ - common.ReplicaType(kubeflow.MPIReplicaTypeLauncher): {}, - common.ReplicaType(kubeflow.MPIReplicaTypeWorker): {}, + mpiJob = validateMPIJobStatus(ctx, t, s.mpiClient, mpiJob, map[kubeflow.MPIReplicaType]*kubeflow.ReplicaStatus{ + kubeflow.MPIReplicaTypeLauncher: {}, + kubeflow.MPIReplicaTypeWorker: {}, }) - if !mpiJobHasCondition(mpiJob, common.JobCreated) { + if !mpiJobHasCondition(mpiJob, kubeflow.JobCreated) { t.Errorf("MPIJob missing Created condition") } s.events.verify(t) @@ -113,9 +113,9 @@ func TestMPIJobSuccess(t *testing.T) { if err != nil { t.Fatalf("Updating worker Pods to Running phase: %v", err) } - validateMPIJobStatus(ctx, t, s.mpiClient, mpiJob, map[common.ReplicaType]*common.ReplicaStatus{ - common.ReplicaType(kubeflow.MPIReplicaTypeLauncher): {}, - common.ReplicaType(kubeflow.MPIReplicaTypeWorker): { + validateMPIJobStatus(ctx, t, s.mpiClient, mpiJob, map[kubeflow.MPIReplicaType]*kubeflow.ReplicaStatus{ + kubeflow.MPIReplicaTypeLauncher: {}, + kubeflow.MPIReplicaTypeWorker: { Active: 2, }, }) @@ -132,11 +132,11 @@ func TestMPIJobSuccess(t *testing.T) { if err != nil { t.Fatalf("Updating launcher Pods to Running phase: %v", err) } - validateMPIJobStatus(ctx, t, s.mpiClient, mpiJob, map[common.ReplicaType]*common.ReplicaStatus{ - common.ReplicaType(kubeflow.MPIReplicaTypeLauncher): { + validateMPIJobStatus(ctx, t, s.mpiClient, mpiJob, map[kubeflow.MPIReplicaType]*kubeflow.ReplicaStatus{ + kubeflow.MPIReplicaTypeLauncher: { Active: 1, }, - common.ReplicaType(kubeflow.MPIReplicaTypeWorker): { + kubeflow.MPIReplicaTypeWorker: { Active: 2, }, }) @@ -157,14 +157,14 @@ func TestMPIJobSuccess(t *testing.T) { t.Fatalf("Updating launcher Job Complete condition: %v", err) } validateMPIJobDependencies(ctx, t, s.kClient, mpiJob, 0) - mpiJob = validateMPIJobStatus(ctx, t, s.mpiClient, mpiJob, map[common.ReplicaType]*common.ReplicaStatus{ - common.ReplicaType(kubeflow.MPIReplicaTypeLauncher): { + mpiJob = validateMPIJobStatus(ctx, t, s.mpiClient, mpiJob, map[kubeflow.MPIReplicaType]*kubeflow.ReplicaStatus{ + kubeflow.MPIReplicaTypeLauncher: { Succeeded: 1, }, - common.ReplicaType(kubeflow.MPIReplicaTypeWorker): {}, + kubeflow.MPIReplicaTypeWorker: {}, }) s.events.verify(t) - if !mpiJobHasCondition(mpiJob, common.JobSucceeded) { + if !mpiJobHasCondition(mpiJob, kubeflow.JobSucceeded) { t.Errorf("MPIJob doesn't have Succeeded condition after launcher Job succeeded") } } @@ -255,15 +255,15 @@ func TestMPIJobFailure(t *testing.T) { if err != nil { t.Fatalf("Failed to update launcher Job failed pods: %v", err) } - mpiJob = validateMPIJobStatus(ctx, t, s.mpiClient, mpiJob, map[common.ReplicaType]*common.ReplicaStatus{ - common.ReplicaType(kubeflow.MPIReplicaTypeLauncher): { + mpiJob = validateMPIJobStatus(ctx, t, s.mpiClient, mpiJob, map[kubeflow.MPIReplicaType]*kubeflow.ReplicaStatus{ + kubeflow.MPIReplicaTypeLauncher: { Failed: 1, }, - common.ReplicaType(kubeflow.MPIReplicaTypeWorker): { + kubeflow.MPIReplicaTypeWorker: { Active: 2, }, }) - if mpiJobHasCondition(mpiJob, common.JobFailed) { + if mpiJobHasCondition(mpiJob, kubeflow.JobFailed) { t.Errorf("MPIJob has Failed condition when a launcher Pod fails") } @@ -281,14 +281,14 @@ func TestMPIJobFailure(t *testing.T) { t.Fatalf("Updating launcher Job Failed condition: %v", err) } validateMPIJobDependencies(ctx, t, s.kClient, mpiJob, 0) - mpiJob = validateMPIJobStatus(ctx, t, s.mpiClient, mpiJob, map[common.ReplicaType]*common.ReplicaStatus{ - common.ReplicaType(kubeflow.MPIReplicaTypeLauncher): { + mpiJob = validateMPIJobStatus(ctx, t, s.mpiClient, mpiJob, map[kubeflow.MPIReplicaType]*kubeflow.ReplicaStatus{ + kubeflow.MPIReplicaTypeLauncher: { Failed: 2, }, - common.ReplicaType(kubeflow.MPIReplicaTypeWorker): {}, + kubeflow.MPIReplicaTypeWorker: {}, }) s.events.verify(t) - if !mpiJobHasCondition(mpiJob, common.JobFailed) { + if !mpiJobHasCondition(mpiJob, kubeflow.JobFailed) { t.Errorf("MPIJob doesn't have Failed condition after launcher Job fails") } } @@ -399,12 +399,12 @@ func validateMPIJobDependencies(ctx context.Context, t *testing.T, kubeClient ku return workerPods, launcherJob } -func validateMPIJobStatus(ctx context.Context, t *testing.T, client clientset.Interface, job *kubeflow.MPIJob, want map[common.ReplicaType]*common.ReplicaStatus) *kubeflow.MPIJob { +func validateMPIJobStatus(ctx context.Context, t *testing.T, client clientset.Interface, job *kubeflow.MPIJob, want map[kubeflow.MPIReplicaType]*kubeflow.ReplicaStatus) *kubeflow.MPIJob { t.Helper() var ( newJob *kubeflow.MPIJob err error - got map[common.ReplicaType]*common.ReplicaStatus + got map[kubeflow.MPIReplicaType]*kubeflow.ReplicaStatus ) if err := wait.Poll(waitInterval, wait.ForeverTestTimeout, func() (bool, error) { newJob, err = client.KubeflowV2beta1().MPIJobs(job.Namespace).Get(ctx, job.Name, metav1.GetOptions{}) @@ -532,7 +532,7 @@ func hasVolumeForConfigMap(podSpec *corev1.PodSpec, cm *corev1.ConfigMap) bool { return false } -func mpiJobHasCondition(job *kubeflow.MPIJob, cond common.JobConditionType) bool { +func mpiJobHasCondition(job *kubeflow.MPIJob, cond kubeflow.JobConditionType) bool { for _, c := range job.Status.Conditions { if c.Type == cond { return c.Status == corev1.ConditionTrue