Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions .github/actions/setup/action.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
name: Setup Project
description: Setup Node.js, restore build cache, and setup Yarn

inputs:
node-version:
description: 'Node.js version to use'
required: false
default: '22'

runs:
using: composite
steps:
- name: Set up Node.js
uses: actions/setup-node@v4
with:
node-version: ${{ inputs.node-version }}

- name: Restore build outputs
uses: actions/cache@v4
with:
path: |
.yarn
node_modules
**/node_modules
**/dist
key: ${{ runner.os }}-build-${{ hashFiles('**/yarn.lock') }}-${{ github.sha }}

- name: Setup Latest Yarn
uses: threeal/setup-yarn-action@v2.0.0
with:
version: berry
85 changes: 65 additions & 20 deletions .github/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,47 @@ concurrency:
cancel-in-progress: true

jobs:
build-and-test:
build:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v4

- name: Setup project
uses: ./.github/actions/setup

- name: Build project
run: yarn build

format-check:
runs-on: ubuntu-latest
needs: build
steps:
- name: Checkout repository
uses: actions/checkout@v4

- name: Setup project
uses: ./.github/actions/setup

- name: Run format checker
run: yarn format:check

lint:
runs-on: ubuntu-latest
needs: build
steps:
- name: Checkout repository
uses: actions/checkout@v4

- name: Setup project
uses: ./.github/actions/setup

- name: Run linter
run: yarn lint

test:
runs-on: ubuntu-latest
needs: build
services:
postgres:
image: postgres:latest
Expand Down Expand Up @@ -59,25 +97,8 @@ jobs:
- name: Checkout repository
uses: actions/checkout@v4

- name: Set up Node.js
uses: actions/setup-node@v4
with:
node-version: "22"

- name: Setup Latest Yarn
uses: threeal/setup-yarn-action@v2.0.0
with:
version: berry
cache: false

- name: Build project
run: yarn build

- name: Run format checker
run: yarn format:check

- name: Run linter
run: yarn lint
- name: Setup project
uses: ./.github/actions/setup

- name: Run tests
env:
Expand All @@ -86,6 +107,30 @@ jobs:
MONGODB_URL: mongodb://127.0.0.1:27017/test
run: yarn test:all:ci

integration-test:
runs-on: ubuntu-latest
needs: build
services:
postgres:
image: postgres:latest
ports:
- 5432:5432
env:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: postgres
options: >-
--health-cmd="pg_isready -U postgres"
--health-interval=10s
--health-timeout=5s
--health-retries=5
steps:
- name: Checkout repository
uses: actions/checkout@v4

- name: Setup project
uses: ./.github/actions/setup

- name: Run integration tests
env:
POSTGRES_URL: postgresql://postgres:postgres@localhost:5432/postgres
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/transitions/snooze-transition.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { JobTransition } from "./transition";
* If the job is currently running, it will decrement the attempt count.
* This allows the job to be retried after the delay.
*
* Only jobs in "waiting" or "claimed" or "running" state can be snoozed.
* Only jobs in "waiting" or "running" state can be snoozed.
*/
export class SnoozeTransition extends JobTransition {
/** The delay in milliseconds. */
Expand Down Expand Up @@ -40,6 +40,6 @@ export class SnoozeTransition extends JobTransition {
}

shouldRun(job: JobData): boolean {
return ["waiting", "claimed", "running"].includes(job.state);
return ["waiting", "running"].includes(job.state);
}
}
44 changes: 44 additions & 0 deletions packages/engine/src/execution/dispatcher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,5 +120,49 @@ describe("Dispatcher", () => {

await dispatcher.stop();
});

sidequestTest(
"claims min(availableSlots, globalSlots) jobs when queue has more slots than global",
async ({ backend }) => {
// Setup: Queue has concurrency of 10, but global has only 3 slots
const configWithHighQueueConcurrency: EngineConfig = {
backend: { driver: "@sidequest/sqlite-backend" },
queues: [{ name: "default", concurrency: 10 }],
maxConcurrentJobs: 3,
};

// Create 4 jobs to ensure there are enough jobs to claim
await createJob(backend, "default");
await createJob(backend, "default");
await createJob(backend, "default");
await createJob(backend, "default");

expect(await backend.listJobs({ state: "waiting" })).toHaveLength(5);

const mockClaim = vi.spyOn(backend, "claimPendingJob");

const dispatcher = new Dispatcher(
backend,
new QueueManager(backend, configWithHighQueueConcurrency.queues!),
new ExecutorManager(backend, configWithHighQueueConcurrency as NonNullableEngineConfig),
100,
);
dispatcher.start();

runMock.mockImplementation(() => {
return { type: "completed", result: "foo", __is_job_transition__: true } as CompletedResult;
});

// Wait for the first claim to happen
await vi.waitUntil(() => mockClaim.mock.calls.length > 0);

// Verify that claimPendingJob was called with Math.min(availableSlots, globalSlots)
// Queue has 10 slots available, global has 3 slots available
// So it should claim min(10, 3) = 3 jobs
expect(mockClaim).toHaveBeenCalledWith("default", 3);

await dispatcher.stop();
},
);
});
});
6 changes: 5 additions & 1 deletion packages/engine/src/execution/dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,18 @@ export class Dispatcher {
continue;
}

const jobs: JobData[] = await this.backend.claimPendingJob(queue.name, availableSlots);
const jobs: JobData[] = await this.backend.claimPendingJob(queue.name, Math.min(availableSlots, globalSlots));

