Skip to content

Conversation

@rkeene
Copy link
Member

@rkeene rkeene commented Jan 15, 2026

This change fixes a bug where partitioned queues may compete with each other in the PostgreSQL DB internal locking to run queues because the indexes were not sufficiently specific.

@rkeene rkeene self-assigned this Jan 15, 2026
@rkeene rkeene added bug Something isn't working queue labels Jan 15, 2026
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR reduces serialization errors with partitioned queues on PostgreSQL by implementing partition-aware composite indexes and query optimizations. The changes address an issue where partitioned queues competed with each other during concurrent operations due to insufficient index specificity.

Changes:

  • Added partition-aware composite indexes (including path column) to prevent cross-partition conflicts
  • Introduced FOR UPDATE lock in setStatus() to prevent read-write conflicts
  • Added ORDER BY RANDOM() to queries to distribute worker contention
  • Implemented schema versioning with migrations to support the new indexes
  • Added serialization retry tracking for testing and instrumentation
  • Created comprehensive tests validating reduced serialization conflicts

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 5 comments.

File Description
src/lib/queue/drivers/queue_postgres.ts Implements schema versioning, partition-aware indexes, FOR UPDATE locks, random query ordering, and serialization retry tracking
src/lib/queue/index.test.ts Adds comprehensive concurrent operation tests validating serialization conflict reduction with single and multiple workers

if (currentVersion < 1) {
logger?.debug('Applying schema version 1: Initial tables and indexes');

await client.query('BEGIN');
Copy link

Copilot AI Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using BEGIN without specifying isolation level may cause confusion since other transactions in the codebase explicitly use BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE. For consistency and clarity, consider explicitly specifying the isolation level even for DDL operations (e.g., BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED).

Suggested change
await client.query('BEGIN');
await client.query('BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED');

Copilot uses AI. Check for mistakes.
await client.query('CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_queue_idempotent_keys_path_idempotent_id ON queue_idempotent_keys(path, idempotent_id)');

// Now transactionally drop old indexes and record version
await client.query('BEGIN');
Copy link

Copilot AI Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using BEGIN without specifying isolation level may cause confusion since other transactions in the codebase explicitly use BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE. For consistency and clarity, consider explicitly specifying the isolation level even for DDL operations (e.g., BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED).

Suggested change
await client.query('BEGIN');
await client.query('BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED');

Copilot uses AI. Check for mistakes.
}

/* Add multiple entries to each partition */
const entriesPerPartition = 2000;
Copy link

Copilot AI Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating 2000 entries per partition (6000 total) but only processing 10 items per worker seems wasteful. The test would be faster and equally effective with a smaller number like 100-200 entries per partition, which still provides ample concurrency testing while reducing setup time.

Suggested change
const entriesPerPartition = 2000;
const entriesPerPartition = 200;

Copilot uses AI. Check for mistakes.
* - Multiple workers query() for pending items (creates read dependency)
* - Then one worker's setStatus() with FOR UPDATE conflicts with another's read
*/
const maxExpectedRetries = kind === 'single' ? 3 : 10;
Copy link

Copilot AI Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The magic numbers 3 and 10 for retry thresholds lack documentation. Consider adding a comment explaining why these specific values were chosen or extracting them as named constants with explanatory names like MAX_RETRIES_SINGLE_WORKER and MAX_RETRIES_MULTIPLE_WORKERS.

Copilot uses AI. Check for mistakes.
}

/* Each partition adds items and updates status concurrently */
for (let workerIdx = 0; workerIdx < (kind === 'single' ? 1 : 3); workerIdx++) {
Copy link

Copilot AI Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The condition kind === 'single' ? 1 : 3 is checked inside the loop but kind is 'single' at this point in the code (line 1988 checks if (kind === 'single')). This should always evaluate to 1. If multiple workers are intended here, this logic needs to be moved outside the if (kind === 'single') block.

Suggested change
for (let workerIdx = 0; workerIdx < (kind === 'single' ? 1 : 3); workerIdx++) {
for (let workerIdx = 0; workerIdx < 1; workerIdx++) {

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something isn't working queue

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants