Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12060,6 +12060,10 @@ spec:
description: ShutdownAfterJobFinishes will determine whether to delete
the ray cluster once rayJob succeed or fai
type: boolean
suspend:
description: suspend specifies whether the RayJob controller should
create a RayCluster instance If a job is appl
type: boolean
ttlSecondsAfterFinished:
description: TTLSecondsAfterFinished is the TTL to clean up RayCluster.
format: int32
Expand Down
7 changes: 7 additions & 0 deletions ray-operator/apis/ray/v1alpha1/rayjob_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
JobDeploymentStatusRunning JobDeploymentStatus = "Running"
JobDeploymentStatusFailedToGetJobStatus JobDeploymentStatus = "FailedToGetJobStatus"
JobDeploymentStatusComplete JobDeploymentStatus = "Complete"
JobDeploymentStatusSuspended JobDeploymentStatus = "Suspended"
)

// RayJobSpec defines the desired state of RayJob
Expand All @@ -61,6 +62,12 @@ type RayJobSpec struct {
RayClusterSpec *RayClusterSpec `json:"rayClusterSpec,omitempty"`
// clusterSelector is used to select running rayclusters by labels
ClusterSelector map[string]string `json:"clusterSelector,omitempty"`
// suspend specifies whether the RayJob controller should create a RayCluster instance

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth explaining what happens with the RayCluster on transitions from false to true and true to false.
Does it affect .status.startTime?

Copy link

@mcariatm mcariatm Mar 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done .status.startTime is updating for each new start.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mcariatm Could you also reset .status.endTime when a job is resumed (transitions from suspend true -> false)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mcariatm Could you also reset .status.endTime when a job is resumed (transitions from suspend true -> false)

Done, line 400 in rayjob_controller.go

// If a job is applied with the suspend field set to true,
// the RayCluster will not be created and will wait for the transition to false.
// If the RayCluster is already created, it will be deleted.
// In case of transition to false a new RayCluster will be created.
Suspend bool `json:"suspend,omitempty"`
}

// RayJobStatus defines the observed state of RayJob
Expand Down
4 changes: 4 additions & 0 deletions ray-operator/config/crd/bases/ray.io_rayjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12060,6 +12060,10 @@ spec:
description: ShutdownAfterJobFinishes will determine whether to delete
the ray cluster once rayJob succeed or fai
type: boolean
suspend:
description: suspend specifies whether the RayJob controller should
create a RayCluster instance If a job is appl
type: boolean
ttlSecondsAfterFinished:
description: TTLSecondsAfterFinished is the TTL to clean up RayCluster.
format: int32
Expand Down
1 change: 1 addition & 0 deletions ray-operator/config/samples/ray_v1alpha1_rayjob.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ kind: RayJob
metadata:
name: rayjob-sample
spec:
suspend: false
entrypoint: python /home/ray/samples/sample_code.py
# runtimeEnv decoded to '{
# "pip": [
Expand Down
128 changes: 97 additions & 31 deletions ray-operator/controllers/ray/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
Expand Down Expand Up @@ -92,7 +93,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
if isJobPendingOrRunning(rayJobInstance.Status.JobStatus) {
rayDashboardClient := utils.GetRayDashboardClientFunc()
rayDashboardClient.InitClient(rayJobInstance.Status.DashboardURL)
err := rayDashboardClient.StopJob(rayJobInstance.Status.JobId, &r.Log)
err := rayDashboardClient.StopJob(ctx, rayJobInstance.Status.JobId, &r.Log)
if err != nil {
r.Log.Info("Failed to stop job", "error", err)
}
Expand Down Expand Up @@ -150,6 +151,20 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
err = r.updateState(ctx, rayJobInstance, nil, rayJobInstance.Status.JobStatus, rayv1alpha1.JobDeploymentStatusFailedToGetOrCreateRayCluster, err)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
// If there is no cluster instance and no error suspend the job deployment
if rayClusterInstance == nil {
// Already suspended?
if rayJobInstance.Status.JobDeploymentStatus == rayv1alpha1.JobDeploymentStatusSuspended {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to requeue in this case? Isn't this job fully synced at this point?

}
err = r.updateState(ctx, rayJobInstance, nil, rayJobInstance.Status.JobStatus, rayv1alpha1.JobDeploymentStatusSuspended, err)
if err != nil {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
r.Log.Info("rayJob suspended", "RayJob", rayJobInstance.Name)
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeNormal, "Suspended", "Suspended RayJob %s", rayJobInstance.Name)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question here

}

// Always update RayClusterStatus along with jobStatus and jobDeploymentStatus updates.
rayJobInstance.Status.RayClusterStatus = rayClusterInstance.Status
Expand Down Expand Up @@ -178,7 +193,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
}

