From e26edf69660eafb58c1e2c635b94610671739874 Mon Sep 17 00:00:00 2001 From: ispasov Date: Tue, 17 Feb 2026 14:29:48 +0200 Subject: [PATCH 1/3] Split provisioning and execution Previously the provisioning and execution were bundled together in the same func. This leads to several issues: 1. When the execution fails, the provisioning tried to deploy a new VM, thinking the provisioning of the VM failed. 2. When the execution fails due to ssh connection, a new VM was either deployed or the running one was deleted, although it was running an actual active job. Now the provisioning retries until a VM and a runner are present. Then the execution is performed. The resources are cleaned if the execution completes. If there is an SSH error then the VM is not deleted and it is relied on the JobCompleted event to clean up the resources. This ensures active VMs are not deleted and the jobs are not lost. Finally, add more logging --- pkg/github/runners/message-processor.go | 139 ++++++++++++++++++------ pkg/github/runners/types.go | 21 ++-- pkg/orka/vm-commnd-executor.go | 10 +- pkg/runner-provisioner/provisioner.go | 47 ++++---- 4 files changed, 151 insertions(+), 66 deletions(-) diff --git a/pkg/github/runners/message-processor.go b/pkg/github/runners/message-processor.go index 3a87fe4..c62f93f 100644 --- a/pkg/github/runners/message-processor.go +++ b/pkg/github/runners/message-processor.go @@ -7,6 +7,7 @@ package runners import ( "context" "encoding/json" + "errors" "fmt" "strings" "sync" @@ -14,6 +15,8 @@ import ( "github.com/macstadium/orka-github-actions-integration/pkg/github/types" "github.com/macstadium/orka-github-actions-integration/pkg/logging" + "github.com/macstadium/orka-github-actions-integration/pkg/orka" + "golang.org/x/crypto/ssh" ) const ( @@ -25,15 +28,15 @@ const ( func NewRunnerMessageProcessor(ctx context.Context, runnerManager RunnerManagerInterface, runnerProvisioner RunnerProvisionerInterface, runnerScaleSet *types.RunnerScaleSet) *RunnerMessageProcessor { return &RunnerMessageProcessor{ - ctx: ctx, - runnerManager: runnerManager, - runnerProvisioner: runnerProvisioner, - logger: logging.Logger.Named(fmt.Sprintf("runner-message-processor-%d", runnerScaleSet.Id)), - runnerScaleSetName: runnerScaleSet.Name, - upstreamCanceledJobs: map[string]bool{}, - upstreamCanceledJobsMutex: sync.RWMutex{}, - provisioningContextCancels: map[string]context.CancelFunc{}, - provisioningContextCancelsMutex: sync.Mutex{}, + ctx: ctx, + runnerManager: runnerManager, + runnerProvisioner: runnerProvisioner, + logger: logging.Logger.Named(fmt.Sprintf("runner-message-processor-%d", runnerScaleSet.Id)), + runnerScaleSetName: runnerScaleSet.Name, + upstreamCanceledJobs: map[string]bool{}, + upstreamCanceledJobsMutex: sync.RWMutex{}, + jobContextCancels: map[string]context.CancelFunc{}, + jobContextCancelsMutex: sync.Mutex{}, } } @@ -121,31 +124,47 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale jobId = defaultJobId } - provisioningContext, cancel := context.WithCancel(p.ctx) - p.storeProvisioningContextCancel(jobId, cancel) + jobContext, cancel := context.WithCancel(p.ctx) + p.storeJobContextCancel(jobId, cancel) go func() { + var executionErr error + defer p.removeUpstreamCanceledJob(jobId) - defer p.cancelProvisioningContext(jobId) - for attempt := 1; !p.isUpstreamCanceled(jobId); attempt++ { - err := p.runnerProvisioner.ProvisionRunner(provisioningContext) - if provisioningContext.Err() != nil { - break + executor, commands, cleanup, provisioningErr := p.provisionRunnerWithRetry(jobContext, jobId) + if provisioningErr != nil { + p.logger.Errorf("unable to provision Orka runner for %s: %v", p.runnerScaleSetName, provisioningErr) + p.cancelJobContext(jobId, "provisioning failed") + return + } + + defer func() { + var exitErr *ssh.ExitError + + if executionErr != nil && !errors.As(executionErr, &exitErr) && !errors.Is(executionErr, context.Canceled) { + p.logger.Warnf("SSH connection dropped for JobId %s (%v). Skipping cleanup, relying on JobCompleted webhook.", jobId, executionErr) + return } - if err == nil { - break + var cancelReason string + + if errors.Is(executionErr, context.Canceled) { + cancelReason = "job context was canceled" + p.logger.Infof("job context canceled for JobId %s. Cleaning up resources.", jobId) + } else if executionErr != nil { + cancelReason = fmt.Sprintf("execution failed with exit code %d", exitErr.ExitStatus()) + p.logger.Errorf("execution failed with exit code %d for JobId %s. Cleaning up resources.", exitErr.ExitStatus(), jobId) + } else { + cancelReason = "execution completed successfully" + p.logger.Infof("execution completed successfully for JobId %s. Cleaning up resources.", jobId) } - p.logger.Errorf("unable to provision Orka runner for %s (attempt %d). More information: %s", p.runnerScaleSetName, attempt, err.Error()) + cleanup(executionErr) + p.cancelJobContext(jobId, cancelReason) + }() - select { - case <-provisioningContext.Done(): - return - case <-time.After(15 * time.Second): - } - } + executionErr = p.executeJobCommands(jobContext, jobId, executor, commands) }() } case "JobStarted": @@ -162,7 +181,7 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale p.logger.Infof("Job completed message received for JobId: %s, RunnerRequestId: %d, RunnerId: %d, RunnerName: %s, with Result: %s", jobCompleted.JobId, jobCompleted.RunnerRequestId, jobCompleted.RunnerId, jobCompleted.RunnerName, jobCompleted.Result) - p.cancelProvisioningContext(jobCompleted.JobId) + p.cancelJobContext(jobCompleted.JobId, "Job completed webhook received") if jobCompleted.JobId != "" && (jobCompleted.Result == cancelledStatus || jobCompleted.Result == ignoredStatus || jobCompleted.Result == abandonedStatus) { p.setUpstreamCanceledJob(jobCompleted.JobId) @@ -180,6 +199,52 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale return nil } +func (p *RunnerMessageProcessor) provisionRunnerWithRetry(ctx context.Context, jobId string) (*orka.VMCommandExecutor, []string, func(error), error) { + for attempt := 1; !p.isUpstreamCanceled(jobId); attempt++ { + executor, commands, cleanup, err := p.runnerProvisioner.ProvisionRunner(ctx) + if ctx.Err() != nil { + return nil, nil, nil, ctx.Err() + } + + if err == nil { + return executor, commands, cleanup, nil + } + + p.logger.Errorf( + "unable to provision Orka runner for %s (attempt %d). More information: %v", + p.runnerScaleSetName, + attempt, + err, + ) + + select { + case <-ctx.Done(): + return nil, nil, nil, nil + case <-time.After(15 * time.Second): + } + } + + return nil, nil, nil, fmt.Errorf("unable to provision Orka runner for %s", p.runnerScaleSetName) +} + +func (p *RunnerMessageProcessor) executeJobCommands(ctx context.Context, jobId string, executor *orka.VMCommandExecutor, commands []string) error { + p.logger.Infof("starting execution for JobId: %s on VM %s", jobId, executor.VMName) + + err := executor.ExecuteCommands(ctx, commands...) + + if ctx.Err() != nil { + return ctx.Err() + } + + if err != nil && !errors.Is(ctx.Err(), context.Canceled) { + p.logger.Errorf("execution failed for JobId: %s on VM %s: %v", jobId, executor.VMName, err) + return err + } + + p.logger.Infof("execution completed for JobId: %s on VM %s", jobId, executor.VMName) + return nil +} + func (p *RunnerMessageProcessor) isUpstreamCanceled(jobId string) bool { p.upstreamCanceledJobsMutex.RLock() defer p.upstreamCanceledJobsMutex.RUnlock() @@ -198,17 +263,21 @@ func (p *RunnerMessageProcessor) removeUpstreamCanceledJob(jobId string) { delete(p.upstreamCanceledJobs, jobId) } -func (p *RunnerMessageProcessor) storeProvisioningContextCancel(jobId string, cancel context.CancelFunc) { - p.provisioningContextCancelsMutex.Lock() - defer p.provisioningContextCancelsMutex.Unlock() - p.provisioningContextCancels[jobId] = cancel +func (p *RunnerMessageProcessor) storeJobContextCancel(jobId string, cancel context.CancelFunc) { + p.jobContextCancelsMutex.Lock() + defer p.jobContextCancelsMutex.Unlock() + p.jobContextCancels[jobId] = cancel } -func (p *RunnerMessageProcessor) cancelProvisioningContext(jobId string) { - p.provisioningContextCancelsMutex.Lock() - defer p.provisioningContextCancelsMutex.Unlock() - if cancel, exists := p.provisioningContextCancels[jobId]; exists { +func (p *RunnerMessageProcessor) cancelJobContext(jobId string, reason string) { + p.jobContextCancelsMutex.Lock() + defer p.jobContextCancelsMutex.Unlock() + + if cancel, exists := p.jobContextCancels[jobId]; exists { + p.logger.Infof("canceling job context for JobId: %s. Triggered by: %s", jobId, reason) cancel() - delete(p.provisioningContextCancels, jobId) + delete(p.jobContextCancels, jobId) + } else { + p.logger.Debugf("job context for JobId: %s already canceled or not found. Triggered by: %s", jobId, reason) } } diff --git a/pkg/github/runners/types.go b/pkg/github/runners/types.go index 68c164f..a35f10e 100644 --- a/pkg/github/runners/types.go +++ b/pkg/github/runners/types.go @@ -11,6 +11,7 @@ import ( "github.com/macstadium/orka-github-actions-integration/pkg/github/actions" "github.com/macstadium/orka-github-actions-integration/pkg/github/messagequeue" "github.com/macstadium/orka-github-actions-integration/pkg/github/types" + "github.com/macstadium/orka-github-actions-integration/pkg/orka" "go.uber.org/zap" ) @@ -31,17 +32,17 @@ type RunnerManager struct { } type RunnerProvisionerInterface interface { - ProvisionRunner(ctx context.Context) error + ProvisionRunner(ctx context.Context) (*orka.VMCommandExecutor, []string, func(error), error) } type RunnerMessageProcessor struct { - ctx context.Context - logger *zap.SugaredLogger - runnerManager RunnerManagerInterface - runnerProvisioner RunnerProvisionerInterface - runnerScaleSetName string - upstreamCanceledJobs map[string]bool - upstreamCanceledJobsMutex sync.RWMutex - provisioningContextCancels map[string]context.CancelFunc - provisioningContextCancelsMutex sync.Mutex + ctx context.Context + logger *zap.SugaredLogger + runnerManager RunnerManagerInterface + runnerProvisioner RunnerProvisionerInterface + runnerScaleSetName string + upstreamCanceledJobs map[string]bool + upstreamCanceledJobsMutex sync.RWMutex + jobContextCancels map[string]context.CancelFunc + jobContextCancelsMutex sync.Mutex } diff --git a/pkg/orka/vm-commnd-executor.go b/pkg/orka/vm-commnd-executor.go index 3cbb482..ddbd626 100644 --- a/pkg/orka/vm-commnd-executor.go +++ b/pkg/orka/vm-commnd-executor.go @@ -3,6 +3,7 @@ package orka import ( "bufio" "context" + "errors" "fmt" "io" "net" @@ -108,10 +109,17 @@ func (executor *VMCommandExecutor) ExecuteCommands(ctx context.Context, commands return ctx.Err() case err := <-done: if err != nil { - executor.Logger.Errorf("Execution finished with error on VM %s: %v", executor.VMName, err) + var exitErr *ssh.ExitError + + if errors.As(err, &exitErr) { + executor.Logger.Errorf("Command execution failed on VM %s with exit code %d: %v", executor.VMName, exitErr.ExitStatus(), err) + } else { + executor.Logger.Errorf("SSH connection dropped or protocol error on VM %s: %v", executor.VMName, err) + } } else { executor.Logger.Infof("Execution completed successfully on VM %s", executor.VMName) } + return err } } diff --git a/pkg/runner-provisioner/provisioner.go b/pkg/runner-provisioner/provisioner.go index 04e1698..70f48cc 100644 --- a/pkg/runner-provisioner/provisioner.go +++ b/pkg/runner-provisioner/provisioner.go @@ -43,43 +43,39 @@ var commands_template = []string{ "echo 'Git Action Runner exited'", } -func (p *RunnerProvisioner) ProvisionRunner(ctx context.Context) (err error) { +func (p *RunnerProvisioner) ProvisionRunner(ctx context.Context) (*orka.VMCommandExecutor, []string, func(error), error) { p.logger.Infof("deploying Orka VM with prefix %s", p.runnerScaleSet.Name) vmResponse, err := p.orkaClient.DeployVM(ctx, p.runnerScaleSet.Name, p.envData.OrkaVMConfig) if err != nil { p.logger.Errorf("failed to deploy Orka VM: %v", err) - return err + return nil, nil, nil, err } runnerName := vmResponse.Name p.logger.Infof("deployed Orka VM with name %s", runnerName) + provisioningSucceeded := false + defer func() { - if err != nil { - if ctx.Err() != nil { - p.logger.Warnf("context cancelled/timed out, triggering cleanup for %s", runnerName) - } else { - p.logger.Errorf("provisioning failed with error, triggering cleanup for %s: %v", runnerName, err) - } - } else { - p.logger.Infof("provisioning and execution completed normally, triggering cleanup for %s", runnerName) + if !provisioningSucceeded { + p.logger.Warnf("provisioning failed, cleaning up resources for VM %s", runnerName) + p.cleanupResources(context.WithoutCancel(ctx), runnerName) } - p.cleanupResources(context.WithoutCancel(ctx), runnerName) }() vmIP, err := p.getRealVMIP(vmResponse.IP) if err != nil { p.logger.Errorf("failed to get real VM IP for %s: %v", runnerName, err) - return err + return nil, nil, nil, err } - p.logger.Infof("creating runner with name %s", runnerName) + p.logger.Infof("creating runner config for name %s", runnerName) jitConfig, err := p.createRunner(ctx, runnerName) if err != nil { p.logger.Errorf("failed to create runner config for %s: %v", runnerName, err) - return err + return nil, nil, nil, err } - p.logger.Infof("created runner with name %s", runnerName) + p.logger.Infof("created runner config with name %s", runnerName) vmCommandExecutor := &orka.VMCommandExecutor{ VMIP: vmIP, @@ -90,13 +86,24 @@ func (p *RunnerProvisioner) ProvisionRunner(ctx context.Context) (err error) { Logger: p.logger, } - p.logger.Infof("starting command execution on VM %s", runnerName) - err = vmCommandExecutor.ExecuteCommands(ctx, buildCommands(jitConfig.EncodedJITConfig, p.envData.GitHubRunnerVersion, p.envData.OrkaVMUsername)...) - if err != nil { - return err + commands := buildCommands(jitConfig.EncodedJITConfig, p.envData.GitHubRunnerVersion, p.envData.OrkaVMUsername) + + cleanup := func(execErr error) { + if execErr != nil { + if ctx.Err() != nil { + p.logger.Warnf("context cancelled/timed out, triggering cleanup for %s", runnerName) + } else { + p.logger.Errorf("execution failed, triggering cleanup for %s: %v", runnerName, execErr) + } + } else { + p.logger.Infof("execution completed normally, triggering cleanup for %s", runnerName) + } + p.cleanupResources(context.WithoutCancel(ctx), runnerName) } - return nil + provisioningSucceeded = true + + return vmCommandExecutor, commands, cleanup, nil } func (p *RunnerProvisioner) getRealVMIP(vmIP string) (string, error) { From 599417b58c4e5c50689022acb4340119f6a8ee9d Mon Sep 17 00:00:00 2001 From: ispasov Date: Tue, 17 Feb 2026 16:58:21 +0200 Subject: [PATCH 2/3] Simplify VM cleanup --- pkg/github/runners/message-processor.go | 20 ++++++++++------- pkg/github/runners/types.go | 3 ++- pkg/runner-provisioner/provisioner.go | 29 ++++++++++--------------- 3 files changed, 25 insertions(+), 27 deletions(-) diff --git a/pkg/github/runners/message-processor.go b/pkg/github/runners/message-processor.go index c62f93f..c02d727 100644 --- a/pkg/github/runners/message-processor.go +++ b/pkg/github/runners/message-processor.go @@ -132,13 +132,18 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale defer p.removeUpstreamCanceledJob(jobId) - executor, commands, cleanup, provisioningErr := p.provisionRunnerWithRetry(jobContext, jobId) + executor, commands, provisioningErr := p.provisionRunnerWithRetry(jobContext, jobId) if provisioningErr != nil { p.logger.Errorf("unable to provision Orka runner for %s: %v", p.runnerScaleSetName, provisioningErr) p.cancelJobContext(jobId, "provisioning failed") return } + context.AfterFunc(jobContext, func() { + p.logger.Infof("cleaning up resources for %s after job context is canceled", executor.VMName) + p.runnerProvisioner.CleanupResources(context.WithoutCancel(p.ctx), executor.VMName) + }) + defer func() { var exitErr *ssh.ExitError @@ -160,7 +165,6 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale p.logger.Infof("execution completed successfully for JobId %s. Cleaning up resources.", jobId) } - cleanup(executionErr) p.cancelJobContext(jobId, cancelReason) }() @@ -199,15 +203,15 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale return nil } -func (p *RunnerMessageProcessor) provisionRunnerWithRetry(ctx context.Context, jobId string) (*orka.VMCommandExecutor, []string, func(error), error) { +func (p *RunnerMessageProcessor) provisionRunnerWithRetry(ctx context.Context, jobId string) (*orka.VMCommandExecutor, []string, error) { for attempt := 1; !p.isUpstreamCanceled(jobId); attempt++ { - executor, commands, cleanup, err := p.runnerProvisioner.ProvisionRunner(ctx) + executor, commands, err := p.runnerProvisioner.ProvisionRunner(ctx) if ctx.Err() != nil { - return nil, nil, nil, ctx.Err() + return nil, nil, ctx.Err() } if err == nil { - return executor, commands, cleanup, nil + return executor, commands, nil } p.logger.Errorf( @@ -219,12 +223,12 @@ func (p *RunnerMessageProcessor) provisionRunnerWithRetry(ctx context.Context, j select { case <-ctx.Done(): - return nil, nil, nil, nil + return nil, nil, nil case <-time.After(15 * time.Second): } } - return nil, nil, nil, fmt.Errorf("unable to provision Orka runner for %s", p.runnerScaleSetName) + return nil, nil, fmt.Errorf("unable to provision Orka runner for %s", p.runnerScaleSetName) } func (p *RunnerMessageProcessor) executeJobCommands(ctx context.Context, jobId string, executor *orka.VMCommandExecutor, commands []string) error { diff --git a/pkg/github/runners/types.go b/pkg/github/runners/types.go index a35f10e..1ade3e7 100644 --- a/pkg/github/runners/types.go +++ b/pkg/github/runners/types.go @@ -32,7 +32,8 @@ type RunnerManager struct { } type RunnerProvisionerInterface interface { - ProvisionRunner(ctx context.Context) (*orka.VMCommandExecutor, []string, func(error), error) + ProvisionRunner(ctx context.Context) (*orka.VMCommandExecutor, []string, error) + CleanupResources(ctx context.Context, runnerName string) } type RunnerMessageProcessor struct { diff --git a/pkg/runner-provisioner/provisioner.go b/pkg/runner-provisioner/provisioner.go index 70f48cc..040432d 100644 --- a/pkg/runner-provisioner/provisioner.go +++ b/pkg/runner-provisioner/provisioner.go @@ -43,12 +43,12 @@ var commands_template = []string{ "echo 'Git Action Runner exited'", } -func (p *RunnerProvisioner) ProvisionRunner(ctx context.Context) (*orka.VMCommandExecutor, []string, func(error), error) { +func (p *RunnerProvisioner) ProvisionRunner(ctx context.Context) (*orka.VMCommandExecutor, []string, error) { p.logger.Infof("deploying Orka VM with prefix %s", p.runnerScaleSet.Name) vmResponse, err := p.orkaClient.DeployVM(ctx, p.runnerScaleSet.Name, p.envData.OrkaVMConfig) if err != nil { p.logger.Errorf("failed to deploy Orka VM: %v", err) - return nil, nil, nil, err + return nil, nil, err } runnerName := vmResponse.Name @@ -66,14 +66,14 @@ func (p *RunnerProvisioner) ProvisionRunner(ctx context.Context) (*orka.VMComman vmIP, err := p.getRealVMIP(vmResponse.IP) if err != nil { p.logger.Errorf("failed to get real VM IP for %s: %v", runnerName, err) - return nil, nil, nil, err + return nil, nil, err } p.logger.Infof("creating runner config for name %s", runnerName) jitConfig, err := p.createRunner(ctx, runnerName) if err != nil { p.logger.Errorf("failed to create runner config for %s: %v", runnerName, err) - return nil, nil, nil, err + return nil, nil, err } p.logger.Infof("created runner config with name %s", runnerName) @@ -88,22 +88,15 @@ func (p *RunnerProvisioner) ProvisionRunner(ctx context.Context) (*orka.VMComman commands := buildCommands(jitConfig.EncodedJITConfig, p.envData.GitHubRunnerVersion, p.envData.OrkaVMUsername) - cleanup := func(execErr error) { - if execErr != nil { - if ctx.Err() != nil { - p.logger.Warnf("context cancelled/timed out, triggering cleanup for %s", runnerName) - } else { - p.logger.Errorf("execution failed, triggering cleanup for %s: %v", runnerName, execErr) - } - } else { - p.logger.Infof("execution completed normally, triggering cleanup for %s", runnerName) - } - p.cleanupResources(context.WithoutCancel(ctx), runnerName) - } - provisioningSucceeded = true - return vmCommandExecutor, commands, cleanup, nil + return vmCommandExecutor, commands, nil +} + +func (p *RunnerProvisioner) CleanupResources(ctx context.Context, runnerName string) { + p.logger.Infof("starting resource cleanup for %s", runnerName) + p.cleanupResources(ctx, runnerName) + p.logger.Infof("resource cleanup completed for %s", runnerName) } func (p *RunnerProvisioner) getRealVMIP(vmIP string) (string, error) { From b3ecee8c176e04e7ff710c830b7d00c2a49f19dd Mon Sep 17 00:00:00 2001 From: ispasov Date: Tue, 17 Feb 2026 18:52:59 +0200 Subject: [PATCH 3/3] Pr review --- pkg/github/runners/message-processor.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/pkg/github/runners/message-processor.go b/pkg/github/runners/message-processor.go index c02d727..5745700 100644 --- a/pkg/github/runners/message-processor.go +++ b/pkg/github/runners/message-processor.go @@ -147,7 +147,7 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale defer func() { var exitErr *ssh.ExitError - if executionErr != nil && !errors.As(executionErr, &exitErr) && !errors.Is(executionErr, context.Canceled) { + if isNetworkingFailure(executionErr) { p.logger.Warnf("SSH connection dropped for JobId %s (%v). Skipping cleanup, relying on JobCompleted webhook.", jobId, executionErr) return } @@ -285,3 +285,12 @@ func (p *RunnerMessageProcessor) cancelJobContext(jobId string, reason string) { p.logger.Debugf("job context for JobId: %s already canceled or not found. Triggered by: %s", jobId, reason) } } + +func isNetworkingFailure(err error) bool { + if err == nil { + return false + } + + var exitErr *ssh.ExitError + return !errors.As(err, &exitErr) && !errors.Is(err, context.Canceled) +}