Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions examples/.env
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,8 @@ LOG_LEVEL="debug"
# [Optional] ORKA_VM_METADATA specifies the custom metadata passed to the VM.
# Should be formatted as "key=value" comma separated list.
ORKA_VM_METADATA="key1=value1,key2=value2"

# [Optional] VM_TRACKER_INTERVAL specifies the interval at which the VM tracker will check for orphaned VMs.
# VMs are deleted if they do not have a corresponding GitHub runner for 2 consecutive checks.
# If not provided, it defaults to 300 seconds.
VM_TRACKER_INTERVAL="300s"
5 changes: 4 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,10 @@ func run(ctx context.Context, actionsClient *actions.ActionsClient, orkaClient *

runnerProvisioner := provisioner.NewRunnerProvisioner(runnerScaleSet, actionsClient, orkaClient, envData)

runnerMessageProcessor := runners.NewRunnerMessageProcessor(ctx, runnerManager, runnerProvisioner, runnerScaleSet)
vmTracker := runners.NewVMTracker(orkaClient, actionsClient, logger)
go vmTracker.Start(ctx, envData.VMTrackerInterval)

runnerMessageProcessor := runners.NewRunnerMessageProcessor(ctx, runnerManager, runnerProvisioner, vmTracker, runnerScaleSet)

if err = runnerMessageProcessor.StartProcessingMessages(); err != nil {
logger.Errorf("failed to start processing messages for runnerScaleSet %s: %w", runnerScaleSet.Name, err.Error())
Expand Down
2 changes: 2 additions & 0 deletions pkg/env/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,7 @@ const (
RunnerDeregistrationTimeoutEnvName = "RUNNER_DEREGISTRATION_TIMEOUT"
RunnerDeregistrationPollIntervalEnvName = "RUNNER_DEREGISTRATION_POLL_INTERVAL"

VMTrackerIntervalEnvName = "VM_TRACKER_INTERVAL"

LogLevelEnvName = "LOG_LEVEL"
)
4 changes: 4 additions & 0 deletions pkg/env/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type Data struct {
RunnerDeregistrationTimeout time.Duration
RunnerDeregistrationPollInterval time.Duration

VMTrackerInterval time.Duration

LogLevel string
}

Expand Down Expand Up @@ -77,6 +79,8 @@ func ParseEnv() *Data {
RunnerDeregistrationTimeout: getDurationEnv(RunnerDeregistrationTimeoutEnvName, 30*time.Second),
RunnerDeregistrationPollInterval: getDurationEnv(RunnerDeregistrationPollIntervalEnvName, 2*time.Second),

VMTrackerInterval: getDurationEnv(VMTrackerIntervalEnvName, 300*time.Second),

LogLevel: getEnvWithDefault(LogLevelEnvName, logging.LogLevelInfo),
}

Expand Down
5 changes: 4 additions & 1 deletion pkg/github/runners/message-processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const (
defaultJobId = "missing-job-id"
)

func NewRunnerMessageProcessor(ctx context.Context, runnerManager RunnerManagerInterface, runnerProvisioner RunnerProvisionerInterface, runnerScaleSet *types.RunnerScaleSet) *RunnerMessageProcessor {
func NewRunnerMessageProcessor(ctx context.Context, runnerManager RunnerManagerInterface, runnerProvisioner RunnerProvisionerInterface, vmTracker *VMTracker, runnerScaleSet *types.RunnerScaleSet) *RunnerMessageProcessor {
return &RunnerMessageProcessor{
ctx: ctx,
runnerManager: runnerManager,
Expand All @@ -37,6 +37,7 @@ func NewRunnerMessageProcessor(ctx context.Context, runnerManager RunnerManagerI
upstreamCanceledJobsMutex: sync.RWMutex{},
jobContextCancels: map[string]context.CancelFunc{},
jobContextCancelsMutex: sync.Mutex{},
vmTracker: vmTracker,
}
}

Expand Down Expand Up @@ -142,6 +143,7 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale
context.AfterFunc(jobContext, func() {
p.logger.Infof("cleaning up resources for %s after job context is canceled", executor.VMName)
p.runnerProvisioner.CleanupResources(context.WithoutCancel(p.ctx), executor.VMName)
p.vmTracker.Untrack(executor.VMName)
})

defer func() {
Expand All @@ -168,6 +170,7 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale
p.cancelJobContext(jobId, cancelReason)
}()

p.vmTracker.Track(executor.VMName)
executionErr = p.executeJobCommands(jobContext, jobId, executor, commands)
}()
}
Expand Down
1 change: 1 addition & 0 deletions pkg/github/runners/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type RunnerMessageProcessor struct {
logger *zap.SugaredLogger
runnerManager RunnerManagerInterface
runnerProvisioner RunnerProvisionerInterface
vmTracker *VMTracker
runnerScaleSetName string
upstreamCanceledJobs map[string]bool
upstreamCanceledJobsMutex sync.RWMutex
Expand Down
110 changes: 110 additions & 0 deletions pkg/github/runners/vm_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package runners

import (
"context"
"strings"
"sync"
"time"

"github.com/macstadium/orka-github-actions-integration/pkg/github/actions"
"github.com/macstadium/orka-github-actions-integration/pkg/orka"
"go.uber.org/zap"
)

type VMTracker struct {
orkaClient orka.OrkaService
actionsClient actions.ActionsService
logger *zap.SugaredLogger

mu sync.Mutex
trackedVMs map[string]int
}

func NewVMTracker(orkaClient orka.OrkaService, actionsClient actions.ActionsService, logger *zap.SugaredLogger) *VMTracker {
return &VMTracker{
orkaClient: orkaClient,
actionsClient: actionsClient,
logger: logger.Named("vm-tracker"),
trackedVMs: make(map[string]int),
}
}

func (tracker *VMTracker) Track(vmName string) {
tracker.mu.Lock()
defer tracker.mu.Unlock()
tracker.trackedVMs[vmName] = 0
tracker.logger.Debugf("Now tracking VM %s for orphaned VM detection", vmName)
}

func (tracker *VMTracker) Untrack(vmName string) {
tracker.logger.Debugf("Stopping tracking VM %s for orphaned VM detection", vmName)
tracker.mu.Lock()
defer tracker.mu.Unlock()
delete(tracker.trackedVMs, vmName)
}

func (tracker *VMTracker) Start(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
tracker.checkaForOrphanedVMs(ctx)
}
}
}

func (tracker *VMTracker) checkaForOrphanedVMs(ctx context.Context) {
tracker.logger.Debugf("Checking for orphaned VMs")
tracker.mu.Lock()
vmNames := make([]string, 0, len(tracker.trackedVMs))
for name := range tracker.trackedVMs {
vmNames = append(vmNames, name)
}
tracker.mu.Unlock()

if len(vmNames) == 0 {
tracker.logger.Debugf("No VMs to check for orphaned VMs")
return
}

for _, name := range vmNames {
runner, err := tracker.actionsClient.GetRunner(ctx, name)
if err != nil {
tracker.logger.Warnf("failed to check GitHub for %s: %v", name, err)
continue
}

tracker.mu.Lock()
if runner == nil {
tracker.trackedVMs[name]++
strikes := tracker.trackedVMs[name]
tracker.mu.Unlock()

tracker.logger.Warnf("VM %s has no GitHub runner (Strike %d/2)", name, strikes)

if strikes >= 2 {
tracker.logger.Errorf("VM %s is orphaned. Forcing deletion.", name)
tracker.cleanupOrphanedVM(ctx, name)
}
} else {
tracker.trackedVMs[name] = 0
tracker.mu.Unlock()
tracker.logger.Debugf("VM %s is healthy and registered", name)
}
}
}

func (tracker *VMTracker) cleanupOrphanedVM(ctx context.Context, vmName string) {
err := tracker.orkaClient.DeleteVM(ctx, vmName)
if err != nil && !strings.Contains(err.Error(), "not found") {
tracker.logger.Errorf("Failed to delete orphaned VM %s: %v", vmName, err)
return
}

tracker.Untrack(vmName)
tracker.logger.Infof("Successfully deleted orphaned VM %s", vmName)
}
Loading