diff --git a/api/apiv1/design/host.go b/api/apiv1/design/host.go index 7a5a351d..d5857924 100644 --- a/api/apiv1/design/host.go +++ b/api/apiv1/design/host.go @@ -253,9 +253,12 @@ var ListHostsResponseExample = map[string]any{ } var RemoveHostResponse = g.Type("RemoveHostResponse", func() { + g.Attribute("task", Task, func() { + g.Description("The task that tracks the overall host removal operation.") + }) g.Attribute("update_database_tasks", g.ArrayOf(Task), func() { g.Description("The tasks that will update databases affected by the host removal.") }) - g.Required("update_database_tasks") + g.Required("task", "update_database_tasks") }) diff --git a/api/apiv1/gen/control_plane/service.go b/api/apiv1/gen/control_plane/service.go index fd87dd19..70001c71 100644 --- a/api/apiv1/gen/control_plane/service.go +++ b/api/apiv1/gen/control_plane/service.go @@ -759,6 +759,8 @@ type RemoveHostPayload struct { // RemoveHostResponse is the result type of the control-plane service // remove-host method. type RemoveHostResponse struct { + // The task that tracks the overall host removal operation. + Task *Task // The tasks that will update databases affected by the host removal. UpdateDatabaseTasks []*Task } diff --git a/api/apiv1/gen/http/control_plane/client/types.go b/api/apiv1/gen/http/control_plane/client/types.go index 902ae7cc..198fedff 100644 --- a/api/apiv1/gen/http/control_plane/client/types.go +++ b/api/apiv1/gen/http/control_plane/client/types.go @@ -175,6 +175,8 @@ type GetHostResponseBody struct { // RemoveHostResponseBody is the type of the "control-plane" service // "remove-host" endpoint HTTP response body. type RemoveHostResponseBody struct { + // The task that tracks the overall host removal operation. + Task *TaskResponseBody `form:"task,omitempty" json:"task,omitempty" xml:"task,omitempty"` // The tasks that will update databases affected by the host removal. UpdateDatabaseTasks []*TaskResponseBody `form:"update_database_tasks,omitempty" json:"update_database_tasks,omitempty" xml:"update_database_tasks,omitempty"` } @@ -3090,6 +3092,7 @@ func NewGetHostServerError(body *GetHostServerErrorResponseBody) *controlplane.A // endpoint result from a HTTP "OK" response. func NewRemoveHostResponseOK(body *RemoveHostResponseBody) *controlplane.RemoveHostResponse { v := &controlplane.RemoveHostResponse{} + v.Task = unmarshalTaskResponseBodyToControlplaneTask(body.Task) v.UpdateDatabaseTasks = make([]*controlplane.Task, len(body.UpdateDatabaseTasks)) for i, val := range body.UpdateDatabaseTasks { if val == nil { diff --git a/api/apiv1/gen/http/control_plane/server/types.go b/api/apiv1/gen/http/control_plane/server/types.go index 9de1fe63..377d1d40 100644 --- a/api/apiv1/gen/http/control_plane/server/types.go +++ b/api/apiv1/gen/http/control_plane/server/types.go @@ -175,6 +175,8 @@ type GetHostResponseBody struct { // RemoveHostResponseBody is the type of the "control-plane" service // "remove-host" endpoint HTTP response body. type RemoveHostResponseBody struct { + // The task that tracks the overall host removal operation. + Task *TaskResponseBody `form:"task" json:"task" xml:"task"` // The tasks that will update databases affected by the host removal. UpdateDatabaseTasks []*TaskResponseBody `form:"update_database_tasks" json:"update_database_tasks" xml:"update_database_tasks"` } @@ -2777,6 +2779,9 @@ func NewGetHostResponseBody(res *controlplane.Host) *GetHostResponseBody { // the "remove-host" endpoint of the "control-plane" service. func NewRemoveHostResponseBody(res *controlplane.RemoveHostResponse) *RemoveHostResponseBody { body := &RemoveHostResponseBody{} + if res.Task != nil { + body.Task = marshalControlplaneTaskToTaskResponseBody(res.Task) + } if res.UpdateDatabaseTasks != nil { body.UpdateDatabaseTasks = make([]*TaskResponseBody, len(res.UpdateDatabaseTasks)) for i, val := range res.UpdateDatabaseTasks { diff --git a/api/apiv1/gen/http/openapi.json b/api/apiv1/gen/http/openapi.json index a21a7f53..76376059 100644 --- a/api/apiv1/gen/http/openapi.json +++ b/api/apiv1/gen/http/openapi.json @@ -1738,6 +1738,7 @@ "schema": { "$ref": "#/definitions/RemoveHostResponse", "required": [ + "task", "update_database_tasks" ] } @@ -6976,6 +6977,9 @@ "title": "RemoveHostResponse", "type": "object", "properties": { + "task": { + "$ref": "#/definitions/Task" + }, "update_database_tasks": { "type": "array", "items": { @@ -7017,6 +7021,16 @@ } }, "example": { + "task": { + "completed_at": "2025-06-18T16:52:35Z", + "created_at": "2025-06-18T16:52:05Z", + "database_id": "storefront", + "entity_id": "storefront", + "scope": "database", + "status": "completed", + "task_id": "019783f4-75f4-71e7-85a3-c9b96b345d77", + "type": "create" + }, "update_database_tasks": [ { "completed_at": "2025-06-18T16:52:35Z", @@ -7041,6 +7055,7 @@ ] }, "required": [ + "task", "update_database_tasks" ] }, diff --git a/api/apiv1/gen/http/openapi.yaml b/api/apiv1/gen/http/openapi.yaml index 457b3305..8d8aa32c 100644 --- a/api/apiv1/gen/http/openapi.yaml +++ b/api/apiv1/gen/http/openapi.yaml @@ -1211,6 +1211,7 @@ paths: schema: $ref: '#/definitions/RemoveHostResponse' required: + - task - update_database_tasks "400": description: Bad Request response. @@ -5008,6 +5009,8 @@ definitions: title: RemoveHostResponse type: object properties: + task: + $ref: '#/definitions/Task' update_database_tasks: type: array items: @@ -5039,6 +5042,15 @@ definitions: task_id: 019783f4-75f4-71e7-85a3-c9b96b345d77 type: create example: + task: + completed_at: "2025-06-18T16:52:35Z" + created_at: "2025-06-18T16:52:05Z" + database_id: storefront + entity_id: storefront + scope: database + status: completed + task_id: 019783f4-75f4-71e7-85a3-c9b96b345d77 + type: create update_database_tasks: - completed_at: "2025-06-18T16:52:35Z" created_at: "2025-06-18T16:52:05Z" @@ -5057,6 +5069,7 @@ definitions: task_id: 019783f4-75f4-71e7-85a3-c9b96b345d77 type: create required: + - task - update_database_tasks RestartInstanceResponse: title: RestartInstanceResponse diff --git a/api/apiv1/gen/http/openapi3.json b/api/apiv1/gen/http/openapi3.json index c2671aa7..8c6f9f9a 100644 --- a/api/apiv1/gen/http/openapi3.json +++ b/api/apiv1/gen/http/openapi3.json @@ -3841,6 +3841,16 @@ "$ref": "#/components/schemas/RemoveHostResponse" }, "example": { + "task": { + "completed_at": "2025-06-18T16:52:35Z", + "created_at": "2025-06-18T16:52:05Z", + "database_id": "storefront", + "entity_id": "storefront", + "scope": "database", + "status": "completed", + "task_id": "019783f4-75f4-71e7-85a3-c9b96b345d77", + "type": "create" + }, "update_database_tasks": [ { "completed_at": "2025-06-18T16:52:35Z", @@ -14899,6 +14909,9 @@ "RemoveHostResponse": { "type": "object", "properties": { + "task": { + "$ref": "#/components/schemas/Task" + }, "update_database_tasks": { "type": "array", "items": { @@ -14940,6 +14953,16 @@ } }, "example": { + "task": { + "completed_at": "2025-06-18T16:52:35Z", + "created_at": "2025-06-18T16:52:05Z", + "database_id": "storefront", + "entity_id": "storefront", + "scope": "database", + "status": "completed", + "task_id": "019783f4-75f4-71e7-85a3-c9b96b345d77", + "type": "create" + }, "update_database_tasks": [ { "completed_at": "2025-06-18T16:52:35Z", @@ -14974,6 +14997,7 @@ ] }, "required": [ + "task", "update_database_tasks" ] }, diff --git a/api/apiv1/gen/http/openapi3.yaml b/api/apiv1/gen/http/openapi3.yaml index 51a6c028..ee709c6e 100644 --- a/api/apiv1/gen/http/openapi3.yaml +++ b/api/apiv1/gen/http/openapi3.yaml @@ -2560,6 +2560,15 @@ paths: schema: $ref: '#/components/schemas/RemoveHostResponse' example: + task: + completed_at: "2025-06-18T16:52:35Z" + created_at: "2025-06-18T16:52:05Z" + database_id: storefront + entity_id: storefront + scope: database + status: completed + task_id: 019783f4-75f4-71e7-85a3-c9b96b345d77 + type: create update_database_tasks: - completed_at: "2025-06-18T16:52:35Z" created_at: "2025-06-18T16:52:05Z" @@ -10523,6 +10532,8 @@ components: RemoveHostResponse: type: object properties: + task: + $ref: '#/components/schemas/Task' update_database_tasks: type: array items: @@ -10554,6 +10565,15 @@ components: task_id: 019783f4-75f4-71e7-85a3-c9b96b345d77 type: create example: + task: + completed_at: "2025-06-18T16:52:35Z" + created_at: "2025-06-18T16:52:05Z" + database_id: storefront + entity_id: storefront + scope: database + status: completed + task_id: 019783f4-75f4-71e7-85a3-c9b96b345d77 + type: create update_database_tasks: - completed_at: "2025-06-18T16:52:35Z" created_at: "2025-06-18T16:52:05Z" @@ -10580,6 +10600,7 @@ components: task_id: 019783f4-75f4-71e7-85a3-c9b96b345d77 type: create required: + - task - update_database_tasks RestartInstanceResponse: type: object diff --git a/changes/unreleased/Changed-20260114-163406.yaml b/changes/unreleased/Changed-20260114-163406.yaml new file mode 100644 index 00000000..d1f4601e --- /dev/null +++ b/changes/unreleased/Changed-20260114-163406.yaml @@ -0,0 +1,3 @@ +kind: Changed +body: The `remove-host` API response now includes a `task` that tracks the progress of the entire remove host operation. +time: 2026-01-14T16:34:06.805619-05:00 diff --git a/clustertest/cluster_test.go b/clustertest/cluster_test.go index d829292c..f161c8eb 100644 --- a/clustertest/cluster_test.go +++ b/clustertest/cluster_test.go @@ -101,8 +101,15 @@ func (c *Cluster) Remove(t testing.TB, hostID string) { HostID: controlplane.Identifier(hostID), }) require.NoError(t, err) - require.NotNil(t, resp, "RemoveHost response should not be nil") - require.NotNil(t, resp.UpdateDatabaseTasks, "UpdateDatabaseTasks should always be present (empty slice if no databases)") + require.NotNil(t, resp) + require.NotNil(t, resp.Task) + require.NotNil(t, resp.UpdateDatabaseTasks) + + _, err = c.client.WaitForHostTask(t.Context(), &controlplane.GetHostTaskPayload{ + HostID: controlplane.Identifier(hostID), + TaskID: resp.Task.TaskID, + }) + require.NoError(t, err) delete(c.hosts, hostID) c.client = hostsClient(t, c.hosts) diff --git a/server/internal/api/apiv1/post_init_handlers.go b/server/internal/api/apiv1/post_init_handlers.go index abf4da35..b6779a58 100644 --- a/server/internal/api/apiv1/post_init_handlers.go +++ b/server/internal/api/apiv1/post_init_handlers.go @@ -200,23 +200,19 @@ func (s *PostInitHandlers) RemoveHost(ctx context.Context, req *api.RemoveHostPa return nil, apiErr(err) } - updateDatabaseInputs, err := s.prepareDatabaseUpdates(ctx, hostID, dbs) + specs, err := s.prepareDatabaseUpdates(ctx, hostID, dbs) if err != nil { - return nil, err - } - - input := &workflows.RemoveHostInput{ - HostID: hostID, - UpdateDatabaseInputs: updateDatabaseInputs, + return nil, apiErr(err) } - dbTasks, err := s.workflowSvc.RemoveHost(ctx, input) + output, err := s.workflowSvc.RemoveHost(ctx, hostID, specs...) if err != nil { return nil, apiErr(err) } return &api.RemoveHostResponse{ - UpdateDatabaseTasks: tasksToAPI(dbTasks), + Task: taskToAPI(output.Task), + UpdateDatabaseTasks: tasksToAPI(output.DatabaseTasks), }, nil } @@ -248,36 +244,22 @@ func (s *PostInitHandlers) validateHostRemoval(ctx context.Context, hostID strin } // prepareDatabaseUpdates prepares workflow inputs for all databases affected by host removal. -func (s *PostInitHandlers) prepareDatabaseUpdates(ctx context.Context, hostID string, dbs []*database.Database) ([]*workflows.UpdateDatabaseInput, error) { - updateInputs := make([]*workflows.UpdateDatabaseInput, 0, len(dbs)) - +func (s *PostInitHandlers) prepareDatabaseUpdates(ctx context.Context, hostID string, dbs []*database.Database) ([]*database.Spec, error) { + specs := make([]*database.Spec, 0, len(dbs)) for _, db := range dbs { - if err := s.updateDatabaseForHostRemoval(ctx, db, hostID); err != nil { - return nil, err + if ok := db.Spec.RemoveHost(hostID); !ok { + return nil, fmt.Errorf("%s host id not found/removed", hostID) } - updateInputs = append(updateInputs, &workflows.UpdateDatabaseInput{ - Spec: db.Spec, - ForceUpdate: false, - RemoveHosts: []string{hostID}, - }) - } - - return updateInputs, nil -} + _, err := s.dbSvc.UpdateDatabase(ctx, database.DatabaseStateModifying, db.Spec) + if err != nil { + return nil, fmt.Errorf("failed to update database: %w", err) + } -// updateDatabaseForHostRemoval removes the host from a database spec and updates the database state. -func (s *PostInitHandlers) updateDatabaseForHostRemoval(ctx context.Context, db *database.Database, hostID string) error { - if ok := db.Spec.RemoveHost(hostID); !ok { - return apiErr(fmt.Errorf("%s host id not found/removed", hostID)) + specs = append(specs, db.Spec) } - _, err := s.dbSvc.UpdateDatabase(ctx, database.DatabaseStateModifying, db.Spec) - if err != nil { - return apiErr(err) - } - - return nil + return specs, nil } // ListDatabases fetches all databases from the database service and converts them to API format. diff --git a/server/internal/workflows/remove_host.go b/server/internal/workflows/remove_host.go index 8a4920d2..eb1df346 100644 --- a/server/internal/workflows/remove_host.go +++ b/server/internal/workflows/remove_host.go @@ -1,15 +1,19 @@ package workflows import ( + "errors" + "fmt" + "log/slog" + "github.com/cschleiden/go-workflows/workflow" "github.com/google/uuid" + "github.com/pgEdge/control-plane/server/internal/task" "github.com/pgEdge/control-plane/server/internal/workflows/activities" ) type RemoveHostInput struct { HostID string `json:"host_id"` UpdateDatabaseInputs []*UpdateDatabaseInput `json:"update_database_inputs,omitempty"` - DatabaseTaskIDs map[string]uuid.UUID `json:"database_task_ids,omitempty"` TaskID uuid.UUID `json:"task_id"` } @@ -21,40 +25,135 @@ func (w *Workflows) RemoveHost(ctx workflow.Context, input *RemoveHostInput) (*R ) logger.Info("removing host") - if len(input.UpdateDatabaseInputs) > 0 { - logger.Info("starting database update workflows", "count", len(input.UpdateDatabaseInputs)) - - futures := make([]workflow.Future[*UpdateDatabaseOutput], len(input.UpdateDatabaseInputs)) - for i, dbInput := range input.UpdateDatabaseInputs { - logger.Info("creating update database sub-workflow", "database_id", dbInput.Spec.DatabaseID) - futures[i] = workflow.CreateSubWorkflowInstance[*UpdateDatabaseOutput]( - ctx, - workflow.SubWorkflowOptions{}, - w.UpdateDatabase, - dbInput, - ) + defer func() { + if errors.Is(ctx.Err(), workflow.Canceled) { + logger.Warn("workflow was canceled") + cleanupCtx := workflow.NewDisconnectedContext(ctx) + + w.cancelTask(cleanupCtx, task.ScopeHost, input.HostID, input.TaskID, logger) } + }() - for i, future := range futures { - _, err := future.Get(ctx) - if err != nil { - dbID := input.UpdateDatabaseInputs[i].Spec.DatabaseID - logger.With("error", err, "database_id", dbID).Error("database update sub-workflow failed") - return nil, err - } + handleError := func(cause error) error { + logger.With("error", cause).Error("failed to remove host") + + updateTaskInput := &activities.UpdateTaskInput{ + Scope: task.ScopeHost, + EntityID: input.HostID, + TaskID: input.TaskID, + UpdateOptions: task.UpdateFail(cause), } + _ = w.updateTask(ctx, logger, updateTaskInput) - logger.Info("all database update workflows completed successfully") + return cause } + // Curry w.logTaskEvent and treat logging errors as non-fatal + logTaskEvent := func(entry task.LogEntry) { + err := w.logTaskEvent(ctx, + task.ScopeHost, + input.HostID, + input.TaskID, + entry, + ) + if err != nil { + // These log messages are not critical to this process, so it's safe + // to treat this error as non-fatal. + logger.With("error", err).Error("failed to log task event") + } + } + + updateTaskInput := &activities.UpdateTaskInput{ + Scope: task.ScopeHost, + EntityID: input.HostID, + TaskID: input.TaskID, + UpdateOptions: task.UpdateStart(), + } + if err := w.updateTask(ctx, logger, updateTaskInput); err != nil { + return nil, handleError(err) + } + + if len(input.UpdateDatabaseInputs) > 0 { + err := w.removeHostFromDatabases(ctx, logger, logTaskEvent, input.UpdateDatabaseInputs) + if err != nil { + return nil, handleError(err) + } + } + + logTaskEvent(task.LogEntry{ + Message: fmt.Sprintf("removing host '%s'", input.HostID), + }) + req := activities.RemoveHostInput{ HostID: input.HostID, } _, err := w.Activities.ExecuteRemoveHost(ctx, &req).Get(ctx) if err != nil { - return nil, err + return nil, handleError(err) + } + + // Needs to come before the UpdateTaskInput or else clients will see the + // task complete before the log entry is added. + logTaskEvent(task.LogEntry{ + Message: fmt.Sprintf("successfully removed host '%s'", input.HostID), + }) + + updateTaskInput = &activities.UpdateTaskInput{ + Scope: task.ScopeHost, + EntityID: input.HostID, + TaskID: input.TaskID, + UpdateOptions: task.UpdateComplete(), + } + if err := w.updateTask(ctx, logger, updateTaskInput); err != nil { + return nil, handleError(err) } logger.Info("successfully removed host") + return &RemoveHostOutput{}, nil } + +func (w *Workflows) removeHostFromDatabases(ctx workflow.Context, logger *slog.Logger, logTaskEvent func(task.LogEntry), inputs []*UpdateDatabaseInput) error { + logger.Info("starting database update workflows", "count", len(inputs)) + + futures := make([]workflow.Future[*UpdateDatabaseOutput], len(inputs)) + for i, dbInput := range inputs { + logger.Info("creating update database sub-workflow", "database_id", dbInput.Spec.DatabaseID) + + logTaskEvent(task.LogEntry{ + Message: fmt.Sprintf("updating database '%s'", dbInput.Spec.DatabaseID), + }) + + futures[i] = workflow.CreateSubWorkflowInstance[*UpdateDatabaseOutput]( + ctx, + workflow.SubWorkflowOptions{}, + w.UpdateDatabase, + dbInput, + ) + } + + for i, future := range futures { + _, err := future.Get(ctx) + if err != nil { + dbID := inputs[i].Spec.DatabaseID + + logTaskEvent(task.LogEntry{ + Message: fmt.Sprintf("failed to update database '%s'", inputs[i].Spec.DatabaseID), + Fields: map[string]any{ + "error": err.Error(), + }, + }) + + logger.With("error", err, "database_id", dbID).Error("database update sub-workflow failed") + return err + } + + logTaskEvent(task.LogEntry{ + Message: fmt.Sprintf("successfully updated database '%s'", inputs[i].Spec.DatabaseID), + }) + } + + logger.Info("all database update workflows completed successfully") + + return nil +} diff --git a/server/internal/workflows/service.go b/server/internal/workflows/service.go index 4964db29..bcc7ee73 100644 --- a/server/internal/workflows/service.go +++ b/server/internal/workflows/service.go @@ -401,37 +401,62 @@ func (s *Service) FailoverDatabaseNode(ctx context.Context, input *FailoverInput return t, nil } -func (s *Service) RemoveHost(ctx context.Context, input *RemoveHostInput) ([]*task.Task, error) { - databaseTaskIDs := map[string]uuid.UUID{} - var allTasks []*task.Task +type RemoveHostTasks struct { + Task *task.Task + DatabaseTasks []*task.Task +} + +func (s *Service) RemoveHost(ctx context.Context, hostID string, specs ...*database.Spec) (*RemoveHostTasks, error) { + hostTask, err := s.taskSvc.CreateTask(ctx, task.Options{ + Scope: task.ScopeHost, + HostID: hostID, + Type: task.TypeRemoveHost, + }) + if err != nil { + return nil, fmt.Errorf("failed to create remove host task: %w", err) + } + var databaseTasks []*task.Task + var databaseInputs []*UpdateDatabaseInput - for _, dbInput := range input.UpdateDatabaseInputs { - dt, err := s.taskSvc.CreateTask(ctx, task.Options{ + for _, spec := range specs { + databaseTask, err := s.taskSvc.CreateTask(ctx, task.Options{ Scope: task.ScopeDatabase, - DatabaseID: dbInput.Spec.DatabaseID, + DatabaseID: spec.DatabaseID, Type: task.TypeUpdate, + ParentID: hostTask.TaskID, }) if err != nil { - s.abortTasks(ctx, allTasks...) + s.abortTasks(ctx, hostTask) + s.abortTasks(ctx, databaseTasks...) return nil, fmt.Errorf("failed to create database update task: %w", err) } - allTasks = append(allTasks, dt) - databaseTasks = append(databaseTasks, dt) - databaseTaskIDs[dbInput.Spec.DatabaseID] = dt.TaskID - dbInput.TaskID = dt.TaskID - } - input.DatabaseTaskIDs = databaseTaskIDs + databaseTasks = append(databaseTasks, databaseTask) + databaseInputs = append(databaseInputs, &UpdateDatabaseInput{ + TaskID: databaseTask.TaskID, + Spec: spec, + RemoveHosts: []string{hostID}, + }) + } opts := client.WorkflowInstanceOptions{ Queue: utils.HostQueue(s.cfg.HostID), InstanceID: uuid.NewString(), } - _, err := s.client.CreateWorkflowInstance(ctx, opts, s.workflows.RemoveHost, input) + _, err = s.client.CreateWorkflowInstance(ctx, opts, s.workflows.RemoveHost, &RemoveHostInput{ + HostID: hostID, + UpdateDatabaseInputs: databaseInputs, + TaskID: hostTask.TaskID, + }) if err != nil { - return nil, err + s.abortTasks(ctx, hostTask) + s.abortTasks(ctx, databaseTasks...) + return nil, fmt.Errorf("failed to create remove host workflow: %w", err) } - return databaseTasks, nil + return &RemoveHostTasks{ + Task: hostTask, + DatabaseTasks: databaseTasks, + }, nil }