diff --git a/pkg/apis/common/v1/interface.go b/pkg/apis/common/v1/interface.go index 39e95d31..06172a52 100644 --- a/pkg/apis/common/v1/interface.go +++ b/pkg/apis/common/v1/interface.go @@ -64,7 +64,7 @@ type ControllerInterface interface { // It will requeue the job in case of an error while creating/deleting pods. // Common implementation will be provided and User can still override this to implement their own reconcile logic ReconcilePods(job interface{}, jobStatus *JobStatus, pods []*v1.Pod, rtype ReplicaType, spec *ReplicaSpec, - replicas map[ReplicaType]*ReplicaSpec) error + replicas map[ReplicaType]*ReplicaSpec, runPolicy *RunPolicy) error // ReconcileServices checks and updates services for each given ReplicaSpec. // It will requeue the job in case of an error while creating/deleting services. diff --git a/pkg/apis/common/v1/openapi_generated.go b/pkg/apis/common/v1/openapi_generated.go index d0af1676..5ffa27ad 100644 --- a/pkg/apis/common/v1/openapi_generated.go +++ b/pkg/apis/common/v1/openapi_generated.go @@ -1,7 +1,7 @@ //go:build !ignore_autogenerated // +build !ignore_autogenerated -// Copyright 2021 The Kubeflow Authors +// Copyright 2022 The Kubeflow Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -536,6 +536,13 @@ func schema_pkg_apis_common_v1_RunPolicy(ref common.ReferenceCallback) common.Op Ref: ref("github.com/kubeflow/common/pkg/apis/common/v1.SchedulingPolicy"), }, }, + "suspend": { + SchemaProps: spec.SchemaProps{ + Description: "Suspend specifies whether the Job controller should create Pods or not. If a Job is created with suspend set to true, no Pods are created by the Job controller. If a Job is suspended after creation (i.e. the flag goes from false to true), the Job controller will delete all active Pods associated with this Job. Users must design their workload to gracefully handle this. Suspending a Job will reset the StartTime field of the Job, effectively resetting the ActiveDeadlineSeconds timer too. This is an alpha field and requires the SuspendJob feature gate to be enabled; otherwise this field may not be set to true. Defaults to false.", + Type: []string{"boolean"}, + Format: "", + }, + }, }, }, }, diff --git a/pkg/apis/common/v1/types.go b/pkg/apis/common/v1/types.go index e4eb7eb2..2487af39 100644 --- a/pkg/apis/common/v1/types.go +++ b/pkg/apis/common/v1/types.go @@ -135,6 +135,17 @@ const ( // reached phase failed with no restarting. // The training has failed its execution. JobFailed JobConditionType = "Failed" + + // JobSuspended means sub-resources (e.g. services/pods) of this job + // has been terminated. + JobSuspended JobConditionType = "Suspended" + + // JobResumed means job Resumed from suspended + JobResumed JobConditionType = "Resumed" + + // JobPartialSucceed means all sub-resources (e.g. services/pods) of this job's one worker + // reached phase have terminated in success. + JobPartialSucceeded JobConditionType = "PartialSucceeded" ) // +k8s:openapi-gen=true @@ -196,6 +207,17 @@ type RunPolicy struct { // SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling // +optional SchedulingPolicy *SchedulingPolicy `json:"schedulingPolicy,omitempty"` + + // Suspend specifies whether the Job controller should create Pods or not. If + // a Job is created with suspend set to true, no Pods are created by the Job + // controller. If a Job is suspended after creation (i.e. the flag goes from + // false to true), the Job controller will delete all active Pods associated + // with this Job. Users must design their workload to gracefully handle this. + // Suspending a Job will reset the StartTime field of the Job, effectively + // resetting the ActiveDeadlineSeconds timer too. + // Defaults to false. + // +optional + Suspend *bool `json:"suspend,omitempty"` } // +k8s:openapi-gen=true diff --git a/pkg/apis/common/v1/zz_generated.deepcopy.go b/pkg/apis/common/v1/zz_generated.deepcopy.go index d80060be..4e861a6d 100644 --- a/pkg/apis/common/v1/zz_generated.deepcopy.go +++ b/pkg/apis/common/v1/zz_generated.deepcopy.go @@ -1,7 +1,7 @@ //go:build !ignore_autogenerated // +build !ignore_autogenerated -// Copyright 2021 The Kubeflow Authors +// Copyright 2022 The Kubeflow Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -164,6 +164,11 @@ func (in *RunPolicy) DeepCopyInto(out *RunPolicy) { *out = new(SchedulingPolicy) (*in).DeepCopyInto(*out) } + if in.Suspend != nil { + in, out := &in.Suspend, &out.Suspend + *out = new(bool) + **out = **in + } return } diff --git a/pkg/apis/common/v1/zz_generated.defaults.go b/pkg/apis/common/v1/zz_generated.defaults.go index 1c9796c7..e9b4dec4 100644 --- a/pkg/apis/common/v1/zz_generated.defaults.go +++ b/pkg/apis/common/v1/zz_generated.defaults.go @@ -1,7 +1,7 @@ //go:build !ignore_autogenerated // +build !ignore_autogenerated -// Copyright 2021 The Kubeflow Authors +// Copyright 2022 The Kubeflow Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/controller.v1/common/job.go b/pkg/controller.v1/common/job.go index fe2b836a..e8d8ac89 100644 --- a/pkg/controller.v1/common/job.go +++ b/pkg/controller.v1/common/job.go @@ -270,7 +270,7 @@ func (jc *JobController) ReconcileJobs( // Diff current active pods/services with replicas. for rtype, spec := range replicas { - err := jc.Controller.ReconcilePods(metaObject, &jobStatus, pods, rtype, spec, replicas) + err := jc.Controller.ReconcilePods(metaObject, &jobStatus, pods, rtype, spec, replicas, runPolicy) if err != nil { log.Warnf("ReconcilePods error %v", err) return err diff --git a/pkg/controller.v1/common/pod.go b/pkg/controller.v1/common/pod.go index c878c71b..fda9080d 100644 --- a/pkg/controller.v1/common/pod.go +++ b/pkg/controller.v1/common/pod.go @@ -274,7 +274,8 @@ func (jc *JobController) ReconcilePods( pods []*v1.Pod, rType apiv1.ReplicaType, spec *apiv1.ReplicaSpec, - replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec) error { + replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec, + runPolicy *apiv1.RunPolicy) error { rt := strings.ToLower(string(rType)) metaObject, ok := job.(metav1.Object) @@ -317,6 +318,10 @@ func (jc *JobController) ReconcilePods( } else if len(podSlice) == 0 { logger.Infof("Need to create new pod: %s-%d", rt, index) + if JobSuspended(runPolicy) || commonutil.IsSuspended(*jobStatus) { + logger.Warningf("job is Suspended %s/%s", metaObject.GetNamespace(), metaObject.GetName()) + continue + } // check if this replica is the master role masterRole = jc.Controller.IsMasterRole(replicas, rType, index) err = jc.createNewPod(job, rt, index, spec, masterRole, replicas) @@ -328,7 +333,7 @@ func (jc *JobController) ReconcilePods( pod := podSlice[0] // check if the index is in the valid range, if not, we should kill the pod - if index < 0 || index >= numReplicas { + if index < 0 || index >= numReplicas || JobSuspended(runPolicy) { err = jc.PodControl.DeletePod(pod.Namespace, pod.Name, runtimeObject) if err != nil { return err diff --git a/pkg/controller.v1/common/util.go b/pkg/controller.v1/common/util.go index 99229034..f726bb80 100644 --- a/pkg/controller.v1/common/util.go +++ b/pkg/controller.v1/common/util.go @@ -143,3 +143,9 @@ func CalcPGMinResources(minMember int32, replicas map[apiv1.ReplicaType]*apiv1.R return &minAvailableTasksRes } + +// JobSuspended returns whether a Job is suspended while taking the feature +// gate into account. +func JobSuspended(runPolicy *apiv1.RunPolicy) bool { + return runPolicy.Suspend != nil && *runPolicy.Suspend +} diff --git a/pkg/core/job.go b/pkg/core/job.go index 45d932a0..c227b621 100644 --- a/pkg/core/job.go +++ b/pkg/core/job.go @@ -64,7 +64,7 @@ func RecordAbnormalPods(activePods []*v1.Pod, object runtime.Object, recorder re // PastActiveDeadline checks if job has ActiveDeadlineSeconds field set and if it is exceeded. func PastActiveDeadline(runPolicy *apiv1.RunPolicy, jobStatus apiv1.JobStatus) bool { - if runPolicy.ActiveDeadlineSeconds == nil || jobStatus.StartTime == nil { + if runPolicy.ActiveDeadlineSeconds == nil || jobStatus.StartTime == nil || (runPolicy.Suspend != nil && *runPolicy.Suspend) { return false } now := metav1.Now() diff --git a/pkg/util/status.go b/pkg/util/status.go index 372354c3..9fca7359 100644 --- a/pkg/util/status.go +++ b/pkg/util/status.go @@ -17,7 +17,12 @@ const ( JobFailedReason = "JobFailed" // JobRestarting is added in a job when it is restarting. JobRestartingReason = "JobRestarting" - + // JobSuspend is added in a job when it is been suspended. + JobSuspendedReason = "JobSuspended" + // JobResumed is added in when job Resumed. + JobResumedReason = "JobResumed" + // JobPartialSucceed is added in when job partial successed. + JobPartialSucceededReason = "JobPartialSucceeded" // labels for pods and servers. ) @@ -32,6 +37,16 @@ func IsFailed(status apiv1.JobStatus) bool { return hasCondition(status, apiv1.JobFailed) } +// IsSuspended checks if the job is suspended +func IsSuspended(status apiv1.JobStatus) bool { + return hasCondition(status, apiv1.JobSuspended) +} + +// IsSuspended checks if the job is suspended +func IsPartialSucceeded(status apiv1.JobStatus) bool { + return hasCondition(status, apiv1.JobPartialSucceeded) +} + // UpdateJobConditions adds to the jobStatus a new condition if needed, with the conditionType, reason, and message func UpdateJobConditions(jobStatus *apiv1.JobStatus, conditionType apiv1.JobConditionType, reason, message string) error { condition := newCondition(conditionType, reason, message) @@ -103,7 +118,19 @@ func filterOutCondition(conditions []apiv1.JobCondition, condType apiv1.JobCondi if condType == apiv1.JobRestarting && c.Type == apiv1.JobRunning { continue } - if condType == apiv1.JobRunning && c.Type == apiv1.JobRestarting { + if condType == apiv1.JobRunning && (c.Type == apiv1.JobRestarting || c.Type == apiv1.JobResumed) { + continue + } + + if condType == apiv1.JobSuspended && c.Type == apiv1.JobResumed { + continue + } + + if condType == apiv1.JobResumed && c.Type == apiv1.JobSuspended { + continue + } + + if condType == apiv1.JobRestarting && c.Type == apiv1.JobPartialSucceeded { continue } @@ -111,8 +138,9 @@ func filterOutCondition(conditions []apiv1.JobCondition, condType apiv1.JobCondi continue } - // Set the running condition status to be false when current condition failed or succeeded - if (condType == apiv1.JobFailed || condType == apiv1.JobSucceeded) && c.Type == apiv1.JobRunning { + // Set the running condition status to be false when current condition failed, succeeded or suspended + if (condType == apiv1.JobFailed || condType == apiv1.JobSucceeded || + condType == apiv1.JobSuspended) && c.Type == apiv1.JobRunning { c.Status = v1.ConditionFalse }