diff --git a/.github/actions/setup/action.yml b/.github/actions/setup/action.yml new file mode 100644 index 0000000..a0b1b81 --- /dev/null +++ b/.github/actions/setup/action.yml @@ -0,0 +1,31 @@ +name: Setup Project +description: Setup Node.js, restore build cache, and setup Yarn + +inputs: + node-version: + description: 'Node.js version to use' + required: false + default: '22' + +runs: + using: composite + steps: + - name: Set up Node.js + uses: actions/setup-node@v4 + with: + node-version: ${{ inputs.node-version }} + + - name: Restore build outputs + uses: actions/cache@v4 + with: + path: | + .yarn + node_modules + **/node_modules + **/dist + key: ${{ runner.os }}-build-${{ hashFiles('**/yarn.lock') }}-${{ github.sha }} + + - name: Setup Latest Yarn + uses: threeal/setup-yarn-action@v2.0.0 + with: + version: berry diff --git a/.github/workflows/pull-request.yml b/.github/workflows/pull-request.yml index ba82478..99f5d27 100644 --- a/.github/workflows/pull-request.yml +++ b/.github/workflows/pull-request.yml @@ -15,9 +15,47 @@ concurrency: cancel-in-progress: true jobs: - build-and-test: + build: runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Setup project + uses: ./.github/actions/setup + + - name: Build project + run: yarn build + + format-check: + runs-on: ubuntu-latest + needs: build + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Setup project + uses: ./.github/actions/setup + + - name: Run format checker + run: yarn format:check + + lint: + runs-on: ubuntu-latest + needs: build + steps: + - name: Checkout repository + uses: actions/checkout@v4 + - name: Setup project + uses: ./.github/actions/setup + + - name: Run linter + run: yarn lint + + test: + runs-on: ubuntu-latest + needs: build services: postgres: image: postgres:latest @@ -59,25 +97,8 @@ jobs: - name: Checkout repository uses: actions/checkout@v4 - - name: Set up Node.js - uses: actions/setup-node@v4 - with: - node-version: "22" - - - name: Setup Latest Yarn - uses: threeal/setup-yarn-action@v2.0.0 - with: - version: berry - cache: false - - - name: Build project - run: yarn build - - - name: Run format checker - run: yarn format:check - - - name: Run linter - run: yarn lint + - name: Setup project + uses: ./.github/actions/setup - name: Run tests env: @@ -86,6 +107,30 @@ jobs: MONGODB_URL: mongodb://127.0.0.1:27017/test run: yarn test:all:ci + integration-test: + runs-on: ubuntu-latest + needs: build + services: + postgres: + image: postgres:latest + ports: + - 5432:5432 + env: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + POSTGRES_DB: postgres + options: >- + --health-cmd="pg_isready -U postgres" + --health-interval=10s + --health-timeout=5s + --health-retries=5 + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Setup project + uses: ./.github/actions/setup + - name: Run integration tests env: POSTGRES_URL: postgresql://postgres:postgres@localhost:5432/postgres diff --git a/packages/core/src/transitions/snooze-transition.ts b/packages/core/src/transitions/snooze-transition.ts index 639a03f..8bcd5c5 100644 --- a/packages/core/src/transitions/snooze-transition.ts +++ b/packages/core/src/transitions/snooze-transition.ts @@ -10,7 +10,7 @@ import { JobTransition } from "./transition"; * If the job is currently running, it will decrement the attempt count. * This allows the job to be retried after the delay. * - * Only jobs in "waiting" or "claimed" or "running" state can be snoozed. + * Only jobs in "waiting" or "running" state can be snoozed. */ export class SnoozeTransition extends JobTransition { /** The delay in milliseconds. */ @@ -40,6 +40,6 @@ export class SnoozeTransition extends JobTransition { } shouldRun(job: JobData): boolean { - return ["waiting", "claimed", "running"].includes(job.state); + return ["waiting", "running"].includes(job.state); } } diff --git a/packages/engine/src/execution/dispatcher.test.ts b/packages/engine/src/execution/dispatcher.test.ts index c13276d..528cd9f 100644 --- a/packages/engine/src/execution/dispatcher.test.ts +++ b/packages/engine/src/execution/dispatcher.test.ts @@ -120,5 +120,49 @@ describe("Dispatcher", () => { await dispatcher.stop(); }); + + sidequestTest( + "claims min(availableSlots, globalSlots) jobs when queue has more slots than global", + async ({ backend }) => { + // Setup: Queue has concurrency of 10, but global has only 3 slots + const configWithHighQueueConcurrency: EngineConfig = { + backend: { driver: "@sidequest/sqlite-backend" }, + queues: [{ name: "default", concurrency: 10 }], + maxConcurrentJobs: 3, + }; + + // Create 4 jobs to ensure there are enough jobs to claim + await createJob(backend, "default"); + await createJob(backend, "default"); + await createJob(backend, "default"); + await createJob(backend, "default"); + + expect(await backend.listJobs({ state: "waiting" })).toHaveLength(5); + + const mockClaim = vi.spyOn(backend, "claimPendingJob"); + + const dispatcher = new Dispatcher( + backend, + new QueueManager(backend, configWithHighQueueConcurrency.queues!), + new ExecutorManager(backend, configWithHighQueueConcurrency as NonNullableEngineConfig), + 100, + ); + dispatcher.start(); + + runMock.mockImplementation(() => { + return { type: "completed", result: "foo", __is_job_transition__: true } as CompletedResult; + }); + + // Wait for the first claim to happen + await vi.waitUntil(() => mockClaim.mock.calls.length > 0); + + // Verify that claimPendingJob was called with Math.min(availableSlots, globalSlots) + // Queue has 10 slots available, global has 3 slots available + // So it should claim min(10, 3) = 3 jobs + expect(mockClaim).toHaveBeenCalledWith("default", 3); + + await dispatcher.stop(); + }, + ); }); }); diff --git a/packages/engine/src/execution/dispatcher.ts b/packages/engine/src/execution/dispatcher.ts index f364c37..493fe82 100644 --- a/packages/engine/src/execution/dispatcher.ts +++ b/packages/engine/src/execution/dispatcher.ts @@ -48,7 +48,7 @@ export class Dispatcher { continue; } - const jobs: JobData[] = await this.backend.claimPendingJob(queue.name, availableSlots); + const jobs: JobData[] = await this.backend.claimPendingJob(queue.name, Math.min(availableSlots, globalSlots)); if (jobs.length > 0) { // if a job was found on any queue do not sleep @@ -56,6 +56,10 @@ export class Dispatcher { } for (const job of jobs) { + // adds jobs to active sets before execution to avoid race conditions + // because the execution is not awaited. This way we ensure that available slots + // are correctly calculated. + this.executorManager.queueJob(queue, job); // does not await for job execution. void this.executorManager.execute(queue, job); } diff --git a/packages/engine/src/execution/executor-manager.test.ts b/packages/engine/src/execution/executor-manager.test.ts index b016838..f3cf691 100644 --- a/packages/engine/src/execution/executor-manager.test.ts +++ b/packages/engine/src/execution/executor-manager.test.ts @@ -171,46 +171,6 @@ describe("ExecutorManager", () => { await executorManager.destroy(); }); - - sidequestTest("snoozes job when queue is full", async ({ backend, config }) => { - const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 }); - const executorManager = new ExecutorManager(backend, config); - - vi.spyOn(executorManager, "availableSlotsByQueue").mockReturnValue(0); - - // Set up job in claimed state (as it would be when passed to execute) - jobData = await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date() }); - - await executorManager.execute(queryConfig, jobData); - - // Verify the job runner was NOT called since the job was snoozed - expect(runMock).not.toHaveBeenCalled(); - - // Verify slots remain unchanged (no job was actually executed) - expect(executorManager.availableSlotsByQueue(queryConfig)).toEqual(0); - expect(executorManager.totalActiveWorkers()).toEqual(0); - await executorManager.destroy(); - }); - - sidequestTest("snoozes job when global slots are full", async ({ backend, config }) => { - const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 5 }); - const executorManager = new ExecutorManager(backend, { ...config, maxConcurrentJobs: 1 }); - - vi.spyOn(executorManager, "availableSlotsGlobal").mockReturnValue(0); - - // Set up job in claimed state - jobData = await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date() }); - - await executorManager.execute(queryConfig, jobData); - - // Verify the job runner was NOT called - expect(runMock).not.toHaveBeenCalled(); - - // Verify global slots show as full - expect(executorManager.availableSlotsGlobal()).toEqual(0); - expect(executorManager.totalActiveWorkers()).toEqual(0); - await executorManager.destroy(); - }); }); describe("availableSlotsByQueue", () => { diff --git a/packages/engine/src/execution/executor-manager.ts b/packages/engine/src/execution/executor-manager.ts index 0c7553d..f97b357 100644 --- a/packages/engine/src/execution/executor-manager.ts +++ b/packages/engine/src/execution/executor-manager.ts @@ -1,13 +1,5 @@ import { Backend } from "@sidequest/backend"; -import { - JobData, - JobTransitionFactory, - logger, - QueueConfig, - RetryTransition, - RunTransition, - SnoozeTransition, -} from "@sidequest/core"; +import { JobData, JobTransitionFactory, logger, QueueConfig, RetryTransition, RunTransition } from "@sidequest/core"; import EventEmitter from "events"; import { inspect } from "util"; import { NonNullableEngineConfig } from "../engine"; @@ -77,44 +69,49 @@ export class ExecutorManager { } /** - * Executes a job in the given queue. + * Prepares a job for execution by marking it as active and adding it to a queue slot. * @param queueConfig The queue configuration. - * @param job The job data to execute. + * @param job The job data. */ - async execute(queueConfig: QueueConfig, job: JobData): Promise { - logger("Executor Manager").debug(`Submitting job ${job.id} for execution in queue ${queueConfig.name}`); + queueJob(queueConfig: QueueConfig, job: JobData) { if (!this.activeByQueue[queueConfig.name]) { this.activeByQueue[queueConfig.name] = new Set(); } - - if (this.availableSlotsByQueue(queueConfig) <= 0 || this.availableSlotsGlobal() <= 0) { - logger("Executor Manager").debug(`No available slots for job ${job.id} in queue ${queueConfig.name}`); - await JobTransitioner.apply(this.backend, job, new SnoozeTransition(0)); - return; - } - this.activeByQueue[queueConfig.name].add(job.id); this.activeJobs.add(job.id); + } - job = await JobTransitioner.apply(this.backend, job, new RunTransition()); - - const signal = new EventEmitter(); - let isRunning = true; - const cancellationCheck = async () => { - while (isRunning) { - const watchedJob = await this.backend.getJob(job.id); - if (watchedJob!.state === "canceled") { - logger("Executor Manager").debug(`Emitting abort signal for job ${job.id}`); - signal.emit("abort"); - isRunning = false; - return; + /** + * Executes a job in the given queue. + * @param queueConfig The queue configuration. + * @param job The job data to execute. + */ + async execute(queueConfig: QueueConfig, job: JobData): Promise { + let isRunning = false; + try { + logger("Executor Manager").debug(`Submitting job ${job.id} for execution in queue ${queueConfig.name}`); + // We call prepareJob here again to make sure the jobs are in the queues. + // This might not be necessary, but for the sake of consistency we do it. + this.queueJob(queueConfig, job); + + job = await JobTransitioner.apply(this.backend, job, new RunTransition()); + + isRunning = true; + const signal = new EventEmitter(); + const cancellationCheck = async () => { + while (isRunning) { + const watchedJob = await this.backend.getJob(job.id); + if (watchedJob!.state === "canceled") { + logger("Executor Manager").debug(`Emitting abort signal for job ${job.id}`); + signal.emit("abort"); + isRunning = false; + return; + } + await new Promise((r) => setTimeout(r, 1000)); } - await new Promise((r) => setTimeout(r, 1000)); - } - }; - void cancellationCheck(); + }; + void cancellationCheck(); - try { logger("Executor Manager").debug(`Running job ${job.id} in queue ${queueConfig.name}`); const runPromise = this.runnerPool.run(job, signal);