Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 40 additions & 5 deletions pkg/orka/vm-commnd-executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"
"time"

"go.uber.org/zap"
"golang.org/x/crypto/ssh"
)

Expand All @@ -18,13 +19,16 @@ type VMCommandExecutor struct {
VMName string
VMUsername string
VMPassword string
Logger *zap.SugaredLogger
}

const (
maxRetries = 20
)

func (executor *VMCommandExecutor) ExecuteCommands(ctx context.Context, commands ...string) error {
executor.Logger.Infof("Starting execution on VM: %s (%s:%d)", executor.VMName, executor.VMIP, executor.VMPort)

sshConfig := &ssh.ClientConfig{
HostKeyCallback: func(hostname string, remote net.Addr, key ssh.PublicKey) error {
return nil
Expand All @@ -36,45 +40,57 @@ func (executor *VMCommandExecutor) ExecuteCommands(ctx context.Context, commands

client, err := executor.connectWithRetries(ctx, sshConfig, fmt.Sprintf("%s:%d", executor.VMIP, executor.VMPort))
if err != nil {
executor.Logger.Errorf("Failed to establish SSH connection to VM %s: %v", executor.VMName, err)
return err
}
defer client.Close()

executor.Logger.Infof("SSH connection established to VM %s", executor.VMName)

session, err := client.NewSession()
if err != nil {
executor.Logger.Errorf("Failed to create SSH session on VM %s: %v", executor.VMName, err)
return err
}
defer session.Close()

executor.Logger.Infof("SSH session successfully created for VM %s", executor.VMName)

stdout, err := session.StdoutPipe()
if err != nil {
executor.Logger.Errorf("Failed to setup stdout pipe: %v", err)
return err
}

stderr, err := session.StderrPipe()
if err != nil {
executor.Logger.Errorf("Failed to setup stderr pipe: %v", err)
return err
}

format := func(out string) string {
return fmt.Sprintf("[VM] - %s - %s: %s\n", time.Now().Format(time.RFC3339), executor.VMName, out)
}

go printFormattedOutput(stdout, format)
go printFormattedOutput(stderr, format)
go printFormattedOutput(executor.Logger, "stdout", stdout, format)
go printFormattedOutput(executor.Logger, "stderr", stderr, format)

stdinBuf, err := session.StdinPipe()
if err != nil {
executor.Logger.Errorf("Failed to setup stdin pipe: %v", err)
return err
}

err = session.Shell()
if err != nil {
executor.Logger.Errorf("Failed to start remote shell: %v", err)
return err
}
executor.Logger.Infof("Remote shell started for VM %s", executor.VMName)

_, err = stdinBuf.Write([]byte(strings.Join(commands, "\n") + "\nexit\n"))
if err != nil {
executor.Logger.Errorf("Failed to write commands to stdin: %v", err)
return err
}

Expand All @@ -83,44 +99,63 @@ func (executor *VMCommandExecutor) ExecuteCommands(ctx context.Context, commands
done <- session.Wait()
}()

executor.Logger.Infof("Waiting for commands to finish execution on VM %s...", executor.VMName)

select {
case <-ctx.Done():
executor.Logger.Warnf("Context canceled while waiting for execution on VM %s: %v", executor.VMName, ctx.Err())
_ = session.Close()
return ctx.Err()
case err := <-done:
if err != nil {
executor.Logger.Errorf("Execution finished with error on VM %s: %v", executor.VMName, err)
} else {
executor.Logger.Infof("Execution completed successfully on VM %s", executor.VMName)
}
return err
}
}

type FormatFunc func(string) string

func printFormattedOutput(reader io.Reader, format FormatFunc) {
func printFormattedOutput(logger *zap.SugaredLogger, streamName string, reader io.Reader, format FormatFunc) {
scanner := bufio.NewScanner(reader)

for scanner.Scan() {
fmt.Print(format(scanner.Text()))
}

if err := scanner.Err(); err != nil {
logger.Errorf("Error reading from %s: %v", streamName, err)
} else {
logger.Infof("Reached EOF for %s", streamName)
}
}

func (executor *VMCommandExecutor) connectWithRetries(ctx context.Context, cfg *ssh.ClientConfig, addr string) (*ssh.Client, error) {
for attempt := 1; attempt <= maxRetries; attempt++ {
if ctx.Err() != nil {
executor.Logger.Warnf("Context canceled during connection retry loop: %v", ctx.Err())
return nil, ctx.Err()
}

client, err := ssh.Dial("tcp", addr, cfg)
if err == nil {
executor.Logger.Infof("Connected to %s on attempt %d", addr, attempt)
return client, nil
}

fmt.Printf("Failed to connect to VM (attempt %d/%d): %v\n", attempt, maxRetries, err)
executor.Logger.Warnf("Failed to connect to VM (attempt %d/%d): %v", attempt, maxRetries, err)

select {
case <-ctx.Done():
executor.Logger.Warnf("Context canceled while backing off before retry: %v", ctx.Err())
return nil, ctx.Err()
case <-time.After(3 * time.Second):
}
}

return nil, fmt.Errorf("failed to connect to VM after %d attempts", maxRetries)
err := fmt.Errorf("failed to connect to VM after %d attempts", maxRetries)
executor.Logger.Errorf("%v", err)
return nil, err
}
65 changes: 49 additions & 16 deletions pkg/runner-provisioner/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,26 +43,40 @@ var commands_template = []string{
"echo 'Git Action Runner exited'",
}

func (p *RunnerProvisioner) ProvisionRunner(ctx context.Context) error {
p.logger.Infof("deploying Orka VM with prefix %s", p.runnerScaleSet.Name)
func (p *RunnerProvisioner) ProvisionRunner(ctx context.Context) (err error) {
p.logger.Infof("deploying Orka VM with prefix %s", p.runnerScaleSet.Name)
vmResponse, err := p.orkaClient.DeployVM(ctx, p.runnerScaleSet.Name, p.envData.OrkaVMConfig)
if err != nil {
p.logger.Errorf("failed to deploy Orka VM: %v", err)
return err
}

runnerName := vmResponse.Name
p.logger.Infof("deployed Orka VM with name %s", runnerName)

defer p.cleanupResources(context.WithoutCancel(ctx), runnerName)
defer func() {
if err != nil {
if ctx.Err() != nil {
p.logger.Warnf("context cancelled/timed out, triggering cleanup for %s", runnerName)
} else {
p.logger.Errorf("provisioning failed with error, triggering cleanup for %s: %v", runnerName, err)
}
} else {
p.logger.Infof("provisioning and execution completed normally, triggering cleanup for %s", runnerName)
}
p.cleanupResources(context.WithoutCancel(ctx), runnerName)
}()

vmIP, err := p.getRealVMIP(vmResponse.IP)
if err != nil {
p.logger.Errorf("failed to get real VM IP for %s: %v", runnerName, err)
return err
}

p.logger.Infof("creating runner with name %s", runnerName)
jitConfig, err := p.createRunner(ctx, runnerName)
if err != nil {
p.logger.Errorf("failed to create runner config for %s: %v", runnerName, err)
return err
}
p.logger.Infof("created runner with name %s", runnerName)
Expand All @@ -73,8 +87,10 @@ func (p *RunnerProvisioner) ProvisionRunner(ctx context.Context) error {
VMName: runnerName,
VMUsername: p.envData.OrkaVMUsername,
VMPassword: p.envData.OrkaVMPassword,
Logger: p.logger,
}

p.logger.Infof("starting command execution on VM %s", runnerName)
err = vmCommandExecutor.ExecuteCommands(ctx, buildCommands(jitConfig.EncodedJITConfig, p.envData.GitHubRunnerVersion, p.envData.OrkaVMUsername)...)
if err != nil {
return err
Expand All @@ -96,6 +112,8 @@ func (p *RunnerProvisioner) getRealVMIP(vmIP string) (string, error) {
}

func (p *RunnerProvisioner) cleanupResources(ctx context.Context, runnerName string) {
p.logger.Infof("starting resource cleanup for %s", runnerName)

for {
err := p.ensureRunnerDeregistered(ctx, runnerName)
if err != nil {
Expand All @@ -115,20 +133,28 @@ func (p *RunnerProvisioner) cleanupResources(ctx context.Context, runnerName str
}

func (p *RunnerProvisioner) deleteVM(ctx context.Context, runnerName string) {
p.logger.Infof("deleting Orka VM with name %s", runnerName)
p.logger.Infof("initiating deletion of Orka VM %s", runnerName)

attempts := 0
operation := func() error {
attempts++
err := p.orkaClient.DeleteVM(ctx, runnerName)
if err != nil && strings.Contains(err.Error(), "not found") {
p.logger.Warnf("vm not found for %s", runnerName)
return nil
if err != nil {
if strings.Contains(err.Error(), "not found") {
p.logger.Warnf("Orka VM %s not found (it may have already been deleted)", runnerName)
return nil
}
p.logger.Warnf("attempt %d: failed to delete Orka VM %s: %v", attempts, runnerName, err)
return err
}
return err
return nil
}

err := backoff.Retry(operation, backoff.NewExponentialBackOff())
if err != nil {
p.logger.Infof("error while deleting Orka VM %s. More information: %s", runnerName, err.Error())
p.logger.Errorf("error while deleting Orka VM %s. More information: %s", runnerName, err.Error())
} else {
p.logger.Infof("deleted Orka VM with name %s", runnerName)
p.logger.Infof("successfully deleted Orka VM %s", runnerName)
}
}

Expand All @@ -142,13 +168,14 @@ func (p *RunnerProvisioner) ensureRunnerDeregistered(ctx context.Context, runner
defer ticker.Stop()

if runner, err := p.actionsClient.GetRunner(ctx, runnerName); err == nil && runner == nil {
p.logger.Infof("runner %s has de-registered from GitHub", runnerName)
p.logger.Infof("runner %s has cleanly de-registered from GitHub", runnerName)
return nil
}

for {
select {
case <-ctx.Done():
p.logger.Warnf("context cancelled while waiting for runner %s to deregister: %v", runnerName, ctx.Err())
return ctx.Err()

case <-timeoutCtx.Done():
Expand All @@ -159,12 +186,12 @@ func (p *RunnerProvisioner) ensureRunnerDeregistered(ctx context.Context, runner
case <-ticker.C:
runner, err := p.actionsClient.GetRunner(ctx, runnerName)
if err != nil {
p.logger.Warnf("error checking runner %s registration status: %s", runnerName, err.Error())
p.logger.Warnf("error checking registration status for runner %s: %v", runnerName, err)
continue
}

if runner == nil {
p.logger.Infof("runner %s has de-registered from GitHub", runnerName)
p.logger.Infof("runner %s has cleanly de-registered from GitHub", runnerName)
return nil
}
}
Expand All @@ -174,7 +201,7 @@ func (p *RunnerProvisioner) ensureRunnerDeregistered(ctx context.Context, runner
func (p *RunnerProvisioner) forceDeleteRunner(ctx context.Context, runnerName string) error {
runner, err := p.actionsClient.GetRunner(ctx, runnerName)
if err != nil {
p.logger.Errorf("error getting runner %s for force-deletion: %s", runnerName, err.Error())
p.logger.Errorf("failed to fetch runner %s for force-deletion: %v", runnerName, err)
return err
}

Expand All @@ -185,7 +212,7 @@ func (p *RunnerProvisioner) forceDeleteRunner(ctx context.Context, runnerName st

err = p.actionsClient.DeleteRunner(ctx, runner.Id)
if err != nil {
p.logger.Errorf("error force-deleting runner %s (ID: %d) from GitHub: %s", runnerName, runner.Id, err.Error())
p.logger.Errorf("failed to force-delete runner %s (ID: %d) from GitHub: %v", runnerName, runner.Id, err)
return err
}

Expand All @@ -194,8 +221,14 @@ func (p *RunnerProvisioner) forceDeleteRunner(ctx context.Context, runnerName st
}

func (p *RunnerProvisioner) createRunner(ctx context.Context, runnerName string) (*types.RunnerScaleSetJitRunnerConfig, error) {
p.logger.Debugf("waiting for lock to create runner %s", runnerName)
p.mu.Lock()
defer p.mu.Unlock()
p.logger.Debugf("acquired lock for runner %s", runnerName)

defer func() {
p.mu.Unlock()
p.logger.Debugf("released lock for runner %s", runnerName)
}()

jitConfig, err := p.actionsClient.CreateRunner(ctx, p.runnerScaleSet.Id, runnerName)
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions pkg/runner-provisioner/provisioner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,7 @@ var _ = Describe("RunnerProvisioner", func() {
return nil
}

err := provisioner.ensureRunnerDeregistered(cancelCtx, testRunnerName)
Expect(err).To(BeNil())
_ = provisioner.ensureRunnerDeregistered(cancelCtx, testRunnerName)

// Should have attempted at least one check and force-deleted
Expect(mockActions.GetRunnerCalls).To(BeNumerically(">=", 1))
Expand Down