if (jobs.length > 0) {
// if a job was found on any queue do not sleep
shouldSleep = false;
}

for (const job of jobs) {
// adds jobs to active sets before execution to avoid race conditions
// because the execution is not awaited. This way we ensure that available slots
// are correctly calculated.
this.executorManager.queueJob(queue, job);
// does not await for job execution.
void this.executorManager.execute(queue, job);
}
Expand Down
40 changes: 0 additions & 40 deletions packages/engine/src/execution/executor-manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,46 +171,6 @@ describe("ExecutorManager", () => {

await executorManager.destroy();
});

sidequestTest("snoozes job when queue is full", async ({ backend, config }) => {
const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 });
const executorManager = new ExecutorManager(backend, config);

vi.spyOn(executorManager, "availableSlotsByQueue").mockReturnValue(0);

// Set up job in claimed state (as it would be when passed to execute)
jobData = await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date() });

await executorManager.execute(queryConfig, jobData);

// Verify the job runner was NOT called since the job was snoozed
expect(runMock).not.toHaveBeenCalled();

// Verify slots remain unchanged (no job was actually executed)
expect(executorManager.availableSlotsByQueue(queryConfig)).toEqual(0);
expect(executorManager.totalActiveWorkers()).toEqual(0);
await executorManager.destroy();
});

sidequestTest("snoozes job when global slots are full", async ({ backend, config }) => {
const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 5 });
const executorManager = new ExecutorManager(backend, { ...config, maxConcurrentJobs: 1 });

vi.spyOn(executorManager, "availableSlotsGlobal").mockReturnValue(0);

// Set up job in claimed state
jobData = await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date() });

await executorManager.execute(queryConfig, jobData);

// Verify the job runner was NOT called
expect(runMock).not.toHaveBeenCalled();

// Verify global slots show as full
expect(executorManager.availableSlotsGlobal()).toEqual(0);
expect(executorManager.totalActiveWorkers()).toEqual(0);
await executorManager.destroy();
});
});

describe("availableSlotsByQueue", () => {
Expand Down
71 changes: 34 additions & 37 deletions packages/engine/src/execution/executor-manager.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,5 @@
import { Backend } from "@sidequest/backend";
import {
JobData,
JobTransitionFactory,
logger,
QueueConfig,
RetryTransition,
RunTransition,
SnoozeTransition,
} from "@sidequest/core";
import { JobData, JobTransitionFactory, logger, QueueConfig, RetryTransition, RunTransition } from "@sidequest/core";
import EventEmitter from "events";
import { inspect } from "util";
import { NonNullableEngineConfig } from "../engine";
Expand Down Expand Up @@ -77,44 +69,49 @@ export class ExecutorManager {
}

/**
* Executes a job in the given queue.
* Prepares a job for execution by marking it as active and adding it to a queue slot.
* @param queueConfig The queue configuration.
* @param job The job data to execute.
* @param job The job data.
*/
async execute(queueConfig: QueueConfig, job: JobData): Promise<void> {
logger("Executor Manager").debug(`Submitting job ${job.id} for execution in queue ${queueConfig.name}`);
queueJob(queueConfig: QueueConfig, job: JobData) {
if (!this.activeByQueue[queueConfig.name]) {
this.activeByQueue[queueConfig.name] = new Set();
}

if (this.availableSlotsByQueue(queueConfig) <= 0 || this.availableSlotsGlobal() <= 0) {
logger("Executor Manager").debug(`No available slots for job ${job.id} in queue ${queueConfig.name}`);
await JobTransitioner.apply(this.backend, job, new SnoozeTransition(0));
return;
}

this.activeByQueue[queueConfig.name].add(job.id);
this.activeJobs.add(job.id);
}

job = await JobTransitioner.apply(this.backend, job, new RunTransition());

const signal = new EventEmitter();
let isRunning = true;
const cancellationCheck = async () => {
while (isRunning) {
const watchedJob = await this.backend.getJob(job.id);
if (watchedJob!.state === "canceled") {
logger("Executor Manager").debug(`Emitting abort signal for job ${job.id}`);
signal.emit("abort");
isRunning = false;
return;
/**
* Executes a job in the given queue.
* @param queueConfig The queue configuration.
* @param job The job data to execute.
*/
async execute(queueConfig: QueueConfig, job: JobData): Promise<void> {
let isRunning = false;
try {
logger("Executor Manager").debug(`Submitting job ${job.id} for execution in queue ${queueConfig.name}`);
// We call prepareJob here again to make sure the jobs are in the queues.
// This might not be necessary, but for the sake of consistency we do it.
this.queueJob(queueConfig, job);

job = await JobTransitioner.apply(this.backend, job, new RunTransition());

isRunning = true;
const signal = new EventEmitter();
const cancellationCheck = async () => {
while (isRunning) {
const watchedJob = await this.backend.getJob(job.id);
if (watchedJob!.state === "canceled") {
logger("Executor Manager").debug(`Emitting abort signal for job ${job.id}`);
signal.emit("abort");
isRunning = false;
return;
}
await new Promise((r) => setTimeout(r, 1000));
}
await new Promise((r) => setTimeout(r, 1000));
}
};
void cancellationCheck();
};
void cancellationCheck();

try {
logger("Executor Manager").debug(`Running job ${job.id} in queue ${queueConfig.name}`);

const runPromise = this.runnerPool.run(job, signal);
Expand Down
Loading