Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
100f20e
fix: added Math.min to the dispatcher and removed redundant checks
GiovaniGuizzo Jan 5, 2026
b56f57e
fix: update test to create 4 jobs for claiming logic
GiovaniGuizzo Jan 5, 2026
9aef0ce
fix: refactor CI workflow to use setup action and streamline job steps
GiovaniGuizzo Jan 6, 2026
0000e2a
fix: add checkout step to pull request workflow and update action des…
GiovaniGuizzo Jan 6, 2026
32f0416
fix: add missing cache paths for Yarn and Node.js modules
GiovaniGuizzo Jan 6, 2026
d761869
fix: implement dependency registry for managing engine dependencies
GiovaniGuizzo Jan 7, 2026
a98b705
Merge branch 'master' of github.com:sidequestjs/sidequest into refact…
GiovaniGuizzo Jan 7, 2026
b64b7e5
fix: update import path for NonNullableEngineConfig in dependency reg…
GiovaniGuizzo Jan 7, 2026
b6fcf17
refactor: simplify dependency registry key handling by removing union…
GiovaniGuizzo Jan 7, 2026
286ea54
fix: disable cache for Yarn setup action
GiovaniGuizzo Jan 7, 2026
d108fc6
fix: add conditional check for Yarn setup based on cache hit
GiovaniGuizzo Jan 7, 2026
819f764
fix: add id to cache step in action.yml for better reference
GiovaniGuizzo Jan 7, 2026
d917a4b
refactor: streamline Yarn setup by enabling Corepack and installing Y…
GiovaniGuizzo Jan 7, 2026
835c4f7
refactor: reorder Corepack and Yarn Berry setup steps for improved cl…
GiovaniGuizzo Jan 7, 2026
3388dfb
refactor: update job state checks to allow for more flexible job comp…
GiovaniGuizzo Jan 7, 2026
dbca378
refactor: standardize import and export order for job modules
GiovaniGuizzo Jan 7, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions .github/actions/setup/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ runs:
node-version: ${{ inputs.node-version }}

- name: Restore build outputs
id: cache
uses: actions/cache@v4
with:
path: |
Expand All @@ -25,7 +26,15 @@ runs:
**/dist
key: ${{ runner.os }}-build-${{ hashFiles('**/yarn.lock') }}-${{ github.sha }}

- name: Setup Latest Yarn
uses: threeal/setup-yarn-action@v2.0.0
with:
version: berry
- name: Enable Corepack
shell: bash
run: corepack enable

- name: Install Yarn Berry
shell: bash
run: yarn set version berry

- name: Install Dependencies
if: steps.cache.outputs.cache-hit != 'true'
shell: bash
run: yarn install --frozen-lockfile
67 changes: 67 additions & 0 deletions packages/engine/src/dependency-registry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import { Backend } from "@sidequest/backend";
import { NonNullableEngineConfig } from "./engine";

/**
* Enumeration of available dependency tokens for the dependency registry.
* Used as keys to register and retrieve dependencies throughout the engine.
*/
export enum Dependency {
/** Engine configuration */
Config = "config",
/** Backend instance */
Backend = "backend",
}

/**
* Type mapping interface that associates each dependency token with its corresponding type.
* This ensures type safety when registering and retrieving dependencies.
*/
interface DependencyRegistryTypes {
[Dependency.Config]: NonNullableEngineConfig;
[Dependency.Backend]: Backend;
}