// Check the current status of ray jobs before submitting.
jobInfo, err := rayDashboardClient.GetJobInfo(rayJobInstance.Status.JobId)
jobInfo, err := rayDashboardClient.GetJobInfo(ctx, rayJobInstance.Status.JobId)
if err != nil {
err = r.updateState(ctx, rayJobInstance, jobInfo, rayJobInstance.Status.JobStatus, rayv1alpha1.JobDeploymentStatusFailedToGetJobStatus, err)
// Dashboard service in head pod takes time to start, it's possible we get connection refused error.
Expand All @@ -189,7 +204,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
r.Log.V(1).Info("RayJob information", "RayJob", rayJobInstance.Name, "jobInfo", jobInfo, "rayJobInstance", rayJobInstance.Status.JobStatus)
if jobInfo == nil {
// Submit the job if no id set
jobId, err := rayDashboardClient.SubmitJob(rayJobInstance, &r.Log)
jobId, err := rayDashboardClient.SubmitJob(ctx, rayJobInstance, &r.Log)
if err != nil {
r.Log.Error(err, "failed to submit job")
err = r.updateState(ctx, rayJobInstance, jobInfo, rayJobInstance.Status.JobStatus, rayv1alpha1.JobDeploymentStatusFailedJobDeploy, err)
Expand All @@ -213,9 +228,48 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
return ctrl.Result{}, err
}

// Job may takes long time to start and finish, let's just periodically requeue the job and check status.
if isJobPendingOrRunning(jobInfo.JobStatus) && rayJobInstance.Status.JobDeploymentStatus == rayv1alpha1.JobDeploymentStatusRunning {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
if rayJobInstance.Status.JobDeploymentStatus == rayv1alpha1.JobDeploymentStatusRunning {
// If suspend flag is set AND
// the RayJob is submitted against the RayCluster created by THIS job, then
// try to gracefully stop the Ray job and delete (suspend) the cluster
if rayJobInstance.Spec.Suspend && len(rayJobInstance.Spec.ClusterSelector) == 0 {
info, err := rayDashboardClient.GetJobInfo(ctx, rayJobInstance.Status.JobId)
if err != nil {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
if !rayv1alpha1.IsJobTerminal(info.JobStatus) {
err := rayDashboardClient.StopJob(ctx, rayJobInstance.Status.JobId, &r.Log)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The StopJob command is async. It sends SIGTERM to all the Ray processes related to the job, and SIGKILL after a 3 second grace period.

Is it possible to wait for a few seconds after the StopJob before deleting the cluster? Would that approach even be reasonable?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about just requeuing and letting the next sync verify if the Job was fully stopped? Is there any way to obtain that information?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that sounds reasonable. According to the source https://github.com/ray-project/ray/blob/74325efb20d525339f482b764322ef55c9e29849/dashboard/modules/job/job_manager.py#L436-L460 we only put the JobInfo status as STOPPED after all the processes have been killed. So we can requeue and check for the STOPPED status

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like this was addressed

if err != nil {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
}
if info.JobStatus != rayv1alpha1.JobStatusStopped {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
}

_, err = r.deleteCluster(ctx, rayJobInstance)
if err != nil && !errors.IsNotFound(err) {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
}
// Since RayCluster instance is gone, remove it status also
// on RayJob resource
rayJobInstance.Status.RayClusterStatus = rayv1alpha1.RayClusterStatus{}
rayJobInstance.Status.RayClusterName = ""
rayJobInstance.Status.DashboardURL = ""
rayJobInstance.Status.JobId = ""
rayJobInstance.Status.Message = ""
err = r.updateState(ctx, rayJobInstance, jobInfo, rayv1alpha1.JobStatusStopped, rayv1alpha1.JobDeploymentStatusSuspended, nil)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this in a separate API call? can it be one with line 246?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seconding this question

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this was addressed.

if err != nil {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
r.Log.Info("rayJob suspended", "RayJob", rayJobInstance.Name)
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeNormal, "Suspended", "Suspended RayJob %s", rayJobInstance.Name)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
// Job may takes long time to start and finish, let's just periodically requeue the job and check status.
}
if isJobPendingOrRunning(jobInfo.JobStatus) {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why requeue in this case? Is there no mechanism to reconcile when there are changes to jobInfo? In other words, an approach that wouldn't need polling.

It looks like a requeue existed before this PR, so this is a question to the maintainers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re: queueing, currently we don't monitor changes to the jobinfo, but it may make sense to do that in the future.

As for the unconditional requeueing, here's some of the discussion: #850 it sounds like it's a best practice to requeue every few minutes, but it might be something that's still up for discussion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"currently we don't monitor changes to the jobinfo" actually I'm not sure about this, let me take a closer look.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think a change to jobInfo triggers anything automatically, but on each reconciliation loop we check it and do the appropriate operations (e.g. delete the cluster if the job is complete and we have the flag set to delete the cluster)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

every 60s sounds both too fast and too slow.

  • too fast in the sense that if you are running thousands of clusters at a time, the operator could be overloaded.
  • too slow in the sense that it takes too long to notice a change in the jobInfo.

Certainly having some kind of trigger and forced reconciliation with a longer period would be better. But this is beyond the scope of this PR.

}
}

// Let's use rayJobInstance.Status.JobStatus to make sure we only delete cluster after the CR is updated.
Expand All @@ -231,34 +285,38 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
return ctrl.Result{RequeueAfter: time.Duration(delta) * time.Second}, nil
}
}

r.Log.Info("shutdownAfterJobFinishes set to true, we will delete cluster",
"RayJob", rayJobInstance.Name, "clusterName", fmt.Sprintf("%s/%s", rayJobInstance.Namespace, rayJobInstance.Status.RayClusterName))
clusterIdentifier := types.NamespacedName{
Name: rayJobInstance.Status.RayClusterName,
Namespace: rayJobInstance.Namespace,
}
cluster := rayv1alpha1.RayCluster{}
if err := r.Get(ctx, clusterIdentifier, &cluster); err != nil {
if !errors.IsNotFound(err) {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
r.Log.Info("The associated cluster has been already deleted and it can not be found", "RayCluster", clusterIdentifier)
} else {
if cluster.DeletionTimestamp != nil {
r.Log.Info("The cluster deletion is ongoing.", "rayjob", rayJobInstance.Name, "raycluster", cluster.Name)
} else {
if err := r.Delete(ctx, &cluster); err != nil {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
r.Log.Info("The associated cluster is deleted", "RayCluster", clusterIdentifier)
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeNormal, "Deleted", "Deleted cluster %s", rayJobInstance.Status.RayClusterName)
return ctrl.Result{Requeue: true}, nil
}
}
return r.deleteCluster(ctx, rayJobInstance)
}
}
return ctrl.Result{}, nil
}

func (r *RayJobReconciler) deleteCluster(ctx context.Context, rayJobInstance *rayv1alpha1.RayJob) (reconcile.Result, error) {
clusterIdentifier := types.NamespacedName{
Name: rayJobInstance.Status.RayClusterName,
Namespace: rayJobInstance.Namespace,
}
cluster := rayv1alpha1.RayCluster{}
if err := r.Get(ctx, clusterIdentifier, &cluster); err != nil {
if !errors.IsNotFound(err) {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
r.Log.Info("The associated cluster has been already deleted and it can not be found", "RayCluster", clusterIdentifier)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
} else {
if cluster.DeletionTimestamp != nil {
r.Log.Info("The cluster deletion is ongoing.", "rayjob", rayJobInstance.Name, "raycluster", cluster.Name)
} else {
if err := r.Delete(ctx, &cluster); err != nil {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
r.Log.Info("The associated cluster is deleted", "RayCluster", clusterIdentifier)
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeNormal, "Deleted", "Deleted cluster %s", rayJobInstance.Status.RayClusterName)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
}
}
return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -343,7 +401,11 @@ func (r *RayJobReconciler) updateState(ctx context.Context, rayJob *rayv1alpha1.
if jobInfo != nil {
rayJob.Status.Message = jobInfo.Message
rayJob.Status.StartTime = utils.ConvertUnixTimeToMetav1Time(jobInfo.StartTime)
rayJob.Status.EndTime = utils.ConvertUnixTimeToMetav1Time(jobInfo.EndTime)
if jobInfo.StartTime >= jobInfo.EndTime {
rayJob.Status.EndTime = nil
} else {
rayJob.Status.EndTime = utils.ConvertUnixTimeToMetav1Time(jobInfo.EndTime)
}
}

// TODO (kevin85421): ObservedGeneration should be used to determine whether update this CR or not.
Expand Down Expand Up @@ -391,11 +453,15 @@ func (r *RayJobReconciler) getOrCreateRayClusterInstance(ctx context.Context, ra
return nil, err
}

// one special case is the job is complete status and cluster has been recycled.
// special case: is the job is complete status and cluster has been recycled.
if isJobSucceedOrFailed(rayJobInstance.Status.JobStatus) && rayJobInstance.Status.JobDeploymentStatus == rayv1alpha1.JobDeploymentStatusComplete {
r.Log.Info("The cluster has been recycled for the job, skip duplicate creation", "rayjob", rayJobInstance.Name)
return nil, err
}
// special case: don't create a cluster instance and don't return an error if the suspend flag of the job is true
if rayJobInstance.Spec.Suspend {
return nil, nil
}

r.Log.Info("RayCluster not found, creating rayCluster!", "raycluster", rayClusterNamespacedName)
rayClusterInstance, err = r.constructRayClusterForRayJob(rayJobInstance, rayClusterInstanceName)
Expand Down
Loading