Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 88 additions & 5 deletions packages/engine/src/routines/release-stale-jobs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,90 @@ describe("release-stale-jobs.ts", () => {
expect(updateJobSpy).not.toHaveBeenCalled();
});

sidequestTest("should release stale jobs by setting state to waiting", async ({ backend }) => {
sidequestTest("should release stale claimed jobs by setting state to waiting", async ({ backend }) => {
const mockStaleJob = {
id: 1,
queue: "default",
state: "claimed",
script: "/path/to/script.js",
class: "TestJob",
args: [],
constructor_args: [],
attempt: 1,
max_attempts: 3,
claimed_at: new Date(Date.now() - 60000),
} as unknown as JobData;

const staleJobsSpy = vi.spyOn(backend, "staleJobs").mockResolvedValue([mockStaleJob]);
const updateJobSpy = vi.spyOn(backend, "updateJob").mockImplementation((job) => Promise.resolve(job as JobData));

await releaseStaleJobs(backend, 600_000, 60_000);

expect(staleJobsSpy).toHaveBeenCalledOnce();
expect(updateJobSpy).toHaveBeenCalledOnce();

// Claimed jobs should go back to waiting without using JobTransitioner
expect(mockStaleJob.state).toBe("waiting");
expect(updateJobSpy).toHaveBeenCalledWith(mockStaleJob);
});

sidequestTest("should retry stale running jobs using JobTransitioner", async ({ backend }) => {
const mockStaleJob = {
id: 2,
queue: "high",
state: "running",
script: "/path/to/another-script.js",
class: "AnotherTestJob",
args: ["arg1", "arg2"],
constructor_args: [],
attempt: 2,
max_attempts: 5,
claimed_at: new Date(Date.now() - 120000),
} as unknown as JobData;

const staleJobsSpy = vi.spyOn(backend, "staleJobs").mockResolvedValue([mockStaleJob]);
const updateJobSpy = vi.spyOn(backend, "updateJob").mockImplementation((job) => {
return Promise.resolve(job as JobData);
});

await releaseStaleJobs(backend, 600_000, 60_000);

expect(staleJobsSpy).toHaveBeenCalledOnce();
expect(updateJobSpy).toHaveBeenCalledOnce();

// Running jobs should be retried via JobTransitioner, which sets state to waiting
expect(mockStaleJob.state).toBe("waiting");
});

sidequestTest("should fail stale running job at max attempts", async ({ backend }) => {
const mockStaleJob = {
id: 3,
queue: "critical",
state: "running",
script: "/path/to/critical-script.js",
class: "CriticalJob",
args: [],
constructor_args: [],
attempt: 3,
max_attempts: 3,
claimed_at: new Date(Date.now() - 180000),
} as unknown as JobData;

const staleJobsSpy = vi.spyOn(backend, "staleJobs").mockResolvedValue([mockStaleJob]);
const updateJobSpy = vi.spyOn(backend, "updateJob").mockImplementation((job) => {
return Promise.resolve(job as JobData);
});

await releaseStaleJobs(backend, 600_000, 60_000);

expect(staleJobsSpy).toHaveBeenCalledOnce();
expect(updateJobSpy).toHaveBeenCalledOnce();

// Job at max attempts should be marked as failed, not retried
expect(mockStaleJob.state).toBe("failed");
});

sidequestTest("should handle mixed stale jobs (claimed and running)", async ({ backend }) => {
const mockStaleJobs = [
{
id: 1,
Expand Down Expand Up @@ -42,18 +125,18 @@ describe("release-stale-jobs.ts", () => {
] as unknown as JobData[];

const staleJobsSpy = vi.spyOn(backend, "staleJobs").mockResolvedValue(mockStaleJobs);
const updateJobSpy = vi.spyOn(backend, "updateJob").mockImplementation((job) => Promise.resolve(job as JobData));
const updateJobSpy = vi.spyOn(backend, "updateJob").mockImplementation((job) => {
return Promise.resolve(job as JobData);
});

await releaseStaleJobs(backend, 600_000, 60_000);

expect(staleJobsSpy).toHaveBeenCalledOnce();
expect(updateJobSpy).toHaveBeenCalledTimes(2);

// Both should end up in waiting state
expect(mockStaleJobs[0].state).toBe("waiting");
expect(mockStaleJobs[1].state).toBe("waiting");

expect(updateJobSpy).toHaveBeenNthCalledWith(1, mockStaleJobs[0]);
expect(updateJobSpy).toHaveBeenNthCalledWith(2, mockStaleJobs[1]);
});

sidequestTest("should handle single stale job", async ({ backend }) => {
Expand Down
15 changes: 12 additions & 3 deletions packages/engine/src/routines/release-stale-jobs.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Backend } from "@sidequest/backend";
import { logger } from "@sidequest/core";
import { logger, RetryTransition } from "@sidequest/core";
import { inspect } from "util";
import { JobTransitioner } from "../job";

/**
* Finds and releases stale jobs, making them available for processing again.
Expand All @@ -16,8 +17,16 @@ export async function releaseStaleJobs(backend: Backend, maxStaleMs: number, max
logger("Engine").info(`Stale jobs found, making them available to process`);
logger("Engine").debug(`Stale jobs: ${inspect(staleJobs)}`);
for (const jobData of staleJobs) {
jobData.state = "waiting";
await backend.updateJob(jobData);
if (jobData.state === "running") {
// We need to use the JobTransitioner to properly handle retries and state transitions
// This fixes the issue where the release of a stale job incremented the retry count and
// did not respect the maxRetries setting.
await JobTransitioner.apply(backend, jobData, new RetryTransition("Stale job released for retry"));
} else {
// If it's "claimed", then the attempt count was not incremented, so we can just set it back to "waiting"
jobData.state = "waiting";
await backend.updateJob(jobData);
}
}
} else {
logger("Engine").debug(`No stale jobs found`);
Expand Down
9 changes: 0 additions & 9 deletions sidequest.jobs.js

This file was deleted.

3 changes: 0 additions & 3 deletions tests/integration/sidequest.jobs.js

This file was deleted.