diff --git a/pkg/github/runners/message-processor.go b/pkg/github/runners/message-processor.go index 3a87fe4..5745700 100644 --- a/pkg/github/runners/message-processor.go +++ b/pkg/github/runners/message-processor.go @@ -7,6 +7,7 @@ package runners import ( "context" "encoding/json" + "errors" "fmt" "strings" "sync" @@ -14,6 +15,8 @@ import ( "github.com/macstadium/orka-github-actions-integration/pkg/github/types" "github.com/macstadium/orka-github-actions-integration/pkg/logging" + "github.com/macstadium/orka-github-actions-integration/pkg/orka" + "golang.org/x/crypto/ssh" ) const ( @@ -25,15 +28,15 @@ const ( func NewRunnerMessageProcessor(ctx context.Context, runnerManager RunnerManagerInterface, runnerProvisioner RunnerProvisionerInterface, runnerScaleSet *types.RunnerScaleSet) *RunnerMessageProcessor { return &RunnerMessageProcessor{ - ctx: ctx, - runnerManager: runnerManager, - runnerProvisioner: runnerProvisioner, - logger: logging.Logger.Named(fmt.Sprintf("runner-message-processor-%d", runnerScaleSet.Id)), - runnerScaleSetName: runnerScaleSet.Name, - upstreamCanceledJobs: map[string]bool{}, - upstreamCanceledJobsMutex: sync.RWMutex{}, - provisioningContextCancels: map[string]context.CancelFunc{}, - provisioningContextCancelsMutex: sync.Mutex{}, + ctx: ctx, + runnerManager: runnerManager, + runnerProvisioner: runnerProvisioner, + logger: logging.Logger.Named(fmt.Sprintf("runner-message-processor-%d", runnerScaleSet.Id)), + runnerScaleSetName: runnerScaleSet.Name, + upstreamCanceledJobs: map[string]bool{}, + upstreamCanceledJobsMutex: sync.RWMutex{}, + jobContextCancels: map[string]context.CancelFunc{}, + jobContextCancelsMutex: sync.Mutex{}, } } @@ -121,31 +124,51 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale jobId = defaultJobId } - provisioningContext, cancel := context.WithCancel(p.ctx) - p.storeProvisioningContextCancel(jobId, cancel) + jobContext, cancel := context.WithCancel(p.ctx) + p.storeJobContextCancel(jobId, cancel) go func() { + var executionErr error + defer p.removeUpstreamCanceledJob(jobId) - defer p.cancelProvisioningContext(jobId) - for attempt := 1; !p.isUpstreamCanceled(jobId); attempt++ { - err := p.runnerProvisioner.ProvisionRunner(provisioningContext) - if provisioningContext.Err() != nil { - break - } + executor, commands, provisioningErr := p.provisionRunnerWithRetry(jobContext, jobId) + if provisioningErr != nil { + p.logger.Errorf("unable to provision Orka runner for %s: %v", p.runnerScaleSetName, provisioningErr) + p.cancelJobContext(jobId, "provisioning failed") + return + } - if err == nil { - break - } + context.AfterFunc(jobContext, func() { + p.logger.Infof("cleaning up resources for %s after job context is canceled", executor.VMName) + p.runnerProvisioner.CleanupResources(context.WithoutCancel(p.ctx), executor.VMName) + }) - p.logger.Errorf("unable to provision Orka runner for %s (attempt %d). More information: %s", p.runnerScaleSetName, attempt, err.Error()) + defer func() { + var exitErr *ssh.ExitError - select { - case <-provisioningContext.Done(): + if isNetworkingFailure(executionErr) { + p.logger.Warnf("SSH connection dropped for JobId %s (%v). Skipping cleanup, relying on JobCompleted webhook.", jobId, executionErr) return - case <-time.After(15 * time.Second): } - } + + var cancelReason string + + if errors.Is(executionErr, context.Canceled) { + cancelReason = "job context was canceled" + p.logger.Infof("job context canceled for JobId %s. Cleaning up resources.", jobId) + } else if executionErr != nil { + cancelReason = fmt.Sprintf("execution failed with exit code %d", exitErr.ExitStatus()) + p.logger.Errorf("execution failed with exit code %d for JobId %s. Cleaning up resources.", exitErr.ExitStatus(), jobId) + } else { + cancelReason = "execution completed successfully" + p.logger.Infof("execution completed successfully for JobId %s. Cleaning up resources.", jobId) + } + + p.cancelJobContext(jobId, cancelReason) + }() + + executionErr = p.executeJobCommands(jobContext, jobId, executor, commands) }() } case "JobStarted": @@ -162,7 +185,7 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale p.logger.Infof("Job completed message received for JobId: %s, RunnerRequestId: %d, RunnerId: %d, RunnerName: %s, with Result: %s", jobCompleted.JobId, jobCompleted.RunnerRequestId, jobCompleted.RunnerId, jobCompleted.RunnerName, jobCompleted.Result) - p.cancelProvisioningContext(jobCompleted.JobId) + p.cancelJobContext(jobCompleted.JobId, "Job completed webhook received") if jobCompleted.JobId != "" && (jobCompleted.Result == cancelledStatus || jobCompleted.Result == ignoredStatus || jobCompleted.Result == abandonedStatus) { p.setUpstreamCanceledJob(jobCompleted.JobId) @@ -180,6 +203,52 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale return nil } +func (p *RunnerMessageProcessor) provisionRunnerWithRetry(ctx context.Context, jobId string) (*orka.VMCommandExecutor, []string, error) { + for attempt := 1; !p.isUpstreamCanceled(jobId); attempt++ { + executor, commands, err := p.runnerProvisioner.ProvisionRunner(ctx) + if ctx.Err() != nil { + return nil, nil, ctx.Err() + } + + if err == nil { + return executor, commands, nil + } + + p.logger.Errorf( + "unable to provision Orka runner for %s (attempt %d). More information: %v", + p.runnerScaleSetName, + attempt, + err, + ) + + select { + case <-ctx.Done(): + return nil, nil, nil + case <-time.After(15 * time.Second): + } + } + + return nil, nil, fmt.Errorf("unable to provision Orka runner for %s", p.runnerScaleSetName) +} + +func (p *RunnerMessageProcessor) executeJobCommands(ctx context.Context, jobId string, executor *orka.VMCommandExecutor, commands []string) error { + p.logger.Infof("starting execution for JobId: %s on VM %s", jobId, executor.VMName) + + err := executor.ExecuteCommands(ctx, commands...) + + if ctx.Err() != nil { + return ctx.Err() + } + + if err != nil && !errors.Is(ctx.Err(), context.Canceled) { + p.logger.Errorf("execution failed for JobId: %s on VM %s: %v", jobId, executor.VMName, err) + return err + } + + p.logger.Infof("execution completed for JobId: %s on VM %s", jobId, executor.VMName) + return nil +} + func (p *RunnerMessageProcessor) isUpstreamCanceled(jobId string) bool { p.upstreamCanceledJobsMutex.RLock() defer p.upstreamCanceledJobsMutex.RUnlock() @@ -198,17 +267,30 @@ func (p *RunnerMessageProcessor) removeUpstreamCanceledJob(jobId string) { delete(p.upstreamCanceledJobs, jobId) } -func (p *RunnerMessageProcessor) storeProvisioningContextCancel(jobId string, cancel context.CancelFunc) { - p.provisioningContextCancelsMutex.Lock() - defer p.provisioningContextCancelsMutex.Unlock() - p.provisioningContextCancels[jobId] = cancel +func (p *RunnerMessageProcessor) storeJobContextCancel(jobId string, cancel context.CancelFunc) { + p.jobContextCancelsMutex.Lock() + defer p.jobContextCancelsMutex.Unlock() + p.jobContextCancels[jobId] = cancel } -func (p *RunnerMessageProcessor) cancelProvisioningContext(jobId string) { - p.provisioningContextCancelsMutex.Lock() - defer p.provisioningContextCancelsMutex.Unlock() - if cancel, exists := p.provisioningContextCancels[jobId]; exists { +func (p *RunnerMessageProcessor) cancelJobContext(jobId string, reason string) { + p.jobContextCancelsMutex.Lock() + defer p.jobContextCancelsMutex.Unlock() + + if cancel, exists := p.jobContextCancels[jobId]; exists { + p.logger.Infof("canceling job context for JobId: %s. Triggered by: %s", jobId, reason) cancel() - delete(p.provisioningContextCancels, jobId) + delete(p.jobContextCancels, jobId) + } else { + p.logger.Debugf("job context for JobId: %s already canceled or not found. Triggered by: %s", jobId, reason) } } + +func isNetworkingFailure(err error) bool { + if err == nil { + return false + } + + var exitErr *ssh.ExitError + return !errors.As(err, &exitErr) && !errors.Is(err, context.Canceled) +} diff --git a/pkg/github/runners/types.go b/pkg/github/runners/types.go index 68c164f..1ade3e7 100644 --- a/pkg/github/runners/types.go +++ b/pkg/github/runners/types.go @@ -11,6 +11,7 @@ import ( "github.com/macstadium/orka-github-actions-integration/pkg/github/actions" "github.com/macstadium/orka-github-actions-integration/pkg/github/messagequeue" "github.com/macstadium/orka-github-actions-integration/pkg/github/types" + "github.com/macstadium/orka-github-actions-integration/pkg/orka" "go.uber.org/zap" ) @@ -31,17 +32,18 @@ type RunnerManager struct { } type RunnerProvisionerInterface interface { - ProvisionRunner(ctx context.Context) error + ProvisionRunner(ctx context.Context) (*orka.VMCommandExecutor, []string, error) + CleanupResources(ctx context.Context, runnerName string) } type RunnerMessageProcessor struct { - ctx context.Context - logger *zap.SugaredLogger - runnerManager RunnerManagerInterface - runnerProvisioner RunnerProvisionerInterface - runnerScaleSetName string - upstreamCanceledJobs map[string]bool - upstreamCanceledJobsMutex sync.RWMutex - provisioningContextCancels map[string]context.CancelFunc - provisioningContextCancelsMutex sync.Mutex + ctx context.Context + logger *zap.SugaredLogger + runnerManager RunnerManagerInterface + runnerProvisioner RunnerProvisionerInterface + runnerScaleSetName string + upstreamCanceledJobs map[string]bool + upstreamCanceledJobsMutex sync.RWMutex + jobContextCancels map[string]context.CancelFunc + jobContextCancelsMutex sync.Mutex } diff --git a/pkg/orka/vm-commnd-executor.go b/pkg/orka/vm-commnd-executor.go index 3cbb482..ddbd626 100644 --- a/pkg/orka/vm-commnd-executor.go +++ b/pkg/orka/vm-commnd-executor.go @@ -3,6 +3,7 @@ package orka import ( "bufio" "context" + "errors" "fmt" "io" "net" @@ -108,10 +109,17 @@ func (executor *VMCommandExecutor) ExecuteCommands(ctx context.Context, commands return ctx.Err() case err := <-done: if err != nil { - executor.Logger.Errorf("Execution finished with error on VM %s: %v", executor.VMName, err) + var exitErr *ssh.ExitError + + if errors.As(err, &exitErr) { + executor.Logger.Errorf("Command execution failed on VM %s with exit code %d: %v", executor.VMName, exitErr.ExitStatus(), err) + } else { + executor.Logger.Errorf("SSH connection dropped or protocol error on VM %s: %v", executor.VMName, err) + } } else { executor.Logger.Infof("Execution completed successfully on VM %s", executor.VMName) } + return err } } diff --git a/pkg/runner-provisioner/provisioner.go b/pkg/runner-provisioner/provisioner.go index 04e1698..040432d 100644 --- a/pkg/runner-provisioner/provisioner.go +++ b/pkg/runner-provisioner/provisioner.go @@ -43,43 +43,39 @@ var commands_template = []string{ "echo 'Git Action Runner exited'", } -func (p *RunnerProvisioner) ProvisionRunner(ctx context.Context) (err error) { +func (p *RunnerProvisioner) ProvisionRunner(ctx context.Context) (*orka.VMCommandExecutor, []string, error) { p.logger.Infof("deploying Orka VM with prefix %s", p.runnerScaleSet.Name) vmResponse, err := p.orkaClient.DeployVM(ctx, p.runnerScaleSet.Name, p.envData.OrkaVMConfig) if err != nil { p.logger.Errorf("failed to deploy Orka VM: %v", err) - return err + return nil, nil, err } runnerName := vmResponse.Name p.logger.Infof("deployed Orka VM with name %s", runnerName) + provisioningSucceeded := false + defer func() { - if err != nil { - if ctx.Err() != nil { - p.logger.Warnf("context cancelled/timed out, triggering cleanup for %s", runnerName) - } else { - p.logger.Errorf("provisioning failed with error, triggering cleanup for %s: %v", runnerName, err) - } - } else { - p.logger.Infof("provisioning and execution completed normally, triggering cleanup for %s", runnerName) + if !provisioningSucceeded { + p.logger.Warnf("provisioning failed, cleaning up resources for VM %s", runnerName) + p.cleanupResources(context.WithoutCancel(ctx), runnerName) } - p.cleanupResources(context.WithoutCancel(ctx), runnerName) }() vmIP, err := p.getRealVMIP(vmResponse.IP) if err != nil { p.logger.Errorf("failed to get real VM IP for %s: %v", runnerName, err) - return err + return nil, nil, err } - p.logger.Infof("creating runner with name %s", runnerName) + p.logger.Infof("creating runner config for name %s", runnerName) jitConfig, err := p.createRunner(ctx, runnerName) if err != nil { p.logger.Errorf("failed to create runner config for %s: %v", runnerName, err) - return err + return nil, nil, err } - p.logger.Infof("created runner with name %s", runnerName) + p.logger.Infof("created runner config with name %s", runnerName) vmCommandExecutor := &orka.VMCommandExecutor{ VMIP: vmIP, @@ -90,13 +86,17 @@ func (p *RunnerProvisioner) ProvisionRunner(ctx context.Context) (err error) { Logger: p.logger, } - p.logger.Infof("starting command execution on VM %s", runnerName) - err = vmCommandExecutor.ExecuteCommands(ctx, buildCommands(jitConfig.EncodedJITConfig, p.envData.GitHubRunnerVersion, p.envData.OrkaVMUsername)...) - if err != nil { - return err - } + commands := buildCommands(jitConfig.EncodedJITConfig, p.envData.GitHubRunnerVersion, p.envData.OrkaVMUsername) - return nil + provisioningSucceeded = true + + return vmCommandExecutor, commands, nil +} + +func (p *RunnerProvisioner) CleanupResources(ctx context.Context, runnerName string) { + p.logger.Infof("starting resource cleanup for %s", runnerName) + p.cleanupResources(ctx, runnerName) + p.logger.Infof("resource cleanup completed for %s", runnerName) } func (p *RunnerProvisioner) getRealVMIP(vmIP string) (string, error) {