Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 160 additions & 0 deletions lambdas/functions/control-plane/src/scale-runners/scale-up.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { createRunner, listEC2Runners } from './../aws/runners';
import { RunnerInputParameters } from './../aws/runners.d';
import * as scaleUpModule from './scale-up';
import { getParameter } from '@aws-github-runner/aws-ssm-util';
import { publishRetryMessage } from './job-retry';
import { describe, it, expect, beforeEach, vi } from 'vitest';
import type { Octokit } from '@octokit/rest';

Expand All @@ -33,6 +34,7 @@ const mockCreateRunner = vi.mocked(createRunner);
const mockListRunners = vi.mocked(listEC2Runners);
const mockSSMClient = mockClient(SSMClient);
const mockSSMgetParameter = vi.mocked(getParameter);
const mockPublishRetryMessage = vi.mocked(publishRetryMessage);

vi.mock('@octokit/rest', () => ({
Octokit: vi.fn().mockImplementation(function () {
Expand Down Expand Up @@ -63,6 +65,11 @@ vi.mock('@aws-github-runner/aws-ssm-util', async () => {
};
});

vi.mock('./job-retry', () => ({
publishRetryMessage: vi.fn(),
checkAndRetryJob: vi.fn(),
}));

export type RunnerType = 'ephemeral' | 'non-ephemeral';

// for ephemeral and non-ephemeral runners
Expand Down Expand Up @@ -1667,6 +1674,159 @@ describe('scaleUp with Github Data Residency', () => {
});
});

describe('Retry mechanism tests', () => {
beforeEach(() => {
process.env.ENABLE_ORGANIZATION_RUNNERS = 'true';
process.env.ENABLE_EPHEMERAL_RUNNERS = 'true';
process.env.ENABLE_JOB_QUEUED_CHECK = 'true';
process.env.RUNNERS_MAXIMUM_COUNT = '10';
expectedRunnerParams = { ...EXPECTED_RUNNER_PARAMS };
mockSSMClient.reset();
});

const createTestMessages = (
count: number,
overrides: Partial<scaleUpModule.ActionRequestMessageSQS>[] = [],
): scaleUpModule.ActionRequestMessageSQS[] => {
return Array.from({ length: count }, (_, i) => ({
...TEST_DATA_SINGLE,
id: i + 1,
messageId: `message-${i + 1}`,
...overrides[i],
}));
};

it('calls publishRetryMessage for each valid message when job is queued', async () => {
const messages = createTestMessages(3);

await scaleUpModule.scaleUp(messages);

expect(mockPublishRetryMessage).toHaveBeenCalledTimes(3);
expect(mockPublishRetryMessage).toHaveBeenNthCalledWith(
1,
expect.objectContaining({
id: 1,
messageId: 'message-1',
}),
);
expect(mockPublishRetryMessage).toHaveBeenNthCalledWith(
2,
expect.objectContaining({
id: 2,
messageId: 'message-2',
}),
);
expect(mockPublishRetryMessage).toHaveBeenNthCalledWith(
3,
expect.objectContaining({
id: 3,
messageId: 'message-3',
}),
);
});

it('does not call publishRetryMessage when job is not queued', async () => {
mockOctokit.actions.getJobForWorkflowRun.mockImplementation((params) => {
const isQueued = params.job_id === 1; // Only job 1 is queued
return {
data: {
status: isQueued ? 'queued' : 'completed',
},
};
});

const messages = createTestMessages(3);

await scaleUpModule.scaleUp(messages);

// Only message with id 1 should trigger retry
expect(mockPublishRetryMessage).toHaveBeenCalledTimes(1);
expect(mockPublishRetryMessage).toHaveBeenCalledWith(
expect.objectContaining({
id: 1,
messageId: 'message-1',
}),
);
});

it('calls publishRetryMessage even when maximum runners is reached', async () => {
process.env.RUNNERS_MAXIMUM_COUNT = '0'; // No runners can be created

const messages = createTestMessages(2);

await scaleUpModule.scaleUp(messages);

// publishRetryMessage should still be called even though no runners will be created
expect(mockPublishRetryMessage).toHaveBeenCalledTimes(2);
expect(createRunner).not.toHaveBeenCalled();
});

it('calls publishRetryMessage with correct message structure including retry counter', async () => {
const message = {
...TEST_DATA_SINGLE,
messageId: 'test-message-id',
retryCounter: 2,
};

await scaleUpModule.scaleUp([message]);

expect(mockPublishRetryMessage).toHaveBeenCalledWith(
expect.objectContaining({
id: message.id,
messageId: 'test-message-id',
retryCounter: 2,
}),
);
});

it('calls publishRetryMessage when ENABLE_JOB_QUEUED_CHECK is false', async () => {
process.env.ENABLE_JOB_QUEUED_CHECK = 'false';

const messages = createTestMessages(2);

await scaleUpModule.scaleUp(messages);

// Should always call publishRetryMessage when queue check is disabled
expect(mockPublishRetryMessage).toHaveBeenCalledTimes(2);
expect(mockOctokit.actions.getJobForWorkflowRun).not.toHaveBeenCalled();
});

it('calls publishRetryMessage for each message in a multi-runner scenario', async () => {
const messages = createTestMessages(5);

await scaleUpModule.scaleUp(messages);

expect(mockPublishRetryMessage).toHaveBeenCalledTimes(5);
messages.forEach((msg, index) => {
expect(mockPublishRetryMessage).toHaveBeenNthCalledWith(
index + 1,
expect.objectContaining({
id: msg.id,
messageId: msg.messageId,
}),
);
});
});

it('calls publishRetryMessage before runner creation', async () => {
const messages = createTestMessages(1);

const callOrder: string[] = [];
mockPublishRetryMessage.mockImplementation(() => {
callOrder.push('publishRetryMessage');
return Promise.resolve();
});
mockCreateRunner.mockImplementation(async () => {
callOrder.push('createRunner');
return ['i-12345'];
});

await scaleUpModule.scaleUp(messages);

expect(callOrder).toEqual(['publishRetryMessage', 'createRunner']);
});
});

function defaultOctokitMockImpl() {
mockOctokit.actions.getJobForWorkflowRun.mockImplementation(() => ({
data: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { createGithubAppAuth, createGithubInstallationAuth, createOctokitClient
import { createRunner, listEC2Runners, tag } from './../aws/runners';
import { RunnerInputParameters } from './../aws/runners.d';
import { metricGitHubAppRateLimit } from '../github/rate-limit';
import { publishRetryMessage } from './job-retry';

const logger = createChildLogger('scale-up');

Expand Down Expand Up @@ -356,6 +357,7 @@ export async function scaleUp(payloads: ActionRequestMessageSQS[]): Promise<stri
}

scaleUp++;
await publishRetryMessage(message as ActionRequestMessageRetry);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, the call got lost in the PR that introces the btaches.

I doubt a bit if we should publish the message hre. At this point no runner is created. Only the count is increased. But on the other hand we cannot match the created runners with the event.

I think it would be a bit safter to call the publishRetry messages at the end for each message that is not marked as invalid. Since the invalid once are going back to the queue and will be retried. This will lead that more retry messages are created for the same event.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if it's worth it to add all that additional logic (and an additional for-loop) into this function. It will make it way more complex as well..

In case the message is marked as invalid, worst case it goes through the retry mechanism, ends up in the same scale-up function and it still is an invalid message. Leading to minimal waste.
IMO a good trade-off to avoid making this function even more complex.

What do you think @npalm?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@npalm, what if your feeling about what @Brend-Smits said?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No I do not agree, problem is in case of failure by pushing all messages back the presure on APIs only quickly increase. So I think it will be better to filter. I did an quick experiment on a branch with some copilot refactory. But had not the time to dig in futher.

}

if (scaleUp === 0) {
Expand Down Expand Up @@ -395,7 +397,7 @@ export async function scaleUp(payloads: ActionRequestMessageSQS[]): Promise<stri
}

// No runners will be created, so skip calling the EC2 API.
if (missingInstanceCount === scaleUp) {
if (newRunners <= 0) {
continue;
}
}
Expand Down