Skip to content

feat: Add Invoice Processing at Scale InfrastructureFeature/invoice processing at scale#32

Open
mrrahman1517 wants to merge 7 commits intomainfrom
feature/invoice-processing-at-scale
Open

feat: Add Invoice Processing at Scale InfrastructureFeature/invoice processing at scale#32
mrrahman1517 wants to merge 7 commits intomainfrom
feature/invoice-processing-at-scale

Conversation

@mrrahman1517
Copy link
Collaborator

Invoice Processing at Scale

Summary

This PR introduces a comprehensive invoice processing infrastructure for DocEX, enabling reliable extraction, validation, and processing of invoices at scale with human-in-the-loop support.

Key Features

Reliable Invoice Extraction

  • InvoicePipeline: End-to-end pipeline (ingest → extract → normalize → validate → persist)
  • Pydantic Models: Strict JSON schema validation with automatic normalization
  • InvoiceValidator: Business rules validation (totals match, dates logical, tax calculations)
  • InvoiceNormalizer: Date standardization, currency detection, address parsing
  • Confidence Routing: Automatic NEEDS_REVIEW status for low-confidence extractions

Real-World Invoice Handling

  • PDFOCRProcessor: Auto-detects scanned PDFs (low text density) and applies Tesseract OCR
  • LineItemExtractor: Two-stage extraction (heuristic parsing + LLM refinement)
  • Entity Normalization: Vendor/customer deduplication keys from tax_id, email, name

Scalability Infrastructure

  • Worker: Async job worker with concurrency control, retries, dead-letter handling
  • JobQueue: Priority-based job queue with dependencies and idempotency keys
  • RateLimiter: Token bucket algorithm with per-tenant isolation
  • CostTracker: LLM API cost tracking per model

Export Connectors

  • WebhookConnector: HTTP POST with HMAC signing and batch support
  • S3Connector: AWS S3 upload with encryption and tagging
  • DatabaseConnector: Direct DB insert with upsert support
  • CSVExporter: File export with compression and rotation

Components Added

docex/
├── connectors/ # Export connectors (webhook, S3, DB, CSV)
├── jobs/ # Async worker, queue, rate limiter
├── models/invoice.py # Pydantic invoice models
├── processors/invoice/ # Extractor, normalizer, validator, pipeline
├── processors/pdf_ocr.py # OCR for scanned PDFs
└── services/invoice_service.py # High-level service API
examples/
└── invoice_processing_example.py # Complete usage example

Testing

All features tested end-to-end with Ollama (local LLM):

Feature Status
Invoice extraction with LLM
Pydantic schema validation
Data normalization
Job queue operations
Rate limiting
Cost tracking

Usage Example

from docex.services.invoice_service import InvoiceService
from docex.models.invoice import InvoiceProcessingConfig

Configure

config = InvoiceProcessingConfig(
confidence_threshold=0.8,
auto_approve_threshold=0.95,
enable_ocr=True
)

Process invoice

service = InvoiceService(db, llm_adapter, config)
result = await service.process_invoice(document)

Breaking Changes

None - this is a new feature addition.

Dependencies

Optional dependencies for full functionality:

  • pytesseract + pdf2image: OCR support
  • boto3: S3 connector
  • httpx or aiohttp: Webhook connector

Sprint 1 - Make invoice extraction reliable:
- Invoice-specific pipeline entrypoint (InvoicePipeline)
- Pydantic invoice model with strict JSON schema validation
- Invoice validator and normalizer processors
- Confidence routing + NEEDS_REVIEW status for human-in-the-loop

Sprint 2 - Handle real invoices:
- OCR processor with scanned PDF auto-detection
- Two-stage line item extraction (heuristic + LLM refinement)
- Vendor/customer normalization with deduplication keys

Sprint 3 - Scale infrastructure:
- Async job worker using existing Operation model
- Job queue with priorities and dependencies
- Rate limiting + batching for LLM calls
- Cost tracking per model

Sprint 4 - Export connectors:
- Webhook connector with HMAC signing
- S3 connector with encryption support
- Database connector with upsert
- CSV exporter with compression

Key components:
- docex/models/invoice.py: Pydantic models for invoice data
- docex/processors/invoice/: Extractor, normalizer, validator, pipeline
- docex/processors/pdf_ocr.py: OCR for scanned PDFs
- docex/services/invoice_service.py: High-level service API
- docex/jobs/: Worker, queue, rate limiter
- docex/connectors/: Webhook, S3, database, CSV export
- examples/invoice_processing_example.py: Complete usage example
- Add _call_llm method that supports different adapter APIs:
  - LocalLLMService.generate_completion()
  - OllamaAdapter.generate()
  - OpenAI-style chat() method
- Fix confidence score handling when None
- Fix example to use correct DocEX API
- Pass actual document IDs to job queue example
@tommyGPT2S
Copy link
Owner

Thanks for prepare this pull request. Can you first review the concept of connectors? DocEX already have storage components to support both S3 and file system. It also has a postgres backend as well.
p.s., I just updated s3 storage to fix a few issues around provision and prefix, now it is pretty stable. ***Please fetch latest codes *** and review if connectors are necessary. Happy new year!

Copy link
Owner

