Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
Empty file.
146 changes: 146 additions & 0 deletions packages/Classification_backend/app/services/ocr/analyzer/analyze.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
# analyzer/analyze.py

from schema_base import FileResults
from utils.logger import setup_logger
from analyzer.llm import DocumentLLM
from documents.document_type.prompt import document_type_prompt
from documents.document_type.schema import DocumentCategoryAndType
from typing import Optional, List, Tuple
import difflib

logger = setup_logger(__name__)


def _normalize_value(s: str, allowed: Optional[List[str]] = None) -> str:
"""
Normalize a string to lowercase with underscores and optionally fuzzy match
against a provided allowed list.
"""
base = str(s or "unknown").strip().lower().replace(" ", "_").replace("-", "_")
if allowed:
match = difflib.get_close_matches(base, allowed, n=1, cutoff=0.8)
if match:
return match[0]
return base


def _normalize_pair(
cat: str, typ: str,
allowed_cats: Optional[List[str]] = None,
allowed_types: Optional[List[str]] = None
) -> Tuple[str, str]:
return (
_normalize_value(cat, allowed_cats),
_normalize_value(typ, allowed_types)
)


def analyze_document(
file_results: FileResults,
allowed_pairs: Optional[List[dict]] = None,
stop_on_failure: bool = False
) -> Optional[FileResults]:
"""
Runs classification on the provided file_results (uses page images).
Returns updated FileResults with normalized category/type and status.
"""
if not file_results or not file_results.properties or not file_results.properties.page_paths:
logger.debug("No file_results or no page_paths present")
return None

# Normalize allowed_pairs set and track normalized values for fuzzy matching
allowed_pairs_set = set()
allowed_cats = []
allowed_types = []
if allowed_pairs:
for p in allowed_pairs:
try:
c = p.get("document_category")
t = p.get("document_type")
except Exception:
continue
cat_n = _normalize_value(c)
typ_n = _normalize_value(t)
allowed_pairs_set.add((cat_n, typ_n))
allowed_cats.append(cat_n)
allowed_types.append(typ_n)

document_llm = DocumentLLM()
logger.info("Analyzing pages: %s", file_results.properties.page_paths)

response = document_llm.call_llm_api(
prompt=document_type_prompt,
image_path=file_results.properties.page_paths
)
parsed = response.get("parsed")
raw = response.get("raw")
error = response.get("error")

# default fallback
file_results.document_category_details.document_category = "unknown"
file_results.document_category_details.document_type = "unknown"
try:
file_results.document_category_details.status = "unknown"
file_results.document_category_details.note = None
except Exception:
pass

if parsed:
try:
# Validate/coerce with Pydantic model
doc_cat_type = DocumentCategoryAndType.model_validate(parsed)
cat_raw = str(doc_cat_type.document_category)
typ_raw = str(doc_cat_type.document_type)

# Normalize with fuzzy matching
cat_n, typ_n = _normalize_pair(cat_raw, typ_raw, allowed_cats, allowed_types)

# Determine status relative to allowed_pairs
if allowed_pairs_set and (cat_n, typ_n) not in allowed_pairs_set:
status = "extra"
note = "Pair not in allowed list (user-provided)"
else:
status = "classified"
note = None

# Write back
file_results.document_category_details.document_category = cat_n
file_results.document_category_details.document_type = typ_n
try:
file_results.document_category_details.status = status
file_results.document_category_details.note = note
except Exception:
pass

# Store raw LLM + parsed outputs
file_results.ocr_results = {
"llm_raw": raw,
"llm_parsed": parsed,
"status": status,
"note": note
}

logger.info("Classification result: %s, %s -> %s", cat_raw, typ_raw, status)
return file_results

except Exception as e:
logger.exception("Failed to validate LLM parsed output: %s", e)
file_results.document_category_details.document_category = "unknown"
file_results.document_category_details.document_type = "unknown"
file_results.document_category_details.status = "unknown"
file_results.document_category_details.note = "validation_failed"
file_results.ocr_results = {"llm_raw": raw, "error": "validation_failed"}
if stop_on_failure:
return None
return file_results

else:
logger.warning("LLM returned no parsed JSON (error=%s)", error)
file_results.document_category_details.document_category = "unknown"
file_results.document_category_details.document_type = "unknown"
file_results.document_category_details.status = "unknown"
file_results.document_category_details.note = error or "no_parsed_response"
file_results.ocr_results = {"error": error}
if stop_on_failure:
return None
return file_results
67 changes: 67 additions & 0 deletions packages/Classification_backend/app/services/ocr/analyzer/llm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# analyzer/llm.py
from pydantic import BaseModel
from dotenv import load_dotenv
import os
import json
import re
import time
from vertexai.generative_models import Part, Image, GenerativeModel
import vertexai
from utils.logger import setup_logger

logger = setup_logger(__name__)
load_dotenv()

project = os.getenv("GOOGLE_CLOUD_PROJECT")
location = os.getenv("GOOGLE_CLOUD_LOCATION")
if project and location:
try:
vertexai.init(project=project, location=location)
except Exception:
logger.exception("vertexai.init failed (maybe running locally without credentials).")

class DocumentLLM(BaseModel):
"""
Wrapper around Vertex/Gemini model to pass images + prompt and return parsed + raw outputs.
"""

def call_llm_api(self, prompt: str, image_path: list[str], retries: int = 2, backoff: float = 1.0) -> dict:
"""
Calls the generative model with images + prompt.
Returns: { "parsed":(<json|None>), "raw": <raw_text or None>, "error": <error_str|None> }
"""
try:
model = GenerativeModel(model_name="gemini-2.0-flash-001")
except Exception as e:
# if model creation fails, raise up (caller will fallback)
logger.exception("Failed to create GenerativeModel: %s", e)
raise

text_part = Part.from_text(prompt)
image_parts = []
for p in image_path:
try:
image_parts.append(Part.from_image(Image.load_from_file(p)))
except Exception:
logger.exception("Failed loading image for LLM: %s", p)
# still proceed (model may accept less images)
last_exc = None
for attempt in range(retries + 1):
try:
response = model.generate_content([*image_parts, text_part])
raw_text = response.text
# First try direct JSON parse
try:
parsed = json.loads(raw_text)
return {"parsed": parsed, "raw": raw_text, "error": None}
except Exception:
# strip fenced code blocks and try again
cleaned = re.sub(r"^```json\s*|\s*```$", "", raw_text, flags=re.MULTILINE)
parsed = json.loads(cleaned)
return {"parsed": parsed, "raw": raw_text, "error": None}
except Exception as e:
last_exc = e
logger.warning("LLM call failed (attempt %d/%d): %s", attempt + 1, retries + 1, str(e))
time.sleep(backoff * (2 ** attempt))
logger.exception("LLM all retries failed: %s", last_exc)
return {"parsed": None, "raw": None, "error": str(last_exc)}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .prompt import document_type_prompt
from .schema import DocumentCategoryAndType
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
document_type_prompt = """
Document Type Identification Agent Prompt
You are a document classification assistant.

You will be given one or more images of a document. Analyze carefully and output the most appropriate
document_category and document_type.

Canonical categories and example types:

{
"identity_verification_document": ["passport","driving_license","national_identity_card","other"],
"bank_statement": ["bank_statement","other"],
"income_document": ["payslip","p60","contract_of_employment","other"],
"expenditure": ["bank_statement","other"]
}

Examples:

Example 1:
Input: passport image
Output:
{"document_category": "identity_verification_document","document_type": "passport"}

Example 2:
Input: payslip
Output:
{"document_category": "income_document","document_type": "payslip"}

Example 3:
Input: bank statement
Output:
{"document_category": "bank_statement","document_type": "bank_statement"}

Example 4:
Input: driving licence
Output:
{"document_category": "identity_verification_document","document_type": "driving_license"}

Example 5:
Input: irrelevant or unclear
Output:
{"document_category": "unknown","document_type": "unknown"}

Instructions:
- Always choose from canonical values if possible.
- If unsure, use "unknown".
- Respond with a single JSON object only, no extra commentary.
"""
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# schema.py
from pydantic import BaseModel, Field, field_validator
from typing import Any
from schema_base import StrEnumBase


class DocumentCategoryEnum(StrEnumBase):
IDENTITY_VERIFICATION_DOCUMENT = "identity_verification_document"
BANK_STATEMENT = "bank_statement"
INCOME_DOCUMENT = "income_document"
EXPENDITURE = "expenditure"
CREDIT_REPORT = "credit_report"
OTHER = "other"
UNKNOWN = "unknown"


class DocumentTypeEnum(StrEnumBase):
PASSPORT = "passport"
DRIVING_LICENSE = "driving_license"
NATIONAL_IDENTITY_CARD = "national_identity_card"
BANK_STATEMENT = "bank_statement"
PAYSLIP = "payslip"
P60 = "p60"
CONTRACT_OF_EMPLOYMENT = "contract_of_employment"
MARRIAGE_CERTIFICATE = "marriage_certificate"
PRE_MATERNITY_PAYSLIP = "pre_maternity_payslip"
PENSION_PAYSLIP = "pension_payslip"
ANNUAL_PENSION_STATEMENT = "pension_annual_statement"
EMPLOYER_LETTER = "letter_from_employer"
CREDIT_TRANSUNION = "transunion"
CREDIT_EXPERIAN = "experian"
OTHER = "other"
UNKNOWN = "unknown"


# mapping dictionaries to fix name mismatches
CATEGORY_MAPPING = {
"income": DocumentCategoryEnum.INCOME_DOCUMENT.value,
"income_document": DocumentCategoryEnum.INCOME_DOCUMENT.value,
"id proof": DocumentCategoryEnum.IDENTITY_VERIFICATION_DOCUMENT.value,
"identity": DocumentCategoryEnum.IDENTITY_VERIFICATION_DOCUMENT.value,
"identity_document": DocumentCategoryEnum.IDENTITY_VERIFICATION_DOCUMENT.value,
"identity_verification_document": DocumentCategoryEnum.IDENTITY_VERIFICATION_DOCUMENT.value,
"expenditure": DocumentCategoryEnum.EXPENDITURE.value,
"bank statement": DocumentCategoryEnum.BANK_STATEMENT.value,
"credit report": DocumentCategoryEnum.CREDIT_REPORT.value,
}

TYPE_MAPPING = {
"pay slip": DocumentTypeEnum.PAYSLIP.value,
"payslip": DocumentTypeEnum.PAYSLIP.value,
"p60": DocumentTypeEnum.P60.value,
"marriage certificate": DocumentTypeEnum.MARRIAGE_CERTIFICATE.value,
"contract of employment": DocumentTypeEnum.CONTRACT_OF_EMPLOYMENT.value,
"previous contract of employment": DocumentTypeEnum.CONTRACT_OF_EMPLOYMENT.value,
"pre maternity pay slip": DocumentTypeEnum.PRE_MATERNITY_PAYSLIP.value,
"annual pension scheme statement": DocumentTypeEnum.ANNUAL_PENSION_STATEMENT.value,
"confirmation of pension scheme": DocumentTypeEnum.ANNUAL_PENSION_STATEMENT.value,
"pension pay slip": DocumentTypeEnum.PENSION_PAYSLIP.value,
"pension annual statement": DocumentTypeEnum.ANNUAL_PENSION_STATEMENT.value,
"uk paaport": DocumentTypeEnum.PASSPORT.value, # typo fixed
"passport": DocumentTypeEnum.PASSPORT.value,
"share code": DocumentTypeEnum.NATIONAL_IDENTITY_CARD.value,
"indefinite leave to remain": DocumentTypeEnum.NATIONAL_IDENTITY_CARD.value,
"bank statements": DocumentTypeEnum.BANK_STATEMENT.value,
"transunion": DocumentTypeEnum.CREDIT_TRANSUNION.value,
"experian": DocumentTypeEnum.CREDIT_EXPERIAN.value,
}


class DocumentCategoryAndType(BaseModel):
document_category: str = Field(..., description="Category of the document")
document_type: str = Field(..., description="Type of the document")

@field_validator("document_category", mode="before")
@classmethod
def _coerce_category(cls, v: Any) -> str:
if v is None:
return DocumentCategoryEnum.UNKNOWN.value
s = str(v).strip().lower()
return CATEGORY_MAPPING.get(s, s.replace(" ", "_").replace("-", "_"))

@field_validator("document_type", mode="before")
@classmethod
def _coerce_type(cls, v: Any) -> str:
if v is None:
return DocumentTypeEnum.UNKNOWN.value
s = str(v).strip().lower()
return TYPE_MAPPING.get(s, s.replace(" ", "_").replace("-", "_"))
Loading