diff --git a/pkg/orka/vm-commnd-executor.go b/pkg/orka/vm-commnd-executor.go index 3050684..3cbb482 100644 --- a/pkg/orka/vm-commnd-executor.go +++ b/pkg/orka/vm-commnd-executor.go @@ -9,6 +9,7 @@ import ( "strings" "time" + "go.uber.org/zap" "golang.org/x/crypto/ssh" ) @@ -18,6 +19,7 @@ type VMCommandExecutor struct { VMName string VMUsername string VMPassword string + Logger *zap.SugaredLogger } const ( @@ -25,6 +27,8 @@ const ( ) func (executor *VMCommandExecutor) ExecuteCommands(ctx context.Context, commands ...string) error { + executor.Logger.Infof("Starting execution on VM: %s (%s:%d)", executor.VMName, executor.VMIP, executor.VMPort) + sshConfig := &ssh.ClientConfig{ HostKeyCallback: func(hostname string, remote net.Addr, key ssh.PublicKey) error { return nil @@ -36,23 +40,31 @@ func (executor *VMCommandExecutor) ExecuteCommands(ctx context.Context, commands client, err := executor.connectWithRetries(ctx, sshConfig, fmt.Sprintf("%s:%d", executor.VMIP, executor.VMPort)) if err != nil { + executor.Logger.Errorf("Failed to establish SSH connection to VM %s: %v", executor.VMName, err) return err } defer client.Close() + executor.Logger.Infof("SSH connection established to VM %s", executor.VMName) + session, err := client.NewSession() if err != nil { + executor.Logger.Errorf("Failed to create SSH session on VM %s: %v", executor.VMName, err) return err } defer session.Close() + executor.Logger.Infof("SSH session successfully created for VM %s", executor.VMName) + stdout, err := session.StdoutPipe() if err != nil { + executor.Logger.Errorf("Failed to setup stdout pipe: %v", err) return err } stderr, err := session.StderrPipe() if err != nil { + executor.Logger.Errorf("Failed to setup stderr pipe: %v", err) return err } @@ -60,21 +72,25 @@ func (executor *VMCommandExecutor) ExecuteCommands(ctx context.Context, commands return fmt.Sprintf("[VM] - %s - %s: %s\n", time.Now().Format(time.RFC3339), executor.VMName, out) } - go printFormattedOutput(stdout, format) - go printFormattedOutput(stderr, format) + go printFormattedOutput(executor.Logger, "stdout", stdout, format) + go printFormattedOutput(executor.Logger, "stderr", stderr, format) stdinBuf, err := session.StdinPipe() if err != nil { + executor.Logger.Errorf("Failed to setup stdin pipe: %v", err) return err } err = session.Shell() if err != nil { + executor.Logger.Errorf("Failed to start remote shell: %v", err) return err } + executor.Logger.Infof("Remote shell started for VM %s", executor.VMName) _, err = stdinBuf.Write([]byte(strings.Join(commands, "\n") + "\nexit\n")) if err != nil { + executor.Logger.Errorf("Failed to write commands to stdin: %v", err) return err } @@ -83,44 +99,63 @@ func (executor *VMCommandExecutor) ExecuteCommands(ctx context.Context, commands done <- session.Wait() }() + executor.Logger.Infof("Waiting for commands to finish execution on VM %s...", executor.VMName) + select { case <-ctx.Done(): + executor.Logger.Warnf("Context canceled while waiting for execution on VM %s: %v", executor.VMName, ctx.Err()) _ = session.Close() return ctx.Err() case err := <-done: + if err != nil { + executor.Logger.Errorf("Execution finished with error on VM %s: %v", executor.VMName, err) + } else { + executor.Logger.Infof("Execution completed successfully on VM %s", executor.VMName) + } return err } } type FormatFunc func(string) string -func printFormattedOutput(reader io.Reader, format FormatFunc) { +func printFormattedOutput(logger *zap.SugaredLogger, streamName string, reader io.Reader, format FormatFunc) { scanner := bufio.NewScanner(reader) for scanner.Scan() { fmt.Print(format(scanner.Text())) } + + if err := scanner.Err(); err != nil { + logger.Errorf("Error reading from %s: %v", streamName, err) + } else { + logger.Infof("Reached EOF for %s", streamName) + } } func (executor *VMCommandExecutor) connectWithRetries(ctx context.Context, cfg *ssh.ClientConfig, addr string) (*ssh.Client, error) { for attempt := 1; attempt <= maxRetries; attempt++ { if ctx.Err() != nil { + executor.Logger.Warnf("Context canceled during connection retry loop: %v", ctx.Err()) return nil, ctx.Err() } client, err := ssh.Dial("tcp", addr, cfg) if err == nil { + executor.Logger.Infof("Connected to %s on attempt %d", addr, attempt) return client, nil } - fmt.Printf("Failed to connect to VM (attempt %d/%d): %v\n", attempt, maxRetries, err) + executor.Logger.Warnf("Failed to connect to VM (attempt %d/%d): %v", attempt, maxRetries, err) select { case <-ctx.Done(): + executor.Logger.Warnf("Context canceled while backing off before retry: %v", ctx.Err()) return nil, ctx.Err() case <-time.After(3 * time.Second): } } - return nil, fmt.Errorf("failed to connect to VM after %d attempts", maxRetries) + err := fmt.Errorf("failed to connect to VM after %d attempts", maxRetries) + executor.Logger.Errorf("%v", err) + return nil, err } diff --git a/pkg/runner-provisioner/provisioner.go b/pkg/runner-provisioner/provisioner.go index b00df40..04e1698 100644 --- a/pkg/runner-provisioner/provisioner.go +++ b/pkg/runner-provisioner/provisioner.go @@ -43,26 +43,40 @@ var commands_template = []string{ "echo 'Git Action Runner exited'", } -func (p *RunnerProvisioner) ProvisionRunner(ctx context.Context) error { - p.logger.Infof("deploying Orka VM with prefix %s", p.runnerScaleSet.Name) +func (p *RunnerProvisioner) ProvisionRunner(ctx context.Context) (err error) { + p.logger.Infof("deploying Orka VM with prefix %s", p.runnerScaleSet.Name) vmResponse, err := p.orkaClient.DeployVM(ctx, p.runnerScaleSet.Name, p.envData.OrkaVMConfig) if err != nil { + p.logger.Errorf("failed to deploy Orka VM: %v", err) return err } runnerName := vmResponse.Name p.logger.Infof("deployed Orka VM with name %s", runnerName) - defer p.cleanupResources(context.WithoutCancel(ctx), runnerName) + defer func() { + if err != nil { + if ctx.Err() != nil { + p.logger.Warnf("context cancelled/timed out, triggering cleanup for %s", runnerName) + } else { + p.logger.Errorf("provisioning failed with error, triggering cleanup for %s: %v", runnerName, err) + } + } else { + p.logger.Infof("provisioning and execution completed normally, triggering cleanup for %s", runnerName) + } + p.cleanupResources(context.WithoutCancel(ctx), runnerName) + }() vmIP, err := p.getRealVMIP(vmResponse.IP) if err != nil { + p.logger.Errorf("failed to get real VM IP for %s: %v", runnerName, err) return err } p.logger.Infof("creating runner with name %s", runnerName) jitConfig, err := p.createRunner(ctx, runnerName) if err != nil { + p.logger.Errorf("failed to create runner config for %s: %v", runnerName, err) return err } p.logger.Infof("created runner with name %s", runnerName) @@ -73,8 +87,10 @@ func (p *RunnerProvisioner) ProvisionRunner(ctx context.Context) error { VMName: runnerName, VMUsername: p.envData.OrkaVMUsername, VMPassword: p.envData.OrkaVMPassword, + Logger: p.logger, } + p.logger.Infof("starting command execution on VM %s", runnerName) err = vmCommandExecutor.ExecuteCommands(ctx, buildCommands(jitConfig.EncodedJITConfig, p.envData.GitHubRunnerVersion, p.envData.OrkaVMUsername)...) if err != nil { return err @@ -96,6 +112,8 @@ func (p *RunnerProvisioner) getRealVMIP(vmIP string) (string, error) { } func (p *RunnerProvisioner) cleanupResources(ctx context.Context, runnerName string) { + p.logger.Infof("starting resource cleanup for %s", runnerName) + for { err := p.ensureRunnerDeregistered(ctx, runnerName) if err != nil { @@ -115,20 +133,28 @@ func (p *RunnerProvisioner) cleanupResources(ctx context.Context, runnerName str } func (p *RunnerProvisioner) deleteVM(ctx context.Context, runnerName string) { - p.logger.Infof("deleting Orka VM with name %s", runnerName) + p.logger.Infof("initiating deletion of Orka VM %s", runnerName) + + attempts := 0 operation := func() error { + attempts++ err := p.orkaClient.DeleteVM(ctx, runnerName) - if err != nil && strings.Contains(err.Error(), "not found") { - p.logger.Warnf("vm not found for %s", runnerName) - return nil + if err != nil { + if strings.Contains(err.Error(), "not found") { + p.logger.Warnf("Orka VM %s not found (it may have already been deleted)", runnerName) + return nil + } + p.logger.Warnf("attempt %d: failed to delete Orka VM %s: %v", attempts, runnerName, err) + return err } - return err + return nil } + err := backoff.Retry(operation, backoff.NewExponentialBackOff()) if err != nil { - p.logger.Infof("error while deleting Orka VM %s. More information: %s", runnerName, err.Error()) + p.logger.Errorf("error while deleting Orka VM %s. More information: %s", runnerName, err.Error()) } else { - p.logger.Infof("deleted Orka VM with name %s", runnerName) + p.logger.Infof("successfully deleted Orka VM %s", runnerName) } } @@ -142,13 +168,14 @@ func (p *RunnerProvisioner) ensureRunnerDeregistered(ctx context.Context, runner defer ticker.Stop() if runner, err := p.actionsClient.GetRunner(ctx, runnerName); err == nil && runner == nil { - p.logger.Infof("runner %s has de-registered from GitHub", runnerName) + p.logger.Infof("runner %s has cleanly de-registered from GitHub", runnerName) return nil } for { select { case <-ctx.Done(): + p.logger.Warnf("context cancelled while waiting for runner %s to deregister: %v", runnerName, ctx.Err()) return ctx.Err() case <-timeoutCtx.Done(): @@ -159,12 +186,12 @@ func (p *RunnerProvisioner) ensureRunnerDeregistered(ctx context.Context, runner case <-ticker.C: runner, err := p.actionsClient.GetRunner(ctx, runnerName) if err != nil { - p.logger.Warnf("error checking runner %s registration status: %s", runnerName, err.Error()) + p.logger.Warnf("error checking registration status for runner %s: %v", runnerName, err) continue } if runner == nil { - p.logger.Infof("runner %s has de-registered from GitHub", runnerName) + p.logger.Infof("runner %s has cleanly de-registered from GitHub", runnerName) return nil } } @@ -174,7 +201,7 @@ func (p *RunnerProvisioner) ensureRunnerDeregistered(ctx context.Context, runner func (p *RunnerProvisioner) forceDeleteRunner(ctx context.Context, runnerName string) error { runner, err := p.actionsClient.GetRunner(ctx, runnerName) if err != nil { - p.logger.Errorf("error getting runner %s for force-deletion: %s", runnerName, err.Error()) + p.logger.Errorf("failed to fetch runner %s for force-deletion: %v", runnerName, err) return err } @@ -185,7 +212,7 @@ func (p *RunnerProvisioner) forceDeleteRunner(ctx context.Context, runnerName st err = p.actionsClient.DeleteRunner(ctx, runner.Id) if err != nil { - p.logger.Errorf("error force-deleting runner %s (ID: %d) from GitHub: %s", runnerName, runner.Id, err.Error()) + p.logger.Errorf("failed to force-delete runner %s (ID: %d) from GitHub: %v", runnerName, runner.Id, err) return err } @@ -194,8 +221,14 @@ func (p *RunnerProvisioner) forceDeleteRunner(ctx context.Context, runnerName st } func (p *RunnerProvisioner) createRunner(ctx context.Context, runnerName string) (*types.RunnerScaleSetJitRunnerConfig, error) { + p.logger.Debugf("waiting for lock to create runner %s", runnerName) p.mu.Lock() - defer p.mu.Unlock() + p.logger.Debugf("acquired lock for runner %s", runnerName) + + defer func() { + p.mu.Unlock() + p.logger.Debugf("released lock for runner %s", runnerName) + }() jitConfig, err := p.actionsClient.CreateRunner(ctx, p.runnerScaleSet.Id, runnerName) if err != nil { diff --git a/pkg/runner-provisioner/provisioner_test.go b/pkg/runner-provisioner/provisioner_test.go index bece08a..6181503 100644 --- a/pkg/runner-provisioner/provisioner_test.go +++ b/pkg/runner-provisioner/provisioner_test.go @@ -208,8 +208,7 @@ var _ = Describe("RunnerProvisioner", func() { return nil } - err := provisioner.ensureRunnerDeregistered(cancelCtx, testRunnerName) - Expect(err).To(BeNil()) + _ = provisioner.ensureRunnerDeregistered(cancelCtx, testRunnerName) // Should have attempted at least one check and force-deleted Expect(mockActions.GetRunnerCalls).To(BeNumerically(">=", 1))