@tommyGPT2S tommyGPT2S left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please review if connectors are necessary. Ideally, we should leverage current storage components.

Address PR review comments:
- Removed S3Connector (docex/connectors/s3.py) as it duplicates
  existing S3Storage functionality in docex/storage/s3_storage.py
- Added StorageExporter that wraps existing storage infrastructure
  for structured data export use cases
- Updated connector documentation to clarify the distinction between
  Connectors (structured data export) vs Storage (document content)
- Added convenience functions export_to_s3() and export_to_filesystem()
  that use existing storage components

For raw document storage, use docex.storage (S3Storage, FilesystemStorage)
For structured data export, use docex.connectors (WebhookConnector,
DatabaseConnector, CSVExporter, StorageExporter)
@mrrahman1517
Copy link
Collaborator Author

Thanks for prepare this pull request. Can you first review the concept of connectors? DocEX already have storage components to support both S3 and file system. It also has a postgres backend as well. p.s., I just updated s3 storage to fix a few issues around provision and prefix, now it is pretty stable. ***Please fetch latest codes *** and review if connectors are necessary. Happy new year!

@mrrahman1517
Copy link
Collaborator Author

@tommyGPT2S Thanks for the review and happy new year!

I've addressed the feedback:

Changes Made

  1. Merged latest from main - pulled in your S3 storage fixes
  2. Removed S3Connector - you're right, it was duplicating S3Storage
  3. Added StorageExporter - a thin wrapper that uses existing StorageFactory under the hood for structured data export

Clarification: Connectors vs Storage

After review, I kept 3 connectors because they serve a different purpose than storage:

Component Purpose Why Needed
Storage (S3Storage, FilesystemStorage) Store raw document content (PDFs, Word files) Already exists ✅
WebhookConnector Deliver structured JSON to external HTTP endpoints Unique - no existing webhook capability
DatabaseConnector Export extracted data to external database tables (ERP, data warehouse) Unique - different from DocEX's internal DB
CSVExporter Export processing results to CSV files Unique - no existing CSV export
StorageExporter Uses existing StorageFactory for S3/filesystem export Leverages existing storage

Example Use Case

When processing invoices:

  • Storage: Stores the original PDF invoice
  • Connectors: Exports the extracted data (invoice number, total, line items) to a webhook, external DB, or CSV report

Verification

  • JobQueue and Worker use existing Operation model ✅
  • StorageExporter uses existing StorageFactory
  • No duplicate S3/filesystem implementations ✅

Let me know if you'd like me to make any other adjustments!

… storage

Updated documentation to clearly distinguish:
- DatabaseConnector: For exporting structured data to EXTERNAL systems
  (customer ERPs, data warehouses, reporting databases)
- docex.db.*: For DocEX INTERNAL storage (documents, metadata, operations)

This addresses PR feedback about potential confusion with existing
PostgreSQL backend.
FileSystemStorage.save() expects a file-like object with read() method.
Wrap JSON content in BytesIO for compatibility with existing storage.
@mrrahman1517
Copy link
Collaborator Author

@tommyGPT2S Thanks for the review and happy new year!

I've addressed the feedback:

Changes Made

  1. Merged latest from main - pulled in your S3 storage fixes
  2. Removed S3Connector - you're right, it was duplicating S3Storage
  3. Added StorageExporter - a thin wrapper that uses existing StorageFactory under the hood

Clarification: Connectors vs Storage vs Database

After review, the remaining connectors serve a different purpose than existing components:

Component Purpose Duplicates Existing?
Storage (S3Storage, FilesystemStorage) Store raw document content Already exists ✅
Database (PostgresDatabase, SQLiteDatabase) DocEX internal tables (documents, operations, metadata) Already exists ✅
WebhookConnector Deliver structured JSON to external HTTP endpoints Unique ✅
DatabaseConnector Export extracted data to external DB tables (ERP, data warehouse) Unique - different from DocEX internal DB ✅
CSVExporter Export processing results to CSV files Unique ✅
StorageExporter Uses existing StorageFactory for S3/filesystem Reuses existing storage

Why We Did NOT Reimplement PostgreSQL

The DatabaseConnector is NOT a reimplementation of PostgresDatabase. Here's the key distinction:

Existing PostgresDatabase New DatabaseConnector
Manages DocEX internal tables Exports to external tables
Tables: document, operations, docbasket, metadata Tables: processed_invoices, invoice_exports (user-defined)
Used by DocEX core services Used for downstream integrations
Single DocEX database Can connect to separate external databases

Example flow:

  1. Invoice PDF uploaded → stored via S3Storage (existing)
  2. Document record created → stored in PostgresDatabase (existing)
  3. LLM extracts invoice data → tracked in operations table (existing)
  4. Extracted JSON exported → sent to customer's ERP via DatabaseConnector (new - external system)

The DatabaseConnector can even use the DocEX database connection to write to custom tables (like invoice_exports), but it never touches the core DocEX tables managed by PostgresDatabase.

Verification

All tests pass:

  • StorageExporter uses existing StorageFactory
  • JobQueue and Worker use existing Operation model ✅
  • No duplicate S3/filesystem/PostgreSQL implementations ✅

Let me know if you'd like me to make any other adjustments!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants