Skip to content

Add MongoDB Change Streams Support #56

@Artmann

Description

@Artmann

Add support for MongoDB change streams to enable real-time monitoring of collection changes. This will allow applications to react to database changes in real-time, useful for features like GraphQL subscriptions, cache invalidation, and background sync notifications.

Motivation

Currently, there's no way to listen for real-time changes on Esix models. Applications that need to react to database changes must rely on polling or external pub/sub systems. MongoDB change streams provide a native solution for this use case.

Proposed API

Basic Usage

// Listen for all changes on a collection
const unsubscribe = Book.listenForChanges((id: string, change: ChangeEvent<Book>) => {
  console.log(`Book ${id} was ${change.operationType}d`)
})

// Cleanup when done
unsubscribe()

Advanced Usage with Options

const unsubscribe = Book.listenForChanges({
  filter: { userId: currentUserId },
  operationType: ['insert', 'update'],
  fullDocument: 'updateLookup',
  pipeline: [
    { $match: { 'fullDocument.published': true } }
  ],
  onChange: (id: string, change: ChangeEvent<Book>) => {
    // Handle change
  },
  onError: (error: Error) => {
    // Handle stream errors
  }
})

Operation-Specific Helpers (Optional)

Book.listenForInserts((id: string, document: Book) => {
  // New book created
})

Book.listenForUpdates((id: string, change: ChangeEvent<Book>) => {
  // Book updated - change.updateDescription contains what changed
})

Book.listenForDeletes((id: string) => {
  // Book deleted
})

Type Definitions

interface ChangeEvent<T> {
  _id: { _data: string } // Resume token
  clusterTime: Timestamp
  documentKey: { _id: string }
  fullDocument?: T
  ns: { db: string; coll: string }
  operationType: 'insert' | 'update' | 'delete' | 'replace' | 'drop' | 'rename'
  updateDescription?: {
    removedFields: string[]
    updatedFields: Record<string, any>
  }
}

interface ListenOptions<T> {
  filter?: FilterQuery<T>
  operationType?: OperationType | OperationType[]
  pipeline?: Document[]
  fullDocument?: 'updateLookup'
  onChange: (id: string, change: ChangeEvent<T>) => void
  onError?: (error: Error) => void
}

type UnsubscribeFn = () => void

Implementation Considerations

  • MongoDB Version: Requires MongoDB 3.6+ (for change streams support)
  • Replica Set: MongoDB must be configured as a replica set
  • Connection Reuse: Use existing Esix MongoDB connection
  • Error Recovery: Implement automatic reconnection with exponential backoff
  • Resume Tokens: Store resume tokens to recover from disconnections
  • Resource Management: Ensure change streams are properly closed to prevent memory leaks
  • Multiple Listeners: Support multiple independent listeners on the same collection

Example Use Cases

GraphQL Subscription

// In GraphQL resolver
const subscribeToBookUpdates = (userId: string) => {
  return Book.listenForChanges({
    filter: { userId },
    onChange: (id, change) => {
      pubsub.publish('BOOK_UPDATED', { 
        bookId: id,
        operationType: change.operationType 
      })
    }
  })
}

Cache Invalidation

Book.listenForChanges((id, change) => {
  if (change.operationType === 'update' || change.operationType === 'delete') {
    cache.delete(`book:${id}`)
  }
})

Background Sync Notifications

ExternalResource.listenForChanges({
  filter: { syncStatus: 'completed' },
  onChange: (id, change) => {
    notifyClient(id, 'Sync completed')
  }
})

References

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions