diff --git a/packages/core/application/commands/process-commands.js b/packages/core/application/commands/process-commands.js new file mode 100644 index 000000000..bdc569775 --- /dev/null +++ b/packages/core/application/commands/process-commands.js @@ -0,0 +1,213 @@ +const { ProcessQueueService } = require('../../integrations/services/process-queue-service'); + +/** + * Checks if the process queue is enabled + * @returns {boolean} True if enabled + */ +function isQueueEnabled() { + const enabled = process.env.PROCESS_QUEUE_ENABLED; + return enabled ? enabled.toLowerCase() === 'true' : false; +} + +/** + * Gets or creates the ProcessQueueService singleton instance + * @returns {ProcessQueueService|null} Queue service instance or null if disabled + * @throws {Error} If queue is enabled but PROCESS_MANAGEMENT_QUEUE_URL is not set + */ +let queueServiceInstance = null; + +function getQueueService() { + if (!isQueueEnabled()) { + return null; + } + + if (!queueServiceInstance) { + const queueUrl = process.env.PROCESS_MANAGEMENT_QUEUE_URL; + + if (!queueUrl) { + throw new Error( + 'PROCESS_MANAGEMENT_QUEUE_URL environment variable is required when PROCESS_QUEUE_ENABLED=true' + ); + } + + queueServiceInstance = new ProcessQueueService({ queueUrl }); + } + + return queueServiceInstance; +} + +/** + * Create process command factory + * + * Process commands manage process state transitions via a FIFO queue to prevent + * race conditions when multiple workers update the same process concurrently. + * + * State Machine Concept: + * - Each command triggers a state transition or metrics update + * - Queue ensures ordered processing per process (FIFO) + * - Prevents lost updates from concurrent workers + * + * @returns {Object} Process command object with queue operations + * + * @example + * const commands = createProcessCommands(); + * + * // Queue state update (triggers state transition) + * await commands.queueStateUpdate(processId, 'RUNNING', { step: 1 }); + * + * // Queue metrics update (accumulative) + * await commands.queueMetricsUpdate(processId, { totalProcessed: 100 }); + * + * // Queue completion (terminal state) + * await commands.queueCompletion(processId); + * + * // Queue error (terminal state) + * await commands.queueError(processId, error); + */ +function createProcessCommands() { + return { + /** + * Check if process queue is enabled + * @returns {boolean} True if enabled + * + * @example + * if (commands.isQueueEnabled()) { + * await commands.queueStateUpdate(processId, 'RUNNING'); + * } + */ + isQueueEnabled, + + /** + * Queue a process state update + * + * State Machine Event: Triggers state transition + * - Validates transition is allowed (via HandleProcessUpdate) + * - Updates process.state and process.context + * - Ordered processing ensures consistency + * + * @param {string} processId - Process ID + * @param {string} state - New state (e.g., 'RUNNING', 'PAUSED', 'COMPLETED') + * @param {Object} [contextUpdates={}] - Context updates to merge + * @returns {Promise} SQS response or null if queue disabled + * + * @example + * // Transition to RUNNING state with context + * await commands.queueStateUpdate(processId, 'RUNNING', { + * currentBatch: 'batch-123', + * step: 2, + * lastProcessedId: 'record-456' + * }); + * + * @example + * // Transition to PAUSED state + * await commands.queueStateUpdate(processId, 'PAUSED', { + * pausedAt: new Date().toISOString(), + * pauseReason: 'Rate limit reached' + * }); + */ + async queueStateUpdate(processId, state, contextUpdates = {}) { + const queueService = getQueueService(); + if (!queueService) { + return null; // Queue disabled + } + + return await queueService.queueStateUpdate(processId, state, contextUpdates); + }, + + /** + * Queue a process metrics update + * + * State Machine Event: Updates metrics without changing state + * - Accumulates metrics (adds to existing values) + * - Calculates performance stats (duration, rate, ETA) + * - Appends errors to error log + * + * @param {string} processId - Process ID + * @param {Object} metricsUpdate - Metrics to add (cumulative) + * @param {number} [metricsUpdate.totalProcessed] - Records processed (added to total) + * @param {number} [metricsUpdate.totalFailed] - Records failed (added to total) + * @param {number} [metricsUpdate.totalSkipped] - Records skipped (added to total) + * @param {Array} [metricsUpdate.errors] - Error details to append + * @returns {Promise} SQS response or null if queue disabled + * + * @example + * // Update metrics after processing a batch + * await commands.queueMetricsUpdate(processId, { + * totalProcessed: 50, + * totalFailed: 2, + * totalSkipped: 1, + * errors: [{ + * id: 'record-123', + * message: 'Invalid data', + * timestamp: new Date().toISOString() + * }] + * }); + */ + async queueMetricsUpdate(processId, metricsUpdate) { + const queueService = getQueueService(); + if (!queueService) { + return null; // Queue disabled + } + + return await queueService.queueMetricsUpdate(processId, metricsUpdate); + }, + + /** + * Queue process completion + * + * State Machine Event: Transition to COMPLETED (terminal state) + * - Sets state to 'COMPLETED' + * - Records endTime + * - No further state transitions allowed + * + * @param {string} processId - Process ID + * @returns {Promise} SQS response or null if queue disabled + * + * @example + * // Mark process as completed + * await commands.queueCompletion(processId); + */ + async queueCompletion(processId) { + const queueService = getQueueService(); + if (!queueService) { + return null; // Queue disabled + } + + return await queueService.queueProcessCompletion(processId); + }, + + /** + * Queue error handling + * + * State Machine Event: Transition to ERROR (terminal state) + * - Sets state to 'ERROR' + * - Records error message and stack trace + * - Records errorTimestamp + * - No further state transitions allowed + * + * @param {string} processId - Process ID + * @param {Error} error - Error object + * @returns {Promise} SQS response or null if queue disabled + * + * @example + * try { + * await processRecords(processId, records); + * } catch (error) { + * await commands.queueError(processId, error); + * throw error; // Re-throw to fail worker + * } + */ + async queueError(processId, error) { + const queueService = getQueueService(); + if (!queueService) { + return null; // Queue disabled + } + + return await queueService.queueErrorHandling(processId, error); + }, + }; +} + +module.exports = { + createProcessCommands, +}; diff --git a/packages/core/application/index.js b/packages/core/application/index.js index 9a56b33ca..7eb786e47 100644 --- a/packages/core/application/index.js +++ b/packages/core/application/index.js @@ -7,6 +7,7 @@ const { createEntityCommands } = require('./commands/entity-commands'); const { createCredentialCommands, } = require('./commands/credential-commands'); +const { createProcessCommands } = require('./commands/process-commands'); /** * Create a unified command factory with all CRUD operations @@ -20,8 +21,15 @@ const { * * @example * const commands = createFriggCommands({ integrationClass: MyIntegration }); + * + * // User/credential/entity commands (direct CRUD) * const user = await commands.createUser({ username: 'user@example.com' }); * const credential = await commands.createCredential({ userId: user.id, ... }); + * + * // Process commands (queued operations for state machine) + * await commands.process.queueStateUpdate(processId, 'RUNNING', { step: 1 }); + * await commands.process.queueMetricsUpdate(processId, { totalProcessed: 100 }); + * await commands.process.queueCompletion(processId); */ function createFriggCommands({ integrationClass } = {}) { // All commands use Frigg's default repositories and use cases @@ -33,6 +41,8 @@ function createFriggCommands({ integrationClass } = {}) { const credentialCommands = createCredentialCommands(); + const processCommands = createProcessCommands(); + return { // Integration commands ...integrationCommands, @@ -45,6 +55,9 @@ function createFriggCommands({ integrationClass } = {}) { // Credential commands ...credentialCommands, + + // Process commands (nested namespace for state machine operations) + process: processCommands, }; } @@ -57,6 +70,7 @@ module.exports = { createUserCommands, createEntityCommands, createCredentialCommands, + createProcessCommands, // Legacy standalone function findIntegrationContextByExternalEntityId, diff --git a/packages/core/docs/PROCESS_MANAGEMENT_QUEUE_USAGE.md b/packages/core/docs/PROCESS_MANAGEMENT_QUEUE_USAGE.md new file mode 100644 index 000000000..455bd1f99 --- /dev/null +++ b/packages/core/docs/PROCESS_MANAGEMENT_QUEUE_USAGE.md @@ -0,0 +1,510 @@ +# Process Management Queue - Usage Guide + +## Overview + +The Process Management Queue is an **optional feature** that prevents race conditions when updating process records from concurrent workers. It uses a single FIFO (First-In-First-Out) SQS queue to ensure ordered processing of updates for each process. + +## Problem Statement + +Without the queue, concurrent workers can cause race conditions: + +``` +Time 1: Worker A reads process.totalSynced = 100 +Time 2: Worker B reads process.totalSynced = 100 +Time 3: Worker A adds 50 → writes totalSynced = 150 +Time 4: Worker B adds 30 → writes totalSynced = 130 (overwrites Worker A's update!) +``` + +**Result**: Lost updates, inconsistent metrics, data corruption. + +## Solution + +The Process Management Queue ensures ordered processing: + +``` +Worker A ──┐ +Worker B ──┼──→ queueProcessUpdate() ──→ FIFO Queue ──→ HandleProcessUpdate ──→ Database +Worker C ──┘ (ordered per process) +``` + +## Configuration + +### Environment Variables + +The feature is **disabled by default** and can be enabled via environment variables: + +```bash +# Enable the process management queue +PROCESS_QUEUE_ENABLED=true + +# Set the FIFO queue URL +PROCESS_MANAGEMENT_QUEUE_URL=https://sqs.us-east-1.amazonaws.com/123456789/process-management.fifo +``` + +### Infrastructure Setup + +#### 1. Create FIFO Queue (AWS SQS) + +Using AWS CLI: + +```bash +aws sqs create-queue \ + --queue-name process-management.fifo \ + --attributes '{ + "FifoQueue": "true", + "ContentBasedDeduplication": "true", + "MessageRetentionPeriod": "1209600", + "VisibilityTimeout": "30", + "ReceiveMessageWaitTimeSeconds": "20" + }' +``` + +Using Serverless Framework (`serverless.yml`): + +```yaml +resources: + Resources: + ProcessManagementQueue: + Type: AWS::SQS::Queue + Properties: + QueueName: process-management.fifo + FifoQueue: true + ContentBasedDeduplication: true + MessageRetentionPeriod: 1209600 # 14 days + VisibilityTimeout: 30 + ReceiveMessageWaitTimeSeconds: 20 + RedrivePolicy: + deadLetterTargetArn: !GetAtt ProcessManagementDLQ.Arn + maxReceiveCount: 3 + + ProcessManagementDLQ: + Type: AWS::SQS::Queue + Properties: + QueueName: process-management-dlq.fifo + FifoQueue: true + MessageRetentionPeriod: 1209600 + + Outputs: + ProcessManagementQueueUrl: + Value: !Ref ProcessManagementQueue + Export: + Name: ProcessManagementQueueUrl +``` + +#### 2. Create Lambda Handler for Queue Processing + +Add a Lambda function to process messages from the queue: + +```yaml +functions: + processUpdateHandler: + handler: handlers/process-update-handler.handler + events: + - sqs: + arn: !GetAtt ProcessManagementQueue.Arn + batchSize: 1 # Process one update at a time per batch + maximumBatchingWindowInSeconds: 0 + timeout: 30 + environment: + PROCESS_QUEUE_ENABLED: true + PROCESS_MANAGEMENT_QUEUE_URL: !Ref ProcessManagementQueue +``` + +Create the handler file (`handlers/process-update-handler.js`): + +```javascript +const { + HandleProcessUpdate, + UpdateProcessState, + UpdateProcessMetrics, +} = require('@friggframework/core'); +const { createProcessRepository } = require('@friggframework/core/integrations/repositories/process-repository-factory'); + +// Initialize use cases +const processRepository = createProcessRepository(); +const updateProcessState = new UpdateProcessState({ processRepository }); +const updateProcessMetrics = new UpdateProcessMetrics({ processRepository }); + +const handleProcessUpdate = new HandleProcessUpdate({ + updateProcessState, + updateProcessMetrics, +}); + +exports.handler = async (event) => { + const results = []; + + for (const record of event.Records) { + try { + await handleProcessUpdate.executeFromSQS(record); + results.push({ messageId: record.messageId, status: 'success' }); + } catch (error) { + console.error('Failed to process message:', error); + results.push({ messageId: record.messageId, status: 'failed', error: error.message }); + throw error; // Re-throw to trigger SQS retry/DLQ + } + } + + return { batchItemFailures: results.filter(r => r.status === 'failed') }; +}; +``` + +## Usage + +### Simple API (Recommended) + +The easiest way to use the queue is via the `queueProcessUpdate` utility: + +```javascript +const { queueProcessUpdate } = require('@friggframework/core'); + +// Queue a state update +await queueProcessUpdate.queueStateUpdate( + processId, + 'RUNNING', + { step: 2, currentBatch: 'batch-456' } +); + +// Queue a metrics update +await queueProcessUpdate.queueMetricsUpdate(processId, { + totalProcessed: 100, + totalFailed: 2, + totalSkipped: 5, +}); + +// Queue process completion +await queueProcessUpdate.queueProcessCompletion(processId); + +// Queue error handling +try { + // ... processing logic +} catch (error) { + await queueProcessUpdate.queueErrorHandling(processId, error); + throw error; +} + +// Check if queue is enabled +if (queueProcessUpdate.isEnabled()) { + console.log('Process queue is enabled'); +} +``` + +### Behavior When Disabled + +If `PROCESS_QUEUE_ENABLED` is not `true`, all methods return `null` (no-op): + +```javascript +const result = await queueProcessUpdate.queueStateUpdate(processId, 'RUNNING'); +// result === null (queue is disabled) +``` + +This allows you to integrate the queue without breaking existing code when disabled. + +### Integration with Existing Code + +**Before (direct updates, race conditions possible):** + +```javascript +const { UpdateProcessState } = require('@friggframework/core'); + +class MyWorker { + async processRecords(processId, records) { + for (const record of records) { + await this.processRecord(record); + + // Direct update - race condition risk! + await this.updateProcessState.execute(processId, 'RUNNING', { + lastProcessedId: record.id, + }); + } + } +} +``` + +**After (queued updates, no race conditions):** + +```javascript +const { queueProcessUpdate } = require('@friggframework/core'); + +class MyWorker { + async processRecords(processId, records) { + for (const record of records) { + await this.processRecord(record); + + // Queued update - no race condition! + await queueProcessUpdate.queueStateUpdate(processId, 'RUNNING', { + lastProcessedId: record.id, + }); + } + } +} +``` + +### Advanced Usage: Direct Service Access + +For more control, use `ProcessQueueService` directly: + +```javascript +const { ProcessQueueService, ProcessUpdateMessage, ProcessUpdateOperation } = require('@friggframework/core'); + +const queueService = new ProcessQueueService({ + queueUrl: process.env.PROCESS_MANAGEMENT_QUEUE_URL, +}); + +// Create and send custom message +const message = new ProcessUpdateMessage({ + processId: 'proc-123', + operation: ProcessUpdateOperation.UPDATE_STATE, + data: { + state: 'RUNNING', + contextUpdates: { step: 1 }, + }, +}); + +await queueService.sendMessage(message); +``` + +## How It Works + +### FIFO Queue Ordering + +Messages are grouped by `processId` using `MessageGroupId`: + +```javascript +MessageGroupId: 'process-{processId}' +``` + +This ensures: +- All updates for `proc-123` are processed in order +- Updates for `proc-123` and `proc-456` can be processed concurrently +- No race conditions within a single process + +### Message Deduplication + +Messages use `MessageDeduplicationId` to prevent duplicates: + +```javascript +MessageDeduplicationId: '{processId}-{operation}-{timestamp}' +``` + +This allows: +- Multiple operations on the same process (different IDs) +- Automatic deduplication of exact duplicates (same ID) + +### Operation Types + +Four operation types are supported: + +1. **UPDATE_STATE**: Updates process state and context +2. **UPDATE_METRICS**: Updates process metrics (cumulative) +3. **COMPLETE_PROCESS**: Marks process as completed +4. **HANDLE_ERROR**: Marks process as errored + +## Architecture + +### Hexagonal Architecture + +``` +┌─────────────────────────────────────────────┐ +│ Application Layer │ +│ - queueProcessUpdate (utility) │ +│ - HandleProcessUpdate (use case) │ +└──────────────────┬──────────────────────────┘ + │ +┌──────────────────▼──────────────────────────┐ +│ Domain Layer │ +│ - ProcessUpdateMessage (value object) │ +│ - ProcessQueueService (domain service) │ +└──────────────────┬──────────────────────────┘ + │ +┌──────────────────▼──────────────────────────┐ +│ Infrastructure │ +│ - SQS Client (AWS SDK) │ +│ - ProcessRepository (MongoDB/PostgreSQL) │ +└─────────────────────────────────────────────┘ +``` + +### Component Responsibilities + +| Component | Responsibility | Layer | +|-----------|----------------|-------| +| `queueProcessUpdate` | Public API for queueing updates | Application | +| `ProcessQueueService` | Sends messages to SQS queue | Domain Service | +| `ProcessUpdateMessage` | Immutable message value object | Domain | +| `HandleProcessUpdate` | Processes messages from queue | Application | +| `UpdateProcessState` | Updates process state in DB | Application | +| `UpdateProcessMetrics` | Updates process metrics in DB | Application | + +## Testing + +### Unit Tests + +```javascript +const { ProcessUpdateMessage, ProcessUpdateOperation } = require('@friggframework/core'); + +describe('ProcessUpdateMessage', () => { + it('should create UPDATE_STATE message', () => { + const message = new ProcessUpdateMessage({ + processId: 'proc-123', + operation: ProcessUpdateOperation.UPDATE_STATE, + data: { state: 'RUNNING', contextUpdates: {} }, + }); + + expect(message.processId).toBe('proc-123'); + expect(message.getMessageGroupId()).toBe('process-proc-123'); + }); +}); +``` + +### Integration Tests + +```javascript +const { queueProcessUpdate } = require('@friggframework/core'); + +describe('Process Queue Integration', () => { + beforeEach(() => { + process.env.PROCESS_QUEUE_ENABLED = 'true'; + process.env.PROCESS_MANAGEMENT_QUEUE_URL = 'https://sqs.us-east-1.amazonaws.com/123/test.fifo'; + }); + + it('should queue state update', async () => { + await queueProcessUpdate.queueStateUpdate('proc-123', 'RUNNING'); + // Assert message sent to SQS + }); +}); +``` + +## Performance Considerations + +### Latency + +- **Without Queue**: Immediate database update (~50-100ms) +- **With Queue**: Asynchronous via SQS (~100-500ms) + +The queue adds latency but prevents race conditions. For most use cases, the trade-off is acceptable. + +### Cost + +- **SQS FIFO**: $0.40 per million requests +- **Example**: 10,000 process updates/day = ~$0.12/month + +### Throughput + +- **Single Queue**: Handles all processes +- **MessageGroupId**: Ensures ordering per process +- **Concurrent Processing**: Different processes can be processed in parallel + +## Monitoring + +### CloudWatch Metrics + +Monitor these SQS metrics: + +- `NumberOfMessagesSent`: Messages queued +- `NumberOfMessagesReceived`: Messages processed +- `ApproximateAgeOfOldestMessage`: Queue backlog +- `NumberOfMessagesDeleted`: Successful processing + +### Dead Letter Queue (DLQ) + +Failed messages (after 3 retries) are sent to the DLQ: + +```bash +# Check DLQ for failed messages +aws sqs receive-message \ + --queue-url https://sqs.us-east-1.amazonaws.com/123456789/process-management-dlq.fifo +``` + +## Troubleshooting + +### Messages Not Being Processed + +1. Check Lambda function is subscribed to queue +2. Verify `PROCESS_QUEUE_ENABLED=true` in Lambda environment +3. Check CloudWatch Logs for errors + +### Race Conditions Still Occurring + +1. Verify `PROCESS_QUEUE_ENABLED=true` in worker environment +2. Ensure all workers use `queueProcessUpdate` instead of direct updates +3. Check MessageGroupId is set correctly (should be `process-{processId}`) + +### Messages in DLQ + +1. Check DLQ messages: `aws sqs receive-message --queue-url ` +2. Review error logs in CloudWatch +3. Fix underlying issue (e.g., database connection, invalid data) +4. Optionally replay DLQ messages + +## Migration Guide + +### Step 1: Add Infrastructure + +1. Create FIFO queue (see Configuration section) +2. Deploy Lambda handler +3. Set environment variables + +### Step 2: Update Code + +Replace direct updates: + +```javascript +// Before +await updateProcessState.execute(processId, 'RUNNING', context); + +// After +await queueProcessUpdate.queueStateUpdate(processId, 'RUNNING', context); +``` + +### Step 3: Enable Queue + +```bash +export PROCESS_QUEUE_ENABLED=true +export PROCESS_MANAGEMENT_QUEUE_URL=https://sqs.us-east-1.amazonaws.com/123456789/process-management.fifo +``` + +### Step 4: Monitor + +- Watch CloudWatch metrics +- Check DLQ for failures +- Verify no race conditions + +## Best Practices + +1. **Always Use the Queue for Concurrent Updates**: If multiple workers update the same process, use the queue +2. **Keep Messages Small**: Only include necessary data in updates +3. **Monitor the DLQ**: Set up alerts for messages in DLQ +4. **Set Appropriate Timeouts**: Lambda timeout should be > SQS visibility timeout +5. **Use Environment Variables**: Don't hardcode queue URLs +6. **Test Both Modes**: Test with queue enabled and disabled + +## FAQ + +### Q: Is the queue required? + +**A:** No, it's optional. Enable it only if you have concurrent workers updating the same process. + +### Q: What happens if I don't enable the queue? + +**A:** All `queueProcessUpdate` methods return `null` (no-op). Your code continues to work without changes. + +### Q: Can I use this for real-time updates? + +**A:** The queue adds latency (~100-500ms). For real-time requirements, consider direct updates with optimistic locking. + +### Q: How many queues do I need? + +**A:** One. The single queue handles all processes using `MessageGroupId` for ordering. + +### Q: What if a message fails? + +**A:** SQS retries up to 3 times, then sends to DLQ. Review DLQ messages and fix underlying issues. + +### Q: Can I mix queued and direct updates? + +**A:** Not recommended. Mixing can still cause race conditions. Choose one approach per process. + +## See Also + +- [PROCESS_MANAGEMENT_QUEUE_SPEC.md](./PROCESS_MANAGEMENT_QUEUE_SPEC.md) - Technical specification +- [AWS SQS FIFO Queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html) +- [Process Management Documentation](./PROCESS_MANAGEMENT.md) diff --git a/packages/core/docs/PROCESS_STATE_MACHINE.md b/packages/core/docs/PROCESS_STATE_MACHINE.md new file mode 100644 index 000000000..9c73fd649 --- /dev/null +++ b/packages/core/docs/PROCESS_STATE_MACHINE.md @@ -0,0 +1,425 @@ +# Process State Machine + +## Overview + +The Process State Machine provides a structured approach to managing process lifecycle states and transitions. It ensures that processes move through valid states in a controlled manner, preventing invalid state changes and race conditions. + +## State Machine Concepts + +### States + +A **state** represents the current status of a process at a point in time. + +```javascript +const { ProcessState } = require('@friggframework/core'); + +ProcessState.INITIALIZING // Process is being set up +ProcessState.RUNNING // Process is actively executing +ProcessState.PAUSED // Process is temporarily paused +ProcessState.COMPLETED // Process finished successfully (terminal) +ProcessState.ERROR // Process failed with an error (terminal) +ProcessState.CANCELLED // Process was cancelled (terminal) +``` + +### Transitions + +A **transition** is a change from one state to another. Not all transitions are valid. + +```javascript +const { isValidTransition } = require('@friggframework/core'); + +// Valid transitions +isValidTransition('RUNNING', 'COMPLETED') // true +isValidTransition('RUNNING', 'PAUSED') // true +isValidTransition('PAUSED', 'RUNNING') // true + +// Invalid transitions +isValidTransition('COMPLETED', 'RUNNING') // false (terminal state) +isValidTransition('INITIALIZING', 'PAUSED') // false (must go through RUNNING) +``` + +### Terminal States + +**Terminal states** are final states with no further transitions allowed: + +- `COMPLETED` - Process finished successfully +- `ERROR` - Process failed +- `CANCELLED` - Process was cancelled + +Once a process reaches a terminal state, it cannot transition to any other state. + +### Guards + +**Guards** are conditions that must be met before allowing a transition: + +```javascript +const { validateTransition } = require('@friggframework/core'); + +const process = { state: 'RUNNING' }; + +// Validate before transitioning +const result = validateTransition(process, 'COMPLETED'); + +if (result.valid) { + // Proceed with transition + await commands.process.queueStateUpdate(processId, 'COMPLETED'); +} else { + console.error(result.error); +} +``` + +## State Diagram + +``` +┌─────────────┐ +│INITIALIZING │ +└──────┬──────┘ + │ + ├──→ RUNNING ──→ PAUSED ──┐ + │ │ │ + │ │←────────────────┘ + │ │ + │ ├──→ COMPLETED (terminal) + │ ├──→ ERROR (terminal) + │ └──→ CANCELLED (terminal) + │ + ├──→ ERROR (terminal) + └──→ CANCELLED (terminal) +``` + +## Valid State Transitions + +| From State | Valid Next States | Notes | +|---------------|-----------------------------------------------------|-------| +| INITIALIZING | RUNNING, ERROR, CANCELLED | Must start RUNNING or fail | +| RUNNING | RUNNING, PAUSED, COMPLETED, ERROR, CANCELLED | Self-transition for progress updates | +| PAUSED | RUNNING, COMPLETED, ERROR, CANCELLED | Can resume or terminate | +| COMPLETED | *(none)* | Terminal state | +| ERROR | *(none)* | Terminal state | +| CANCELLED | *(none)* | Terminal state | + +## Usage with friggCommands + +### Recommended Pattern + +```javascript +const { createFriggCommands, ProcessState } = require('@friggframework/core'); + +const commands = createFriggCommands({ integrationClass: MyIntegration }); + +class MyWorker { + async processRecords(processId, records) { + try { + // Start processing + await commands.process.queueStateUpdate( + processId, + ProcessState.RUNNING, + { startTime: new Date().toISOString() } + ); + + for (const record of records) { + await this.processRecord(record); + + // Update progress (self-transition on RUNNING) + await commands.process.queueMetricsUpdate(processId, { + totalProcessed: 1, + lastProcessedId: record.id, + }); + } + + // Complete successfully + await commands.process.queueCompletion(processId); + + } catch (error) { + // Transition to ERROR state + await commands.process.queueError(processId, error); + throw error; + } + } + + async pauseProcessing(processId) { + // Transition RUNNING -> PAUSED + await commands.process.queueStateUpdate( + processId, + ProcessState.PAUSED, + { pausedAt: new Date().toISOString() } + ); + } + + async resumeProcessing(processId) { + // Transition PAUSED -> RUNNING + await commands.process.queueStateUpdate( + processId, + ProcessState.RUNNING, + { resumedAt: new Date().toISOString() } + ); + } + + async cancelProcessing(processId) { + // Transition to CANCELLED (terminal) + await commands.process.queueStateUpdate( + processId, + ProcessState.CANCELLED, + { cancelledAt: new Date().toISOString() } + ); + } +} +``` + +## Common Patterns + +### Pattern 1: Simple Linear Flow + +Most processes follow a simple path: + +``` +INITIALIZING → RUNNING → COMPLETED +``` + +```javascript +// Create process +const process = await createProcess.execute({ + userId, + integrationId, + name: 'data-sync', + type: 'SYNC', +}); +// Default state: INITIALIZING + +// Start processing +await commands.process.queueStateUpdate(process.id, ProcessState.RUNNING); + +// ... do work ... + +// Complete +await commands.process.queueCompletion(process.id); +``` + +### Pattern 2: Pause/Resume Flow + +For long-running processes that can be paused: + +``` +RUNNING → PAUSED → RUNNING → COMPLETED +``` + +```javascript +// Start processing +await commands.process.queueStateUpdate(processId, ProcessState.RUNNING); + +// User pauses +await commands.process.queueStateUpdate(processId, ProcessState.PAUSED, { + pauseReason: 'User requested pause', +}); + +// Later, user resumes +await commands.process.queueStateUpdate(processId, ProcessState.RUNNING, { + resumedAt: new Date().toISOString(), +}); + +// Complete +await commands.process.queueCompletion(processId); +``` + +### Pattern 3: Error Handling + +Processes can fail at any point: + +``` +RUNNING → ERROR (terminal) +``` + +```javascript +try { + await commands.process.queueStateUpdate(processId, ProcessState.RUNNING); + + // Process data + await this.processData(); + + await commands.process.queueCompletion(processId); + +} catch (error) { + // Transition to ERROR state (terminal) + await commands.process.queueError(processId, error); + throw error; // Re-throw to fail worker +} +``` + +### Pattern 4: Batch Processing with Progress Updates + +Self-transitions on RUNNING state to track progress: + +``` +RUNNING → RUNNING → RUNNING → COMPLETED + (10%) (50%) (100%) +``` + +```javascript +await commands.process.queueStateUpdate(processId, ProcessState.RUNNING); + +for (let i = 0; i < batches.length; i++) { + await this.processBatch(batches[i]); + + // Self-transition on RUNNING to update context + await commands.process.queueStateUpdate( + processId, + ProcessState.RUNNING, + { + currentBatch: i + 1, + totalBatches: batches.length, + percentComplete: ((i + 1) / batches.length) * 100, + } + ); + + // Update metrics + await commands.process.queueMetricsUpdate(processId, { + totalProcessed: batches[i].length, + }); +} + +await commands.process.queueCompletion(processId); +``` + +## Integration with Queue + +The process management queue ensures state transitions are **ordered** and **consistent**: + +```javascript +// Multiple workers concurrently updating same process +Worker A: commands.process.queueMetricsUpdate(processId, { count: 50 }) +Worker B: commands.process.queueMetricsUpdate(processId, { count: 30 }) +Worker C: commands.process.queueCompletion(processId) + +// Queue ensures ordered processing: +// 1. Worker A's update (count: 50) +// 2. Worker B's update (count: 80 total) +// 3. Worker C's completion +// No race conditions! +``` + +### Why Queue + State Machine? + +| Without Queue | With Queue + State Machine | +|---------------|---------------------------| +| Race conditions | Ordered processing | +| Lost updates | All updates preserved | +| Inconsistent state | Valid transitions only | +| Hard to debug | Clear audit trail | + +## Validation + +### Validating Before Transition + +```javascript +const { validateTransition } = require('@friggframework/core'); + +const process = await getProcess.execute(processId); + +// Check if transition is allowed +const result = validateTransition(process, 'PAUSED'); + +if (!result.valid) { + throw new Error(`Cannot pause: ${result.error}`); +} + +// Proceed with transition +await commands.process.queueStateUpdate(processId, ProcessState.PAUSED); +``` + +### Getting Valid Next States + +```javascript +const { getValidNextStates } = require('@friggframework/core'); + +const process = await getProcess.execute(processId); +const validStates = getValidNextStates(process.state); + +console.log(`Valid next states: ${validStates.join(', ')}`); +// Output: "Valid next states: PAUSED, COMPLETED, ERROR, CANCELLED, RUNNING" +``` + +## Future Enhancements + +The current state machine implementation is intentionally simple. Future enhancements could include: + +### 1. XState Integration + +For complex workflows, integrate with [XState](https://xstate.js.org/): + +```javascript +import { createMachine } from 'xstate'; + +const processMachine = createMachine({ + id: 'process', + initial: 'initializing', + states: { + initializing: { on: { START: 'running' } }, + running: { + on: { + PAUSE: 'paused', + COMPLETE: 'completed', + ERROR: 'error', + }, + }, + paused: { on: { RESUME: 'running' } }, + completed: { type: 'final' }, + error: { type: 'final' }, + }, +}); +``` + +### 2. Actions on Transitions + +Execute callbacks when transitioning: + +```javascript +const transitions = { + onEnterRunning: async (process) => { + await notifyUser(process.userId, 'Process started'); + }, + onEnterCompleted: async (process) => { + await sendCompletionEmail(process); + }, +}; +``` + +### 3. Conditional Transitions + +Add complex business logic guards: + +```javascript +const guards = { + canComplete: (process) => { + return process.results.totalProcessed >= process.context.expectedTotal; + }, +}; +``` + +### 4. State History + +Track all state transitions for audit trail: + +```javascript +process.stateHistory = [ + { state: 'INITIALIZING', timestamp: '2024-01-01T10:00:00Z' }, + { state: 'RUNNING', timestamp: '2024-01-01T10:01:00Z' }, + { state: 'PAUSED', timestamp: '2024-01-01T10:05:00Z' }, + { state: 'RUNNING', timestamp: '2024-01-01T10:10:00Z' }, + { state: 'COMPLETED', timestamp: '2024-01-01T10:15:00Z' }, +]; +``` + +## Best Practices + +1. **Always validate transitions** before queuing state updates +2. **Use terminal states** to prevent accidental updates to completed processes +3. **Self-transition on RUNNING** to update progress without changing state +4. **Include context** in state updates for debugging and audit trails +5. **Handle errors gracefully** with proper error state transitions +6. **Check queue is enabled** before using process commands (gracefully degrade) + +## See Also + +- [Process Management Queue Usage](./PROCESS_MANAGEMENT_QUEUE_USAGE.md) +- [Process Management Queue Spec](./PROCESS_MANAGEMENT_QUEUE_SPEC.md) +- [friggCommands README](../application/commands/README.md) diff --git a/packages/core/docs/XSTATE_INTEGRATION.md b/packages/core/docs/XSTATE_INTEGRATION.md new file mode 100644 index 000000000..05f9a4d97 --- /dev/null +++ b/packages/core/docs/XSTATE_INTEGRATION.md @@ -0,0 +1,507 @@ +# Process State Machine vs XState Integration + +## Key Differences + +### Our Implementation (DB-First State Machine) + +**Purpose**: Manage long-running, distributed processes with state persisted in database + +**Characteristics**: +- ✅ **DB-backed**: State stored in MongoDB/PostgreSQL +- ✅ **Distributed**: Multiple workers update same process via queue +- ✅ **Asynchronous**: State transitions queued through SQS +- ✅ **Rehydration-native**: Load process from DB, state is already there +- ✅ **Audit trail**: Every state change persisted +- ✅ **Race condition safe**: FIFO queue ensures ordered updates +- ❌ **Simple transitions**: Just valid state → state mappings +- ❌ **No actions**: Can't trigger callbacks on transitions (yet) +- ❌ **No complex guards**: Basic validation only + +**Use Case**: +``` +Worker A (Lambda) ──→ Queue ──→ DB ──→ Worker B (Lambda) + ↓ ↓ +Processes batch 1 Processes batch 2 +State: RUNNING State: RUNNING (from DB) +``` + +### XState (In-Memory State Machine) + +**Purpose**: Manage complex UI/application state with rich transition logic + +**Characteristics**: +- ✅ **Rich transitions**: Actions, guards, services, activities +- ✅ **Hierarchical states**: Nested states, parallel states +- ✅ **History states**: Can return to previous state +- ✅ **Visualizable**: Generate state diagrams automatically +- ✅ **Type-safe**: Full TypeScript support +- ❌ **In-memory first**: State lives in JavaScript runtime +- ❌ **Single-process**: Designed for one client/server instance +- ❌ **Serialization needed**: Must manually persist/rehydrate + +**Use Case**: +``` +React Component ──→ XState Machine (in-memory) ──→ Re-render +``` + +--- + +## The Problem XState Has with Backend + +You're right - XState is hard for backend because: + +1. **State doesn't persist** - Each Lambda invocation starts fresh +2. **Multiple workers** - Can't share in-memory state across processes +3. **No queue** - Race conditions if multiple workers update same entity +4. **Rehydration overhead** - Must serialize/deserialize on every operation + +**Example problem:** +```javascript +// XState in Lambda (doesn't work well) +const machine = createMachine({ ... }); + +exports.handler = async (event) => { + // State machine is fresh each invocation! + const service = interpret(machine).start(); + + // How do we restore previous state? + // How do we share state with other Lambdas? + // How do we prevent race conditions? +}; +``` + +--- + +## How They Can Work Together (Layered Approach) + +### Architecture: Our DB + Queue + XState Logic + +``` +┌─────────────────────────────────────────────────────┐ +│ Application Layer (friggCommands) │ +│ commands.process.queueStateUpdate() │ +└────────────────────┬────────────────────────────────┘ + │ +┌────────────────────▼────────────────────────────────┐ +│ XState Layer (Optional) │ +│ - Complex transition logic │ +│ - Actions (send email, notify, etc.) │ +│ - Hierarchical states │ +│ - Guards (complex conditions) │ +└────────────────────┬────────────────────────────────┘ + │ +┌────────────────────▼────────────────────────────────┐ +│ Our State Machine (Persistence) │ +│ - Valid transitions (simple rules) │ +│ - DB storage (process.state, process.context) │ +│ - Queue ordering (FIFO per process) │ +└────────────────────┬────────────────────────────────┘ + │ +┌────────────────────▼────────────────────────────────┐ +│ Infrastructure │ +│ SQS Queue → Lambda → Database │ +└─────────────────────────────────────────────────────┘ +``` + +### Integration Pattern: XState as "Business Logic Layer" + +Our state machine handles **persistence and distribution**, XState handles **complex transitions**. + +**File: `process-state-machine-xstate.js`** + +```javascript +const { createMachine, interpret } = require('xstate'); +const { ProcessState } = require('./process-state-machine'); + +/** + * XState machine definition for process lifecycle + * + * This is the "business logic" layer - complex transitions, actions, guards. + * Our DB-backed state machine handles persistence and distribution. + */ +const processXStateMachine = createMachine({ + id: 'process', + initial: ProcessState.INITIALIZING, + + states: { + [ProcessState.INITIALIZING]: { + on: { + START: { + target: ProcessState.RUNNING, + actions: ['notifyUserStarted', 'logProcessStart'], + cond: 'hasRequiredData', + }, + CANCEL: ProcessState.CANCELLED, + }, + }, + + [ProcessState.RUNNING]: { + // Nested states for complex workflows + initial: 'fetching', + states: { + fetching: { + on: { + FETCHED: 'processing', + }, + }, + processing: { + on: { + PROCESSED: 'saving', + }, + }, + saving: { + on: { + SAVED: { + target: '#process.COMPLETED', + actions: ['sendCompletionEmail'], + }, + }, + }, + }, + + on: { + PAUSE: { + target: ProcessState.PAUSED, + actions: ['saveCheckpoint'], + }, + ERROR: { + target: ProcessState.ERROR, + actions: ['logError', 'notifyAdmin'], + }, + CANCEL: ProcessState.CANCELLED, + }, + }, + + [ProcessState.PAUSED]: { + on: { + RESUME: { + target: ProcessState.RUNNING, + actions: ['restoreCheckpoint'], + }, + CANCEL: ProcessState.CANCELLED, + }, + }, + + [ProcessState.COMPLETED]: { + type: 'final', + entry: ['archiveProcess', 'sendSuccessNotification'], + }, + + [ProcessState.ERROR]: { + type: 'final', + entry: ['logErrorDetails', 'sendErrorNotification'], + }, + + [ProcessState.CANCELLED]: { + type: 'final', + entry: ['cleanupResources'], + }, + }, +}, { + // Actions (side effects) + actions: { + notifyUserStarted: (context, event) => { + console.log('Process started:', context.processId); + // Could send email, webhook, etc. + }, + logProcessStart: (context, event) => { + // Log to analytics + }, + sendCompletionEmail: (context, event) => { + // Send email when process completes + }, + saveCheckpoint: (context, event) => { + // Save current position for resume + }, + restoreCheckpoint: (context, event) => { + // Load saved position + }, + }, + + // Guards (conditions) + guards: { + hasRequiredData: (context, event) => { + return context.dataSourceId && context.userId; + }, + }, +}); + +/** + * Adapter: XState → Our DB-backed State Machine + * + * This translates XState transitions into our persistent state updates + */ +class XStateProcessAdapter { + constructor({ processId, commands }) { + this.processId = processId; + this.commands = commands; + this.service = null; + } + + /** + * Initialize XState service from persisted state + * @param {Object} process - Process from database + */ + async hydrate(process) { + // Restore XState from DB state + const service = interpret(processXStateMachine, { + // Restore context from process.context + context: { + processId: this.processId, + ...process.context, + }, + }); + + // Start from persisted state + service.start(process.state); + + // Listen for state changes → persist to DB + service.onTransition(async (state) => { + if (state.changed) { + // Persist XState state to our DB + await this.commands.process.queueStateUpdate( + this.processId, + state.value, // XState state + { + // Serialize XState context + xstateContext: state.context, + // Include nested state if hierarchical + xstateSubState: typeof state.value === 'object' + ? JSON.stringify(state.value) + : null, + } + ); + } + }); + + this.service = service; + return service; + } + + /** + * Send event to XState machine (triggers transitions) + */ + async send(event) { + if (!this.service) { + throw new Error('Service not hydrated. Call hydrate() first.'); + } + + this.service.send(event); + // State change automatically persisted via onTransition listener + } + + /** + * Get current state (from XState) + */ + getState() { + return this.service?.state; + } +} + +module.exports = { + processXStateMachine, + XStateProcessAdapter, +}; +``` + +### Usage: XState + Our State Machine + +```javascript +const { createFriggCommands, GetProcess } = require('@friggframework/core'); +const { XStateProcessAdapter } = require('./process-state-machine-xstate'); + +const commands = createFriggCommands({ integrationClass: MyIntegration }); +const getProcess = new GetProcess({ processRepository }); + +class MyWorker { + async processWithXState(processId) { + // 1. Load process from DB (our persistence layer) + const process = await getProcess.execute(processId); + + // 2. Hydrate XState from DB state (business logic layer) + const adapter = new XStateProcessAdapter({ + processId, + commands, + }); + await adapter.hydrate(process); + + // 3. Use XState for complex transitions + await adapter.send('START'); + // → XState validates transition + // → XState runs actions (notifyUserStarted, logProcessStart) + // → XState transitions to RUNNING + // → Adapter persists to DB via our queue + + // 4. Process data + for (const batch of batches) { + await adapter.send('FETCHED'); // RUNNING.fetching → RUNNING.processing + await this.processBatch(batch); + await adapter.send('PROCESSED'); // RUNNING.processing → RUNNING.saving + await this.saveBatch(batch); + await adapter.send('SAVED'); // RUNNING.saving → COMPLETED + } + + // XState automatically: + // - Runs sendCompletionEmail action + // - Transitions to COMPLETED (final state) + // - Persists to DB + } +} +``` + +--- + +## When to Use Each + +### Use Our Simple State Machine (Current Implementation) + +✅ **Simple linear workflows** +``` +INITIALIZING → RUNNING → COMPLETED +``` + +✅ **Distributed processing with queue ordering** +``` +Multiple workers → FIFO queue → Consistent state +``` + +✅ **Basic validation** +``` +Can only pause from RUNNING state +Cannot transition from terminal states +``` + +**Example**: Batch sync process with progress tracking + +--- + +### Add XState Layer When You Need + +✅ **Complex transition logic** +```javascript +on: { + SUBMIT: { + target: 'processing', + cond: (ctx) => ctx.retries < 3 && ctx.hasValidToken, + actions: ['incrementRetries', 'logAttempt'], + }, +} +``` + +✅ **Hierarchical states (sub-states)** +``` +RUNNING + ├── fetching + ├── processing + └── saving +``` + +✅ **Actions on transitions** +```javascript +entry: ['sendEmail', 'logToAnalytics', 'updateDashboard'], +exit: ['cleanup', 'saveCheckpoint'], +``` + +✅ **History states** +```javascript +// Return to previous sub-state when resuming +hist: { + type: 'history', +} +``` + +**Example**: Multi-step approval workflow with notifications, retries, and rollback + +--- + +## Layering Decision Tree + +``` +Do you need complex state logic? +├─ NO → Use our simple state machine +│ ✓ Fast +│ ✓ Simple +│ ✓ DB-backed +│ ✓ Queue-ordered +│ +└─ YES → Add XState layer + ├─ Hierarchical states? → XState + ├─ Actions on transitions? → XState + ├─ Complex guards? → XState + └─ Visual state diagrams? → XState + + Still get: + ✓ DB persistence (our layer) + ✓ Queue ordering (our layer) + ✓ Distributed workers (our layer) +``` + +--- + +## Implementation Strategy + +### Phase 1: Current (Simple State Machine) ✅ + +```javascript +// Already implemented +await commands.process.queueStateUpdate(processId, 'RUNNING'); +``` + +**Good for**: 90% of use cases + +### Phase 2: XState Integration (Future) + +```javascript +// Optional enhancement +const adapter = new XStateProcessAdapter({ processId, commands }); +await adapter.hydrate(process); +await adapter.send('START'); +``` + +**Add when needed**: Complex workflows requiring actions/guards/hierarchical states + +### Phase 3: Hybrid Approach (Best of Both) + +```javascript +// Simple processes: Use direct commands +if (workflow.isSimple) { + await commands.process.queueStateUpdate(processId, 'RUNNING'); +} + +// Complex processes: Use XState +if (workflow.isComplex) { + const adapter = new XStateProcessAdapter({ processId, commands }); + await adapter.send('START'); +} +``` + +--- + +## Summary + +| Aspect | Our State Machine | XState | Layered (Both) | +|--------|------------------|--------|----------------| +| **Persistence** | ✅ DB-backed | ❌ In-memory | ✅ DB-backed | +| **Distribution** | ✅ Multi-worker | ❌ Single-process | ✅ Multi-worker | +| **Queue Ordering** | ✅ FIFO | ❌ None | ✅ FIFO | +| **Rehydration** | ✅ Native | ❌ Manual | ✅ Native | +| **Complex Logic** | ❌ Simple | ✅ Rich | ✅ Rich | +| **Actions** | ❌ None | ✅ Full | ✅ Full | +| **Guards** | ⚠️ Basic | ✅ Complex | ✅ Complex | +| **Hierarchical** | ❌ Flat | ✅ Nested | ✅ Nested | +| **Visualize** | ❌ No | ✅ Yes | ✅ Yes | + +**Best of both worlds**: Use our state machine for persistence/distribution, optionally add XState for complex transition logic. + +--- + +## Recommendation + +1. **Start simple** - Use our current state machine for most processes +2. **Add XState when needed** - Complex workflows, actions, hierarchical states +3. **Keep both layers** - Our layer handles persistence, XState handles logic +4. **Adapter pattern** - XStateProcessAdapter bridges the two worlds + +The beauty is: **They're complementary, not competitive!** +- Our layer: "Where state is stored and how it's synchronized" +- XState layer: "What transitions are valid and what happens during transitions" diff --git a/packages/core/handlers/process-update-handler-example.js b/packages/core/handlers/process-update-handler-example.js new file mode 100644 index 000000000..7d7f3dfc5 --- /dev/null +++ b/packages/core/handlers/process-update-handler-example.js @@ -0,0 +1,152 @@ +/** + * Example Lambda Handler for Process Management Queue + * + * This handler processes messages from the process management FIFO queue. + * Deploy this as a Lambda function with SQS trigger. + * + * Configuration Required: + * - Environment Variables: + * - DB_TYPE: 'mongodb' or 'postgresql' + * - DATABASE_URL: Connection string for database + * - PROCESS_QUEUE_ENABLED: 'true' + * - PROCESS_MANAGEMENT_QUEUE_URL: Queue URL + * + * - IAM Permissions: + * - sqs:ReceiveMessage + * - sqs:DeleteMessage + * - sqs:GetQueueAttributes + * + * - SQS Trigger Configuration: + * - Batch size: 1 (process one message at a time) + * - Maximum batching window: 0 seconds + * - Report batch item failures: Enabled + * + * Usage in serverless.yml: + * ```yaml + * functions: + * processUpdateHandler: + * handler: node_modules/@friggframework/core/handlers/process-update-handler-example.handler + * events: + * - sqs: + * arn: !GetAtt ProcessManagementQueue.Arn + * batchSize: 1 + * maximumBatchingWindowInSeconds: 0 + * functionResponseType: ReportBatchItemFailures + * timeout: 30 + * environment: + * DB_TYPE: ${env:DB_TYPE} + * DATABASE_URL: ${env:DATABASE_URL} + * PROCESS_QUEUE_ENABLED: 'true' + * PROCESS_MANAGEMENT_QUEUE_URL: !Ref ProcessManagementQueue + * ``` + */ + +const { + HandleProcessUpdate, + UpdateProcessState, + UpdateProcessMetrics, +} = require('../integrations/use-cases'); +const { + createProcessRepository, +} = require('../integrations/repositories/process-repository-factory'); + +// Initialize dependencies (singleton pattern for Lambda container reuse) +let handleProcessUpdate; + +/** + * Initializes the HandleProcessUpdate use case with dependencies + * Reuses instances across Lambda invocations for performance + */ +function initializeUseCase() { + if (!handleProcessUpdate) { + console.log('Initializing HandleProcessUpdate use case...'); + + // Create repository + const processRepository = createProcessRepository(); + + // Create use cases + const updateProcessState = new UpdateProcessState({ processRepository }); + const updateProcessMetrics = new UpdateProcessMetrics({ processRepository }); + + // Create handler + handleProcessUpdate = new HandleProcessUpdate({ + updateProcessState, + updateProcessMetrics, + }); + + console.log('HandleProcessUpdate use case initialized'); + } + + return handleProcessUpdate; +} + +/** + * Lambda handler for SQS messages from process management queue + * + * @param {Object} event - Lambda event from SQS + * @param {Array} event.Records - Array of SQS records + * @param {Object} context - Lambda context + * @returns {Promise} Response with batch item failures + */ +exports.handler = async (event, context) => { + console.log('Process Update Handler invoked', { + recordCount: event.Records?.length || 0, + requestId: context.requestId, + }); + + // Initialize use case + const useCase = initializeUseCase(); + + const results = []; + const batchItemFailures = []; + + // Process each SQS record + for (const record of event.Records) { + const messageId = record.messageId; + + try { + console.log(`Processing message ${messageId}...`); + + // Parse and handle the message + await useCase.executeFromSQS(record); + + console.log(`Message ${messageId} processed successfully`); + results.push({ + messageId, + status: 'success', + }); + } catch (error) { + console.error(`Failed to process message ${messageId}:`, { + error: error.message, + stack: error.stack, + }); + + results.push({ + messageId, + status: 'failed', + error: error.message, + }); + + // Add to batch item failures for SQS retry + batchItemFailures.push({ + itemIdentifier: messageId, + }); + } + } + + // Log summary + const successCount = results.filter((r) => r.status === 'success').length; + const failureCount = results.filter((r) => r.status === 'failed').length; + + console.log('Processing complete', { + total: results.length, + success: successCount, + failed: failureCount, + }); + + // Return batch item failures for SQS to retry + // Successfully processed messages will be deleted from queue + return { + batchItemFailures, + }; +}; diff --git a/packages/core/index.js b/packages/core/index.js index 78e689f71..009c21eb7 100644 --- a/packages/core/index.js +++ b/packages/core/index.js @@ -64,6 +64,25 @@ const { const { GetProcess, } = require('./integrations/use-cases/get-process'); +const { + HandleProcessUpdate, +} = require('./integrations/use-cases/handle-process-update'); +const { + ProcessQueueService, +} = require('./integrations/services/process-queue-service'); +const { + ProcessUpdateMessage, + ProcessUpdateOperation, +} = require('./integrations/domain/process-update-message'); +const { + ProcessState, + isValidTransition, + validateTransition, + getValidNextStates, +} = require('./integrations/domain/process-state-machine'); +const { + queueProcessUpdate, +} = require('./integrations/utils/queue-process-update'); const { Cryptor } = require('./encrypt'); const { BaseError, @@ -158,6 +177,19 @@ module.exports = { UpdateProcessState, UpdateProcessMetrics, GetProcess, + HandleProcessUpdate, + + // process management queue + ProcessQueueService, + ProcessUpdateMessage, + ProcessUpdateOperation, + queueProcessUpdate, + + // process state machine + ProcessState, + isValidTransition, + validateTransition, + getValidNextStates, // application - Command factories for integration developers application, @@ -166,6 +198,7 @@ module.exports = { createUserCommands: application.createUserCommands, createEntityCommands: application.createEntityCommands, createCredentialCommands: application.createCredentialCommands, + createProcessCommands: application.createProcessCommands, findIntegrationContextByExternalEntityId: application.findIntegrationContextByExternalEntityId, integrationCommands: application.integrationCommands, diff --git a/packages/core/integrations/domain/process-state-machine.js b/packages/core/integrations/domain/process-state-machine.js new file mode 100644 index 000000000..b624bd7bb --- /dev/null +++ b/packages/core/integrations/domain/process-state-machine.js @@ -0,0 +1,253 @@ +/** + * Process State Machine + * + * Defines valid states, transitions, and guards for process lifecycle management. + * Inspired by state machine patterns but simplified for process tracking. + * + * State Machine Concepts: + * - States: Valid process states (INITIALIZING, RUNNING, etc.) + * - Transitions: Valid state changes (RUNNING -> COMPLETED) + * - Guards: Conditions that must be met for transitions + * - Events: Operations that trigger transitions (queueStateUpdate, queueError, etc.) + * + * Future Enhancement: + * - Could be formalized with XState or similar if needed + * - Could add action callbacks on state transitions + * - Could add history/audit trail of state changes + */ + +/** + * Valid process states + * @enum {string} + */ +const ProcessState = { + // Initial state when process is created + INITIALIZING: 'INITIALIZING', + + // Process is actively running + RUNNING: 'RUNNING', + + // Process is temporarily paused (can be resumed) + PAUSED: 'PAUSED', + + // Process completed successfully + COMPLETED: 'COMPLETED', + + // Process failed with an error + ERROR: 'ERROR', + + // Process was cancelled by user/system + CANCELLED: 'CANCELLED', +}; + +/** + * Valid state transitions + * Maps: fromState -> [toState1, toState2, ...] + * + * This defines which state transitions are allowed. + * Use isValidTransition() to check before transitioning. + */ +const StateTransitions = { + [ProcessState.INITIALIZING]: [ + ProcessState.RUNNING, + ProcessState.ERROR, + ProcessState.CANCELLED, + ], + [ProcessState.RUNNING]: [ + ProcessState.PAUSED, + ProcessState.COMPLETED, + ProcessState.ERROR, + ProcessState.CANCELLED, + ProcessState.RUNNING, // Allow self-transition for progress updates + ], + [ProcessState.PAUSED]: [ + ProcessState.RUNNING, + ProcessState.COMPLETED, + ProcessState.ERROR, + ProcessState.CANCELLED, + ], + [ProcessState.COMPLETED]: [ + // Terminal state - no transitions allowed + ], + [ProcessState.ERROR]: [ + // Terminal state - no transitions allowed + // Could allow ERROR -> RUNNING for retry scenarios in future + ], + [ProcessState.CANCELLED]: [ + // Terminal state - no transitions allowed + ], +}; + +/** + * Terminal states (no further transitions allowed) + */ +const TerminalStates = [ + ProcessState.COMPLETED, + ProcessState.ERROR, + ProcessState.CANCELLED, +]; + +/** + * Checks if a state is terminal (no further transitions) + * @param {string} state - State to check + * @returns {boolean} True if terminal + */ +function isTerminalState(state) { + return TerminalStates.includes(state); +} + +/** + * Checks if a state transition is valid + * @param {string} fromState - Current state + * @param {string} toState - Target state + * @returns {boolean} True if transition is allowed + * + * @example + * isValidTransition('RUNNING', 'COMPLETED') // true + * isValidTransition('COMPLETED', 'RUNNING') // false + */ +function isValidTransition(fromState, toState) { + const allowedTransitions = StateTransitions[fromState]; + return allowedTransitions ? allowedTransitions.includes(toState) : false; +} + +/** + * Gets all valid next states from current state + * @param {string} currentState - Current state + * @returns {string[]} Array of valid next states + * + * @example + * getValidNextStates('RUNNING') + * // ['PAUSED', 'COMPLETED', 'ERROR', 'CANCELLED', 'RUNNING'] + */ +function getValidNextStates(currentState) { + return StateTransitions[currentState] || []; +} + +/** + * Validates a state string + * @param {string} state - State to validate + * @returns {boolean} True if valid state + */ +function isValidState(state) { + return Object.values(ProcessState).includes(state); +} + +/** + * Guards for state transitions + * These are conditions that must be met before allowing a transition. + * Currently simple, but can be extended with complex business logic. + */ +const TransitionGuards = { + /** + * Check if process can transition to COMPLETED + * @param {Object} process - Process object + * @returns {boolean} True if allowed + */ + canComplete(process) { + // Could add business logic here: + // - Check all child processes are complete + // - Verify metrics meet completion criteria + // - etc. + return !isTerminalState(process.state); + }, + + /** + * Check if process can transition to ERROR + * @param {Object} process - Process object + * @returns {boolean} True if allowed + */ + canError(process) { + // Could add logic to prevent errors in certain states + return !isTerminalState(process.state); + }, + + /** + * Check if process can be paused + * @param {Object} process - Process object + * @returns {boolean} True if allowed + */ + canPause(process) { + return process.state === ProcessState.RUNNING; + }, + + /** + * Check if process can be resumed + * @param {Object} process - Process object + * @returns {boolean} True if allowed + */ + canResume(process) { + return process.state === ProcessState.PAUSED; + }, + + /** + * Check if process can be cancelled + * @param {Object} process - Process object + * @returns {boolean} True if allowed + */ + canCancel(process) { + return !isTerminalState(process.state); + }, +}; + +/** + * Validates a state transition with guards + * @param {Object} process - Process object + * @param {string} toState - Target state + * @returns {Object} { valid: boolean, error?: string } + * + * @example + * validateTransition(process, 'COMPLETED') + * // { valid: true } or { valid: false, error: 'Invalid transition' } + */ +function validateTransition(process, toState) { + const fromState = process.state; + + // Check if state is valid + if (!isValidState(toState)) { + return { + valid: false, + error: `Invalid state: ${toState}. Valid states: ${Object.values(ProcessState).join(', ')}`, + }; + } + + // Check if transition is allowed by state machine + if (!isValidTransition(fromState, toState)) { + return { + valid: false, + error: `Invalid transition from ${fromState} to ${toState}. ` + + `Valid next states: ${getValidNextStates(fromState).join(', ')}`, + }; + } + + // Check guards + if (toState === ProcessState.COMPLETED && !TransitionGuards.canComplete(process)) { + return { valid: false, error: 'Process cannot be completed in current state' }; + } + + if (toState === ProcessState.ERROR && !TransitionGuards.canError(process)) { + return { valid: false, error: 'Process cannot transition to ERROR in current state' }; + } + + if (toState === ProcessState.PAUSED && !TransitionGuards.canPause(process)) { + return { valid: false, error: 'Process can only be paused from RUNNING state' }; + } + + if (toState === ProcessState.CANCELLED && !TransitionGuards.canCancel(process)) { + return { valid: false, error: 'Process cannot be cancelled in terminal state' }; + } + + return { valid: true }; +} + +module.exports = { + ProcessState, + StateTransitions, + TerminalStates, + TransitionGuards, + isTerminalState, + isValidTransition, + getValidNextStates, + isValidState, + validateTransition, +}; diff --git a/packages/core/integrations/domain/process-state-machine.test.js b/packages/core/integrations/domain/process-state-machine.test.js new file mode 100644 index 000000000..25ec7a8ee --- /dev/null +++ b/packages/core/integrations/domain/process-state-machine.test.js @@ -0,0 +1,244 @@ +const { + ProcessState, + StateTransitions, + TerminalStates, + isTerminalState, + isValidTransition, + getValidNextStates, + isValidState, + validateTransition, +} = require('./process-state-machine'); + +describe('ProcessState', () => { + it('should define all process states', () => { + expect(ProcessState.INITIALIZING).toBe('INITIALIZING'); + expect(ProcessState.RUNNING).toBe('RUNNING'); + expect(ProcessState.PAUSED).toBe('PAUSED'); + expect(ProcessState.COMPLETED).toBe('COMPLETED'); + expect(ProcessState.ERROR).toBe('ERROR'); + expect(ProcessState.CANCELLED).toBe('CANCELLED'); + }); +}); + +describe('isValidState', () => { + it('should return true for valid states', () => { + expect(isValidState('INITIALIZING')).toBe(true); + expect(isValidState('RUNNING')).toBe(true); + expect(isValidState('COMPLETED')).toBe(true); + }); + + it('should return false for invalid states', () => { + expect(isValidState('INVALID')).toBe(false); + expect(isValidState('')).toBe(false); + expect(isValidState(null)).toBe(false); + }); +}); + +describe('isTerminalState', () => { + it('should identify terminal states', () => { + expect(isTerminalState(ProcessState.COMPLETED)).toBe(true); + expect(isTerminalState(ProcessState.ERROR)).toBe(true); + expect(isTerminalState(ProcessState.CANCELLED)).toBe(true); + }); + + it('should identify non-terminal states', () => { + expect(isTerminalState(ProcessState.INITIALIZING)).toBe(false); + expect(isTerminalState(ProcessState.RUNNING)).toBe(false); + expect(isTerminalState(ProcessState.PAUSED)).toBe(false); + }); +}); + +describe('StateTransitions', () => { + it('should define transitions from INITIALIZING', () => { + expect(StateTransitions.INITIALIZING).toContain('RUNNING'); + expect(StateTransitions.INITIALIZING).toContain('ERROR'); + expect(StateTransitions.INITIALIZING).toContain('CANCELLED'); + }); + + it('should define transitions from RUNNING', () => { + expect(StateTransitions.RUNNING).toContain('PAUSED'); + expect(StateTransitions.RUNNING).toContain('COMPLETED'); + expect(StateTransitions.RUNNING).toContain('ERROR'); + expect(StateTransitions.RUNNING).toContain('CANCELLED'); + expect(StateTransitions.RUNNING).toContain('RUNNING'); // Self-transition + }); + + it('should define transitions from PAUSED', () => { + expect(StateTransitions.PAUSED).toContain('RUNNING'); + expect(StateTransitions.PAUSED).toContain('COMPLETED'); + expect(StateTransitions.PAUSED).toContain('ERROR'); + expect(StateTransitions.PAUSED).toContain('CANCELLED'); + }); + + it('should not allow transitions from terminal states', () => { + expect(StateTransitions.COMPLETED).toEqual([]); + expect(StateTransitions.ERROR).toEqual([]); + expect(StateTransitions.CANCELLED).toEqual([]); + }); +}); + +describe('isValidTransition', () => { + describe('valid transitions', () => { + it('should allow INITIALIZING -> RUNNING', () => { + expect(isValidTransition('INITIALIZING', 'RUNNING')).toBe(true); + }); + + it('should allow RUNNING -> COMPLETED', () => { + expect(isValidTransition('RUNNING', 'COMPLETED')).toBe(true); + }); + + it('should allow RUNNING -> PAUSED', () => { + expect(isValidTransition('RUNNING', 'PAUSED')).toBe(true); + }); + + it('should allow PAUSED -> RUNNING', () => { + expect(isValidTransition('PAUSED', 'RUNNING')).toBe(true); + }); + + it('should allow RUNNING -> ERROR', () => { + expect(isValidTransition('RUNNING', 'ERROR')).toBe(true); + }); + + it('should allow RUNNING -> CANCELLED', () => { + expect(isValidTransition('RUNNING', 'CANCELLED')).toBe(true); + }); + + it('should allow RUNNING -> RUNNING (self-transition)', () => { + expect(isValidTransition('RUNNING', 'RUNNING')).toBe(true); + }); + }); + + describe('invalid transitions', () => { + it('should not allow COMPLETED -> RUNNING', () => { + expect(isValidTransition('COMPLETED', 'RUNNING')).toBe(false); + }); + + it('should not allow ERROR -> RUNNING', () => { + expect(isValidTransition('ERROR', 'RUNNING')).toBe(false); + }); + + it('should not allow CANCELLED -> RUNNING', () => { + expect(isValidTransition('CANCELLED', 'RUNNING')).toBe(false); + }); + + it('should not allow INITIALIZING -> PAUSED', () => { + expect(isValidTransition('INITIALIZING', 'PAUSED')).toBe(false); + }); + + it('should not allow INITIALIZING -> COMPLETED', () => { + expect(isValidTransition('INITIALIZING', 'COMPLETED')).toBe(false); + }); + }); +}); + +describe('getValidNextStates', () => { + it('should return valid next states from INITIALIZING', () => { + const nextStates = getValidNextStates('INITIALIZING'); + expect(nextStates).toContain('RUNNING'); + expect(nextStates).toContain('ERROR'); + expect(nextStates).toContain('CANCELLED'); + expect(nextStates.length).toBe(3); + }); + + it('should return valid next states from RUNNING', () => { + const nextStates = getValidNextStates('RUNNING'); + expect(nextStates).toContain('PAUSED'); + expect(nextStates).toContain('COMPLETED'); + expect(nextStates).toContain('ERROR'); + expect(nextStates).toContain('CANCELLED'); + expect(nextStates).toContain('RUNNING'); + expect(nextStates.length).toBe(5); + }); + + it('should return empty array for terminal states', () => { + expect(getValidNextStates('COMPLETED')).toEqual([]); + expect(getValidNextStates('ERROR')).toEqual([]); + expect(getValidNextStates('CANCELLED')).toEqual([]); + }); + + it('should return empty array for invalid states', () => { + expect(getValidNextStates('INVALID')).toEqual([]); + }); +}); + +describe('validateTransition', () => { + describe('valid transitions', () => { + it('should validate RUNNING -> COMPLETED', () => { + const process = { state: 'RUNNING' }; + const result = validateTransition(process, 'COMPLETED'); + expect(result.valid).toBe(true); + expect(result.error).toBeUndefined(); + }); + + it('should validate RUNNING -> PAUSED', () => { + const process = { state: 'RUNNING' }; + const result = validateTransition(process, 'PAUSED'); + expect(result.valid).toBe(true); + }); + + it('should validate PAUSED -> RUNNING', () => { + const process = { state: 'PAUSED' }; + const result = validateTransition(process, 'RUNNING'); + expect(result.valid).toBe(true); + }); + }); + + describe('invalid state', () => { + it('should reject invalid target state', () => { + const process = { state: 'RUNNING' }; + const result = validateTransition(process, 'INVALID_STATE'); + expect(result.valid).toBe(false); + expect(result.error).toContain('Invalid state'); + }); + }); + + describe('invalid transitions', () => { + it('should reject COMPLETED -> RUNNING', () => { + const process = { state: 'COMPLETED' }; + const result = validateTransition(process, 'RUNNING'); + expect(result.valid).toBe(false); + expect(result.error).toContain('Invalid transition'); + }); + + it('should reject INITIALIZING -> PAUSED', () => { + const process = { state: 'INITIALIZING' }; + const result = validateTransition(process, 'PAUSED'); + expect(result.valid).toBe(false); + expect(result.error).toContain('Invalid transition'); + }); + }); + + describe('guard validation', () => { + it('should enforce pause guard (only from RUNNING)', () => { + const process = { state: 'INITIALIZING' }; + const result = validateTransition(process, 'PAUSED'); + expect(result.valid).toBe(false); + }); + + it('should allow pause from RUNNING', () => { + const process = { state: 'RUNNING' }; + const result = validateTransition(process, 'PAUSED'); + expect(result.valid).toBe(true); + }); + + it('should not allow cancel from terminal state', () => { + const process = { state: 'COMPLETED' }; + const result = validateTransition(process, 'CANCELLED'); + expect(result.valid).toBe(false); + }); + }); + + describe('error messages', () => { + it('should provide helpful error for invalid state', () => { + const process = { state: 'RUNNING' }; + const result = validateTransition(process, 'INVALID'); + expect(result.error).toContain('Valid states:'); + }); + + it('should provide helpful error for invalid transition', () => { + const process = { state: 'COMPLETED' }; + const result = validateTransition(process, 'RUNNING'); + expect(result.error).toContain('Valid next states:'); + }); + }); +}); diff --git a/packages/core/integrations/domain/process-update-message.js b/packages/core/integrations/domain/process-update-message.js new file mode 100644 index 000000000..2ae2ecc32 --- /dev/null +++ b/packages/core/integrations/domain/process-update-message.js @@ -0,0 +1,161 @@ +/** + * Enumeration of valid process update operations + * @enum {string} + */ +const ProcessUpdateOperation = { + UPDATE_STATE: 'UPDATE_STATE', + UPDATE_METRICS: 'UPDATE_METRICS', + COMPLETE_PROCESS: 'COMPLETE_PROCESS', + HANDLE_ERROR: 'HANDLE_ERROR', + + /** + * List of all valid operation types + */ + VALID_OPERATIONS: ['UPDATE_STATE', 'UPDATE_METRICS', 'COMPLETE_PROCESS', 'HANDLE_ERROR'], + + /** + * Validates if an operation type is valid + * @param {string} operation - Operation to validate + * @returns {boolean} True if valid + */ + isValid(operation) { + return this.VALID_OPERATIONS.includes(operation); + }, +}; + +/** + * Value Object representing a process update message + * Immutable structure for queue messages + * + * Domain-Driven Design: Value Object + * - Immutable + * - Validates invariants in constructor + * - Equality based on values + */ +class ProcessUpdateMessage { + /** + * Creates a new ProcessUpdateMessage + * @param {Object} params - Message parameters + * @param {string} params.processId - Process ID to update + * @param {string} params.operation - Operation type (from ProcessUpdateOperation) + * @param {Object} params.data - Operation-specific data + * @param {Date} [params.timestamp] - Message timestamp (defaults to now) + * @throws {Error} If validation fails + */ + constructor({ processId, operation, data, timestamp } = {}) { + // Validate processId + if (processId === undefined || processId === null) { + throw new Error('processId is required'); + } + if (typeof processId !== 'string') { + throw new Error('processId must be a string'); + } + if (processId.trim() === '') { + throw new Error('processId cannot be empty'); + } + + // Validate operation + if (operation === undefined || operation === null) { + throw new Error('operation is required'); + } + if (!ProcessUpdateOperation.isValid(operation)) { + throw new Error( + `Invalid operation type: ${operation}. ` + + `Valid operations: ${ProcessUpdateOperation.VALID_OPERATIONS.join(', ')}` + ); + } + + // Validate data + if (data === undefined || data === null) { + throw new Error('data is required'); + } + if (typeof data !== 'object' || Array.isArray(data)) { + throw new Error('data must be an object'); + } + + // Validate timestamp if provided + const messageTimestamp = timestamp || new Date(); + if (!(messageTimestamp instanceof Date)) { + throw new Error('timestamp must be a Date object'); + } + + // Set immutable properties + Object.defineProperties(this, { + processId: { + value: processId, + writable: false, + enumerable: true, + }, + operation: { + value: operation, + writable: false, + enumerable: true, + }, + data: { + value: Object.freeze({ ...data }), // Deep freeze would be better for nested objects + writable: false, + enumerable: true, + }, + timestamp: { + value: messageTimestamp, + writable: false, + enumerable: true, + }, + }); + } + + /** + * Serializes the message to JSON format + * @returns {Object} JSON representation + */ + toJSON() { + return { + processId: this.processId, + operation: this.operation, + data: this.data, + timestamp: this.timestamp.toISOString(), + }; + } + + /** + * Deserializes a message from JSON format + * @param {Object|string} json - JSON object or string + * @returns {ProcessUpdateMessage} Deserialized message + * @throws {Error} If JSON is invalid + */ + static fromJSON(json) { + const obj = typeof json === 'string' ? JSON.parse(json) : json; + + return new ProcessUpdateMessage({ + processId: obj.processId, + operation: obj.operation, + data: obj.data, + timestamp: new Date(obj.timestamp), + }); + } + + /** + * Gets the MessageGroupId for FIFO queue + * Ensures messages for the same process are processed in order + * @returns {string} MessageGroupId + */ + getMessageGroupId() { + return `process-${this.processId}`; + } + + /** + * Gets a unique MessageDeduplicationId for FIFO queue + * Prevents duplicate message processing + * @returns {string} MessageDeduplicationId + */ + getMessageDeduplicationId() { + // Use processId + operation + timestamp to ensure uniqueness + // This allows multiple operations on same process but prevents exact duplicates + return `${this.processId}-${this.operation}-${this.timestamp.getTime()}`; + } +} + +module.exports = { + ProcessUpdateMessage, + ProcessUpdateOperation, +}; diff --git a/packages/core/integrations/domain/process-update-message.test.js b/packages/core/integrations/domain/process-update-message.test.js new file mode 100644 index 000000000..9b348d4d0 --- /dev/null +++ b/packages/core/integrations/domain/process-update-message.test.js @@ -0,0 +1,376 @@ +const { ProcessUpdateMessage, ProcessUpdateOperation } = require('./process-update-message'); + +describe('ProcessUpdateOperation', () => { + it('should define all operation types', () => { + expect(ProcessUpdateOperation.UPDATE_STATE).toBe('UPDATE_STATE'); + expect(ProcessUpdateOperation.UPDATE_METRICS).toBe('UPDATE_METRICS'); + expect(ProcessUpdateOperation.COMPLETE_PROCESS).toBe('COMPLETE_PROCESS'); + expect(ProcessUpdateOperation.HANDLE_ERROR).toBe('HANDLE_ERROR'); + }); + + it('should have a list of valid operations', () => { + expect(ProcessUpdateOperation.VALID_OPERATIONS).toEqual([ + 'UPDATE_STATE', + 'UPDATE_METRICS', + 'COMPLETE_PROCESS', + 'HANDLE_ERROR', + ]); + }); + + it('should validate operation types', () => { + expect(ProcessUpdateOperation.isValid('UPDATE_STATE')).toBe(true); + expect(ProcessUpdateOperation.isValid('UPDATE_METRICS')).toBe(true); + expect(ProcessUpdateOperation.isValid('COMPLETE_PROCESS')).toBe(true); + expect(ProcessUpdateOperation.isValid('HANDLE_ERROR')).toBe(true); + expect(ProcessUpdateOperation.isValid('INVALID_OP')).toBe(false); + expect(ProcessUpdateOperation.isValid('')).toBe(false); + expect(ProcessUpdateOperation.isValid(null)).toBe(false); + }); +}); + +describe('ProcessUpdateMessage', () => { + describe('constructor', () => { + it('should create message with required fields', () => { + const message = new ProcessUpdateMessage({ + processId: 'proc-123', + operation: ProcessUpdateOperation.UPDATE_STATE, + data: { state: 'RUNNING' }, + }); + + expect(message.processId).toBe('proc-123'); + expect(message.operation).toBe('UPDATE_STATE'); + expect(message.data).toEqual({ state: 'RUNNING' }); + expect(message.timestamp).toBeInstanceOf(Date); + }); + + it('should create message with custom timestamp', () => { + const customTime = new Date('2024-01-15T10:00:00Z'); + const message = new ProcessUpdateMessage({ + processId: 'proc-123', + operation: ProcessUpdateOperation.UPDATE_STATE, + data: { state: 'RUNNING' }, + timestamp: customTime, + }); + + expect(message.timestamp).toEqual(customTime); + }); + + it('should throw if processId is missing', () => { + expect(() => new ProcessUpdateMessage({ + operation: ProcessUpdateOperation.UPDATE_STATE, + data: {}, + })).toThrow('processId is required'); + }); + + it('should throw if processId is not a string', () => { + expect(() => new ProcessUpdateMessage({ + processId: 123, + operation: ProcessUpdateOperation.UPDATE_STATE, + data: {}, + })).toThrow('processId must be a string'); + }); + + it('should throw if processId is empty string', () => { + expect(() => new ProcessUpdateMessage({ + processId: '', + operation: ProcessUpdateOperation.UPDATE_STATE, + data: {}, + })).toThrow('processId cannot be empty'); + }); + + it('should throw if operation is missing', () => { + expect(() => new ProcessUpdateMessage({ + processId: 'proc-123', + data: {}, + })).toThrow('operation is required'); + }); + + it('should throw if operation is invalid', () => { + expect(() => new ProcessUpdateMessage({ + processId: 'proc-123', + operation: 'INVALID_OP', + data: {}, + })).toThrow('Invalid operation type: INVALID_OP'); + }); + + it('should throw if data is missing', () => { + expect(() => new ProcessUpdateMessage({ + processId: 'proc-123', + operation: ProcessUpdateOperation.UPDATE_STATE, + })).toThrow('data is required'); + }); + + it('should throw if data is not an object', () => { + expect(() => new ProcessUpdateMessage({ + processId: 'proc-123', + operation: ProcessUpdateOperation.UPDATE_STATE, + data: 'invalid', + })).toThrow('data must be an object'); + }); + + it('should allow empty data object', () => { + const message = new ProcessUpdateMessage({ + processId: 'proc-123', + operation: ProcessUpdateOperation.COMPLETE_PROCESS, + data: {}, + }); + + expect(message.data).toEqual({}); + }); + + it('should throw if timestamp is not a Date', () => { + expect(() => new ProcessUpdateMessage({ + processId: 'proc-123', + operation: ProcessUpdateOperation.UPDATE_STATE, + data: {}, + timestamp: 'not-a-date', + })).toThrow('timestamp must be a Date object'); + }); + }); + + describe('toJSON', () => { + it('should serialize to JSON format', () => { + const timestamp = new Date('2024-01-15T10:00:00Z'); + const message = new ProcessUpdateMessage({ + processId: 'proc-123', + operation: ProcessUpdateOperation.UPDATE_STATE, + data: { state: 'RUNNING', context: { step: 1 } }, + timestamp, + }); + + const json = message.toJSON(); + + expect(json).toEqual({ + processId: 'proc-123', + operation: 'UPDATE_STATE', + data: { state: 'RUNNING', context: { step: 1 } }, + timestamp: '2024-01-15T10:00:00.000Z', + }); + }); + + it('should be JSON.stringify compatible', () => { + const message = new ProcessUpdateMessage({ + processId: 'proc-123', + operation: ProcessUpdateOperation.UPDATE_METRICS, + data: { totalProcessed: 100 }, + }); + + const jsonString = JSON.stringify(message); + const parsed = JSON.parse(jsonString); + + expect(parsed.processId).toBe('proc-123'); + expect(parsed.operation).toBe('UPDATE_METRICS'); + expect(parsed.data).toEqual({ totalProcessed: 100 }); + expect(typeof parsed.timestamp).toBe('string'); + }); + }); + + describe('fromJSON', () => { + it('should deserialize from JSON format', () => { + const json = { + processId: 'proc-123', + operation: 'UPDATE_STATE', + data: { state: 'RUNNING' }, + timestamp: '2024-01-15T10:00:00.000Z', + }; + + const message = ProcessUpdateMessage.fromJSON(json); + + expect(message).toBeInstanceOf(ProcessUpdateMessage); + expect(message.processId).toBe('proc-123'); + expect(message.operation).toBe('UPDATE_STATE'); + expect(message.data).toEqual({ state: 'RUNNING' }); + expect(message.timestamp).toEqual(new Date('2024-01-15T10:00:00.000Z')); + }); + + it('should deserialize from JSON string', () => { + const jsonString = JSON.stringify({ + processId: 'proc-456', + operation: 'UPDATE_METRICS', + data: { totalProcessed: 50 }, + timestamp: '2024-01-15T11:00:00.000Z', + }); + + const message = ProcessUpdateMessage.fromJSON(jsonString); + + expect(message.processId).toBe('proc-456'); + expect(message.operation).toBe('UPDATE_METRICS'); + }); + + it('should throw if JSON is invalid', () => { + expect(() => ProcessUpdateMessage.fromJSON('invalid-json')) + .toThrow(); + }); + + it('should throw if deserialized data is invalid', () => { + const json = { + processId: 123, // Invalid type + operation: 'UPDATE_STATE', + data: {}, + timestamp: '2024-01-15T10:00:00.000Z', + }; + + expect(() => ProcessUpdateMessage.fromJSON(json)) + .toThrow('processId must be a string'); + }); + }); + + describe('getMessageGroupId', () => { + it('should return MessageGroupId for FIFO queue', () => { + const message = new ProcessUpdateMessage({ + processId: 'proc-123', + operation: ProcessUpdateOperation.UPDATE_STATE, + data: {}, + }); + + expect(message.getMessageGroupId()).toBe('process-proc-123'); + }); + + it('should use processId in MessageGroupId', () => { + const message1 = new ProcessUpdateMessage({ + processId: 'proc-abc', + operation: ProcessUpdateOperation.UPDATE_STATE, + data: {}, + }); + + const message2 = new ProcessUpdateMessage({ + processId: 'proc-xyz', + operation: ProcessUpdateOperation.UPDATE_STATE, + data: {}, + }); + + expect(message1.getMessageGroupId()).toBe('process-proc-abc'); + expect(message2.getMessageGroupId()).toBe('process-proc-xyz'); + }); + }); + + describe('getMessageDeduplicationId', () => { + it('should return unique MessageDeduplicationId', () => { + const message = new ProcessUpdateMessage({ + processId: 'proc-123', + operation: ProcessUpdateOperation.UPDATE_STATE, + data: {}, + }); + + const dedupId = message.getMessageDeduplicationId(); + + expect(dedupId).toContain('proc-123'); + expect(dedupId).toContain('UPDATE_STATE'); + expect(typeof dedupId).toBe('string'); + }); + + it('should generate different IDs for same process with different operations', () => { + const timestamp = new Date('2024-01-15T10:00:00.000Z'); + + const message1 = new ProcessUpdateMessage({ + processId: 'proc-123', + operation: ProcessUpdateOperation.UPDATE_STATE, + data: {}, + timestamp, + }); + + const message2 = new ProcessUpdateMessage({ + processId: 'proc-123', + operation: ProcessUpdateOperation.UPDATE_METRICS, + data: {}, + timestamp, + }); + + const dedupId1 = message1.getMessageDeduplicationId(); + const dedupId2 = message2.getMessageDeduplicationId(); + + expect(dedupId1).not.toBe(dedupId2); + }); + + it('should generate different IDs for same process at different times', () => { + const message1 = new ProcessUpdateMessage({ + processId: 'proc-123', + operation: ProcessUpdateOperation.UPDATE_STATE, + data: {}, + timestamp: new Date('2024-01-15T10:00:00.000Z'), + }); + + const message2 = new ProcessUpdateMessage({ + processId: 'proc-123', + operation: ProcessUpdateOperation.UPDATE_STATE, + data: {}, + timestamp: new Date('2024-01-15T10:00:01.000Z'), + }); + + expect(message1.getMessageDeduplicationId()) + .not.toBe(message2.getMessageDeduplicationId()); + }); + }); + + describe('specific operation types', () => { + describe('UPDATE_STATE', () => { + it('should create UPDATE_STATE message', () => { + const message = new ProcessUpdateMessage({ + processId: 'proc-123', + operation: ProcessUpdateOperation.UPDATE_STATE, + data: { + state: 'RUNNING', + contextUpdates: { step: 2, batchId: 'batch-1' }, + }, + }); + + expect(message.operation).toBe('UPDATE_STATE'); + expect(message.data.state).toBe('RUNNING'); + expect(message.data.contextUpdates).toEqual({ step: 2, batchId: 'batch-1' }); + }); + }); + + describe('UPDATE_METRICS', () => { + it('should create UPDATE_METRICS message', () => { + const message = new ProcessUpdateMessage({ + processId: 'proc-123', + operation: ProcessUpdateOperation.UPDATE_METRICS, + data: { + metricsUpdate: { + totalProcessed: 100, + totalFailed: 2, + }, + }, + }); + + expect(message.operation).toBe('UPDATE_METRICS'); + expect(message.data.metricsUpdate).toEqual({ + totalProcessed: 100, + totalFailed: 2, + }); + }); + }); + + describe('COMPLETE_PROCESS', () => { + it('should create COMPLETE_PROCESS message', () => { + const message = new ProcessUpdateMessage({ + processId: 'proc-123', + operation: ProcessUpdateOperation.COMPLETE_PROCESS, + data: {}, + }); + + expect(message.operation).toBe('COMPLETE_PROCESS'); + }); + }); + + describe('HANDLE_ERROR', () => { + it('should create HANDLE_ERROR message', () => { + const error = new Error('Test error'); + const message = new ProcessUpdateMessage({ + processId: 'proc-123', + operation: ProcessUpdateOperation.HANDLE_ERROR, + data: { + error: { + message: error.message, + stack: error.stack, + }, + }, + }); + + expect(message.operation).toBe('HANDLE_ERROR'); + expect(message.data.error.message).toBe('Test error'); + expect(message.data.error.stack).toBeDefined(); + }); + }); + }); +}); diff --git a/packages/core/integrations/services/process-queue-service.js b/packages/core/integrations/services/process-queue-service.js new file mode 100644 index 000000000..c124f6a4b --- /dev/null +++ b/packages/core/integrations/services/process-queue-service.js @@ -0,0 +1,177 @@ +const { SQSClient, SendMessageCommand } = require('@aws-sdk/client-sqs'); +const { ProcessUpdateMessage, ProcessUpdateOperation } = require('../domain/process-update-message'); + +/** + * Gets AWS configuration based on environment + * @returns {Object} AWS SDK configuration + */ +const getAWSConfig = () => { + const config = {}; + if (process.env.IS_OFFLINE) { + console.log('ProcessQueueService: Running in offline mode'); + config.credentials = { + accessKeyId: 'test-aws-key', + secretAccessKey: 'test-aws-secret', + }; + config.region = 'us-east-1'; + } + if (process.env.AWS_ENDPOINT) { + config.endpoint = process.env.AWS_ENDPOINT; + } + return config; +}; + +/** + * Domain Service for managing process updates via FIFO queue + * + * Hexagonal Architecture: Domain Service (Port) + * - Orchestrates process update operations + * - Uses SQS adapter (infrastructure) + * - Prevents race conditions via FIFO queue ordering + * + * Key Features: + * - Single FIFO queue for all processes + * - MessageGroupId ensures ordered processing per process + * - MessageDeduplicationId prevents duplicate messages + */ +class ProcessQueueService { + /** + * Creates a new ProcessQueueService + * @param {Object} params - Service parameters + * @param {string} params.queueUrl - FIFO queue URL + * @throws {Error} If queueUrl is invalid + */ + constructor({ queueUrl } = {}) { + // Validate queueUrl + if (!queueUrl) { + throw new Error('queueUrl is required'); + } + if (typeof queueUrl !== 'string') { + throw new Error('queueUrl must be a string'); + } + + this.queueUrl = queueUrl; + this.sqsClient = new SQSClient(getAWSConfig()); + } + + /** + * Sends a ProcessUpdateMessage to the queue + * @param {ProcessUpdateMessage} message - Message to send + * @returns {Promise} SQS response + * @throws {Error} If message is invalid or send fails + */ + async sendMessage(message) { + // Validate message type + if (!(message instanceof ProcessUpdateMessage)) { + throw new Error('message must be an instance of ProcessUpdateMessage'); + } + + try { + const command = new SendMessageCommand({ + QueueUrl: this.queueUrl, + MessageBody: JSON.stringify(message.toJSON()), + MessageGroupId: message.getMessageGroupId(), + MessageDeduplicationId: message.getMessageDeduplicationId(), + }); + + console.log( + `ProcessQueueService: Sending ${message.operation} for process ${message.processId}` + ); + + return await this.sqsClient.send(command); + } catch (error) { + throw new Error(`Failed to send message to queue: ${error.message}`); + } + } + + /** + * Queues a process state update + * @param {string} processId - Process ID + * @param {string} state - New state + * @param {Object} [contextUpdates={}] - Context updates + * @returns {Promise} SQS response + */ + async queueStateUpdate(processId, state, contextUpdates = {}) { + // Validate inputs + if (!state || typeof state !== 'string') { + throw new Error('state is required'); + } + + const message = new ProcessUpdateMessage({ + processId, + operation: ProcessUpdateOperation.UPDATE_STATE, + data: { + state, + contextUpdates: contextUpdates || {}, + }, + }); + + return this.sendMessage(message); + } + + /** + * Queues a process metrics update + * @param {string} processId - Process ID + * @param {Object} metricsUpdate - Metrics to update + * @returns {Promise} SQS response + */ + async queueMetricsUpdate(processId, metricsUpdate) { + // Validate inputs + if (!metricsUpdate || typeof metricsUpdate !== 'object') { + throw new Error('metricsUpdate is required'); + } + + const message = new ProcessUpdateMessage({ + processId, + operation: ProcessUpdateOperation.UPDATE_METRICS, + data: { + metricsUpdate, + }, + }); + + return this.sendMessage(message); + } + + /** + * Queues process completion + * @param {string} processId - Process ID + * @returns {Promise} SQS response + */ + async queueProcessCompletion(processId) { + const message = new ProcessUpdateMessage({ + processId, + operation: ProcessUpdateOperation.COMPLETE_PROCESS, + data: {}, + }); + + return this.sendMessage(message); + } + + /** + * Queues error handling + * @param {string} processId - Process ID + * @param {Error} error - Error object + * @returns {Promise} SQS response + */ + async queueErrorHandling(processId, error) { + // Validate error + if (!(error instanceof Error)) { + throw new Error('error must be an Error object'); + } + + const message = new ProcessUpdateMessage({ + processId, + operation: ProcessUpdateOperation.HANDLE_ERROR, + data: { + error: { + message: error.message, + stack: error.stack, + }, + }, + }); + + return this.sendMessage(message); + } +} + +module.exports = { ProcessQueueService }; diff --git a/packages/core/integrations/services/process-queue-service.test.js b/packages/core/integrations/services/process-queue-service.test.js new file mode 100644 index 000000000..4c584bfe3 --- /dev/null +++ b/packages/core/integrations/services/process-queue-service.test.js @@ -0,0 +1,297 @@ +const { ProcessQueueService } = require('./process-queue-service'); +const { ProcessUpdateMessage, ProcessUpdateOperation } = require('../domain/process-update-message'); + +// Mock the SQS client +jest.mock('@aws-sdk/client-sqs'); +const { SQSClient, SendMessageCommand } = require('@aws-sdk/client-sqs'); + +describe('ProcessQueueService', () => { + let service; + let mockSQSClient; + const testQueueUrl = 'https://sqs.us-east-1.amazonaws.com/123456789/process-management.fifo'; + + beforeEach(() => { + // Clear all mocks + jest.clearAllMocks(); + + // Create mock SQS client + mockSQSClient = { + send: jest.fn().mockResolvedValue({ MessageId: 'test-message-id' }), + }; + SQSClient.mockImplementation(() => mockSQSClient); + + // Create service + service = new ProcessQueueService({ queueUrl: testQueueUrl }); + }); + + describe('constructor', () => { + it('should create service with queueUrl', () => { + expect(service.queueUrl).toBe(testQueueUrl); + expect(service.sqsClient).toBeDefined(); + }); + + it('should throw if queueUrl is missing', () => { + expect(() => new ProcessQueueService({})) + .toThrow('queueUrl is required'); + }); + + it('should throw if queueUrl is not a string', () => { + expect(() => new ProcessQueueService({ queueUrl: 123 })) + .toThrow('queueUrl must be a string'); + }); + + it('should use environment-based configuration in offline mode', () => { + process.env.IS_OFFLINE = 'true'; + process.env.AWS_ENDPOINT = 'http://localhost:4566'; + + new ProcessQueueService({ queueUrl: testQueueUrl }); + + expect(SQSClient).toHaveBeenCalledWith( + expect.objectContaining({ + credentials: expect.objectContaining({ + accessKeyId: 'test-aws-key', + secretAccessKey: 'test-aws-secret', + }), + region: 'us-east-1', + endpoint: 'http://localhost:4566', + }) + ); + + delete process.env.IS_OFFLINE; + delete process.env.AWS_ENDPOINT; + }); + }); + + describe('sendMessage', () => { + it('should send ProcessUpdateMessage to queue', async () => { + const message = new ProcessUpdateMessage({ + processId: 'proc-123', + operation: ProcessUpdateOperation.UPDATE_STATE, + data: { state: 'RUNNING' }, + }); + + await service.sendMessage(message); + + expect(mockSQSClient.send).toHaveBeenCalledTimes(1); + const command = mockSQSClient.send.mock.calls[0][0]; + expect(command).toBeInstanceOf(SendMessageCommand); + expect(command.input).toMatchObject({ + QueueUrl: testQueueUrl, + MessageGroupId: 'process-proc-123', + }); + + const body = JSON.parse(command.input.MessageBody); + expect(body.processId).toBe('proc-123'); + expect(body.operation).toBe('UPDATE_STATE'); + }); + + it('should throw if message is not ProcessUpdateMessage', async () => { + await expect(service.sendMessage({ invalid: 'message' })) + .rejects.toThrow('message must be an instance of ProcessUpdateMessage'); + }); + + it('should include MessageDeduplicationId for FIFO queue', async () => { + const message = new ProcessUpdateMessage({ + processId: 'proc-123', + operation: ProcessUpdateOperation.UPDATE_STATE, + data: { state: 'RUNNING' }, + }); + + await service.sendMessage(message); + + const command = mockSQSClient.send.mock.calls[0][0]; + expect(command.input.MessageDeduplicationId).toBeDefined(); + expect(command.input.MessageDeduplicationId).toContain('proc-123'); + expect(command.input.MessageDeduplicationId).toContain('UPDATE_STATE'); + }); + + it('should handle SQS send errors', async () => { + mockSQSClient.send.mockRejectedValue(new Error('SQS error')); + + const message = new ProcessUpdateMessage({ + processId: 'proc-123', + operation: ProcessUpdateOperation.UPDATE_STATE, + data: { state: 'RUNNING' }, + }); + + await expect(service.sendMessage(message)) + .rejects.toThrow('Failed to send message to queue: SQS error'); + }); + }); + + describe('queueStateUpdate', () => { + it('should queue state update message', async () => { + await service.queueStateUpdate('proc-123', 'RUNNING', { step: 1 }); + + expect(mockSQSClient.send).toHaveBeenCalledTimes(1); + const command = mockSQSClient.send.mock.calls[0][0]; + const body = JSON.parse(command.input.MessageBody); + + expect(body.processId).toBe('proc-123'); + expect(body.operation).toBe('UPDATE_STATE'); + expect(body.data).toEqual({ + state: 'RUNNING', + contextUpdates: { step: 1 }, + }); + }); + + it('should queue state update without context updates', async () => { + await service.queueStateUpdate('proc-123', 'RUNNING'); + + const command = mockSQSClient.send.mock.calls[0][0]; + const body = JSON.parse(command.input.MessageBody); + + expect(body.data).toEqual({ + state: 'RUNNING', + contextUpdates: {}, + }); + }); + + it('should validate processId', async () => { + await expect(service.queueStateUpdate('', 'RUNNING')) + .rejects.toThrow(); + }); + + it('should validate state', async () => { + await expect(service.queueStateUpdate('proc-123', '')) + .rejects.toThrow('state is required'); + }); + }); + + describe('queueMetricsUpdate', () => { + it('should queue metrics update message', async () => { + const metricsUpdate = { + totalProcessed: 100, + totalFailed: 2, + }; + + await service.queueMetricsUpdate('proc-123', metricsUpdate); + + expect(mockSQSClient.send).toHaveBeenCalledTimes(1); + const command = mockSQSClient.send.mock.calls[0][0]; + const body = JSON.parse(command.input.MessageBody); + + expect(body.processId).toBe('proc-123'); + expect(body.operation).toBe('UPDATE_METRICS'); + expect(body.data).toEqual({ + metricsUpdate, + }); + }); + + it('should validate processId', async () => { + await expect(service.queueMetricsUpdate('', {})) + .rejects.toThrow(); + }); + + it('should validate metricsUpdate is an object', async () => { + await expect(service.queueMetricsUpdate('proc-123', null)) + .rejects.toThrow('metricsUpdate is required'); + }); + }); + + describe('queueProcessCompletion', () => { + it('should queue process completion message', async () => { + await service.queueProcessCompletion('proc-123'); + + expect(mockSQSClient.send).toHaveBeenCalledTimes(1); + const command = mockSQSClient.send.mock.calls[0][0]; + const body = JSON.parse(command.input.MessageBody); + + expect(body.processId).toBe('proc-123'); + expect(body.operation).toBe('COMPLETE_PROCESS'); + expect(body.data).toEqual({}); + }); + + it('should validate processId', async () => { + await expect(service.queueProcessCompletion('')) + .rejects.toThrow(); + }); + }); + + describe('queueErrorHandling', () => { + it('should queue error handling message', async () => { + const error = new Error('Test error'); + error.stack = 'Error stack trace'; + + await service.queueErrorHandling('proc-123', error); + + expect(mockSQSClient.send).toHaveBeenCalledTimes(1); + const command = mockSQSClient.send.mock.calls[0][0]; + const body = JSON.parse(command.input.MessageBody); + + expect(body.processId).toBe('proc-123'); + expect(body.operation).toBe('HANDLE_ERROR'); + expect(body.data).toEqual({ + error: { + message: 'Test error', + stack: 'Error stack trace', + }, + }); + }); + + it('should handle errors without stack trace', async () => { + const error = new Error('Test error'); + delete error.stack; + + await service.queueErrorHandling('proc-123', error); + + const command = mockSQSClient.send.mock.calls[0][0]; + const body = JSON.parse(command.input.MessageBody); + + expect(body.data.error.message).toBe('Test error'); + expect(body.data.error.stack).toBeUndefined(); + }); + + it('should validate processId', async () => { + const error = new Error('Test error'); + await expect(service.queueErrorHandling('', error)) + .rejects.toThrow(); + }); + + it('should validate error is an Error object', async () => { + await expect(service.queueErrorHandling('proc-123', 'not an error')) + .rejects.toThrow('error must be an Error object'); + }); + }); + + describe('integration with different processes', () => { + it('should use same MessageGroupId for same process', async () => { + const processId = 'proc-same'; + + await service.queueStateUpdate(processId, 'RUNNING'); + await service.queueMetricsUpdate(processId, { count: 1 }); + + expect(mockSQSClient.send).toHaveBeenCalledTimes(2); + + const command1 = mockSQSClient.send.mock.calls[0][0]; + const command2 = mockSQSClient.send.mock.calls[1][0]; + + expect(command1.input.MessageGroupId).toBe('process-proc-same'); + expect(command2.input.MessageGroupId).toBe('process-proc-same'); + }); + + it('should use different MessageGroupId for different processes', async () => { + await service.queueStateUpdate('proc-1', 'RUNNING'); + await service.queueStateUpdate('proc-2', 'RUNNING'); + + const command1 = mockSQSClient.send.mock.calls[0][0]; + const command2 = mockSQSClient.send.mock.calls[1][0]; + + expect(command1.input.MessageGroupId).toBe('process-proc-1'); + expect(command2.input.MessageGroupId).toBe('process-proc-2'); + }); + + it('should use different MessageDeduplicationId for different operations', async () => { + const processId = 'proc-123'; + + await service.queueStateUpdate(processId, 'RUNNING'); + await service.queueMetricsUpdate(processId, { count: 1 }); + + const command1 = mockSQSClient.send.mock.calls[0][0]; + const command2 = mockSQSClient.send.mock.calls[1][0]; + + expect(command1.input.MessageDeduplicationId) + .not.toBe(command2.input.MessageDeduplicationId); + }); + }); +}); diff --git a/packages/core/integrations/use-cases/handle-process-update.js b/packages/core/integrations/use-cases/handle-process-update.js new file mode 100644 index 000000000..c1a581e5c --- /dev/null +++ b/packages/core/integrations/use-cases/handle-process-update.js @@ -0,0 +1,170 @@ +const { ProcessUpdateMessage, ProcessUpdateOperation } = require('../domain/process-update-message'); + +/** + * Use Case: Handle Process Update from Queue + * + * Hexagonal Architecture: Application Layer + * - Consumes messages from process management queue + * - Orchestrates state and metrics updates + * - Delegates to UpdateProcessState and UpdateProcessMetrics use cases + * + * DDD: Application Service + * - Coordinates domain operations + * - Maintains transaction boundaries + * - No business logic (delegated to domain use cases) + * + * Prevents race conditions by processing messages in FIFO order per process + */ +class HandleProcessUpdate { + /** + * Creates a new HandleProcessUpdate use case + * @param {Object} params - Dependencies + * @param {Object} params.updateProcessState - UpdateProcessState use case + * @param {Object} params.updateProcessMetrics - UpdateProcessMetrics use case + */ + constructor({ updateProcessState, updateProcessMetrics } = {}) { + if (!updateProcessState) { + throw new Error('updateProcessState is required'); + } + if (!updateProcessMetrics) { + throw new Error('updateProcessMetrics is required'); + } + + this.updateProcessState = updateProcessState; + this.updateProcessMetrics = updateProcessMetrics; + } + + /** + * Handles a process update message + * @param {ProcessUpdateMessage} message - Update message + * @returns {Promise} Updated process + * @throws {Error} If message is invalid or operation fails + */ + async execute(message) { + // Validate message type + if (!(message instanceof ProcessUpdateMessage)) { + throw new Error('message must be an instance of ProcessUpdateMessage'); + } + + try { + console.log( + `HandleProcessUpdate: Processing ${message.operation} for process ${message.processId}` + ); + + // Route to appropriate handler based on operation type + switch (message.operation) { + case ProcessUpdateOperation.UPDATE_STATE: + return await this._handleUpdateState(message); + + case ProcessUpdateOperation.UPDATE_METRICS: + return await this._handleUpdateMetrics(message); + + case ProcessUpdateOperation.COMPLETE_PROCESS: + return await this._handleCompleteProcess(message); + + case ProcessUpdateOperation.HANDLE_ERROR: + return await this._handleError(message); + + default: + throw new Error(`Unknown operation type: ${message.operation}`); + } + } catch (error) { + console.error( + `HandleProcessUpdate: Failed to process ${message.operation} for process ${message.processId}:`, + error + ); + throw new Error(`Failed to handle process update: ${error.message}`); + } + } + + /** + * Handles UPDATE_STATE operation + * @param {ProcessUpdateMessage} message - Update message + * @returns {Promise} Updated process + * @private + */ + async _handleUpdateState(message) { + const { state, contextUpdates } = message.data; + + return await this.updateProcessState.execute( + message.processId, + state, + contextUpdates + ); + } + + /** + * Handles UPDATE_METRICS operation + * @param {ProcessUpdateMessage} message - Update message + * @returns {Promise} Updated process + * @private + */ + async _handleUpdateMetrics(message) { + const { metricsUpdate } = message.data; + + return await this.updateProcessMetrics.execute( + message.processId, + metricsUpdate + ); + } + + /** + * Handles COMPLETE_PROCESS operation + * @param {ProcessUpdateMessage} message - Update message + * @returns {Promise} Updated process + * @private + */ + async _handleCompleteProcess(message) { + return await this.updateProcessState.execute( + message.processId, + 'COMPLETED', + { + endTime: new Date().toISOString(), + } + ); + } + + /** + * Handles HANDLE_ERROR operation + * @param {ProcessUpdateMessage} message - Update message + * @returns {Promise} Updated process + * @private + */ + async _handleError(message) { + const { error } = message.data; + + return await this.updateProcessState.execute( + message.processId, + 'ERROR', + { + error: error.message, + errorStack: error.stack, + errorTimestamp: new Date().toISOString(), + } + ); + } + + /** + * Executes from an SQS message + * Parses the SQS message body and delegates to execute() + * + * @param {Object} sqsMessage - SQS message object + * @param {string} sqsMessage.body - JSON string message body + * @returns {Promise} Updated process + * @throws {Error} If message parsing or execution fails + */ + async executeFromSQS(sqsMessage) { + try { + // Parse the message body + const message = ProcessUpdateMessage.fromJSON(sqsMessage.body); + + // Execute the update + return await this.execute(message); + } catch (error) { + console.error('HandleProcessUpdate: Failed to parse SQS message:', error); + throw error; + } + } +} + +module.exports = { HandleProcessUpdate }; diff --git a/packages/core/integrations/use-cases/handle-process-update.test.js b/packages/core/integrations/use-cases/handle-process-update.test.js new file mode 100644 index 000000000..589ee21a8 --- /dev/null +++ b/packages/core/integrations/use-cases/handle-process-update.test.js @@ -0,0 +1,369 @@ +const { HandleProcessUpdate } = require('./handle-process-update'); +const { ProcessUpdateMessage, ProcessUpdateOperation } = require('../domain/process-update-message'); + +describe('HandleProcessUpdate', () => { + let useCase; + let mockUpdateProcessState; + let mockUpdateProcessMetrics; + + beforeEach(() => { + // Create mocks for dependencies + mockUpdateProcessState = { + execute: jest.fn().mockResolvedValue({ id: 'proc-123', state: 'RUNNING' }), + }; + + mockUpdateProcessMetrics = { + execute: jest.fn().mockResolvedValue({ id: 'proc-123', results: {} }), + }; + + // Create use case with mocked dependencies + useCase = new HandleProcessUpdate({ + updateProcessState: mockUpdateProcessState, + updateProcessMetrics: mockUpdateProcessMetrics, + }); + }); + + describe('constructor', () => { + it('should require updateProcessState', () => { + expect(() => new HandleProcessUpdate({ + updateProcessMetrics: mockUpdateProcessMetrics, + })).toThrow('updateProcessState is required'); + }); + + it('should require updateProcessMetrics', () => { + expect(() => new HandleProcessUpdate({ + updateProcessState: mockUpdateProcessState, + })).toThrow('updateProcessMetrics is required'); + }); + + it('should create use case with dependencies', () => { + const uc = new HandleProcessUpdate({ + updateProcessState: mockUpdateProcessState, + updateProcessMetrics: mockUpdateProcessMetrics, + }); + + expect(uc.updateProcessState).toBe(mockUpdateProcessState); + expect(uc.updateProcessMetrics).toBe(mockUpdateProcessMetrics); + }); + }); + + describe('execute', () => { + describe('UPDATE_STATE operation', () => { + it('should handle UPDATE_STATE message', async () => { + const message = new ProcessUpdateMessage({ + processId: 'proc-123', + operation: ProcessUpdateOperation.UPDATE_STATE, + data: { + state: 'RUNNING', + contextUpdates: { step: 1 }, + }, + }); + + await useCase.execute(message); + + expect(mockUpdateProcessState.execute).toHaveBeenCalledWith( + 'proc-123', + 'RUNNING', + { step: 1 } + ); + expect(mockUpdateProcessMetrics.execute).not.toHaveBeenCalled(); + }); + + it('should handle UPDATE_STATE without context updates', async () => { + const message = new ProcessUpdateMessage({ + processId: 'proc-123', + operation: ProcessUpdateOperation.UPDATE_STATE, + data: { + state: 'RUNNING', + contextUpdates: {}, + }, + }); + + await useCase.execute(message); + + expect(mockUpdateProcessState.execute).toHaveBeenCalledWith( + 'proc-123', + 'RUNNING', + {} + ); + }); + }); + + describe('UPDATE_METRICS operation', () => { + it('should handle UPDATE_METRICS message', async () => { + const metricsUpdate = { + totalProcessed: 100, + totalFailed: 2, + }; + + const message = new ProcessUpdateMessage({ + processId: 'proc-123', + operation: ProcessUpdateOperation.UPDATE_METRICS, + data: { + metricsUpdate, + }, + }); + + await useCase.execute(message); + + expect(mockUpdateProcessMetrics.execute).toHaveBeenCalledWith( + 'proc-123', + metricsUpdate + ); + expect(mockUpdateProcessState.execute).not.toHaveBeenCalled(); + }); + }); + + describe('COMPLETE_PROCESS operation', () => { + it('should handle COMPLETE_PROCESS message', async () => { + const message = new ProcessUpdateMessage({ + processId: 'proc-123', + operation: ProcessUpdateOperation.COMPLETE_PROCESS, + data: {}, + }); + + await useCase.execute(message); + + expect(mockUpdateProcessState.execute).toHaveBeenCalledWith( + 'proc-123', + 'COMPLETED', + expect.objectContaining({ + endTime: expect.any(String), + }) + ); + }); + + it('should set endTime as ISO string', async () => { + const message = new ProcessUpdateMessage({ + processId: 'proc-123', + operation: ProcessUpdateOperation.COMPLETE_PROCESS, + data: {}, + }); + + await useCase.execute(message); + + const contextUpdates = mockUpdateProcessState.execute.mock.calls[0][2]; + expect(contextUpdates.endTime).toMatch(/^\d{4}-\d{2}-\d{2}T/); // ISO format + }); + }); + + describe('HANDLE_ERROR operation', () => { + it('should handle HANDLE_ERROR message', async () => { + const message = new ProcessUpdateMessage({ + processId: 'proc-123', + operation: ProcessUpdateOperation.HANDLE_ERROR, + data: { + error: { + message: 'Test error', + stack: 'Error stack trace', + }, + }, + }); + + await useCase.execute(message); + + expect(mockUpdateProcessState.execute).toHaveBeenCalledWith( + 'proc-123', + 'ERROR', + expect.objectContaining({ + error: 'Test error', + errorStack: 'Error stack trace', + errorTimestamp: expect.any(String), + }) + ); + }); + + it('should handle errors without stack trace', async () => { + const message = new ProcessUpdateMessage({ + processId: 'proc-123', + operation: ProcessUpdateOperation.HANDLE_ERROR, + data: { + error: { + message: 'Test error', + }, + }, + }); + + await useCase.execute(message); + + const contextUpdates = mockUpdateProcessState.execute.mock.calls[0][2]; + expect(contextUpdates.error).toBe('Test error'); + expect(contextUpdates.errorStack).toBeUndefined(); + }); + }); + + describe('validation', () => { + it('should throw if message is not ProcessUpdateMessage', async () => { + await expect(useCase.execute({ invalid: 'message' })) + .rejects.toThrow('message must be an instance of ProcessUpdateMessage'); + }); + + it('should throw for unknown operation type', async () => { + // Create a mock message with invalid operation + const invalidMessage = { + processId: 'proc-123', + operation: 'UNKNOWN_OP', + data: {}, + getMessageGroupId: jest.fn(), + getMessageDeduplicationId: jest.fn(), + }; + + // Override instanceof check temporarily for this test + Object.setPrototypeOf(invalidMessage, ProcessUpdateMessage.prototype); + + await expect(useCase.execute(invalidMessage)) + .rejects.toThrow('Unknown operation type: UNKNOWN_OP'); + }); + }); + + describe('error handling', () => { + it('should propagate UpdateProcessState errors', async () => { + mockUpdateProcessState.execute.mockRejectedValue( + new Error('State update failed') + ); + + const message = new ProcessUpdateMessage({ + processId: 'proc-123', + operation: ProcessUpdateOperation.UPDATE_STATE, + data: { state: 'RUNNING', contextUpdates: {} }, + }); + + await expect(useCase.execute(message)) + .rejects.toThrow('Failed to handle process update: State update failed'); + }); + + it('should propagate UpdateProcessMetrics errors', async () => { + mockUpdateProcessMetrics.execute.mockRejectedValue( + new Error('Metrics update failed') + ); + + const message = new ProcessUpdateMessage({ + processId: 'proc-123', + operation: ProcessUpdateOperation.UPDATE_METRICS, + data: { metricsUpdate: { count: 1 } }, + }); + + await expect(useCase.execute(message)) + .rejects.toThrow('Failed to handle process update: Metrics update failed'); + }); + + it('should include processId and operation in error message', async () => { + mockUpdateProcessState.execute.mockRejectedValue( + new Error('Update failed') + ); + + const message = new ProcessUpdateMessage({ + processId: 'proc-123', + operation: ProcessUpdateOperation.UPDATE_STATE, + data: { state: 'RUNNING', contextUpdates: {} }, + }); + + try { + await useCase.execute(message); + fail('Should have thrown'); + } catch (error) { + expect(error.message).toContain('Failed to handle process update'); + } + }); + }); + + describe('integration scenarios', () => { + it('should handle sequence of operations', async () => { + const messages = [ + new ProcessUpdateMessage({ + processId: 'proc-123', + operation: ProcessUpdateOperation.UPDATE_STATE, + data: { state: 'RUNNING', contextUpdates: {} }, + }), + new ProcessUpdateMessage({ + processId: 'proc-123', + operation: ProcessUpdateOperation.UPDATE_METRICS, + data: { metricsUpdate: { count: 50 } }, + }), + new ProcessUpdateMessage({ + processId: 'proc-123', + operation: ProcessUpdateOperation.COMPLETE_PROCESS, + data: {}, + }), + ]; + + for (const message of messages) { + await useCase.execute(message); + } + + expect(mockUpdateProcessState.execute).toHaveBeenCalledTimes(2); // RUNNING + COMPLETED + expect(mockUpdateProcessMetrics.execute).toHaveBeenCalledTimes(1); + }); + + it('should handle error after processing', async () => { + const messages = [ + new ProcessUpdateMessage({ + processId: 'proc-123', + operation: ProcessUpdateOperation.UPDATE_METRICS, + data: { metricsUpdate: { count: 50 } }, + }), + new ProcessUpdateMessage({ + processId: 'proc-123', + operation: ProcessUpdateOperation.HANDLE_ERROR, + data: { + error: { message: 'Processing failed' }, + }, + }), + ]; + + for (const message of messages) { + await useCase.execute(message); + } + + expect(mockUpdateProcessMetrics.execute).toHaveBeenCalledWith( + 'proc-123', + { count: 50 } + ); + expect(mockUpdateProcessState.execute).toHaveBeenCalledWith( + 'proc-123', + 'ERROR', + expect.objectContaining({ + error: 'Processing failed', + }) + ); + }); + }); + }); + + describe('executeFromSQS', () => { + it('should parse SQS message and execute', async () => { + const sqsMessage = { + body: JSON.stringify({ + processId: 'proc-123', + operation: 'UPDATE_STATE', + data: { state: 'RUNNING', contextUpdates: {} }, + timestamp: new Date().toISOString(), + }), + }; + + await useCase.executeFromSQS(sqsMessage); + + expect(mockUpdateProcessState.execute).toHaveBeenCalledWith( + 'proc-123', + 'RUNNING', + {} + ); + }); + + it('should handle malformed SQS message body', async () => { + const sqsMessage = { + body: 'invalid json', + }; + + await expect(useCase.executeFromSQS(sqsMessage)) + .rejects.toThrow(); + }); + + it('should handle missing body field', async () => { + const sqsMessage = {}; + + await expect(useCase.executeFromSQS(sqsMessage)) + .rejects.toThrow(); + }); + }); +}); diff --git a/packages/core/integrations/use-cases/index.js b/packages/core/integrations/use-cases/index.js index d7ce7a7fc..f8e44e22e 100644 --- a/packages/core/integrations/use-cases/index.js +++ b/packages/core/integrations/use-cases/index.js @@ -6,6 +6,7 @@ const { CreateProcess } = require('./create-process'); const { UpdateProcessState } = require('./update-process-state'); const { UpdateProcessMetrics } = require('./update-process-metrics'); const { GetProcess } = require('./get-process'); +const { HandleProcessUpdate } = require('./handle-process-update'); module.exports = { GetIntegrationsForUser, @@ -16,4 +17,5 @@ module.exports = { UpdateProcessState, UpdateProcessMetrics, GetProcess, + HandleProcessUpdate, }; \ No newline at end of file diff --git a/packages/core/integrations/utils/queue-process-update.js b/packages/core/integrations/utils/queue-process-update.js new file mode 100644 index 000000000..73e524141 --- /dev/null +++ b/packages/core/integrations/utils/queue-process-update.js @@ -0,0 +1,182 @@ +const { ProcessQueueService } = require('../services/process-queue-service'); + +/** + * Singleton instance of ProcessQueueService + * Lazy-initialized when first needed + */ +let queueServiceInstance = null; + +/** + * Gets or creates the ProcessQueueService instance + * @returns {ProcessQueueService} Queue service instance + * @throws {Error} If queue is enabled but PROCESS_MANAGEMENT_QUEUE_URL is not set + */ +function getQueueService() { + if (!queueServiceInstance) { + const queueUrl = process.env.PROCESS_MANAGEMENT_QUEUE_URL; + + if (!queueUrl) { + throw new Error( + 'PROCESS_MANAGEMENT_QUEUE_URL environment variable is required when process queue is enabled' + ); + } + + queueServiceInstance = new ProcessQueueService({ queueUrl }); + } + + return queueServiceInstance; +} + +/** + * Checks if the process management queue is enabled + * @returns {boolean} True if enabled + */ +function isEnabled() { + const enabled = process.env.PROCESS_QUEUE_ENABLED; + return enabled ? enabled.toLowerCase() === 'true' : false; +} + +/** + * Utility for queueing process updates + * + * Usage: + * - Set PROCESS_QUEUE_ENABLED=true to enable queue + * - Set PROCESS_MANAGEMENT_QUEUE_URL to the queue URL + * - Use queueProcessUpdate.queueStateUpdate(), etc. to queue updates + * - If queue is disabled, functions return null (no-op) + * + * Benefits: + * - Simple API for queueing process updates + * - Automatic enable/disable via environment variables + * - Prevents race conditions when enabled + * - Backward compatible (returns null when disabled) + * + * Example: + * ```javascript + * const { queueProcessUpdate } = require('@friggframework/core'); + * + * // Queue a state update + * await queueProcessUpdate.queueStateUpdate( + * processId, + * 'RUNNING', + * { step: 1, batchId: 'batch-123' } + * ); + * + * // Queue a metrics update + * await queueProcessUpdate.queueMetricsUpdate( + * processId, + * { totalProcessed: 100, totalFailed: 2 } + * ); + * + * // Queue process completion + * await queueProcessUpdate.queueProcessCompletion(processId); + * + * // Queue error handling + * await queueProcessUpdate.queueErrorHandling(processId, error); + * + * // Check if queue is enabled + * if (queueProcessUpdate.isEnabled()) { + * console.log('Process queue is enabled'); + * } + * ``` + */ +const queueProcessUpdate = { + /** + * Checks if the process management queue is enabled + * @returns {boolean} True if enabled + */ + isEnabled, + + /** + * Queues a process state update + * + * @param {string} processId - Process ID + * @param {string} state - New state + * @param {Object} [contextUpdates] - Context updates + * @returns {Promise} SQS response or null if disabled + * @throws {Error} If queue is enabled but configuration is invalid + * + * @example + * await queueProcessUpdate.queueStateUpdate( + * 'proc-123', + * 'RUNNING', + * { step: 2, currentBatch: 'batch-456' } + * ); + */ + async queueStateUpdate(processId, state, contextUpdates) { + if (!isEnabled()) { + return null; + } + + const queueService = getQueueService(); + return await queueService.queueStateUpdate(processId, state, contextUpdates); + }, + + /** + * Queues a process metrics update + * + * @param {string} processId - Process ID + * @param {Object} metricsUpdate - Metrics to update + * @returns {Promise} SQS response or null if disabled + * @throws {Error} If queue is enabled but configuration is invalid + * + * @example + * await queueProcessUpdate.queueMetricsUpdate( + * 'proc-123', + * { totalProcessed: 100, totalFailed: 2, totalSkipped: 5 } + * ); + */ + async queueMetricsUpdate(processId, metricsUpdate) { + if (!isEnabled()) { + return null; + } + + const queueService = getQueueService(); + return await queueService.queueMetricsUpdate(processId, metricsUpdate); + }, + + /** + * Queues process completion + * + * @param {string} processId - Process ID + * @returns {Promise} SQS response or null if disabled + * @throws {Error} If queue is enabled but configuration is invalid + * + * @example + * await queueProcessUpdate.queueProcessCompletion('proc-123'); + */ + async queueProcessCompletion(processId) { + if (!isEnabled()) { + return null; + } + + const queueService = getQueueService(); + return await queueService.queueProcessCompletion(processId); + }, + + /** + * Queues error handling + * + * @param {string} processId - Process ID + * @param {Error} error - Error object + * @returns {Promise} SQS response or null if disabled + * @throws {Error} If queue is enabled but configuration is invalid + * + * @example + * try { + * // ... processing + * } catch (error) { + * await queueProcessUpdate.queueErrorHandling('proc-123', error); + * } + */ + async queueErrorHandling(processId, error) { + if (!isEnabled()) { + return null; + } + + const queueService = getQueueService(); + return await queueService.queueErrorHandling(processId, error); + }, +}; + +module.exports = { queueProcessUpdate }; diff --git a/packages/core/integrations/utils/queue-process-update.test.js b/packages/core/integrations/utils/queue-process-update.test.js new file mode 100644 index 000000000..06a634872 --- /dev/null +++ b/packages/core/integrations/utils/queue-process-update.test.js @@ -0,0 +1,255 @@ +const { queueProcessUpdate } = require('./queue-process-update'); +const { ProcessQueueService } = require('../services/process-queue-service'); + +// Mock the ProcessQueueService +jest.mock('../services/process-queue-service'); + +describe('queueProcessUpdate', () => { + let mockQueueService; + const originalEnv = process.env; + + beforeEach(() => { + jest.clearAllMocks(); + process.env = { ...originalEnv }; + + // Create mock queue service + mockQueueService = { + queueStateUpdate: jest.fn().mockResolvedValue({ MessageId: 'msg-1' }), + queueMetricsUpdate: jest.fn().mockResolvedValue({ MessageId: 'msg-2' }), + queueProcessCompletion: jest.fn().mockResolvedValue({ MessageId: 'msg-3' }), + queueErrorHandling: jest.fn().mockResolvedValue({ MessageId: 'msg-4' }), + }; + + ProcessQueueService.mockImplementation(() => mockQueueService); + }); + + afterEach(() => { + process.env = originalEnv; + }); + + describe('when queue is enabled', () => { + beforeEach(() => { + process.env.PROCESS_QUEUE_ENABLED = 'true'; + process.env.PROCESS_MANAGEMENT_QUEUE_URL = 'https://sqs.us-east-1.amazonaws.com/123456789/process-management.fifo'; + }); + + describe('queueStateUpdate', () => { + it('should queue state update via ProcessQueueService', async () => { + await queueProcessUpdate.queueStateUpdate( + 'proc-123', + 'RUNNING', + { step: 1 } + ); + + expect(mockQueueService.queueStateUpdate).toHaveBeenCalledWith( + 'proc-123', + 'RUNNING', + { step: 1 } + ); + }); + + it('should handle state update without context', async () => { + await queueProcessUpdate.queueStateUpdate('proc-123', 'RUNNING'); + + expect(mockQueueService.queueStateUpdate).toHaveBeenCalledWith( + 'proc-123', + 'RUNNING', + undefined + ); + }); + }); + + describe('queueMetricsUpdate', () => { + it('should queue metrics update via ProcessQueueService', async () => { + const metrics = { totalProcessed: 100 }; + + await queueProcessUpdate.queueMetricsUpdate('proc-123', metrics); + + expect(mockQueueService.queueMetricsUpdate).toHaveBeenCalledWith( + 'proc-123', + metrics + ); + }); + }); + + describe('queueProcessCompletion', () => { + it('should queue process completion via ProcessQueueService', async () => { + await queueProcessUpdate.queueProcessCompletion('proc-123'); + + expect(mockQueueService.queueProcessCompletion).toHaveBeenCalledWith( + 'proc-123' + ); + }); + }); + + describe('queueErrorHandling', () => { + it('should queue error handling via ProcessQueueService', async () => { + const error = new Error('Test error'); + + await queueProcessUpdate.queueErrorHandling('proc-123', error); + + expect(mockQueueService.queueErrorHandling).toHaveBeenCalledWith( + 'proc-123', + error + ); + }); + }); + + it('should initialize ProcessQueueService with queue URL', async () => { + await queueProcessUpdate.queueStateUpdate('proc-123', 'RUNNING'); + + expect(ProcessQueueService).toHaveBeenCalledWith({ + queueUrl: 'https://sqs.us-east-1.amazonaws.com/123456789/process-management.fifo', + }); + }); + + it('should reuse ProcessQueueService instance', async () => { + await queueProcessUpdate.queueStateUpdate('proc-123', 'RUNNING'); + await queueProcessUpdate.queueMetricsUpdate('proc-123', { count: 1 }); + + // Should only create service once + expect(ProcessQueueService).toHaveBeenCalledTimes(1); + }); + }); + + describe('when queue is disabled', () => { + beforeEach(() => { + process.env.PROCESS_QUEUE_ENABLED = 'false'; + }); + + it('should return null for queueStateUpdate', async () => { + const result = await queueProcessUpdate.queueStateUpdate( + 'proc-123', + 'RUNNING' + ); + + expect(result).toBeNull(); + expect(mockQueueService.queueStateUpdate).not.toHaveBeenCalled(); + }); + + it('should return null for queueMetricsUpdate', async () => { + const result = await queueProcessUpdate.queueMetricsUpdate( + 'proc-123', + { count: 1 } + ); + + expect(result).toBeNull(); + expect(mockQueueService.queueMetricsUpdate).not.toHaveBeenCalled(); + }); + + it('should return null for queueProcessCompletion', async () => { + const result = await queueProcessUpdate.queueProcessCompletion('proc-123'); + + expect(result).toBeNull(); + expect(mockQueueService.queueProcessCompletion).not.toHaveBeenCalled(); + }); + + it('should return null for queueErrorHandling', async () => { + const error = new Error('Test error'); + const result = await queueProcessUpdate.queueErrorHandling('proc-123', error); + + expect(result).toBeNull(); + expect(mockQueueService.queueErrorHandling).not.toHaveBeenCalled(); + }); + }); + + describe('when queue environment variables are not set', () => { + beforeEach(() => { + delete process.env.PROCESS_QUEUE_ENABLED; + delete process.env.PROCESS_MANAGEMENT_QUEUE_URL; + }); + + it('should treat as disabled when PROCESS_QUEUE_ENABLED is not set', async () => { + const result = await queueProcessUpdate.queueStateUpdate( + 'proc-123', + 'RUNNING' + ); + + expect(result).toBeNull(); + expect(mockQueueService.queueStateUpdate).not.toHaveBeenCalled(); + }); + + it('should throw error when enabled but queue URL is missing', async () => { + process.env.PROCESS_QUEUE_ENABLED = 'true'; + + await expect( + queueProcessUpdate.queueStateUpdate('proc-123', 'RUNNING') + ).rejects.toThrow( + 'PROCESS_MANAGEMENT_QUEUE_URL environment variable is required when process queue is enabled' + ); + }); + }); + + describe('isEnabled', () => { + it('should return true when enabled', () => { + process.env.PROCESS_QUEUE_ENABLED = 'true'; + expect(queueProcessUpdate.isEnabled()).toBe(true); + }); + + it('should return false when disabled', () => { + process.env.PROCESS_QUEUE_ENABLED = 'false'; + expect(queueProcessUpdate.isEnabled()).toBe(false); + }); + + it('should return false when not set', () => { + delete process.env.PROCESS_QUEUE_ENABLED; + expect(queueProcessUpdate.isEnabled()).toBe(false); + }); + + it('should handle case-insensitive true values', () => { + process.env.PROCESS_QUEUE_ENABLED = 'TRUE'; + expect(queueProcessUpdate.isEnabled()).toBe(true); + + process.env.PROCESS_QUEUE_ENABLED = 'True'; + expect(queueProcessUpdate.isEnabled()).toBe(true); + }); + }); + + describe('error handling', () => { + beforeEach(() => { + process.env.PROCESS_QUEUE_ENABLED = 'true'; + process.env.PROCESS_MANAGEMENT_QUEUE_URL = 'https://sqs.us-east-1.amazonaws.com/123456789/process-management.fifo'; + }); + + it('should propagate errors from queueStateUpdate', async () => { + mockQueueService.queueStateUpdate.mockRejectedValue( + new Error('Queue error') + ); + + await expect( + queueProcessUpdate.queueStateUpdate('proc-123', 'RUNNING') + ).rejects.toThrow('Queue error'); + }); + + it('should propagate errors from queueMetricsUpdate', async () => { + mockQueueService.queueMetricsUpdate.mockRejectedValue( + new Error('Queue error') + ); + + await expect( + queueProcessUpdate.queueMetricsUpdate('proc-123', {}) + ).rejects.toThrow('Queue error'); + }); + + it('should propagate errors from queueProcessCompletion', async () => { + mockQueueService.queueProcessCompletion.mockRejectedValue( + new Error('Queue error') + ); + + await expect( + queueProcessUpdate.queueProcessCompletion('proc-123') + ).rejects.toThrow('Queue error'); + }); + + it('should propagate errors from queueErrorHandling', async () => { + mockQueueService.queueErrorHandling.mockRejectedValue( + new Error('Queue error') + ); + + const error = new Error('Test error'); + await expect( + queueProcessUpdate.queueErrorHandling('proc-123', error) + ).rejects.toThrow('Queue error'); + }); + }); +});