From 100f20ed51ddb465dcce58b65daf238759974f49 Mon Sep 17 00:00:00 2001 From: Giovani Guizzo Date: Mon, 5 Jan 2026 20:10:25 -0300 Subject: [PATCH 1/5] fix: added Math.min to the dispatcher and removed redundant checks --- .../core/src/transitions/snooze-transition.ts | 4 +- .../engine/src/execution/dispatcher.test.ts | 44 ++++++++++++ packages/engine/src/execution/dispatcher.ts | 6 +- .../src/execution/executor-manager.test.ts | 40 ----------- .../engine/src/execution/executor-manager.ts | 71 +++++++++---------- 5 files changed, 85 insertions(+), 80 deletions(-) diff --git a/packages/core/src/transitions/snooze-transition.ts b/packages/core/src/transitions/snooze-transition.ts index 639a03fb..8bcd5c54 100644 --- a/packages/core/src/transitions/snooze-transition.ts +++ b/packages/core/src/transitions/snooze-transition.ts @@ -10,7 +10,7 @@ import { JobTransition } from "./transition"; * If the job is currently running, it will decrement the attempt count. * This allows the job to be retried after the delay. * - * Only jobs in "waiting" or "claimed" or "running" state can be snoozed. + * Only jobs in "waiting" or "running" state can be snoozed. */ export class SnoozeTransition extends JobTransition { /** The delay in milliseconds. */ @@ -40,6 +40,6 @@ export class SnoozeTransition extends JobTransition { } shouldRun(job: JobData): boolean { - return ["waiting", "claimed", "running"].includes(job.state); + return ["waiting", "running"].includes(job.state); } } diff --git a/packages/engine/src/execution/dispatcher.test.ts b/packages/engine/src/execution/dispatcher.test.ts index c13276dd..ff1a9aea 100644 --- a/packages/engine/src/execution/dispatcher.test.ts +++ b/packages/engine/src/execution/dispatcher.test.ts @@ -120,5 +120,49 @@ describe("Dispatcher", () => { await dispatcher.stop(); }); + + sidequestTest( + "claims min(availableSlots, globalSlots) jobs when queue has more slots than global", + async ({ backend }) => { + // Setup: Queue has concurrency of 10, but global has only 3 slots + const configWithHighQueueConcurrency: EngineConfig = { + backend: { driver: "@sidequest/sqlite-backend" }, + queues: [{ name: "default", concurrency: 10 }], + maxConcurrentJobs: 3, + }; + + // Create 5 jobs to ensure there are enough jobs to claim + await createJob(backend, "default"); + await createJob(backend, "default"); + await createJob(backend, "default"); + await createJob(backend, "default"); + + expect(await backend.listJobs({ state: "waiting" })).toHaveLength(5); + + const mockClaim = vi.spyOn(backend, "claimPendingJob"); + + const dispatcher = new Dispatcher( + backend, + new QueueManager(backend, configWithHighQueueConcurrency.queues!), + new ExecutorManager(backend, configWithHighQueueConcurrency as NonNullableEngineConfig), + 100, + ); + dispatcher.start(); + + runMock.mockImplementation(() => { + return { type: "completed", result: "foo", __is_job_transition__: true } as CompletedResult; + }); + + // Wait for the first claim to happen + await vi.waitUntil(() => mockClaim.mock.calls.length > 0); + + // Verify that claimPendingJob was called with Math.min(availableSlots, globalSlots) + // Queue has 10 slots available, global has 3 slots available + // So it should claim min(10, 3) = 3 jobs + expect(mockClaim).toHaveBeenCalledWith("default", 3); + + await dispatcher.stop(); + }, + ); }); }); diff --git a/packages/engine/src/execution/dispatcher.ts b/packages/engine/src/execution/dispatcher.ts index f364c371..493fe82c 100644 --- a/packages/engine/src/execution/dispatcher.ts +++ b/packages/engine/src/execution/dispatcher.ts @@ -48,7 +48,7 @@ export class Dispatcher { continue; } - const jobs: JobData[] = await this.backend.claimPendingJob(queue.name, availableSlots); + const jobs: JobData[] = await this.backend.claimPendingJob(queue.name, Math.min(availableSlots, globalSlots)); if (jobs.length > 0) { // if a job was found on any queue do not sleep @@ -56,6 +56,10 @@ export class Dispatcher { } for (const job of jobs) { + // adds jobs to active sets before execution to avoid race conditions + // because the execution is not awaited. This way we ensure that available slots + // are correctly calculated. + this.executorManager.queueJob(queue, job); // does not await for job execution. void this.executorManager.execute(queue, job); } diff --git a/packages/engine/src/execution/executor-manager.test.ts b/packages/engine/src/execution/executor-manager.test.ts index b0168384..f3cf691c 100644 --- a/packages/engine/src/execution/executor-manager.test.ts +++ b/packages/engine/src/execution/executor-manager.test.ts @@ -171,46 +171,6 @@ describe("ExecutorManager", () => { await executorManager.destroy(); }); - - sidequestTest("snoozes job when queue is full", async ({ backend, config }) => { - const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 }); - const executorManager = new ExecutorManager(backend, config); - - vi.spyOn(executorManager, "availableSlotsByQueue").mockReturnValue(0); - - // Set up job in claimed state (as it would be when passed to execute) - jobData = await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date() }); - - await executorManager.execute(queryConfig, jobData); - - // Verify the job runner was NOT called since the job was snoozed - expect(runMock).not.toHaveBeenCalled(); - - // Verify slots remain unchanged (no job was actually executed) - expect(executorManager.availableSlotsByQueue(queryConfig)).toEqual(0); - expect(executorManager.totalActiveWorkers()).toEqual(0); - await executorManager.destroy(); - }); - - sidequestTest("snoozes job when global slots are full", async ({ backend, config }) => { - const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 5 }); - const executorManager = new ExecutorManager(backend, { ...config, maxConcurrentJobs: 1 }); - - vi.spyOn(executorManager, "availableSlotsGlobal").mockReturnValue(0); - - // Set up job in claimed state - jobData = await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date() }); - - await executorManager.execute(queryConfig, jobData); - - // Verify the job runner was NOT called - expect(runMock).not.toHaveBeenCalled(); - - // Verify global slots show as full - expect(executorManager.availableSlotsGlobal()).toEqual(0); - expect(executorManager.totalActiveWorkers()).toEqual(0); - await executorManager.destroy(); - }); }); describe("availableSlotsByQueue", () => { diff --git a/packages/engine/src/execution/executor-manager.ts b/packages/engine/src/execution/executor-manager.ts index 0c7553d7..f97b357d 100644 --- a/packages/engine/src/execution/executor-manager.ts +++ b/packages/engine/src/execution/executor-manager.ts @@ -1,13 +1,5 @@ import { Backend } from "@sidequest/backend"; -import { - JobData, - JobTransitionFactory, - logger, - QueueConfig, - RetryTransition, - RunTransition, - SnoozeTransition, -} from "@sidequest/core"; +import { JobData, JobTransitionFactory, logger, QueueConfig, RetryTransition, RunTransition } from "@sidequest/core"; import EventEmitter from "events"; import { inspect } from "util"; import { NonNullableEngineConfig } from "../engine"; @@ -77,44 +69,49 @@ export class ExecutorManager { } /** - * Executes a job in the given queue. + * Prepares a job for execution by marking it as active and adding it to a queue slot. * @param queueConfig The queue configuration. - * @param job The job data to execute. + * @param job The job data. */ - async execute(queueConfig: QueueConfig, job: JobData): Promise { - logger("Executor Manager").debug(`Submitting job ${job.id} for execution in queue ${queueConfig.name}`); + queueJob(queueConfig: QueueConfig, job: JobData) { if (!this.activeByQueue[queueConfig.name]) { this.activeByQueue[queueConfig.name] = new Set(); } - - if (this.availableSlotsByQueue(queueConfig) <= 0 || this.availableSlotsGlobal() <= 0) { - logger("Executor Manager").debug(`No available slots for job ${job.id} in queue ${queueConfig.name}`); - await JobTransitioner.apply(this.backend, job, new SnoozeTransition(0)); - return; - } - this.activeByQueue[queueConfig.name].add(job.id); this.activeJobs.add(job.id); + } - job = await JobTransitioner.apply(this.backend, job, new RunTransition()); - - const signal = new EventEmitter(); - let isRunning = true; - const cancellationCheck = async () => { - while (isRunning) { - const watchedJob = await this.backend.getJob(job.id); - if (watchedJob!.state === "canceled") { - logger("Executor Manager").debug(`Emitting abort signal for job ${job.id}`); - signal.emit("abort"); - isRunning = false; - return; + /** + * Executes a job in the given queue. + * @param queueConfig The queue configuration. + * @param job The job data to execute. + */ + async execute(queueConfig: QueueConfig, job: JobData): Promise { + let isRunning = false; + try { + logger("Executor Manager").debug(`Submitting job ${job.id} for execution in queue ${queueConfig.name}`); + // We call prepareJob here again to make sure the jobs are in the queues. + // This might not be necessary, but for the sake of consistency we do it. + this.queueJob(queueConfig, job); + + job = await JobTransitioner.apply(this.backend, job, new RunTransition()); + + isRunning = true; + const signal = new EventEmitter(); + const cancellationCheck = async () => { + while (isRunning) { + const watchedJob = await this.backend.getJob(job.id); + if (watchedJob!.state === "canceled") { + logger("Executor Manager").debug(`Emitting abort signal for job ${job.id}`); + signal.emit("abort"); + isRunning = false; + return; + } + await new Promise((r) => setTimeout(r, 1000)); } - await new Promise((r) => setTimeout(r, 1000)); - } - }; - void cancellationCheck(); + }; + void cancellationCheck(); - try { logger("Executor Manager").debug(`Running job ${job.id} in queue ${queueConfig.name}`); const runPromise = this.runnerPool.run(job, signal); From b56f57e340616d65312b96d925d16e11f62025e5 Mon Sep 17 00:00:00 2001 From: Giovani Guizzo Date: Mon, 5 Jan 2026 20:13:52 -0300 Subject: [PATCH 2/5] fix: update test to create 4 jobs for claiming logic --- packages/engine/src/execution/dispatcher.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/engine/src/execution/dispatcher.test.ts b/packages/engine/src/execution/dispatcher.test.ts index ff1a9aea..528cd9f0 100644 --- a/packages/engine/src/execution/dispatcher.test.ts +++ b/packages/engine/src/execution/dispatcher.test.ts @@ -131,7 +131,7 @@ describe("Dispatcher", () => { maxConcurrentJobs: 3, }; - // Create 5 jobs to ensure there are enough jobs to claim + // Create 4 jobs to ensure there are enough jobs to claim await createJob(backend, "default"); await createJob(backend, "default"); await createJob(backend, "default"); From 9aef0ceaf4cc96906c07b9cfb79efc74b0e297d1 Mon Sep 17 00:00:00 2001 From: Giovani Guizzo Date: Tue, 6 Jan 2026 11:31:37 -0300 Subject: [PATCH 3/5] fix: refactor CI workflow to use setup action and streamline job steps --- .github/actions/setup/action.yml | 31 ++++++++++++ .github/workflows/pull-request.yml | 76 +++++++++++++++++++++--------- 2 files changed, 84 insertions(+), 23 deletions(-) create mode 100644 .github/actions/setup/action.yml diff --git a/.github/actions/setup/action.yml b/.github/actions/setup/action.yml new file mode 100644 index 00000000..936e5522 --- /dev/null +++ b/.github/actions/setup/action.yml @@ -0,0 +1,31 @@ +name: Setup Project +description: Checkout code, setup Node.js, restore build cache, and setup Yarn + +inputs: + node-version: + description: 'Node.js version to use' + required: false + default: '22' + +runs: + using: composite + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Set up Node.js + uses: actions/setup-node@v4 + with: + node-version: ${{ inputs.node-version }} + + - name: Restore build outputs + uses: actions/cache@v4 + with: + path: | + **/dist + key: ${{ runner.os }}-build-${{ hashFiles('**/yarn.lock') }}-${{ github.sha }} + + - name: Setup Latest Yarn + uses: threeal/setup-yarn-action@v2.0.0 + with: + version: berry diff --git a/.github/workflows/pull-request.yml b/.github/workflows/pull-request.yml index ba824784..3ca74ae1 100644 --- a/.github/workflows/pull-request.yml +++ b/.github/workflows/pull-request.yml @@ -15,9 +15,38 @@ concurrency: cancel-in-progress: true jobs: - build-and-test: + build: runs-on: ubuntu-latest + steps: + - name: Setup project + uses: ./.github/actions/setup + + - name: Build project + run: yarn build + + format-check: + runs-on: ubuntu-latest + needs: build + steps: + - name: Setup project + uses: ./.github/actions/setup + + - name: Run format checker + run: yarn format:check + lint: + runs-on: ubuntu-latest + needs: build + steps: + - name: Setup project + uses: ./.github/actions/setup + + - name: Run linter + run: yarn lint + + test: + runs-on: ubuntu-latest + needs: build services: postgres: image: postgres:latest @@ -56,28 +85,8 @@ jobs: --health-timeout=5s --health-retries=5 steps: - - name: Checkout repository - uses: actions/checkout@v4 - - - name: Set up Node.js - uses: actions/setup-node@v4 - with: - node-version: "22" - - - name: Setup Latest Yarn - uses: threeal/setup-yarn-action@v2.0.0 - with: - version: berry - cache: false - - - name: Build project - run: yarn build - - - name: Run format checker - run: yarn format:check - - - name: Run linter - run: yarn lint + - name: Setup project + uses: ./.github/actions/setup - name: Run tests env: @@ -86,6 +95,27 @@ jobs: MONGODB_URL: mongodb://127.0.0.1:27017/test run: yarn test:all:ci + integration-test: + runs-on: ubuntu-latest + needs: build + services: + postgres: + image: postgres:latest + ports: + - 5432:5432 + env: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + POSTGRES_DB: postgres + options: >- + --health-cmd="pg_isready -U postgres" + --health-interval=10s + --health-timeout=5s + --health-retries=5 + steps: + - name: Setup project + uses: ./.github/actions/setup + - name: Run integration tests env: POSTGRES_URL: postgresql://postgres:postgres@localhost:5432/postgres From 0000e2a4babbf7ad13000ec763a543b542dda295 Mon Sep 17 00:00:00 2001 From: Giovani Guizzo Date: Tue, 6 Jan 2026 11:34:16 -0300 Subject: [PATCH 4/5] fix: add checkout step to pull request workflow and update action description --- .github/actions/setup/action.yml | 5 +---- .github/workflows/pull-request.yml | 15 +++++++++++++++ 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/.github/actions/setup/action.yml b/.github/actions/setup/action.yml index 936e5522..7fcfab5f 100644 --- a/.github/actions/setup/action.yml +++ b/.github/actions/setup/action.yml @@ -1,5 +1,5 @@ name: Setup Project -description: Checkout code, setup Node.js, restore build cache, and setup Yarn +description: Setup Node.js, restore build cache, and setup Yarn inputs: node-version: @@ -10,9 +10,6 @@ inputs: runs: using: composite steps: - - name: Checkout repository - uses: actions/checkout@v4 - - name: Set up Node.js uses: actions/setup-node@v4 with: diff --git a/.github/workflows/pull-request.yml b/.github/workflows/pull-request.yml index 3ca74ae1..99f5d27a 100644 --- a/.github/workflows/pull-request.yml +++ b/.github/workflows/pull-request.yml @@ -18,6 +18,9 @@ jobs: build: runs-on: ubuntu-latest steps: + - name: Checkout repository + uses: actions/checkout@v4 + - name: Setup project uses: ./.github/actions/setup @@ -28,6 +31,9 @@ jobs: runs-on: ubuntu-latest needs: build steps: + - name: Checkout repository + uses: actions/checkout@v4 + - name: Setup project uses: ./.github/actions/setup @@ -38,6 +44,9 @@ jobs: runs-on: ubuntu-latest needs: build steps: + - name: Checkout repository + uses: actions/checkout@v4 + - name: Setup project uses: ./.github/actions/setup @@ -85,6 +94,9 @@ jobs: --health-timeout=5s --health-retries=5 steps: + - name: Checkout repository + uses: actions/checkout@v4 + - name: Setup project uses: ./.github/actions/setup @@ -113,6 +125,9 @@ jobs: --health-timeout=5s --health-retries=5 steps: + - name: Checkout repository + uses: actions/checkout@v4 + - name: Setup project uses: ./.github/actions/setup From 32f0416b3f6c54c448bcdce2815f6915fa55692f Mon Sep 17 00:00:00 2001 From: Giovani Guizzo Date: Tue, 6 Jan 2026 11:37:54 -0300 Subject: [PATCH 5/5] fix: add missing cache paths for Yarn and Node.js modules --- .github/actions/setup/action.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/actions/setup/action.yml b/.github/actions/setup/action.yml index 7fcfab5f..a0b1b81b 100644 --- a/.github/actions/setup/action.yml +++ b/.github/actions/setup/action.yml @@ -19,6 +19,9 @@ runs: uses: actions/cache@v4 with: path: | + .yarn + node_modules + **/node_modules **/dist key: ${{ runner.os }}-build-${{ hashFiles('**/yarn.lock') }}-${{ github.sha }}