Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 117 additions & 35 deletions pkg/github/runners/message-processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ package runners
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"sync"
"time"

"github.com/macstadium/orka-github-actions-integration/pkg/github/types"
"github.com/macstadium/orka-github-actions-integration/pkg/logging"
"github.com/macstadium/orka-github-actions-integration/pkg/orka"
"golang.org/x/crypto/ssh"
)

const (
Expand All @@ -25,15 +28,15 @@ const (

func NewRunnerMessageProcessor(ctx context.Context, runnerManager RunnerManagerInterface, runnerProvisioner RunnerProvisionerInterface, runnerScaleSet *types.RunnerScaleSet) *RunnerMessageProcessor {
return &RunnerMessageProcessor{
ctx: ctx,
runnerManager: runnerManager,
runnerProvisioner: runnerProvisioner,
logger: logging.Logger.Named(fmt.Sprintf("runner-message-processor-%d", runnerScaleSet.Id)),
runnerScaleSetName: runnerScaleSet.Name,
upstreamCanceledJobs: map[string]bool{},
upstreamCanceledJobsMutex: sync.RWMutex{},
provisioningContextCancels: map[string]context.CancelFunc{},
provisioningContextCancelsMutex: sync.Mutex{},
ctx: ctx,
runnerManager: runnerManager,
runnerProvisioner: runnerProvisioner,
logger: logging.Logger.Named(fmt.Sprintf("runner-message-processor-%d", runnerScaleSet.Id)),
runnerScaleSetName: runnerScaleSet.Name,
upstreamCanceledJobs: map[string]bool{},
upstreamCanceledJobsMutex: sync.RWMutex{},
jobContextCancels: map[string]context.CancelFunc{},
jobContextCancelsMutex: sync.Mutex{},
}
}

Expand Down Expand Up @@ -121,31 +124,51 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale
jobId = defaultJobId
}

provisioningContext, cancel := context.WithCancel(p.ctx)
p.storeProvisioningContextCancel(jobId, cancel)
jobContext, cancel := context.WithCancel(p.ctx)
p.storeJobContextCancel(jobId, cancel)

go func() {
var executionErr error

defer p.removeUpstreamCanceledJob(jobId)
defer p.cancelProvisioningContext(jobId)

for attempt := 1; !p.isUpstreamCanceled(jobId); attempt++ {
err := p.runnerProvisioner.ProvisionRunner(provisioningContext)
if provisioningContext.Err() != nil {
break
}
executor, commands, provisioningErr := p.provisionRunnerWithRetry(jobContext, jobId)
if provisioningErr != nil {
p.logger.Errorf("unable to provision Orka runner for %s: %v", p.runnerScaleSetName, provisioningErr)
p.cancelJobContext(jobId, "provisioning failed")
return
}

if err == nil {
break
}
context.AfterFunc(jobContext, func() {
p.logger.Infof("cleaning up resources for %s after job context is canceled", executor.VMName)
p.runnerProvisioner.CleanupResources(context.WithoutCancel(p.ctx), executor.VMName)
})

p.logger.Errorf("unable to provision Orka runner for %s (attempt %d). More information: %s", p.runnerScaleSetName, attempt, err.Error())
defer func() {
var exitErr *ssh.ExitError

select {
case <-provisioningContext.Done():
if isNetworkingFailure(executionErr) {
p.logger.Warnf("SSH connection dropped for JobId %s (%v). Skipping cleanup, relying on JobCompleted webhook.", jobId, executionErr)
return
case <-time.After(15 * time.Second):
}
}

var cancelReason string

if errors.Is(executionErr, context.Canceled) {
cancelReason = "job context was canceled"
p.logger.Infof("job context canceled for JobId %s. Cleaning up resources.", jobId)
} else if executionErr != nil {
cancelReason = fmt.Sprintf("execution failed with exit code %d", exitErr.ExitStatus())
p.logger.Errorf("execution failed with exit code %d for JobId %s. Cleaning up resources.", exitErr.ExitStatus(), jobId)
} else {
cancelReason = "execution completed successfully"
p.logger.Infof("execution completed successfully for JobId %s. Cleaning up resources.", jobId)
}

p.cancelJobContext(jobId, cancelReason)
}()

executionErr = p.executeJobCommands(jobContext, jobId, executor, commands)
}()
}
case "JobStarted":
Expand All @@ -162,7 +185,7 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale

p.logger.Infof("Job completed message received for JobId: %s, RunnerRequestId: %d, RunnerId: %d, RunnerName: %s, with Result: %s", jobCompleted.JobId, jobCompleted.RunnerRequestId, jobCompleted.RunnerId, jobCompleted.RunnerName, jobCompleted.Result)

p.cancelProvisioningContext(jobCompleted.JobId)
p.cancelJobContext(jobCompleted.JobId, "Job completed webhook received")

if jobCompleted.JobId != "" && (jobCompleted.Result == cancelledStatus || jobCompleted.Result == ignoredStatus || jobCompleted.Result == abandonedStatus) {
p.setUpstreamCanceledJob(jobCompleted.JobId)
Expand All @@ -180,6 +203,52 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale
return nil
}

func (p *RunnerMessageProcessor) provisionRunnerWithRetry(ctx context.Context, jobId string) (*orka.VMCommandExecutor, []string, error) {
for attempt := 1; !p.isUpstreamCanceled(jobId); attempt++ {
executor, commands, err := p.runnerProvisioner.ProvisionRunner(ctx)
if ctx.Err() != nil {
return nil, nil, ctx.Err()
}

if err == nil {
return executor, commands, nil
}

p.logger.Errorf(
"unable to provision Orka runner for %s (attempt %d). More information: %v",
p.runnerScaleSetName,
attempt,
err,
)

select {
case <-ctx.Done():
return nil, nil, nil
case <-time.After(15 * time.Second):
}
}

return nil, nil, fmt.Errorf("unable to provision Orka runner for %s", p.runnerScaleSetName)
}

func (p *RunnerMessageProcessor) executeJobCommands(ctx context.Context, jobId string, executor *orka.VMCommandExecutor, commands []string) error {
p.logger.Infof("starting execution for JobId: %s on VM %s", jobId, executor.VMName)

err := executor.ExecuteCommands(ctx, commands...)

if ctx.Err() != nil {
return ctx.Err()
}

if err != nil && !errors.Is(ctx.Err(), context.Canceled) {
p.logger.Errorf("execution failed for JobId: %s on VM %s: %v", jobId, executor.VMName, err)
return err
}

p.logger.Infof("execution completed for JobId: %s on VM %s", jobId, executor.VMName)
return nil
}

func (p *RunnerMessageProcessor) isUpstreamCanceled(jobId string) bool {
p.upstreamCanceledJobsMutex.RLock()
defer p.upstreamCanceledJobsMutex.RUnlock()
Expand All @@ -198,17 +267,30 @@ func (p *RunnerMessageProcessor) removeUpstreamCanceledJob(jobId string) {
delete(p.upstreamCanceledJobs, jobId)
}

func (p *RunnerMessageProcessor) storeProvisioningContextCancel(jobId string, cancel context.CancelFunc) {
p.provisioningContextCancelsMutex.Lock()
defer p.provisioningContextCancelsMutex.Unlock()
p.provisioningContextCancels[jobId] = cancel
func (p *RunnerMessageProcessor) storeJobContextCancel(jobId string, cancel context.CancelFunc) {
p.jobContextCancelsMutex.Lock()
defer p.jobContextCancelsMutex.Unlock()
p.jobContextCancels[jobId] = cancel
}

func (p *RunnerMessageProcessor) cancelProvisioningContext(jobId string) {
p.provisioningContextCancelsMutex.Lock()
defer p.provisioningContextCancelsMutex.Unlock()
if cancel, exists := p.provisioningContextCancels[jobId]; exists {
func (p *RunnerMessageProcessor) cancelJobContext(jobId string, reason string) {
p.jobContextCancelsMutex.Lock()
defer p.jobContextCancelsMutex.Unlock()

if cancel, exists := p.jobContextCancels[jobId]; exists {
p.logger.Infof("canceling job context for JobId: %s. Triggered by: %s", jobId, reason)
cancel()
delete(p.provisioningContextCancels, jobId)
delete(p.jobContextCancels, jobId)
} else {
p.logger.Debugf("job context for JobId: %s already canceled or not found. Triggered by: %s", jobId, reason)
}
}

func isNetworkingFailure(err error) bool {
if err == nil {
return false
}

var exitErr *ssh.ExitError
return !errors.As(err, &exitErr) && !errors.Is(err, context.Canceled)
}
22 changes: 12 additions & 10 deletions pkg/github/runners/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/macstadium/orka-github-actions-integration/pkg/github/actions"
"github.com/macstadium/orka-github-actions-integration/pkg/github/messagequeue"
"github.com/macstadium/orka-github-actions-integration/pkg/github/types"
"github.com/macstadium/orka-github-actions-integration/pkg/orka"
"go.uber.org/zap"
)

Expand All @@ -31,17 +32,18 @@ type RunnerManager struct {
}

type RunnerProvisionerInterface interface {
ProvisionRunner(ctx context.Context) error
ProvisionRunner(ctx context.Context) (*orka.VMCommandExecutor, []string, error)
CleanupResources(ctx context.Context, runnerName string)
}

type RunnerMessageProcessor struct {
ctx context.Context
logger *zap.SugaredLogger
runnerManager RunnerManagerInterface
runnerProvisioner RunnerProvisionerInterface
runnerScaleSetName string
upstreamCanceledJobs map[string]bool
upstreamCanceledJobsMutex sync.RWMutex
provisioningContextCancels map[string]context.CancelFunc
provisioningContextCancelsMutex sync.Mutex
ctx context.Context
logger *zap.SugaredLogger
runnerManager RunnerManagerInterface
runnerProvisioner RunnerProvisionerInterface
runnerScaleSetName string
upstreamCanceledJobs map[string]bool
upstreamCanceledJobsMutex sync.RWMutex
jobContextCancels map[string]context.CancelFunc
jobContextCancelsMutex sync.Mutex
}
10 changes: 9 additions & 1 deletion pkg/orka/vm-commnd-executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package orka
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"net"
Expand Down Expand Up @@ -108,10 +109,17 @@ func (executor *VMCommandExecutor) ExecuteCommands(ctx context.Context, commands
return ctx.Err()
case err := <-done:
if err != nil {
executor.Logger.Errorf("Execution finished with error on VM %s: %v", executor.VMName, err)
var exitErr *ssh.ExitError

if errors.As(err, &exitErr) {
executor.Logger.Errorf("Command execution failed on VM %s with exit code %d: %v", executor.VMName, exitErr.ExitStatus(), err)
} else {
executor.Logger.Errorf("SSH connection dropped or protocol error on VM %s: %v", executor.VMName, err)
}
} else {
executor.Logger.Infof("Execution completed successfully on VM %s", executor.VMName)
}

return err
}
}
Expand Down
42 changes: 21 additions & 21 deletions pkg/runner-provisioner/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,43 +43,39 @@ var commands_template = []string{
"echo 'Git Action Runner exited'",
}

func (p *RunnerProvisioner) ProvisionRunner(ctx context.Context) (err error) {
func (p *RunnerProvisioner) ProvisionRunner(ctx context.Context) (*orka.VMCommandExecutor, []string, error) {
p.logger.Infof("deploying Orka VM with prefix %s", p.runnerScaleSet.Name)
vmResponse, err := p.orkaClient.DeployVM(ctx, p.runnerScaleSet.Name, p.envData.OrkaVMConfig)
if err != nil {
p.logger.Errorf("failed to deploy Orka VM: %v", err)
return err
return nil, nil, err
}

runnerName := vmResponse.Name
p.logger.Infof("deployed Orka VM with name %s", runnerName)

provisioningSucceeded := false

defer func() {
if err != nil {
if ctx.Err() != nil {
p.logger.Warnf("context cancelled/timed out, triggering cleanup for %s", runnerName)
} else {
p.logger.Errorf("provisioning failed with error, triggering cleanup for %s: %v", runnerName, err)
}
} else {
p.logger.Infof("provisioning and execution completed normally, triggering cleanup for %s", runnerName)
if !provisioningSucceeded {
p.logger.Warnf("provisioning failed, cleaning up resources for VM %s", runnerName)
p.cleanupResources(context.WithoutCancel(ctx), runnerName)
}
p.cleanupResources(context.WithoutCancel(ctx), runnerName)
}()

vmIP, err := p.getRealVMIP(vmResponse.IP)
if err != nil {
p.logger.Errorf("failed to get real VM IP for %s: %v", runnerName, err)
return err
return nil, nil, err
}

p.logger.Infof("creating runner with name %s", runnerName)
p.logger.Infof("creating runner config for name %s", runnerName)
jitConfig, err := p.createRunner(ctx, runnerName)
if err != nil {
p.logger.Errorf("failed to create runner config for %s: %v", runnerName, err)
return err
return nil, nil, err
}
p.logger.Infof("created runner with name %s", runnerName)
p.logger.Infof("created runner config with name %s", runnerName)

vmCommandExecutor := &orka.VMCommandExecutor{
VMIP: vmIP,
Expand All @@ -90,13 +86,17 @@ func (p *RunnerProvisioner) ProvisionRunner(ctx context.Context) (err error) {
Logger: p.logger,
}

p.logger.Infof("starting command execution on VM %s", runnerName)
err = vmCommandExecutor.ExecuteCommands(ctx, buildCommands(jitConfig.EncodedJITConfig, p.envData.GitHubRunnerVersion, p.envData.OrkaVMUsername)...)
if err != nil {
return err
}
commands := buildCommands(jitConfig.EncodedJITConfig, p.envData.GitHubRunnerVersion, p.envData.OrkaVMUsername)

return nil
provisioningSucceeded = true

return vmCommandExecutor, commands, nil
}

func (p *RunnerProvisioner) CleanupResources(ctx context.Context, runnerName string) {
p.logger.Infof("starting resource cleanup for %s", runnerName)
p.cleanupResources(ctx, runnerName)
p.logger.Infof("resource cleanup completed for %s", runnerName)
}

func (p *RunnerProvisioner) getRealVMIP(vmIP string) (string, error) {
Expand Down