diff --git a/.changeset/worker-code-quality-phase-3.md b/.changeset/worker-code-quality-phase-3.md new file mode 100644 index 00000000..4b50cdc5 --- /dev/null +++ b/.changeset/worker-code-quality-phase-3.md @@ -0,0 +1,5 @@ +--- +'@bilbomd/worker': minor +--- + +Enhance worker code quality with centralized configuration and comprehensive test coverage. Extract all magic numbers to config/constants.ts (worker concurrency, polling intervals, retry settings, progress calculation). Consolidate duplicated error handling into shared helpers/errors.ts utility. Add 100% test coverage for mongo-utils.ts and workerControl.ts, plus 63% coverage for job-utils.ts (39 new tests total). Improve runPythonStep.ts coverage from 88% to 92%. Remove dead/commented code across worker files. diff --git a/.claude/settings.local.json b/.claude/settings.local.json index 8a886106..4874c33d 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -19,7 +19,10 @@ "Bash(pnpm -C apps/ui run test:*)", "Bash(pnpm -C apps/worker run test:*)", "Bash(gh run view:*)", - "Bash(gh pr checks:*)" + "Bash(gh pr checks:*)", + "Bash(pnpm -F @bilbomd/worker test:*)", + "Bash(gh pr view:*)", + "Bash(git fetch:*)" ] } -} \ No newline at end of file +} diff --git a/CLAUDE.md b/CLAUDE.md index e7281fb7..52f79030 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -91,6 +91,55 @@ pnpm changeset # Select packages, choose semver bump, write summary Changesets are applied on merge to `main`, producing git tags and Docker image semver tags. `updateInternalDependencies: "patch"` is enabled — bumping an internal package auto-bumps dependents. +### Manual Changeset Creation + +**IMPORTANT**: The `pnpm changeset` command is interactive and won't work in non-TTY environments (like Claude Code CLI). When this happens, create changeset files manually: + +1. **Create a new file** in `.changeset/` with a descriptive kebab-case name: + - Pattern: `.changeset/descriptive-name.md` + - Examples: `worker-code-quality-improvements.md`, `backend-security-fixes.md` + +2. **File format** (YAML front matter + description): + ```markdown + --- + '@bilbomd/package-name': patch|minor|major + --- + + Brief description of changes. Focus on user/developer impact, not implementation details. + ``` + +3. **Semver guidelines**: + - `patch` - Bug fixes, minor improvements, internal refactoring + - `minor` - New features, significant improvements (backwards compatible) + - `major` - Breaking changes + +4. **Examples**: + ```markdown + --- + '@bilbomd/worker': patch + --- + + Improve worker reliability with graceful shutdown handling and MongoDB connection retry logic. + ``` + + ```markdown + --- + '@bilbomd/worker': minor + --- + + Add comprehensive test coverage for critical infrastructure (mongo-utils, job-utils, workerControl). Extract magic numbers to centralized config/constants.ts. Consolidate error handling utilities. + ``` + +5. **Multiple packages** (if changes affect multiple): + ```markdown + --- + '@bilbomd/backend': patch + '@bilbomd/mongodb-schema': patch + --- + + Fix user authentication schema validation and update backend handlers. + ``` + ## Git Branch Naming Convention Use standardized branch prefixes to indicate the type of work. Branch names should use kebab-case (lowercase with hyphens). diff --git a/apps/worker/src/config/constants.ts b/apps/worker/src/config/constants.ts new file mode 100644 index 00000000..3bb13f67 --- /dev/null +++ b/apps/worker/src/config/constants.ts @@ -0,0 +1,76 @@ +/** + * Worker configuration constants + * + * This file centralizes all magic numbers and hardcoded values used throughout + * the worker application to improve maintainability and configuration flexibility. + */ + +// Worker concurrency settings +export const WORKER_CONCURRENCY = { + NERSC: 50, + LOCAL: 1, + MOVIE: 1, + MULTI_MD: 1 +} as const + +// BullMQ lock settings (in milliseconds) +export const LOCK_SETTINGS = { + DURATION: 60_000, // 1 minute + RENEW_TIME: 30_000 // 30 seconds +} as const + +// Polling and monitoring intervals (in milliseconds) +export const INTERVALS = { + TOKEN_CHECK: 300_000, // 5 minutes + JOB_MONITORING: 60_000, // 1 minute + NERSC_TASK_POLL: 2_000, // 2 seconds + NERSC_JOB_POLL: 60_000 // 1 minute +} as const + +// NERSC API retry configuration +export const NERSC_RETRY = { + MAX_ATTEMPTS: 11, + MAX_JOB_RETRIES: 10, + MAX_ITERATIONS: 1_440, // 1440 x 60s = 24 hours + RETRY_DELAY: 60_000 // 1 minute +} as const + +// Progress calculation constants +export const PROGRESS = { + MIN: 20, // Minimum progress percentage + MAX: 90, // Maximum progress percentage + SCALE_FACTOR: 70 // Scale factor for progress calculation (20% to 90%) +} as const + +// Step weights for progress calculation +export const STEP_WEIGHTS: Record = { + alphafold: 20, + pdb2crd: 5, + pae: 5, + autorg: 5, + minimize: 10, + initfoxs: 5, + heat: 10, + md: 30, + dcd2pdb: 10, + foxs: 10, + multifoxs: 10, + copy_results_to_cfs: 5, + results: 3, + email: 1, + nersc_prepare_slurm_batch: 5, + nersc_submit_slurm_batch: 5, + nersc_job_status: 5, + nersc_copy_results_to_cfs: 5 +} as const + +// Server configuration +export const SERVER = { + PORT: 3000 +} as const + +// NERSC paths +// TODO: Make this configurable via environment variable +export const NERSC_PATHS = { + SCRIPT_LOGS_DIR: '/global/homes/s/sclassen/script-logs' +} as const diff --git a/apps/worker/src/helpers/__tests__/runPythonStep.test.ts b/apps/worker/src/helpers/__tests__/runPythonStep.test.ts index a390b6b6..50020fc3 100644 --- a/apps/worker/src/helpers/__tests__/runPythonStep.test.ts +++ b/apps/worker/src/helpers/__tests__/runPythonStep.test.ts @@ -79,4 +79,10 @@ describe('runPythonStep', () => { const result = await runPythonStep('a.py', 'b.yaml') expect(result).toEqual({ code: 42, signal: 'SIGUSR1' }) }) + + it('throws error when spawn fails', async () => { + const spawnError = new Error('ENOENT: python binary not found') + setTimeout(() => mockChild.emit('error', spawnError), 10) + await expect(runPythonStep('a.py', 'b.yaml')).rejects.toThrow('ENOENT: python binary not found') + }) }) diff --git a/apps/worker/src/helpers/errors.ts b/apps/worker/src/helpers/errors.ts new file mode 100644 index 00000000..dca6797a --- /dev/null +++ b/apps/worker/src/helpers/errors.ts @@ -0,0 +1,14 @@ +/** + * Error handling utilities + * + * Shared error handling functions used throughout the worker application + */ + +/** + * Extracts a readable error message from various error types + * + * @param e - The error to extract a message from + * @returns A string representation of the error + */ +export const getErrorMessage = (e: unknown): string => + e instanceof Error ? e.message : typeof e === 'string' ? e : JSON.stringify(e) diff --git a/apps/worker/src/services/functions/__tests__/job-utils.test.ts b/apps/worker/src/services/functions/__tests__/job-utils.test.ts new file mode 100644 index 00000000..e1d443c8 --- /dev/null +++ b/apps/worker/src/services/functions/__tests__/job-utils.test.ts @@ -0,0 +1,428 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest' +import { + initializeJob, + cleanupJob, + makeDir, + makeFile, + generateInputFile, + handleError +} from '../job-utils.js' +import { User, type IJob, type IUser } from '@bilbomd/mongodb-schema' +import { Job as BullMQJob } from 'bullmq' +import { logger } from '../../../helpers/loggers.js' +import { sendJobCompleteEmail } from '../../../helpers/mailer.js' +import { config } from '../../../config/config.js' +import fs from 'fs-extra' +import { updateStepStatus, updateJobStatus } from '../mongo-utils.js' +import { Types } from 'mongoose' + +vi.mock('../../../helpers/loggers.js', () => ({ + logger: { + error: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + debug: vi.fn() + } +})) + +vi.mock('../../../helpers/mailer.js', () => ({ + sendJobCompleteEmail: vi.fn() +})) + +vi.mock('../mongo-utils.js', () => ({ + updateStepStatus: vi.fn(), + updateJobStatus: vi.fn() +})) + +vi.mock('fs-extra', () => ({ + default: { + ensureDir: vi.fn(), + ensureFile: vi.fn(), + readFile: vi.fn(), + promises: { + writeFile: vi.fn() + } + } +})) + +vi.mock('@bilbomd/mongodb-schema', async () => { + const actual = await vi.importActual('@bilbomd/mongodb-schema') + return { + ...actual, + User: { + findById: vi.fn() + } + } +}) + +describe('job-utils', () => { + beforeEach(() => { + vi.clearAllMocks() + }) + + describe('initializeJob', () => { + it('should initialize job successfully', async () => { + const mockMQJob = { + clearLogs: vi.fn().mockResolvedValue(undefined) + } as unknown as BullMQJob + + const mockDBJob = { + status: 'Pending', + time_started: undefined, + save: vi.fn().mockResolvedValue(undefined) + } as unknown as IJob + + await initializeJob(mockMQJob, mockDBJob) + + expect(mockMQJob.clearLogs).toHaveBeenCalledTimes(1) + expect(mockDBJob.status).toBe('Running') + expect(mockDBJob.time_started).toBeInstanceOf(Date) + expect(mockDBJob.save).toHaveBeenCalledTimes(1) + }) + + it('should log and throw error if initialization fails', async () => { + const mockMQJob = { + clearLogs: vi.fn().mockResolvedValue(undefined) + } as unknown as BullMQJob + + const mockDBJob = { + status: 'Pending', + save: vi.fn().mockRejectedValue(new Error('Database error')) + } as unknown as IJob + + await expect(initializeJob(mockMQJob, mockDBJob)).rejects.toThrow('Database error') + expect(logger.error).toHaveBeenCalledWith( + expect.stringContaining('Error in initializeJob') + ) + }) + }) + + describe('cleanupJob', () => { + it('should cleanup job with user and send email', async () => { + const mockUser: IUser = { + _id: new Types.ObjectId(), + email: 'user@example.com', + username: 'testuser' + } as IUser + + const mockMQJob = { + log: vi.fn().mockResolvedValue(undefined) + } as unknown as BullMQJob + + const mockDBJob = { + _id: new Types.ObjectId(), + uuid: 'test-uuid', + title: 'Test Job', + user: mockUser, + status: 'Running', + time_completed: undefined, + progress: 0, + save: vi.fn().mockResolvedValue(undefined) + } as unknown as IJob + + vi.mocked(config).sendEmailNotifications = true + vi.mocked(config).bilbomdUrl = 'http://localhost:3000' + + await cleanupJob(mockMQJob, mockDBJob) + + expect(mockDBJob.status).toBe('Completed') + expect(mockDBJob.time_completed).toBeInstanceOf(Date) + expect(mockDBJob.save).toHaveBeenCalled() + expect(updateStepStatus).toHaveBeenCalledWith( + mockDBJob, + 'email', + expect.objectContaining({ status: 'Running' }) + ) + expect(sendJobCompleteEmail).toHaveBeenCalledWith( + 'user@example.com', + 'http://localhost:3000', + mockDBJob._id.toString(), + 'Test Job', + false + ) + }) + + it('should cleanup job without user and skip email', async () => { + const mockMQJob = {} as unknown as BullMQJob + + const mockDBJob = { + _id: new Types.ObjectId(), + uuid: 'test-uuid-no-user', + user: undefined, + status: 'Running', + time_completed: undefined, + progress: 0, + save: vi.fn().mockResolvedValue(undefined) + } as unknown as IJob + + await cleanupJob(mockMQJob, mockDBJob) + + expect(mockDBJob.status).toBe('Completed') + expect(mockDBJob.progress).toBe(100) + expect(mockDBJob.save).toHaveBeenCalled() + expect(logger.info).toHaveBeenCalledWith( + expect.stringContaining('no user associated with job') + ) + expect(sendJobCompleteEmail).not.toHaveBeenCalled() + }) + + it('should handle email sending errors gracefully', async () => { + const mockUser: IUser = { + _id: new Types.ObjectId(), + email: 'user@example.com', + username: 'testuser' + } as IUser + + const mockMQJob = { + log: vi.fn().mockResolvedValue(undefined) + } as unknown as BullMQJob + + const mockDBJob = { + _id: new Types.ObjectId(), + uuid: 'test-uuid', + title: 'Test Job', + user: mockUser, + status: 'Running', + time_completed: undefined, + save: vi.fn().mockResolvedValue(undefined) + } as unknown as IJob + + vi.mocked(config).sendEmailNotifications = true + vi.mocked(sendJobCompleteEmail).mockImplementation(() => { + throw new Error('Email service unavailable') + }) + + await cleanupJob(mockMQJob, mockDBJob) + + expect(logger.error).toHaveBeenCalledWith( + expect.stringContaining('Failed to send email') + ) + expect(updateStepStatus).toHaveBeenCalledWith( + mockDBJob, + 'email', + expect.objectContaining({ status: 'Error' }) + ) + }) + + it('should skip email if notifications are disabled', async () => { + const mockUser: IUser = { + _id: new Types.ObjectId(), + email: 'user@example.com', + username: 'testuser' + } as IUser + + const mockMQJob = {} as unknown as BullMQJob + + const mockDBJob = { + _id: new Types.ObjectId(), + uuid: 'test-uuid', + user: mockUser, + status: 'Running', + time_completed: undefined, + save: vi.fn().mockResolvedValue(undefined) + } as unknown as IJob + + vi.mocked(config).sendEmailNotifications = false + + await cleanupJob(mockMQJob, mockDBJob) + + expect(sendJobCompleteEmail).not.toHaveBeenCalled() + expect(logger.info).toHaveBeenCalledWith( + expect.stringContaining('email notifications disabled') + ) + }) + + it('should handle user lookup from ObjectId', async () => { + const userId = new Types.ObjectId() + const mockUser: IUser = { + _id: userId, + email: 'found@example.com', + username: 'founduser' + } as IUser + + const mockMQJob = {} as unknown as BullMQJob + + // User as string ObjectId (as it comes from MongoDB before population) + const mockDBJob = { + _id: new Types.ObjectId(), + uuid: 'test-uuid', + user: userId.toString(), + status: 'Running', + time_completed: undefined, + save: vi.fn().mockResolvedValue(undefined) + } as unknown as IJob + + vi.mocked(User.findById).mockReturnValue({ + lean: vi.fn().mockReturnValue({ + exec: vi.fn().mockResolvedValue(mockUser) + }) + } as unknown as ReturnType) + + vi.mocked(config).sendEmailNotifications = false + + await cleanupJob(mockMQJob, mockDBJob) + + expect(User.findById).toHaveBeenCalledWith(userId.toString()) + expect(mockDBJob.status).toBe('Completed') + }) + }) + + describe('makeDir', () => { + it('should create directory and log', async () => { + const directory = '/tmp/test-dir' + + await makeDir(directory) + + expect(fs.ensureDir).toHaveBeenCalledWith(directory) + expect(logger.info).toHaveBeenCalledWith(`Create Dir: ${directory}`) + }) + }) + + describe('makeFile', () => { + it('should ensure file exists', async () => { + const file = '/tmp/test-file.txt' + + await makeFile(file) + + expect(fs.ensureFile).toHaveBeenCalledWith(file) + }) + }) + + describe('generateInputFile', () => { + it('should generate input file from template', async () => { + const mockParams = { + charmm_template: 'minimize', + charmm_inp_file: 'minimize.inp', + out_dir: '/tmp/job-dir' + } + + const mockTemplate = 'CHARMM input template with {{charmm_inp_file}}' + + vi.mocked(fs.readFile).mockResolvedValue(mockTemplate) + vi.mocked(config).charmmTemplateDir = '/templates' + + await generateInputFile(mockParams) + + expect(fs.readFile).toHaveBeenCalledWith( + '/templates/minimize.handlebars', + 'utf8' + ) + expect(fs.promises.writeFile).toHaveBeenCalledWith( + '/tmp/job-dir/minimize.inp', + expect.any(String) + ) + }) + + it('should handle template read errors', async () => { + const mockParams = { + charmm_template: 'nonexistent', + charmm_inp_file: 'test.inp', + out_dir: '/tmp' + } + + vi.mocked(fs.readFile).mockRejectedValue(new Error('Template not found')) + vi.mocked(config).charmmTemplateDir = '/templates' + + await expect(generateInputFile(mockParams)).rejects.toThrow('Template not found') + expect(logger.error).toHaveBeenCalledWith( + expect.stringContaining('Error in readTemplate') + ) + }) + }) + + describe('handleError', () => { + it('should handle Error objects and update job/step status', async () => { + const mockDBJob = { + _id: new Types.ObjectId(), + uuid: 'error-test-uuid', + title: 'Error Test Job', + __t: 'BilboMDPDBJob', + status: 'Running' + } as unknown as IJob + + const error = new Error('Test error message') + const step = 'minimize' + + await expect(handleError(error, mockDBJob, step)).rejects.toThrow( + "BilboMD failed in step 'minimize': Test error message" + ) + + expect(logger.error).toHaveBeenCalledWith( + expect.stringContaining('handleError - Error object details') + ) + expect(updateJobStatus).toHaveBeenCalledWith(mockDBJob, 'Error') + expect(updateStepStatus).toHaveBeenCalledWith( + mockDBJob, + 'minimize', + expect.objectContaining({ + status: 'Error', + message: expect.stringContaining('Test error message') + }) + ) + }) + + it('should handle non-Error objects', async () => { + const mockDBJob = { + _id: new Types.ObjectId(), + uuid: 'error-test-uuid', + title: 'Error Test Job', + __t: 'BilboMDPDBJob', + status: 'Running' + } as unknown as IJob + + const error = 'String error' + const step = 'heat' + + await expect(handleError(error, mockDBJob, step)).rejects.toThrow( + "BilboMD failed in step 'heat': String error" + ) + + expect(logger.error).toHaveBeenCalledWith( + expect.stringContaining('handleError - Non-Error object') + ) + }) + + it('should handle errors without step information', async () => { + const mockDBJob = { + _id: new Types.ObjectId(), + uuid: 'error-test-uuid', + title: 'Error Test Job', + __t: 'BilboMDPDBJob', + status: 'Running' + } as unknown as IJob + + const error = new Error('General error') + + await expect(handleError(error, mockDBJob)).rejects.toThrow( + "BilboMD failed in step 'unknown': General error" + ) + + expect(logger.error).toHaveBeenCalledWith( + expect.stringContaining('Step not provided when handling error') + ) + expect(updateJobStatus).toHaveBeenCalledWith(mockDBJob, 'Error') + expect(updateStepStatus).not.toHaveBeenCalled() + }) + + it('should handle updateJobStatus failures gracefully', async () => { + const mockDBJob = { + _id: new Types.ObjectId(), + uuid: 'error-test-uuid', + title: 'Error Test Job', + __t: 'BilboMDPDBJob', + status: 'Running' + } as unknown as IJob + + const error = new Error('Original error') + vi.mocked(updateJobStatus).mockRejectedValue(new Error('Update failed')) + + await expect(handleError(error, mockDBJob, 'md')).rejects.toThrow( + "BilboMD failed in step 'md': Original error" + ) + + expect(logger.error).toHaveBeenCalledWith( + expect.stringContaining('Failed to update job status') + ) + }) + }) +}) diff --git a/apps/worker/src/services/functions/__tests__/mongo-utils.test.ts b/apps/worker/src/services/functions/__tests__/mongo-utils.test.ts new file mode 100644 index 00000000..afad4c20 --- /dev/null +++ b/apps/worker/src/services/functions/__tests__/mongo-utils.test.ts @@ -0,0 +1,204 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest' +import { updateStepStatus, handleStepError, updateJobStatus } from '../mongo-utils.js' +import { Job, type IJob, type IMultiJob, type IBilboMDSteps } from '@bilbomd/mongodb-schema' +import { logger } from '../../../helpers/loggers.js' + +vi.mock('../../../helpers/loggers.js', () => ({ + logger: { + error: vi.fn(), + info: vi.fn(), + warn: vi.fn() + } +})) + +vi.mock('@bilbomd/mongodb-schema', async () => { + const actual = await vi.importActual('@bilbomd/mongodb-schema') + return { + ...actual, + Job: { + findByIdAndUpdate: vi.fn() + } + } +}) + +describe('mongo-utils', () => { + beforeEach(() => { + vi.clearAllMocks() + }) + + describe('updateStepStatus', () => { + it('should update step status and save job successfully', async () => { + const mockJob = { + _id: 'test-job-id', + steps: { + minimize: { status: 'Pending', message: '' } + } as IBilboMDSteps, + save: vi.fn().mockResolvedValue(undefined) + } as unknown as IJob + + const newStatus = { status: 'Success' as const, message: 'Completed' } + + await updateStepStatus(mockJob, 'minimize', newStatus) + + expect(mockJob.steps.minimize).toEqual(newStatus) + expect(mockJob.save).toHaveBeenCalledTimes(1) + }) + + it('should initialize steps object if it does not exist', async () => { + const mockJob = { + _id: 'test-job-id', + steps: undefined, + save: vi.fn().mockResolvedValue(undefined) + } as unknown as IJob + + const newStatus = { status: 'Running' as const, message: 'In progress' } + + await updateStepStatus(mockJob, 'heat', newStatus) + + expect(mockJob.steps).toBeDefined() + expect(mockJob.steps?.heat).toEqual(newStatus) + expect(mockJob.save).toHaveBeenCalledTimes(1) + }) + + it('should handle save errors gracefully', async () => { + const mockJob = { + _id: 'test-job-id', + steps: {} as IBilboMDSteps, + save: vi.fn().mockRejectedValue(new Error('Database connection failed')) + } as unknown as IJob + + const newStatus = { status: 'Error' as const, message: 'Failed' } + + await updateStepStatus(mockJob, 'md', newStatus) + + expect(logger.error).toHaveBeenCalledWith( + expect.stringContaining('Error updating step status for job test-job-id') + ) + }) + + it('should work with IMultiJob instances', async () => { + const mockMultiJob = { + _id: 'multi-job-id', + steps: {} as IBilboMDSteps, + save: vi.fn().mockResolvedValue(undefined) + } as unknown as IMultiJob + + const newStatus = { status: 'Success' as const, message: 'Done' } + + await updateStepStatus(mockMultiJob, 'multifoxs', newStatus) + + expect(mockMultiJob.steps.multifoxs).toEqual(newStatus) + expect(mockMultiJob.save).toHaveBeenCalledTimes(1) + }) + }) + + describe('handleStepError', () => { + it('should update step status to Error and log the error with Error object', async () => { + const jobId = 'job-123' + const stepName = 'foxs' + const error = new Error('FoXS calculation failed') + + vi.mocked(Job.findByIdAndUpdate).mockResolvedValue(null) + + await handleStepError(jobId, stepName, error) + + expect(Job.findByIdAndUpdate).toHaveBeenCalledWith( + jobId, + { 'steps.foxs.status': 'Error' }, + { new: true } + ) + expect(logger.error).toHaveBeenCalledWith( + 'Error in foxs: FoXS calculation failed' + ) + }) + + it('should handle non-Error objects by converting to string', async () => { + const jobId = 'job-456' + const stepName = 'minimize' + const error = 'String error message' + + vi.mocked(Job.findByIdAndUpdate).mockResolvedValue(null) + + await handleStepError(jobId, stepName, error) + + expect(Job.findByIdAndUpdate).toHaveBeenCalledWith( + jobId, + { 'steps.minimize.status': 'Error' }, + { new: true } + ) + expect(logger.error).toHaveBeenCalledWith( + 'Error in minimize: String error message' + ) + }) + + it('should handle non-string error values', async () => { + const jobId = 'job-789' + const stepName = 'heat' + const error = { code: 500, message: 'Server error' } + + vi.mocked(Job.findByIdAndUpdate).mockResolvedValue(null) + + await handleStepError(jobId, stepName, error) + + expect(Job.findByIdAndUpdate).toHaveBeenCalledWith( + jobId, + { 'steps.heat.status': 'Error' }, + { new: true } + ) + expect(logger.error).toHaveBeenCalledWith( + expect.stringContaining('Error in heat:') + ) + }) + }) + + describe('updateJobStatus', () => { + it('should update job status and save successfully', async () => { + const mockJob = { + _id: 'job-update-test', + status: 'Pending', + save: vi.fn().mockResolvedValue(undefined) + } as unknown as IJob + + await updateJobStatus(mockJob, 'Running') + + expect(mockJob.status).toBe('Running') + expect(mockJob.save).toHaveBeenCalledTimes(1) + }) + + it('should update job status to Completed', async () => { + const mockJob = { + _id: 'job-complete', + status: 'Running', + save: vi.fn().mockResolvedValue(undefined) + } as unknown as IJob + + await updateJobStatus(mockJob, 'Completed') + + expect(mockJob.status).toBe('Completed') + expect(mockJob.save).toHaveBeenCalledTimes(1) + }) + + it('should update job status to Error', async () => { + const mockJob = { + _id: 'job-error', + status: 'Running', + save: vi.fn().mockResolvedValue(undefined) + } as unknown as IJob + + await updateJobStatus(mockJob, 'Error') + + expect(mockJob.status).toBe('Error') + expect(mockJob.save).toHaveBeenCalledTimes(1) + }) + + it('should throw error if save fails', async () => { + const mockJob = { + _id: 'job-save-fail', + status: 'Pending', + save: vi.fn().mockRejectedValue(new Error('Save failed')) + } as unknown as IJob + + await expect(updateJobStatus(mockJob, 'Running')).rejects.toThrow('Save failed') + }) + }) +}) diff --git a/apps/worker/src/services/functions/bilbomd-functions.ts b/apps/worker/src/services/functions/bilbomd-functions.ts index 77a4922f..f25ec858 100644 --- a/apps/worker/src/services/functions/bilbomd-functions.ts +++ b/apps/worker/src/services/functions/bilbomd-functions.ts @@ -15,12 +15,10 @@ import path from 'path' import { updateStepStatus } from './mongo-utils.js' import { config } from '../../config/config.js' import { logger } from '../../helpers/loggers.js' +import { getErrorMessage } from '../../helpers/errors.js' import fs from 'fs-extra' import { Job as BullMQJob } from 'bullmq' -const getErrorMessage = (e: unknown): string => - e instanceof Error ? e.message : typeof e === 'string' ? e : JSON.stringify(e) - interface FoxsRunDir { dir: string rg: number @@ -213,7 +211,6 @@ const writeSegidToChainid = async (inputFile: string): Promise => { // Join the modified lines and overwrite the original file await fs.promises.writeFile(inputFile, modifiedLines.join('\n'), 'utf-8') - // logger.info(`Processed PDB file saved as ${inputFile}`) } catch (error: unknown) { logger.error('Error processing the PDB file:', error) } diff --git a/apps/worker/src/services/functions/job-utils.ts b/apps/worker/src/services/functions/job-utils.ts index 8ee6b76c..d08aef55 100644 --- a/apps/worker/src/services/functions/job-utils.ts +++ b/apps/worker/src/services/functions/job-utils.ts @@ -14,11 +14,9 @@ import path from 'path' import { spawn, ChildProcess } from 'node:child_process' import Handlebars from 'handlebars' import { updateStepStatus, updateJobStatus } from './mongo-utils.js' +import { getErrorMessage } from '../../helpers/errors.js' import { Types } from 'mongoose' -const getErrorMessage = (e: unknown): string => - e instanceof Error ? e.message : typeof e === 'string' ? e : JSON.stringify(e) - const initializeJob = async (MQJob: BullMQJob, DBjob: IJob): Promise => { try { // Clear the BullMQ Job logs in the case this job is being re-run diff --git a/apps/worker/src/services/functions/nersc-api-functions.ts b/apps/worker/src/services/functions/nersc-api-functions.ts index f79aaa25..fe52cf25 100644 --- a/apps/worker/src/services/functions/nersc-api-functions.ts +++ b/apps/worker/src/services/functions/nersc-api-functions.ts @@ -4,6 +4,13 @@ import axiosRetry from 'axios-retry' import qs from 'qs' import { logger } from '../../helpers/loggers.js' import { config } from '../../config/config.js' +import { + STEP_WEIGHTS, + PROGRESS, + NERSC_RETRY, + INTERVALS, + NERSC_PATHS +} from '../../config/constants.js' import { IBilboMDSteps, IJob, @@ -17,29 +24,11 @@ import { Job as BullMQJob } from 'bullmq' const environment: string = process.env.NODE_ENV || 'development' -const stepWeights: { [key: string]: number } = { - alphafold: 20, - pdb2crd: 5, - pae: 5, - autorg: 5, - minimize: 10, - initfoxs: 5, - heat: 10, - md: 30, - dcd2pdb: 10, - foxs: 10, - multifoxs: 10, - copy_results_to_cfs: 5, - results: 3, - email: 1, - nersc_prepare_slurm_batch: 5, - nersc_submit_slurm_batch: 5, - nersc_job_status: 5, - nersc_copy_results_to_cfs: 5 -} - // Configure axios to retry on failure -axiosRetry(axios, { retries: 11, retryDelay: axiosRetry.exponentialDelay }) +axiosRetry(axios, { + retries: NERSC_RETRY.MAX_ATTEMPTS, + retryDelay: axiosRetry.exponentialDelay +}) const executeNerscScript = async ( scriptName: string, @@ -55,8 +44,7 @@ const executeNerscScript = async ( Authorization: `Bearer ${token}` } const scriptBaseName = path.basename(scriptName) - // /global/homes/${username_first_letter}/${username}/script-logs/${scriptBaseName}-${new Date().toISOString()}.log - const logFile = `/global/homes/s/sclassen/script-logs/${scriptBaseName}-${new Date().toISOString()}.log` + const logFile = `${NERSC_PATHS.SCRIPT_LOGS_DIR}/${scriptBaseName}-${new Date().toISOString()}.log` const cmd = `ENVIRONMENT=${environment} ${scriptName} ${scriptArgs} > ${logFile} 2>&1` logger.info(`Executing command: ${cmd}`) @@ -152,7 +140,7 @@ const monitorTaskAtNERSC = async ( do { await makeRequest() - await new Promise((resolve) => setTimeout(resolve, 2000)) + await new Promise((resolve) => setTimeout(resolve, INTERVALS.NERSC_TASK_POLL)) } while (status !== 'completed' && status !== 'failed') if (!statusResponse) { @@ -177,8 +165,8 @@ const monitorJobAtNERSC = async ( api_error: '' } - const maxRetries = 10 // Maximum number of retries for failed attempts - const maxIterations = 1440 // 1440 x 60s = 24 hours + const maxRetries = NERSC_RETRY.MAX_JOB_RETRIES + const maxIterations = NERSC_RETRY.MAX_ITERATIONS let retryCount = 0 let iterationCount = 0 @@ -234,7 +222,7 @@ const monitorJobAtNERSC = async ( logger.error(`Max retries reached for job ${jobID}`) throw new Error(`Max retries reached for job ${jobID}`) } - await new Promise((resolve) => setTimeout(resolve, 60000)) // Wait before retrying + await new Promise((resolve) => setTimeout(resolve, NERSC_RETRY.RETRY_DELAY)) continue // Retry the request } } @@ -258,7 +246,7 @@ const monitorJobAtNERSC = async ( break default: iterationCount++ - await new Promise((resolve) => setTimeout(resolve, 60000)) // Continue polling otherwise + await new Promise((resolve) => setTimeout(resolve, INTERVALS.NERSC_JOB_POLL)) break } } @@ -372,7 +360,7 @@ const updateStatus = async (MQjob: BullMQJob, DBJob: IJob) => { const calculateProgress = (steps: IBilboMDSteps): number => { if (!steps || Object.keys(steps).length === 0) { logger.warn('Steps are empty or undefined.') - return 20 // Minimum progress + return PROGRESS.MIN } logger.info('Printing all steps and their statuses:') @@ -384,25 +372,25 @@ const calculateProgress = (steps: IBilboMDSteps): number => { ) } - const totalWeight = Object.values(stepWeights).reduce( + const totalWeight = Object.values(STEP_WEIGHTS).reduce( (acc, weight) => acc + weight, 0 ) if (totalWeight === 0) { - logger.error('Total weight is zero. Check stepWeights configuration.') - return 20 // Minimum progress + logger.error('Total weight is zero. Check STEP_WEIGHTS configuration.') + return PROGRESS.MIN } let completedWeight = 0 - // Iterate only over valid keys in `steps` that exist in `stepWeights` - for (const step of Object.keys(steps).filter((key) => key in stepWeights)) { + // Iterate only over valid keys in `steps` that exist in `STEP_WEIGHTS` + for (const step of Object.keys(steps).filter((key) => key in STEP_WEIGHTS)) { const status = steps[step as keyof IBilboMDSteps]?.status logger.info(`Step: ${step}, Status: ${status}`) if (status === 'Success') { - const weight = stepWeights[step] || 0 + const weight = STEP_WEIGHTS[step] || 0 completedWeight += weight } } @@ -412,8 +400,9 @@ const calculateProgress = (steps: IBilboMDSteps): number => { ) // Calculate progress - const progress = (completedWeight / totalWeight) * 70 + 20 // Scale between 20% and 90% - return Math.min(progress, 90) // Ensure it doesn't exceed 90% + const progress = + (completedWeight / totalWeight) * PROGRESS.SCALE_FACTOR + PROGRESS.MIN + return Math.min(progress, PROGRESS.MAX) } export { diff --git a/apps/worker/src/worker.ts b/apps/worker/src/worker.ts index afa17c15..3b533d7d 100644 --- a/apps/worker/src/worker.ts +++ b/apps/worker/src/worker.ts @@ -4,12 +4,19 @@ import { connectDB } from './helpers/db.js' import { Worker, WorkerOptions } from 'bullmq' import { logger } from './helpers/loggers.js' import { config } from './config/config.js' +import { + WORKER_CONCURRENCY, + LOCK_SETTINGS, + INTERVALS, + SERVER +} from './config/constants.js' import { createBilboMdWorker } from './workers/bilboMdWorker.js' import { createMovieWorker } from './workers/movieWorker.js' import { createMultiMDWorker } from './workers/multiMdWorker.js' import { checkNERSC } from './workers/workerControl.js' import { monitorAndCleanupJobs } from './workers/bilboMdNerscJobMonitor.js' import { redis } from './queues/redisConn.js' +import { getErrorMessage } from './helpers/errors.js' dotenv.config() @@ -17,9 +24,6 @@ const environment: string = process.env.NODE_ENV || 'development' const version: string = process.env.BILBOMD_WORKER_VERSION || '0.0.0' const gitHash: string = process.env.BILBOMD_WORKER_GIT_HASH || '321cba' -const getErrorMessage = (e: unknown): string => - e instanceof Error ? e.message : typeof e === 'string' ? e : JSON.stringify(e) - if (environment === 'production') { logger.info('Running in production mode') } else { @@ -32,23 +36,23 @@ let bilboMdWorker: Worker | null = null let movieWorker: Worker | null = null let multimdWorker: Worker | null = null -// 9000000 is 2 hours and 30 minutes const workerOptions: WorkerOptions = { connection: redis, - concurrency: config.runOnNERSC ? 50 : 1, - // lockDuration: config.runOnNERSC ? 9000000 : 9000000 - lockDuration: 60_000, - lockRenewTime: 30_000 + concurrency: config.runOnNERSC + ? WORKER_CONCURRENCY.NERSC + : WORKER_CONCURRENCY.LOCAL, + lockDuration: LOCK_SETTINGS.DURATION, + lockRenewTime: LOCK_SETTINGS.RENEW_TIME } const movieWorkerOptions: WorkerOptions = { connection: redis, - concurrency: 1 + concurrency: WORKER_CONCURRENCY.MOVIE } const multimdWorkerOptions: WorkerOptions = { connection: redis, - concurrency: 1 + concurrency: WORKER_CONCURRENCY.MULTI_MD } const startWorkers = async () => { @@ -119,7 +123,7 @@ if (config.runOnNERSC) { } } } - }, 300000) // Check every 300 seconds i.e. 5 minutes + }, INTERVALS.TOKEN_CHECK) intervals.push(tokenCheckInterval) // Start monitoring and cleanup process @@ -140,7 +144,7 @@ if (config.runOnNERSC) { } finally { isMonitoring = false } - }, 60000) + }, INTERVALS.JOB_MONITORING) intervals.push(monitoringInterval) } @@ -204,8 +208,7 @@ app.get('/config', (req, res) => { }) // Start the Express server -const PORT = 3000 logger.info('Starting the Express server...') -app.listen(PORT, () => { - logger.info(`Worker configuration server running on port ${PORT}`) +app.listen(SERVER.PORT, () => { + logger.info(`Worker configuration server running on port ${SERVER.PORT}`) }) diff --git a/apps/worker/src/workers/__tests__/workerControl.test.ts b/apps/worker/src/workers/__tests__/workerControl.test.ts new file mode 100644 index 00000000..64275584 --- /dev/null +++ b/apps/worker/src/workers/__tests__/workerControl.test.ts @@ -0,0 +1,194 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest' +import { pauseProcessing, resumeProcessing, checkNERSC } from '../workerControl.js' +import { Worker } from 'bullmq' +import { logger } from '../../helpers/loggers.js' +import { ensureValidToken } from '../../services/functions/nersc-api-token-functions.js' + +vi.mock('../../helpers/loggers.js', () => ({ + logger: { + error: vi.fn(), + info: vi.fn(), + warn: vi.fn() + } +})) + +vi.mock('../../services/functions/nersc-api-token-functions.js', () => ({ + ensureValidToken: vi.fn() +})) + +describe('workerControl', () => { + beforeEach(() => { + vi.clearAllMocks() + }) + + describe('pauseProcessing', () => { + it('should pause all workers', async () => { + const mockWorker1 = { + pause: vi.fn().mockResolvedValue(undefined) + } as unknown as Worker + + const mockWorker2 = { + pause: vi.fn().mockResolvedValue(undefined) + } as unknown as Worker + + const workers = [ + { worker: mockWorker1, name: 'BilboMD Worker' }, + { worker: mockWorker2, name: 'Movie Worker' } + ] + + await pauseProcessing(workers) + + expect(mockWorker1.pause).toHaveBeenCalledTimes(1) + expect(mockWorker2.pause).toHaveBeenCalledTimes(1) + expect(logger.info).toHaveBeenCalledWith('BilboMD Worker paused due to invalid NERSC tokens') + expect(logger.info).toHaveBeenCalledWith('Movie Worker paused due to invalid NERSC tokens') + }) + + it('should handle empty workers array', async () => { + const workers: { worker: Worker; name: string }[] = [] + + await pauseProcessing(workers) + + expect(logger.info).not.toHaveBeenCalled() + }) + + it('should skip null workers', async () => { + const workers = [ + { worker: null as unknown as Worker, name: 'Null Worker' } + ] + + await pauseProcessing(workers) + + expect(logger.info).not.toHaveBeenCalled() + }) + }) + + describe('resumeProcessing', () => { + it('should resume all workers', async () => { + const mockWorker1 = { + resume: vi.fn().mockResolvedValue(undefined) + } as unknown as Worker + + const mockWorker2 = { + resume: vi.fn().mockResolvedValue(undefined) + } as unknown as Worker + + const workers = [ + { worker: mockWorker1, name: 'BilboMD Worker' }, + { worker: mockWorker2, name: 'Movie Worker' } + ] + + await resumeProcessing(workers) + + expect(mockWorker1.resume).toHaveBeenCalledTimes(1) + expect(mockWorker2.resume).toHaveBeenCalledTimes(1) + expect(logger.info).toHaveBeenCalledWith('BilboMD Worker resumed') + expect(logger.info).toHaveBeenCalledWith('Movie Worker resumed') + }) + + it('should handle empty workers array', async () => { + const workers: { worker: Worker; name: string }[] = [] + + await resumeProcessing(workers) + + expect(logger.info).not.toHaveBeenCalled() + }) + + it('should skip null workers', async () => { + const workers = [ + { worker: null as unknown as Worker, name: 'Null Worker' } + ] + + await resumeProcessing(workers) + + expect(logger.info).not.toHaveBeenCalled() + }) + }) + + describe('checkNERSC', () => { + it('should return true when valid token is obtained', async () => { + const mockToken = 'valid-token-1234567890' + vi.mocked(ensureValidToken).mockResolvedValue(mockToken) + + const result = await checkNERSC() + + expect(result).toBe(true) + expect(ensureValidToken).toHaveBeenCalledTimes(1) + expect(logger.info).toHaveBeenCalledWith( + `Successfully obtained NERSC token: ${mockToken.slice(0, 10)}...` + ) + }) + + it('should return false when token is too short', async () => { + const mockToken = 'short' + vi.mocked(ensureValidToken).mockResolvedValue(mockToken) + + const result = await checkNERSC() + + expect(result).toBe(false) + expect(logger.warn).toHaveBeenCalledWith( + `Did not successfully obtain NERSC token: ${mockToken}` + ) + }) + + it('should return false when token is empty string', async () => { + const mockToken = '' + vi.mocked(ensureValidToken).mockResolvedValue(mockToken) + + const result = await checkNERSC() + + expect(result).toBe(false) + expect(logger.warn).toHaveBeenCalledWith( + `Did not successfully obtain NERSC token: ${mockToken}` + ) + }) + + it('should return false and log error when ensureValidToken throws', async () => { + const error = new Error('Token service unavailable') + vi.mocked(ensureValidToken).mockRejectedValue(error) + + const result = await checkNERSC() + + expect(result).toBe(false) + expect(logger.error).toHaveBeenCalledWith( + `Failed to obtain NERSC token: ${error}` + ) + }) + + it('should return false when ensureValidToken throws non-Error', async () => { + const error = 'String error' + vi.mocked(ensureValidToken).mockRejectedValue(error) + + const result = await checkNERSC() + + expect(result).toBe(false) + expect(logger.error).toHaveBeenCalledWith( + `Failed to obtain NERSC token: ${error}` + ) + }) + + it('should handle token of exactly 10 characters', async () => { + const mockToken = '1234567890' + vi.mocked(ensureValidToken).mockResolvedValue(mockToken) + + const result = await checkNERSC() + + expect(result).toBe(false) + expect(logger.warn).toHaveBeenCalledWith( + `Did not successfully obtain NERSC token: ${mockToken}` + ) + }) + + it('should handle token of 11 characters (valid)', async () => { + const mockToken = '12345678901' + vi.mocked(ensureValidToken).mockResolvedValue(mockToken) + + const result = await checkNERSC() + + expect(result).toBe(true) + expect(logger.info).toHaveBeenCalledWith( + `Successfully obtained NERSC token: ${mockToken.slice(0, 10)}...` + ) + }) + }) +}) diff --git a/apps/worker/src/workers/workerControl.ts b/apps/worker/src/workers/workerControl.ts index f4f2c419..5dd711af 100644 --- a/apps/worker/src/workers/workerControl.ts +++ b/apps/worker/src/workers/workerControl.ts @@ -28,16 +28,6 @@ export const resumeProcessing = async (workers: WorkerInfo[]) => { export const checkNERSC = async () => { try { - // Eventually could have various checks here. - - // Perlmutter status - // const response = await someAPI.healthCheck() - // if (response.status !== 'ok') { - // throw new Error('API is not healthy') - // } - - // Valid client - // Able to get access token const token: string = await ensureValidToken() if (typeof token === 'string' && token.length > 10) {