/**
* A type-safe dependency injection container for managing core engine dependencies.
* Provides methods to register, retrieve, and clear dependencies used throughout the engine lifecycle.
*/
class DependencyRegistry {
/**
* Internal storage for registered dependencies.
*/
private registry = new Map<Dependency, unknown>();

/**
* Retrieves a registered dependency by its token.
* @param token - The dependency token to look up
* @returns The registered dependency instance, or undefined if not found
*/
get<T extends Dependency>(token: T): DependencyRegistryTypes[T] | undefined {
return this.registry.get(token) as DependencyRegistryTypes[T] | undefined;
}

/**
* Registers a dependency instance with the specified token.
* @param token - The dependency token to register under
* @param instance - The dependency instance to register
* @returns The registered instance
*/
register<T extends Dependency>(token: T, instance: DependencyRegistryTypes[T]) {
this.registry.set(token, instance);
return instance;
}

/**
* Clears all registered dependencies from the registry.
* Useful for cleanup and testing scenarios.
*/
clear() {
this.registry.clear();
}
}

/**
* Singleton instance of the dependency registry.
* This is the main container used throughout the engine to manage dependencies.
*/
export const dependencyRegistry = new DependencyRegistry();
86 changes: 39 additions & 47 deletions packages/engine/src/engine.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { Backend, BackendConfig, LazyBackend, MISC_FALLBACK, NewQueueData, QUEUE_FALLBACK } from "@sidequest/backend";
import { BackendConfig, LazyBackend, MISC_FALLBACK, NewQueueData, QUEUE_FALLBACK } from "@sidequest/backend";
import { configureLogger, JobClassType, logger, LoggerOptions } from "@sidequest/core";
import { ChildProcess, fork } from "child_process";
import { existsSync } from "fs";
import { cpus } from "os";
import { fileURLToPath } from "url";
import { inspect } from "util";
import { DEFAULT_WORKER_PATH } from "./constants";
import { Dependency, dependencyRegistry } from "./dependency-registry";
import { JOB_BUILDER_FALLBACK } from "./job/constants";
import { ScheduledJobRegistry } from "./job/cron-registry";
import { JobBuilder, JobBuilderDefaults } from "./job/job-builder";
Expand Down Expand Up @@ -116,19 +117,6 @@ export type NonNullableEngineConfig = {
* The main engine for managing job queues and workers in Sidequest.
*/
export class Engine {
/**
* Backend instance used by the engine.
* This is initialized when the engine is configured or started.
*/
private backend?: Backend;

/**
* Current configuration of the engine.
* This is set when the engine is configured or started.
* It contains all the necessary settings for the engine to operate, such as backend, queues, logger options, and job defaults.
*/
private config?: NonNullableEngineConfig;

/**
* Main worker process that runs the Sidequest engine.
* This is created when the engine is started and handles job processing.
Expand All @@ -147,11 +135,11 @@ export class Engine {
* @returns The resolved configuration.
*/
async configure(config?: EngineConfig): Promise<NonNullableEngineConfig> {
if (this.config) {
if (this.getConfig()) {
logger("Engine").debug("Sidequest already configured");
return this.config;
return this.getConfig()!;
}
this.config = {
const nonNullConfig: NonNullableEngineConfig = {
queues: config?.queues ?? [],
backend: {
driver: config?.backend?.driver ?? "@sidequest/sqlite-backend",
Expand Down Expand Up @@ -190,21 +178,22 @@ export class Engine {
jobsFilePath: config?.jobsFilePath?.trim() ?? "",
jobPollingInterval: config?.jobPollingInterval ?? 100,
};
dependencyRegistry.register(Dependency.Config, nonNullConfig);

this.validateConfig();

logger("Engine").debug(`Configuring Sidequest engine: ${inspect(this.config)}`);
logger("Engine").debug(`Configuring Sidequest engine: ${inspect(nonNullConfig)}`);

if (this.config.logger) {
configureLogger(this.config.logger);
if (nonNullConfig.logger) {
configureLogger(nonNullConfig.logger);
}

this.backend = new LazyBackend(this.config.backend);
if (!this.config.skipMigration) {
await this.backend.migrate();
const backend = dependencyRegistry.register(Dependency.Backend, new LazyBackend(nonNullConfig.backend));
if (!nonNullConfig.skipMigration) {
await backend.migrate();
}

return this.config;
return nonNullConfig;
}

/**
Expand All @@ -224,18 +213,19 @@ export class Engine {
* - Logs the resolved jobs file path when using manual job resolution
*/
validateConfig() {
if (this.config!.maxConcurrentJobs !== undefined && this.config!.maxConcurrentJobs < 1) {
const config = this.getConfig();
if (config!.maxConcurrentJobs !== undefined && config!.maxConcurrentJobs < 1) {
throw new Error(`Invalid "maxConcurrentJobs" value: must be at least 1.`);
}

if (this.config!.manualJobResolution) {
if (this.config!.jobsFilePath) {
const scriptUrl = resolveScriptPath(this.config!.jobsFilePath);
if (config!.manualJobResolution) {
if (config!.jobsFilePath) {
const scriptUrl = resolveScriptPath(config!.jobsFilePath);
if (!existsSync(fileURLToPath(scriptUrl))) {
throw new Error(`The specified jobsFilePath does not exist. Resolved to: ${scriptUrl}`);
}
logger("Engine").info(`Using manual jobs file at: ${this.config!.jobsFilePath}`);
this.config!.jobsFilePath = scriptUrl;
logger("Engine").info(`Using manual jobs file at: ${config!.jobsFilePath}`);
config!.jobsFilePath = scriptUrl;
} else {
// This should throw an error if not found
findSidequestJobsScriptInParentDirs();
Expand All @@ -253,13 +243,13 @@ export class Engine {
return;
}

await this.configure(config);
const nonNullConfig = await this.configure(config);

logger("Engine").info(`Starting Sidequest using backend ${this.config!.backend.driver}`);
logger("Engine").info(`Starting Sidequest using backend ${nonNullConfig.backend.driver}`);

if (this.config!.queues) {
for (const queue of this.config!.queues) {
await grantQueueConfig(this.backend!, queue, this.config!.queueDefaults, true);
if (nonNullConfig.queues) {
for (const queue of nonNullConfig.queues) {
await grantQueueConfig(dependencyRegistry.get(Dependency.Backend)!, queue, nonNullConfig.queueDefaults, true);
}
}

Expand All @@ -276,7 +266,7 @@ export class Engine {
this.mainWorker.on("message", (msg) => {
if (msg === "ready") {
logger("Engine").debug("Main worker is ready");
this.mainWorker?.send({ type: "start", sidequestConfig: this.config! });
this.mainWorker?.send({ type: "start", sidequestConfig: nonNullConfig });
clearTimeout(timeout);
resolve();
}
Expand All @@ -291,7 +281,7 @@ export class Engine {
};

runWorker();
gracefulShutdown(this.close.bind(this), "Engine", this.config!.gracefulShutdown);
gracefulShutdown(this.close.bind(this), "Engine", nonNullConfig.gracefulShutdown);
}
});
}
Expand All @@ -301,15 +291,15 @@ export class Engine {
* @returns The current configuration, if set.
*/
getConfig() {
return this.config;
return dependencyRegistry.get(Dependency.Config);
}

/**
* Gets the backend instance in use by the engine.
* @returns The backend instance, if set.
*/
getBackend() {
return this.backend;
return dependencyRegistry.get(Dependency.Backend);
}

/**
Expand All @@ -331,18 +321,18 @@ export class Engine {
await promise;
}
try {
await this.backend?.close();
await dependencyRegistry.get(Dependency.Backend)?.close();
} catch (error) {
logger("Engine").error("Error closing backend:", error);
}
this.config = undefined;
this.backend = undefined;
this.mainWorker = undefined;
// Reset the shutting down flag after closing
// This allows the engine to be reconfigured or restarted later
clearGracefulShutdown();
logger("Engine").debug("Sidequest engine closed.");
this.shuttingDown = false;
// Clear the dependency registry to allow fresh configuration later
dependencyRegistry.clear();
}
}

Expand All @@ -352,23 +342,25 @@ export class Engine {
* @returns A new JobBuilder instance for the job class.
*/
build<T extends JobClassType>(JobClass: T) {
if (!this.config || !this.backend) {
const backend = this.getBackend();
const config = this.getConfig();
if (!config || !backend) {
throw new Error("Engine not configured. Call engine.configure() or engine.start() first.");
}
if (this.shuttingDown) {
throw new Error("Engine is shutting down, cannot build job.");
}
logger("Engine").debug(`Building job for class: ${JobClass.name}`);
return new JobBuilder(
this.backend,
backend,
JobClass,
{
...this.config.jobDefaults,
...config.jobDefaults,
// We need to do this check again because available at is a getter. It needs to be set at job creation time.
// If not set, it will use the fallback value which is outdated from config.
availableAt: this.config.jobDefaults.availableAt ?? JOB_BUILDER_FALLBACK.availableAt!,
availableAt: config.jobDefaults.availableAt ?? JOB_BUILDER_FALLBACK.availableAt!,
},
this.config.manualJobResolution,
config.manualJobResolution,
);
}
}
9 changes: 9 additions & 0 deletions sidequest.jobs.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import {
EnqueueFromWithinJob,
FailingJob,
RetryJob,
SuccessJob,
TimeoutJob,
} from "./tests/integration/jobs/test-jobs.js";

export { EnqueueFromWithinJob, FailingJob, RetryJob, SuccessJob, TimeoutJob };
2 changes: 2 additions & 0 deletions tests/fixture.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { test as baseTest } from "vitest";
import { Backend } from "../packages/backends/backend/src";
import { Engine, NonNullableEngineConfig } from "../packages/engine/src";
import { dependencyRegistry } from "../packages/engine/src/dependency-registry";

export interface SidequestTestFixture {
engine: Engine;
Expand Down Expand Up @@ -29,6 +30,7 @@ export const sidequestTest = baseTest.extend<SidequestTestFixture>({
}

await engine.close();
dependencyRegistry.clear();
},

backend: async ({ engine }, use) => {
Expand Down
6 changes: 3 additions & 3 deletions tests/integration/shared-test-suite.js
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ export function createIntegrationTestSuite(Sidequest, jobs, moduleType = "ESM")
// Wait for the two scheduled executions
await vi.waitUntil(async () => {
const currentJobs = await Sidequest.job.list();
return currentJobs.length === 2 && currentJobs.every((job) => job.state === "completed");
return currentJobs.length >= 2 && currentJobs.every((job) => job.state === "completed");
}, 5000);
});

Expand Down Expand Up @@ -609,7 +609,7 @@ export function createIntegrationTestSuite(Sidequest, jobs, moduleType = "ESM")
let currentJobs;
await vi.waitUntil(async () => {
currentJobs = await Sidequest.job.list();
return currentJobs.length === 3 && currentJobs.every((job) => job.state === "completed");
return currentJobs.length >= 3 && currentJobs.every((job) => job.state === "completed");
}, 5000);

const job1Executions = currentJobs.filter((job) => job.args[0] === "job-1");
Expand All @@ -634,7 +634,7 @@ export function createIntegrationTestSuite(Sidequest, jobs, moduleType = "ESM")
let currentJobs;
await vi.waitUntil(async () => {
currentJobs = await Sidequest.job.list();
return currentJobs.length === 2 && currentJobs.every((job) => job.state === "completed");
return currentJobs.length >= 2 && currentJobs.every((job) => job.state === "completed");
}, 5000);

expect(currentJobs.length).toBe(2);
Expand Down
3 changes: 3 additions & 0 deletions tests/integration/sidequest.jobs.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import { EnqueueFromWithinJob, FailingJob, RetryJob, SuccessJob, TimeoutJob } from "./jobs/test-jobs.js";

export { EnqueueFromWithinJob, FailingJob, RetryJob, SuccessJob, TimeoutJob };