From 92cf702ce49f2066110c589d97b57b0ed3ab8fa4 Mon Sep 17 00:00:00 2001
From: Prins Kumar
Date: Mon, 17 Nov 2025 17:07:34 +0530
Subject: [PATCH 01/15] Add S3 Bedrock BDA ingestion support with user
confirmation and pymupdf4llm integration
---
common/requirements.txt | 3 +-
common/utils/image_data_extractor.py | 163 +++---------
common/utils/markdown_parsing.py | 63 +++++
common/utils/text_extractors.py | 254 +++++++++---------
graphrag-ui/src/pages/Setup.tsx | 370 +++++++++++++--------------
5 files changed, 403 insertions(+), 450 deletions(-)
create mode 100644 common/utils/markdown_parsing.py
diff --git a/common/requirements.txt b/common/requirements.txt
index 562c2f6..f0022f3 100644
--- a/common/requirements.txt
+++ b/common/requirements.txt
@@ -110,7 +110,8 @@ packaging==24.2
pandas==2.2.3
#pathtools==0.1.2
pillow==11.2.1
-PyMuPDF==1.26.4
+#PyMuPDF==1.26.4
+pymupdf4llm==0.2.0
platformdirs==4.3.8
pluggy==1.6.0
prometheus_client==0.22.1
diff --git a/common/utils/image_data_extractor.py b/common/utils/image_data_extractor.py
index bde9c97..74e8d2f 100644
--- a/common/utils/image_data_extractor.py
+++ b/common/utils/image_data_extractor.py
@@ -11,155 +11,54 @@
logger = logging.getLogger(__name__)
-
-
-def describe_image_with_llm(image_input):
+def describe_image_with_llm(file_path):
"""
- Send image (pixmap or PIL image) to LLM vision model and return description.
- Uses multimodal_service from config if available, otherwise falls back to completion_service.
- Currently supports: OpenAI, Azure OpenAI, Google GenAI, and Google VertexAI
+ Read image file and convert to base64 to send to LLM.
"""
try:
+ from PIL import Image as PILImage
+
client = get_multimodal_service()
if not client:
return "[Image: Failed to create multimodal LLM client]"
-
+
+ # Read image and convert to base64
+ pil_image = PILImage.open(file_path)
buffer = io.BytesIO()
- # Convert to RGB if needed for better compatibility
- if image_input.mode != 'RGB':
- image_input = image_input.convert('RGB')
- image_input.save(buffer, format="JPEG", quality=95)
- b64_img = base64.b64encode(buffer.getvalue()).decode("utf-8")
+ if pil_image.mode != 'RGB':
+ pil_image = pil_image.convert('RGB')
+ pil_image.save(buffer, format="JPEG", quality=95)
+ image_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8')
- # Build messages (system + human)
messages = [
- SystemMessage(
- content="You are a helpful assistant that describes images concisely for document analysis."
- ),
- HumanMessage(
- content=[
- {
- "type": "text",
- "text": (
- "Please describe what you see in this image and "
- "if the image has scanned text then extract all the text. "
- "if the image has any logo, icon, or branding element, try to describe it with text. "
- "Focus on any text, diagrams, charts, or other visual elements."
- "If the image is purely a logo, icon, or branding element, start your response with 'LOGO:' or 'ICON:'."
- ),
- },
- {
- "type": "image_url",
- "image_url": {"url": f"data:image/jpeg;base64,{b64_img}"},
- },
- ]
- ),
+ SystemMessage(
+ content="You are a helpful assistant that describes images concisely for document analysis."
+ ),
+ HumanMessage(
+ content=[
+ {
+ "type": "text",
+ "text": (
+ "Please describe what you see in this image and "
+ "if the image has scanned text then extract all the text. "
+ "If the image has any graph, chart, table, or other diagram, describe it. "
+ ),
+ },
+ {
+ "type": "image_url",
+ "image_url": {"url": f"data:image/jpeg;base64,{image_base64}"},
+ },
+ ],
+ ),
]
- # Get response from LangChain LLM client
- # Access the underlying LangChain client
langchain_client = client.llm
response = langchain_client.invoke(messages)
- return response.content if hasattr(response, 'content') else str(response)
+ return response.content if hasattr(response, "content") else str(response)
except Exception as e:
logger.error(f"Failed to describe image with LLM: {str(e)}")
return "[Image: Error processing image description]"
-def save_image_and_get_markdown(image_input, context_info="", graphname=None):
- """
- Save image locally to static/images/ folder and return markdown reference with description.
-
- LEGACY/OLD APPROACH: Used for backward compatibility with JSONL-based loading.
- Images are saved as files and served via /ui/images/ endpoint with img:// protocol.
-
- For NEW direct loading approach, images are stored in Image vertex as base64
- and served via /ui/image_vertex/ endpoint with image:// protocol.
-
- Args:
- image_input: PIL Image object
- context_info: Optional context (e.g., "page 3 of invoice.pdf")
- graphname: Graph name to organize images by graph (optional)
-
- Returns:
- dict with:
- - 'markdown': Markdown string with img:// reference
- - 'image_id': Unique identifier for the saved image
- - 'image_path': Path where image was saved to static/images/
- """
- try:
- # FIRST: Get description from LLM to check if it's a logo
- description = describe_image_with_llm(image_input)
-
- # Check if the image is a logo, icon, or decorative element BEFORE saving
- # These should be filtered out as they're not content-relevant
- description_lower = description.lower()
- logo_indicators = ['logo', 'icon', 'branding', 'watermark', 'trademark', 'company logo', 'brand logo']
-
- if any(indicator in description_lower for indicator in logo_indicators):
- logger.info(f"Detected logo/icon in image, skipping: {description[:100]}")
- return None
-
- # If not a logo, proceed with saving the image
- # Generate unique image ID using hash of image content
- buffer = io.BytesIO()
- if image_input.mode != 'RGB':
- image_input = image_input.convert('RGB')
- image_input.save(buffer, format="JPEG", quality=95)
- image_bytes = buffer.getvalue()
-
- # Create hash-based ID (deterministic for same image)
- image_hash = hashlib.sha256(image_bytes).hexdigest()[:16]
- image_id = f"{image_hash}.jpg"
-
- # Save image to local storage directory organized by graphname
- project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
-
- # If graphname is provided, organize images by graph
- if graphname:
- images_dir = os.path.join(project_root, "static", "images", graphname)
- # Include graphname in the image reference for URL construction
- image_reference = f"{graphname}/{image_id}"
- else:
- images_dir = os.path.join(project_root, "static", "images")
- image_reference = image_id
-
- os.makedirs(images_dir, exist_ok=True)
-
- image_path = os.path.join(images_dir, image_id)
-
- # Save image file (skip if already exists with same hash)
- if not os.path.exists(image_path):
- with open(image_path, 'wb') as f:
- f.write(image_bytes)
- logger.info(f"Saved content image to: {image_path}")
- else:
- logger.debug(f"Image already exists: {image_path}")
-
- # Generate markdown with custom img:// protocol (will be replaced later)
- # Format:  or 
- markdown = f""
-
- logger.info(f"Created image reference: {image_reference} with description")
-
- return {
- 'markdown': markdown,
- 'image_id': image_reference,
- 'image_path': image_path,
- 'description': description
- }
-
- except Exception as e:
- logger.error(f"Failed to save image and generate markdown: {str(e)}")
- # Fallback to text description only
- fallback_desc = f"[Image: {context_info} - processing failed]"
- return {
- 'markdown': fallback_desc,
- 'image_id': None,
- 'image_path': None,
- 'description': fallback_desc
- }
-
-
diff --git a/common/utils/markdown_parsing.py b/common/utils/markdown_parsing.py
new file mode 100644
index 0000000..7c8c476
--- /dev/null
+++ b/common/utils/markdown_parsing.py
@@ -0,0 +1,63 @@
+import re
+import os
+import pymupdf4llm
+
+class MarkdownProcessor:
+ """
+ A helper class to extract markdown image entries and
+ update descriptions based on image_id.
+ """
+
+ # regex for markdown images: 
+ _pattern = re.compile(r'!\[([^\]]*)\]\(([^)\s]+)\)')
+
+ @classmethod
+ def extract_images(cls, md_text):
+ """
+ Returns list of {"path": path, "image_id": image_id}
+ image_id = basename without extension
+ """
+ images = []
+ for m in cls._pattern.finditer(md_text):
+ path = m.group(2)
+ basename = os.path.basename(path)
+ image_id = os.path.splitext(basename)[0]
+ images.append({"path": path, "image_id": image_id})
+ return images
+
+ @classmethod
+ def insert_description_by_id(cls, md_text, image_id, description):
+ """
+ Replace the description for an image whose basename == image_id.
+ """
+
+ def repl(m):
+ old_path = m.group(2)
+ candidate_id = os.path.splitext(os.path.basename(old_path))[0]
+
+ if candidate_id == image_id:
+ # Insert new description
+ return f''
+
+ return m.group(0)
+
+ return cls._pattern.sub(repl, md_text)
+
+ @classmethod
+ def replace_path_with_tg_protocol(cls, md_text, image_id, tg_reference):
+ """
+ Replace the file path for an image whose basename == image_id with tg:// protocol reference.
+ tg_reference should be like 'Graphs_image_1'
+ """
+ def repl(m):
+ old_path = m.group(2)
+ candidate_id = os.path.splitext(os.path.basename(old_path))[0]
+
+ if candidate_id == image_id:
+ # Replace path with tg:// protocol reference
+ alt_text = m.group(1)
+ return f''
+
+ return m.group(0)
+
+ return cls._pattern.sub(repl, md_text)
\ No newline at end of file
diff --git a/common/utils/text_extractors.py b/common/utils/text_extractors.py
index da3e22d..b900cae 100644
--- a/common/utils/text_extractors.py
+++ b/common/utils/text_extractors.py
@@ -183,137 +183,154 @@ def extract_text_from_file_with_images_as_docs(file_path, graphname=None):
def _extract_pdf_with_images_as_docs(file_path, base_doc_id, graphname=None):
"""
- Extract PDF as ONE markdown document with inline image references.
+ Extract PDF as ONE markdown document with inline image references using pymupdf4llm.
+ Uses unique temporary folder per PDF to allow parallel processing.
+ After processing, delete the extracted image folder.
"""
+ # Use unique folder per PDF to allow parallel processing without conflicts
+ unique_folder_id = uuid.uuid4().hex[:12]
+ image_output_folder = Path(f"tg_temp_{unique_folder_id}")
+
try:
- import fitz # PyMuPDF
+ import pymupdf4llm
from PIL import Image as PILImage
+ from common.utils.image_data_extractor import describe_image_with_llm
+ from common.utils.markdown_parsing import MarkdownProcessor
+
+ # Ensure clean slate - remove folder if it exists from failed previous run
+ if image_output_folder.exists():
+ shutil.rmtree(image_output_folder, ignore_errors=True)
+
+ # Convert PDF to markdown with extracted image files
+ try:
+ markdown_content = pymupdf4llm.to_markdown(
+ file_path,
+ write_images=True,
+ image_path=str(image_output_folder), # unique folder per PDF
+ force_text=False,
+ margins=0,
+ image_size_limit=0.08,
+ )
+ except Exception as e:
+ logger.error(f"pymupdf4llm failed for {file_path}: {e}")
+ # Cleanup folder if it was created
+ if image_output_folder.exists():
+ shutil.rmtree(image_output_folder, ignore_errors=True)
+ return [{
+ "doc_id": base_doc_id,
+ "doc_type": "markdown",
+ "content": f"[PDF extraction failed: {e}]",
+ "position": 0
+ }]
+
+ if not markdown_content or not markdown_content.strip():
+ logger.warning(f"No content extracted from PDF: {file_path}")
+
+ # Extract image references from markdown
+ image_refs = MarkdownProcessor.extract_images(markdown_content)
+
+ if not image_refs:
+ # cleanup folder anyway
+ if image_output_folder.exists():
+ shutil.rmtree(image_output_folder, ignore_errors=True)
+
+ return [{
+ "doc_id": base_doc_id,
+ "doc_type": "markdown",
+ "content": markdown_content,
+ "position": 0
+ }]
- doc = fitz.open(file_path)
- markdown_parts = []
image_entries = []
image_counter = 0
- for page_num, page in enumerate(doc, start=1):
- if page_num > 1:
- markdown_parts.append("\n\n")
- markdown_parts.append(f"--- Page {page_num} ---\n") #Avoid to be splitted as a single chunk
-
- blocks = page.get_text("blocks", sort=True)
- text_blocks_with_pos = []
-
- for block in blocks:
- block_type = block[6] if len(block) > 6 else 0
- if block_type == 0:
- text = block[4].strip()
- if text:
- y_pos = block[1]
- text_blocks_with_pos.append({'type': 'text', 'content': text, 'y_pos': y_pos})
-
- image_list = page.get_images(full=True)
- images_with_pos = []
-
- if image_list:
- for img_index, img_info in enumerate(image_list):
- try:
- xref = img_info[0]
- base_image = doc.extract_image(xref)
- image_bytes = base_image["image"]
- image_ext = base_image["ext"]
-
- img_rects = page.get_image_rects(xref)
- y_pos = img_rects[0].y0 if img_rects else 999999
-
- pil_image = PILImage.open(io.BytesIO(image_bytes))
- if pil_image.width < 100 or pil_image.height < 100:
- continue
-
- from common.utils.image_data_extractor import describe_image_with_llm
- description = describe_image_with_llm(pil_image)
- description_lower = description.lower()
- logo_indicators = [
- 'logo:', 'icon:', 'logo', 'icon', 'branding',
- 'watermark', 'trademark', 'stylized letter',
- 'stylized text', 'word "', "word '"
- ]
- if any(indicator in description_lower for indicator in logo_indicators):
- continue
-
- buffer = io.BytesIO()
- if pil_image.mode != 'RGB':
- pil_image = pil_image.convert('RGB')
- pil_image.save(buffer, format="JPEG", quality=95)
- image_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8')
-
- image_counter += 1
- image_doc_id = f"{base_doc_id}_image_{image_counter}"
-
- images_with_pos.append({
- 'type': 'image',
- 'image_doc_id': image_doc_id,
- 'description': description,
- 'y_pos': y_pos,
- 'image_data': image_base64,
- 'image_format': image_ext,
- 'width': pil_image.width,
- 'height': pil_image.height
- })
- except Exception as img_error:
- logger.warning(f"Failed to extract image on page {page_num}: {img_error}")
-
- all_elements = text_blocks_with_pos + images_with_pos
- all_elements.sort(key=lambda x: x['y_pos'])
-
- for element in all_elements:
- if element['type'] == 'text':
- markdown_parts.append(element['content'])
- markdown_parts.append("\n\n")
- else:
- # Add image description as text, then markdown image reference
- # Use short alt text in markdown, full description as regular text
- markdown_parts.append(f"![{element['description']}](tg://{element['image_doc_id']})\n\n")
-
- image_entries.append({
- "doc_id": element['image_doc_id'],
- "doc_type": "image",
- "image_description": element['description'],
- "image_data": element['image_data'],
- "image_format": element['image_format'],
- "parent_doc": base_doc_id,
- "page_number": page_num,
- "width": element['width'],
- "height": element['height'],
- "position": int(element['image_doc_id'].split('_')[-1])
- })
-
- doc.close()
-
- markdown_content = "".join(markdown_parts) if markdown_parts else "" #No content extracted from PDF
- if not markdown_content:
- return []
+ for img_ref in image_refs:
+ try:
+ img_path = Path(img_ref["path"]) # convert to Path
+ image_id = img_ref["image_id"]
+
+ # Image description
+ description = describe_image_with_llm(str(img_path))
+
+ markdown_content = MarkdownProcessor.insert_description_by_id(
+ markdown_content,
+ image_id,
+ description
+ )
+
+ # Convert image to base64
+ pil_image = PILImage.open(img_path)
+ buffer = io.BytesIO()
+
+ if pil_image.mode != "RGB":
+ pil_image = pil_image.convert("RGB")
+
+ pil_image.save(buffer, format="JPEG", quality=95)
+ image_base64 = base64.b64encode(buffer.getvalue()).decode("utf-8")
+
+ image_counter += 1
+ image_doc_id = f"{base_doc_id}_image_{image_counter}"
+
+ # Replace file path with tg:// protocol reference in markdown
+ markdown_content = MarkdownProcessor.replace_path_with_tg_protocol(
+ markdown_content,
+ image_id,
+ image_doc_id
+ )
+
+ image_entries.append({
+ "doc_id": image_doc_id,
+ "doc_type": "image",
+ "image_description": description,
+ "image_data": image_base64,
+ "image_format": "jpg",
+ "parent_doc": base_doc_id,
+ "page_number": 0,
+ "width": pil_image.width,
+ "height": pil_image.height,
+ "position": image_counter
+ })
+
+ except Exception as img_error:
+ logger.warning(f"Failed to process image {img_ref.get('path')}: {img_error}")
+
+ # FINAL CLEANUP — delete folder after processing everything
+ if image_output_folder.exists() and image_output_folder.is_dir():
+ try:
+ shutil.rmtree(image_output_folder)
+ logger.debug(f"Deleted image folder: {image_output_folder}")
+ except Exception as delete_err:
+ logger.warning(f"Failed to delete folder {image_output_folder}: {delete_err}")
+ # Build final result
result = [{
"doc_id": base_doc_id,
- "doc_type": "",
+ "doc_type": "markdown",
"content": markdown_content,
"position": 0
}]
result.extend(image_entries)
+
return result
- except ImportError:
- logger.error("PyMuPDF not available")
+ except ImportError as import_err:
+ logger.error(f"Required library missing: {import_err}")
+ # Cleanup on import error
+ if image_output_folder.exists():
+ shutil.rmtree(image_output_folder, ignore_errors=True)
return [{
"doc_id": base_doc_id,
- "doc_type": "",
- "content": "[PDF extraction requires PyMuPDF]",
+ "doc_type": "markdown",
+ "content": "[PDF extraction requires pymupdf4llm and PyMuPDF]",
"position": 0
}]
except Exception as e:
logger.error(f"Error extracting PDF: {e}")
+ # Cleanup on any other error
+ if image_output_folder.exists():
+ shutil.rmtree(image_output_folder, ignore_errors=True)
raise
-
def _extract_standalone_image_as_doc(file_path, base_doc_id, graphname=None):
"""
Extract standalone image file as ONE markdown document with inline image reference.
@@ -324,25 +341,15 @@ def _extract_standalone_image_as_doc(file_path, base_doc_id, graphname=None):
pil_image = PILImage.open(file_path)
if pil_image.width < 100 or pil_image.height < 100:
- return [{
- "doc_id": base_doc_id,
- "doc_type": "",
- "content": f"[Skipped small image: {file_path.name}]",
- "position": 0
- }]
+ pass
- description = describe_image_with_llm(pil_image)
+ description = describe_image_with_llm(str(Path(file_path).absolute()))
description_lower = description.lower()
logo_indicators = ['logo:', 'icon:', 'logo', 'icon', 'branding',
'watermark', 'trademark', 'stylized letter',
'stylized text', 'word "', "word '"]
if any(indicator in description_lower for indicator in logo_indicators):
- return [{
- "doc_id": base_doc_id,
- "doc_type": "",
- "content": f"[Skipped logo/icon: {file_path.name}]",
- "position": 0
- }]
+ return []
buffer = io.BytesIO()
if pil_image.mode != 'RGB':
@@ -353,7 +360,6 @@ def _extract_standalone_image_as_doc(file_path, base_doc_id, graphname=None):
image_id = f"{base_doc_id}_image_1"
# Put description as text, then markdown image reference with short alt text
content = f""
-
return [
{
"doc_id": base_doc_id,
@@ -379,7 +385,7 @@ def _extract_standalone_image_as_doc(file_path, base_doc_id, graphname=None):
logger.error(f"Error extracting image: {e}")
return [{
"doc_id": base_doc_id,
- "doc_type": "",
+ "doc_type": "markdown",
"content": f"[Image extraction failed: {str(e)}]",
"position": 0
}]
@@ -441,12 +447,10 @@ def get_doc_type_from_extension(extension):
if extension in ['.html', '.htm']:
return 'html'
- elif extension in ['.md']:
- return 'markdown'
elif extension in ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp']:
return 'image'
else:
- return ''
+ return 'markdown'
def get_supported_extensions():
diff --git a/graphrag-ui/src/pages/Setup.tsx b/graphrag-ui/src/pages/Setup.tsx
index b7d357d..2aaee99 100644
--- a/graphrag-ui/src/pages/Setup.tsx
+++ b/graphrag-ui/src/pages/Setup.tsx
@@ -2,7 +2,7 @@ import React, { useState, useEffect } from "react";
import { useNavigate } from "react-router-dom";
import { Button } from "@/components/ui/button";
import { Input } from "@/components/ui/input";
-import { Database, Upload, RefreshCw, Loader2, Trash2, FolderUp, Cloud, ArrowLeft, CloudDownload, CloudLightning } from "lucide-react";
+import { Database, Upload, RefreshCw, Loader2, Trash2, FolderUp, Cloud, ArrowLeft, CloudDownload, CloudCog } from "lucide-react";
import {
Dialog,
DialogContent,
@@ -56,7 +56,6 @@ const Setup = () => {
const [uploadMessage, setUploadMessage] = useState("");
const [isIngesting, setIsIngesting] = useState(false);
const [ingestMessage, setIngestMessage] = useState("");
- const [activeTab, setActiveTab] = useState("upload");
// Refresh state
const [refreshOpen, setRefreshOpen] = useState(false);
@@ -67,12 +66,13 @@ const Setup = () => {
const [isCheckingStatus, setIsCheckingStatus] = useState(false);
// S3 state
+ const [fileFormat, setFileFormat] = useState<"json" | "multi">("json");
const [awsAccessKey, setAwsAccessKey] = useState("");
const [awsSecretKey, setAwsSecretKey] = useState("");
+ const [dataPath, setDataPath] = useState("");
const [inputBucket, setInputBucket] = useState("");
const [outputBucket, setOutputBucket] = useState("");
const [regionName, setRegionName] = useState("");
- const [skipBDAProcessing, setSkipBDAProcessing] = useState(false);
// Cloud Download state
const [cloudProvider, setCloudProvider] = useState<"s3" | "gcs" | "azure">("s3");
@@ -458,7 +458,7 @@ const Setup = () => {
}
const createData = await createResponse.json();
- //console.log("Create ingest response:", createData);
+ console.log("Create ingest response:", createData);
// Step 2: Run ingest
setIngestMessage("Step 2/2: Running document ingest...");
@@ -484,7 +484,7 @@ const Setup = () => {
}
const ingestData = await ingestResponse.json();
- //console.log("Ingest response:", ingestData);
+ console.log("Ingest response:", ingestData);
setIngestMessage(`✅ Data ingested successfully! Processed documents from ${folderPath}/`);
} catch (error: any) {
@@ -495,8 +495,8 @@ const Setup = () => {
}
};
- // Ingest files from S3 with Amazon BDA
- const handleAmazonBDAIngest = async () => {
+ // Ingest files from S3 with Bedrock BDA
+ const handleS3BedrockIngest = async () => {
if (!ingestGraphName) {
setIngestMessage("Please select a graph");
return;
@@ -508,112 +508,92 @@ const Setup = () => {
return;
}
- if (skipBDAProcessing) {
- // When skipping BDA, only output bucket and region are required
- if (!outputBucket || !regionName) {
- setIngestMessage("❌ Please provide Output Bucket and Region Name");
- return;
- }
- } else {
- // When using BDA, all fields are required
+ if (fileFormat === "multi") {
if (!inputBucket || !outputBucket || !regionName) {
setIngestMessage("❌ Please provide Input Bucket, Output Bucket, and Region Name");
return;
}
- }
- // Ask for confirmation
- const confirmMessage = skipBDAProcessing
- ? `You're skipping Amazon BDA processing and will ingest directly from the output bucket (${outputBucket}). Please confirm to proceed.`
- : `You're using Amazon BDA for multimodal document processing. This will trigger Amazon BDA to process your documents from the input bucket (${inputBucket}) and store the results in the output bucket (${outputBucket}) and then ingest them into your knowledge graph. Please confirm to proceed.`;
-
- const shouldProceed = await confirm(confirmMessage);
- if (!shouldProceed) {
- setIngestMessage("Operation cancelled by user.");
- return;
+ // Ask for confirmation if using Bedrock (multi format)
+ const shouldProceed = await confirm(
+ `Are you using AWS Bedrock for multimodal document processing? This will trigger AWS Bedrock BDA to process your documents from the input bucket (${inputBucket}) and store the results in the output bucket (${outputBucket}).`
+ );
+ if (!shouldProceed) {
+ setIngestMessage("Operation cancelled by user.");
+ return;
+ }
+ } else if (fileFormat === "json") {
+ if (!dataPath) {
+ setIngestMessage("❌ Please provide Data Path (e.g., s3://bucket-name/path/to/data)");
+ return;
+ }
}
setIsIngesting(true);
+ setIngestMessage("Step 1/2: Creating ingest job...");
try {
const creds = localStorage.getItem("creds");
- let loadingInfo: any = {};
- if (skipBDAProcessing) {
- // Skip BDA processing - create ingest job that reads directly from output bucket
- const runIngestConfig: any = {
- data_source: "bda",
+ // Step 1: Create ingest job
+ const createIngestConfig: any = {
+ data_source: "s3",
+ data_source_config: {
aws_access_key: awsAccessKey,
aws_secret_key: awsSecretKey,
- output_bucket: outputBucket,
- region_name: regionName,
- bda_jobs:[],
- loader_config: {
- doc_id_field: "doc_id",
- content_field: "content",
- doc_type: "markdown",
- },
- file_format: "multi"
- };
-
- setIngestMessage("Step 1/2: Creating ingest job from output bucket...");
-
- // Run ingest directly
- loadingInfo = {
- load_job_id: "load_documents_content_json",
- data_source_id: runIngestConfig,
- file_path: outputBucket,
- };
- setIngestMessage(`Step 2/2: Running document ingestion for all files in ${outputBucket}...`);
- } else {
- // Step 1: Create ingest job with BDA processing
- const createIngestConfig: any = {
- data_source: "bda",
- data_source_config: {
- aws_access_key: awsAccessKey,
- aws_secret_key: awsSecretKey,
- input_bucket: inputBucket,
- output_bucket: outputBucket,
- region_name: regionName,
- },
- loader_config: {
- doc_id_field: "doc_id",
- content_field: "content",
- doc_type: "markdown",
- },
- file_format: "multi"
- };
+ },
+ loader_config: {
+ doc_id_field: "doc_id",
+ content_field: "content",
+ doc_type: fileFormat === "multi" ? "markdown" : "",
+ },
+ file_format: fileFormat
+ };
- setIngestMessage("Step 1/2: Triggering Amazon BDA processing and creating ingest job...");
+ // Add format-specific configuration
+ if (fileFormat === "multi") {
+ createIngestConfig.data_source_config.input_bucket = inputBucket;
+ createIngestConfig.data_source_config.output_bucket = outputBucket;
+ createIngestConfig.data_source_config.region_name = regionName;
+ setIngestMessage("Step 1/2: Creating ingest job and triggering AWS Bedrock BDA processing...");
+ } else if (fileFormat === "json") {
+ createIngestConfig.loader_config.doc_id_field = "url";
+ }
- const createResponse = await fetch(`/ui/${ingestGraphName}/create_ingest`, {
- method: "POST",
- headers: {
- "Content-Type": "application/json",
- Authorization: `Basic ${creds}`,
- },
- body: JSON.stringify(createIngestConfig),
- });
+ const createResponse = await fetch(`/ui/${ingestGraphName}/create_ingest`, {
+ method: "POST",
+ headers: {
+ "Content-Type": "application/json",
+ Authorization: `Basic ${creds}`,
+ },
+ body: JSON.stringify(createIngestConfig),
+ });
- if (!createResponse.ok) {
- const errorData = await createResponse.json();
- throw new Error(errorData.detail || `Failed to create ingest job: ${createResponse.statusText}`);
- }
+ if (!createResponse.ok) {
+ const errorData = await createResponse.json();
+ throw new Error(errorData.detail || `Failed to create ingest job: ${createResponse.statusText}`);
+ }
- const createData = await createResponse.json();
- //console.log("Create ingest response:", createData);
+ const createData = await createResponse.json();
+ console.log("Create ingest response:", createData);
- // Step 2: Run ingest
- loadingInfo = {
- load_job_id: createData.load_job_id,
- data_source_id: createData.data_source_id,
- file_path: outputBucket,
- };
+ // Step 2: Run ingest
+ setIngestMessage("Step 2/2: Running document ingest...");
- const filesToIngest = createData.data_source_id.bda_jobs.map((job: any) => job.jobId.split("/")[-1]);
- setIngestMessage(`Step 2/2: Running document ingest for ${filesToIngest.length} files in ${outputBucket}...`);
+ // Determine file path based on format
+ let filePath = "";
+ if (fileFormat === "multi") {
+ filePath = outputBucket; // For multi format, use output bucket
+ } else if (fileFormat === "json") {
+ filePath = dataPath; // For json format, use the provided data path
}
+ const loadingInfo = {
+ load_job_id: createData.load_job_id,
+ data_source_id: createData.data_source_id,
+ file_path: filePath,
+ };
+
const ingestResponse = await fetch(`/ui/${ingestGraphName}/ingest`, {
method: "POST",
headers: {
@@ -629,13 +609,15 @@ const Setup = () => {
}
const ingestData = await ingestResponse.json();
- //console.log("Ingest response:", ingestData);
- const filesIngested = ingestData.summary.map((file: any) => file.file_path);
-
- setIngestMessage(`✅ Document ingestion completed successfully! Ingested ${filesIngested.length} into your knowledge graph.`);
+ console.log("Ingest response:", ingestData);
+ if (fileFormat === "multi") {
+ setIngestMessage(`✅ Data ingested successfully! AWS Bedrock BDA processed documents from ${inputBucket} and loaded results from ${outputBucket}.`);
+ } else {
+ setIngestMessage(`✅ Data ingested successfully! Processed documents from ${dataPath}.`);
+ }
} catch (error: any) {
- console.error("Error ingesting files:", error);
+ console.error("Error ingesting S3 data:", error);
setIngestMessage(`❌ Error: ${error.message}`);
} finally {
setIsIngesting(false);
@@ -1121,8 +1103,8 @@ const Setup = () => {
-
- {/* Direct Ingestion Checkbox */}
-
- setDirectIngestion(e.target.checked)}
- className="mr-2 h-4 w-4 rounded border-gray-300 text-blue-600 focus:ring-blue-500"
- />
-
-
-
handleIngestDocuments("downloaded")}
- disabled={isIngesting}
+ onClick={handleRunIngest}
+ disabled={isIngesting || !tempSessionId}
className="gradient text-white w-full"
>
{isIngesting ? (
@@ -1813,71 +1910,6 @@ const Setup = () => {
{ingestMessage}
)}
-
- {/* Processed Temp Files - Review before ingesting */}
- {showTempFiles && tempFiles.length > 0 && (
-
-
-
- Processed Files ({tempFiles.length})
-
-
-
- Clear All
-
-
-
- Review the processed files below. You can delete any file before ingesting.
-
-
- {tempFiles.map((file, index) => (
-
-
-
- {file.doc_id}
-
-
- {(file.size / 1024).toFixed(2)} KB
-
-
-
handleDeleteTempFile(file.filename)}
- variant="outline"
- size="sm"
- className="ml-2 dark:border-[#3D3D3D]"
- >
-
-
-
- ))}
-
-
- {isIngesting ? (
- <>
-
- Ingesting...
- >
- ) : (
- <>
-
- Run Final Ingest
- >
- )}
-
-
- )}
)}
From 37ccf7089b0acff64cf9aba7827061df097af4ac Mon Sep 17 00:00:00 2001
From: Prins Kumar
Date: Tue, 18 Nov 2025 18:32:55 +0530
Subject: [PATCH 09/15] Update README for OpenAI and Bedrock config, add
pymupdf4llm license
---
README.md | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/README.md b/README.md
index 13c88b3..9469ad6 100644
--- a/README.md
+++ b/README.md
@@ -482,7 +482,7 @@ In addition to the `OPENAI_API_KEY`, `llm_model` and `model_name` can be edited
"model_kwargs": {
"temperature": 0
},
- "prompt_path": "./common/prompts/openai_gpt4/"
+ "prompt_path": "./app/prompts/openai_gpt4/"
},
"multimodal_service": {
"llm_service": "openai",
@@ -614,7 +614,7 @@ In addition to the `AZURE_OPENAI_ENDPOINT`, `AZURE_OPENAI_API_KEY`, and `azure_d
"temperature": 0,
"max_tokens": 4096
},
- "prompt_path": "./common/prompts/openai_gpt4/"
+ "prompt_path": "./app/prompts/aws_bedrock_claude3haiku/"
},
"multimodal_service": {
"llm_service": "bedrock",
From 74ce839bc08266433ba26d867eb39cd882a17f69 Mon Sep 17 00:00:00 2001
From: Prins Kumar
Date: Tue, 18 Nov 2025 22:55:22 +0530
Subject: [PATCH 10/15] Fix prompt_path to use ./common/prompts/ for OpenAI and
Bedrock
---
README.md | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/README.md b/README.md
index 9469ad6..13c88b3 100644
--- a/README.md
+++ b/README.md
@@ -482,7 +482,7 @@ In addition to the `OPENAI_API_KEY`, `llm_model` and `model_name` can be edited
"model_kwargs": {
"temperature": 0
},
- "prompt_path": "./app/prompts/openai_gpt4/"
+ "prompt_path": "./common/prompts/openai_gpt4/"
},
"multimodal_service": {
"llm_service": "openai",
@@ -614,7 +614,7 @@ In addition to the `AZURE_OPENAI_ENDPOINT`, `AZURE_OPENAI_API_KEY`, and `azure_d
"temperature": 0,
"max_tokens": 4096
},
- "prompt_path": "./app/prompts/aws_bedrock_claude3haiku/"
+ "prompt_path": "./common/prompts/openai_gpt4/"
},
"multimodal_service": {
"llm_service": "bedrock",
From 5090a71ded00a672bf248e3af5143ce8842bbe18 Mon Sep 17 00:00:00 2001
From: Prins Kumar
Date: Fri, 21 Nov 2025 20:57:53 +0530
Subject: [PATCH 11/15] bug fixes
---
graphrag/app/supportai/supportai.py | 79 +++++++++--------------------
1 file changed, 23 insertions(+), 56 deletions(-)
diff --git a/graphrag/app/supportai/supportai.py b/graphrag/app/supportai/supportai.py
index 88542dc..8dea43a 100644
--- a/graphrag/app/supportai/supportai.py
+++ b/graphrag/app/supportai/supportai.py
@@ -501,18 +501,17 @@ def create_ingest(
documents = server_processing_result.get("documents", [])
doc_count = len(documents)
- # Save each document as a separate JSON file
- for idx, doc_data in enumerate(documents):
- doc_filename = f"doc_{idx}_{doc_data.get('doc_id', 'unknown')}.json"
- doc_filepath = os.path.join(temp_folder, doc_filename)
- with open(doc_filepath, 'w', encoding='utf-8') as f:
- json.dump(doc_data, f, ensure_ascii=False, indent=2)
+ # Save all documents to a single JSONL file (our new logic)
+ jsonl_filepath = os.path.join(temp_folder, "processed_documents.jsonl")
+ with open(jsonl_filepath, 'w', encoding='utf-8') as f:
+ for doc_data in documents:
+ f.write(json.dumps(doc_data, ensure_ascii=False) + '\n')
# Clear documents from memory immediately after saving
documents.clear()
server_processing_result.clear()
- logger.info(f"Saved {doc_count} processed documents to {temp_folder}")
+ logger.info(f"Saved {doc_count} processed documents to {jsonl_filepath}")
res_ingest_config["temp_session_id"] = temp_session_id
res_ingest_config["temp_folder"] = temp_folder
@@ -671,7 +670,6 @@ def ingest(
}
elif ingest_config.get("data_source") == "server":
try:
- processed_files = []
data_source_id = ingest_config.get("data_source_id", "DocumentContent")
# Read from temporary folder
@@ -679,54 +677,23 @@ def ingest(
if not temp_folder or not os.path.exists(temp_folder):
raise Exception(f"Temporary folder not found: {temp_folder}")
- # Read all JSON files from temp folder
- json_files = [f for f in os.listdir(temp_folder) if f.endswith('.json')]
- logger.info(f"Reading {len(json_files)} documents from {temp_folder}")
+ # Read the processed_documents.jsonl file (our new logic)
+ jsonl_file = os.path.join(temp_folder, "processed_documents.jsonl")
+ if not os.path.exists(jsonl_file):
+ raise Exception(f"Processed documents file not found: {jsonl_file}")
- for json_filename in json_files:
- json_filepath = os.path.join(temp_folder, json_filename)
- try:
- with open(json_filepath, 'r', encoding='utf-8') as f:
- doc_data = json.load(f)
-
- if not doc_data.get("doc_id"):
- logger.warning(f"Skipping invalid document: {json_filename}")
- continue
- # Skip documents with neither content nor image_data
- if not doc_data.get("content") and not doc_data.get("image_data"):
- logger.warning(f"Skipping document with no content: {json_filename}")
- continue
-
- if doc_data.get("image_data"):
- payload = {
- "doc_id": doc_data.get("doc_id", ""),
- "doc_type": "image",
- "image_data": doc_data.get("image_data", ""),
- "image_format": doc_data.get("image_format", "jpg"),
- "image_description": doc_data.get("image_description", ""),
- "parent_doc": doc_data.get("parent_doc", ""),
- "page_number": doc_data.get("page_number", 0),
- "width": doc_data.get("width", 0),
- "height": doc_data.get("height", 0),
- "position": doc_data.get("position", 0),
- "content": ""
- }
- else:
- payload = {
- "doc_id": doc_data.get("doc_id", ""),
- "doc_type": doc_data.get("doc_type", "markdown"),
- "content": doc_data.get("content", "")
- }
- payload_json = json.dumps(payload)
- conn.runLoadingJobWithData(payload_json, data_source_id, loader_info.load_job_id)
- processed_files.append({
- 'file_path': doc_data.get("doc_id", ""),
- 'parent_doc': doc_data.get("parent_doc", ""),
- })
- logger.info(f"Data uploading done for doc_id: {doc_data.get('doc_id', 'unknown')}")
- except Exception as file_error:
- logger.error(f"Error processing file {json_filename}: {file_error}")
- continue
+ # Read entire JSONL content as a single string
+ with open(jsonl_file, 'r', encoding='utf-8') as f:
+ jsonl_content = f.read()
+
+ # Count documents for logging
+ document_count = jsonl_content.count('\n') if jsonl_content.strip() else 0
+ logger.info(f"Ingesting {document_count} documents from {jsonl_file}")
+
+ # Pass entire JSONL content in ONE call (efficient!)
+ conn.runLoadingJobWithData(jsonl_content, data_source_id, loader_info.load_job_id)
+
+ logger.info(f"Successfully ingested {document_count} documents")
# Clean up temp folder after successful ingestion
try:
@@ -740,7 +707,7 @@ def ingest(
raise Exception(f"Error during server markdown extraction and TigerGraph loading: {e}")
return {
"job_name": loader_info.load_job_id,
- "summary": processed_files
+ "summary": f"Data ingestion successful - processed {document_count} documents"
}
else:
raise Exception("Data source and file format combination not implemented")
From 5417f8872fd55ef7d2598b95314b6e7d05eb9fd0 Mon Sep 17 00:00:00 2001
From: Prins Kumar
Date: Mon, 1 Dec 2025 17:16:28 +0530
Subject: [PATCH 12/15] Merge latest main and consolidate markdown_parsing.py
into text_extractors.py
---
common/requirements.txt | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/common/requirements.txt b/common/requirements.txt
index f0022f3..3bbd096 100644
--- a/common/requirements.txt
+++ b/common/requirements.txt
@@ -108,7 +108,7 @@ ordered-set==4.1.0
orjson==3.10.18
packaging==24.2
pandas==2.2.3
-#pathtools==0.1.2
+pathtools==0.1.2
pillow==11.2.1
#PyMuPDF==1.26.4
pymupdf4llm==0.2.0
From eefcd67d012fd787a2555ba2f7825c585ae0a211 Mon Sep 17 00:00:00 2001
From: Prins Kumar
Date: Wed, 3 Dec 2025 15:23:46 +0530
Subject: [PATCH 13/15] Redesign temp file storage: save immediately during
file processing instead of after
---
common/utils/text_extractors.py | 77 ++++++++++++++++++++++++-----
graphrag/app/supportai/supportai.py | 14 +++++-
2 files changed, 77 insertions(+), 14 deletions(-)
diff --git a/common/utils/text_extractors.py b/common/utils/text_extractors.py
index ec5b140..4f22822 100644
--- a/common/utils/text_extractors.py
+++ b/common/utils/text_extractors.py
@@ -42,10 +42,11 @@ def __init__(self):
'.jpg': 'image/jpeg'
}
- async def _process_file_async(self, file_path, folder_path_obj, graphname):
+ async def _process_file_async(self, file_path, folder_path_obj, graphname, temp_folder=None, file_counter=None):
"""
Async helper to process a single file.
Runs in thread pool to avoid blocking on I/O operations.
+ If temp_folder is provided, saves documents immediately and returns metadata only.
"""
try:
loop = asyncio.get_event_loop()
@@ -57,6 +58,27 @@ async def _process_file_async(self, file_path, folder_path_obj, graphname):
graphname
)
+ # If temp_folder provided, save immediately and return metadata only
+ if temp_folder and doc_entries:
+ saved_files = []
+ for idx, doc_data in enumerate(doc_entries):
+ # Use file_counter for unique naming across all files
+ counter_val = next(file_counter) if file_counter else idx
+ doc_filename = f"doc_{counter_val}_{doc_data.get('doc_id', 'unknown')}.json"
+ doc_filepath = os.path.join(temp_folder, doc_filename)
+ with open(doc_filepath, 'w', encoding='utf-8') as f:
+ json.dump(doc_data, f, ensure_ascii=False, indent=2)
+ saved_files.append(doc_filename)
+
+ # Return metadata only, not full documents (memory efficient)
+ return {
+ 'success': True,
+ 'file_path': str(file_path),
+ 'saved_files': saved_files,
+ 'num_documents': len(doc_entries)
+ }
+
+ # No temp_folder - return documents in memory (legacy behavior)
return {
'success': True,
'file_path': str(file_path),
@@ -72,10 +94,11 @@ async def _process_file_async(self, file_path, folder_path_obj, graphname):
logger.warning(f"Failed to process file {file_path}: {e}")
return {'success': False, 'file_path': str(file_path), 'error': str(e)}
- async def _process_folder_async(self, folder_path, graphname=None, max_concurrent=10):
+ async def _process_folder_async(self, folder_path, graphname=None, max_concurrent=10, temp_folder=None):
"""
Async version of process_folder for parallel file processing.
This prevents conflicts when multiple users process folders simultaneously.
+ If temp_folder is provided, saves documents immediately to disk instead of holding in memory.
"""
logger.info(f"Processing local folder ASYNC: {folder_path} for graph: {graphname} (max_concurrent={max_concurrent})")
@@ -87,6 +110,11 @@ async def _process_folder_async(self, folder_path, graphname=None, max_concurren
if not folder_path_obj.is_dir():
raise Exception(f"Path is not a directory: {folder_path}")
+ # Create temp folder if provided
+ if temp_folder:
+ os.makedirs(temp_folder, exist_ok=True)
+ logger.info(f"Saving processed documents to: {temp_folder}")
+
def safe_walk(path):
try:
for item in path.iterdir():
@@ -111,16 +139,20 @@ def safe_walk(path):
logger.info(f"Found {len(files_to_process)} files to process")
semaphore = asyncio.Semaphore(max_concurrent)
+
+ # Thread-safe counter for unique file naming
+ file_counter = iter(range(100000)) if temp_folder else None
async def process_with_semaphore(file_path):
async with semaphore:
- return await self._process_file_async(file_path, folder_path_obj, graphname)
+ return await self._process_file_async(file_path, folder_path_obj, graphname, temp_folder, file_counter)
tasks = [process_with_semaphore(fp) for fp in files_to_process]
results = await asyncio.gather(*tasks, return_exceptions=True)
all_documents = []
processed_files_info = []
+ total_saved_files = []
for result in results:
if isinstance(result, Exception):
@@ -128,10 +160,15 @@ async def process_with_semaphore(file_path):
continue
if result.get('success'):
- all_documents.extend(result.get('documents', []))
+ # If temp_folder was used, documents are saved to disk
+ if temp_folder:
+ total_saved_files.extend(result.get('saved_files', []))
+ else:
+ all_documents.extend(result.get('documents', []))
+
processed_files_info.append({
'file_path': result['file_path'],
- 'num_documents': result.get('num_documents', len(result.get('documents', []))),
+ 'num_documents': result.get('num_documents', 0),
'status': 'success'
})
else:
@@ -141,23 +178,39 @@ async def process_with_semaphore(file_path):
'error': result.get('error', 'Unknown error')
})
- logger.info(f"Processed {len(processed_files_info)} files, extracted {len(all_documents)} total documents")
+ total_docs = len(total_saved_files) if temp_folder else len(all_documents)
+ logger.info(f"Processed {len(processed_files_info)} files, extracted {total_docs} total documents")
- return {
+ response = {
'statusCode': 200,
- 'message': f'Processed {len(processed_files_info)} files, {len(all_documents)} documents',
- 'documents': all_documents,
+ 'message': f'Processed {len(processed_files_info)} files, {total_docs} documents',
'files': processed_files_info,
- 'num_documents': len(all_documents)
+ 'num_documents': total_docs
}
+
+ # Only include documents in response if NOT saving to temp_folder
+ if temp_folder:
+ response['saved_to_temp'] = True
+ response['temp_folder'] = temp_folder
+ response['saved_files'] = total_saved_files
+ else:
+ response['documents'] = all_documents
+
+ return response
- def process_folder(self, folder_path, graphname=None):
+ def process_folder(self, folder_path, graphname=None, temp_folder=None):
"""
Process local folder with multiple file formats and extract text content.
Uses async processing internally for parallel file handling.
+
+ Args:
+ folder_path: Path to the folder containing files to process
+ graphname: Name of the graph (for context)
+ temp_folder: Optional path to save processed documents immediately.
+ If provided, documents are saved to disk instead of returned in memory.
"""
logger.info(f"Processing local folder: {folder_path} for graph: {graphname}")
- return asyncio.run(self._process_folder_async(folder_path, graphname))
+ return asyncio.run(self._process_folder_async(folder_path, graphname, temp_folder=temp_folder))
def extract_text_from_file_with_images_as_docs(file_path, graphname=None):
diff --git a/graphrag/app/supportai/supportai.py b/graphrag/app/supportai/supportai.py
index 8dea43a..63cfa3d 100644
--- a/graphrag/app/supportai/supportai.py
+++ b/graphrag/app/supportai/supportai.py
@@ -485,12 +485,22 @@ def create_ingest(
if data_path is None:
raise Exception("Folder path not provided for server processing")
try:
+ # Create temp folder BEFORE processing so extractor can save directly
+ temp_session_id = str(uuid.uuid4())
+ temp_folder = os.path.join("uploads", "ingestion_temp", graphname, temp_session_id)
+
+ # Process files and save immediately to temp folder (memory efficient)
extractor = TextExtractor()
- server_processing_result = extractor.process_folder(data_path, graphname=graphname)
+ server_processing_result = extractor.process_folder(
+ data_path,
+ graphname=graphname,
+ temp_folder=temp_folder # Extractor saves files as it processes
+ )
+
if server_processing_result.get("statusCode") != 200:
raise Exception(f"Server folder processing failed: {server_processing_result}")
- # Log only summary, NOT the full documents to avoid memory logging
+ doc_count = server_processing_result.get("num_documents", 0)
logger.info(f"Server folder processing completed: {server_processing_result.get('message')}")
# Save processed documents to temporary folder instead of keeping in memory
From 555a500f3d24bec8fafd556847c324b5f8f5761f Mon Sep 17 00:00:00 2001
From: Prins Kumar
Date: Thu, 4 Dec 2025 21:02:50 +0530
Subject: [PATCH 14/15] files processing with processed file json creation then
using single job to insert
---
common/utils/text_extractors.py | 191 +++++++++++++++++++---------
graphrag-ui/src/pages/Setup.tsx | 107 ++++++++--------
graphrag/app/routers/ui.py | 140 +++++++++++++++-----
graphrag/app/supportai/supportai.py | 43 ++-----
4 files changed, 310 insertions(+), 171 deletions(-)
diff --git a/common/utils/text_extractors.py b/common/utils/text_extractors.py
index 4f22822..9b5b652 100644
--- a/common/utils/text_extractors.py
+++ b/common/utils/text_extractors.py
@@ -42,11 +42,11 @@ def __init__(self):
'.jpg': 'image/jpeg'
}
- async def _process_file_async(self, file_path, folder_path_obj, graphname, temp_folder=None, file_counter=None):
+ async def _process_file_async(self, file_path, folder_path_obj, graphname, temp_folder, jsonl_file, jsonl_lock):
"""
Async helper to process a single file.
Runs in thread pool to avoid blocking on I/O operations.
- If temp_folder is provided, saves documents immediately and returns metadata only.
+ Appends documents immediately to JSONL file.
"""
try:
loop = asyncio.get_event_loop()
@@ -58,31 +58,21 @@ async def _process_file_async(self, file_path, folder_path_obj, graphname, temp_
graphname
)
- # If temp_folder provided, save immediately and return metadata only
- if temp_folder and doc_entries:
- saved_files = []
- for idx, doc_data in enumerate(doc_entries):
- # Use file_counter for unique naming across all files
- counter_val = next(file_counter) if file_counter else idx
- doc_filename = f"doc_{counter_val}_{doc_data.get('doc_id', 'unknown')}.json"
- doc_filepath = os.path.join(temp_folder, doc_filename)
- with open(doc_filepath, 'w', encoding='utf-8') as f:
- json.dump(doc_data, f, ensure_ascii=False, indent=2)
- saved_files.append(doc_filename)
-
- # Return metadata only, not full documents (memory efficient)
- return {
- 'success': True,
- 'file_path': str(file_path),
- 'saved_files': saved_files,
- 'num_documents': len(doc_entries)
- }
+ # Append each document to JSONL file immediately
+ if doc_entries:
+ # Use lock to ensure thread-safe writing to JSONL file
+ async with jsonl_lock:
+ await loop.run_in_executor(
+ None,
+ self._append_to_jsonl,
+ jsonl_file,
+ doc_entries
+ )
- # No temp_folder - return documents in memory (legacy behavior)
+ # Return metadata only, documents already saved to JSONL
return {
'success': True,
'file_path': str(file_path),
- 'documents': doc_entries,
'num_documents': len(doc_entries)
}
@@ -93,12 +83,21 @@ async def _process_file_async(self, file_path, folder_path_obj, graphname, temp_
except Exception as e:
logger.warning(f"Failed to process file {file_path}: {e}")
return {'success': False, 'file_path': str(file_path), 'error': str(e)}
+
+ def _append_to_jsonl(self, jsonl_file, doc_entries):
+ """
+ Append document entries to JSONL file.
+ Each document is written as a separate line.
+ """
+ with open(jsonl_file, 'a', encoding='utf-8') as f:
+ for doc_data in doc_entries:
+ json_line = json.dumps(doc_data, ensure_ascii=False)
+ f.write(json_line + '\n')
- async def _process_folder_async(self, folder_path, graphname=None, max_concurrent=10, temp_folder=None):
+ async def _process_folder_async(self, folder_path, graphname, temp_folder, max_concurrent=10):
"""
Async version of process_folder for parallel file processing.
- This prevents conflicts when multiple users process folders simultaneously.
- If temp_folder is provided, saves documents immediately to disk instead of holding in memory.
+ Saves all documents immediately to a single JSONL file as they are processed.
"""
logger.info(f"Processing local folder ASYNC: {folder_path} for graph: {graphname} (max_concurrent={max_concurrent})")
@@ -110,10 +109,12 @@ async def _process_folder_async(self, folder_path, graphname=None, max_concurren
if not folder_path_obj.is_dir():
raise Exception(f"Path is not a directory: {folder_path}")
- # Create temp folder if provided
- if temp_folder:
- os.makedirs(temp_folder, exist_ok=True)
- logger.info(f"Saving processed documents to: {temp_folder}")
+ # Create temp folder and JSONL file
+ os.makedirs(temp_folder, exist_ok=True)
+ jsonl_file = os.path.join(temp_folder, "processed_documents.jsonl")
+ # Create async lock for thread-safe JSONL writing
+ jsonl_lock = asyncio.Lock()
+ logger.info(f"Saving processed documents to: {jsonl_file}")
def safe_walk(path):
try:
@@ -139,20 +140,16 @@ def safe_walk(path):
logger.info(f"Found {len(files_to_process)} files to process")
semaphore = asyncio.Semaphore(max_concurrent)
-
- # Thread-safe counter for unique file naming
- file_counter = iter(range(100000)) if temp_folder else None
async def process_with_semaphore(file_path):
async with semaphore:
- return await self._process_file_async(file_path, folder_path_obj, graphname, temp_folder, file_counter)
+ return await self._process_file_async(file_path, folder_path_obj, graphname, temp_folder, jsonl_file, jsonl_lock)
tasks = [process_with_semaphore(fp) for fp in files_to_process]
results = await asyncio.gather(*tasks, return_exceptions=True)
- all_documents = []
processed_files_info = []
- total_saved_files = []
+ total_docs = 0
for result in results:
if isinstance(result, Exception):
@@ -160,15 +157,12 @@ async def process_with_semaphore(file_path):
continue
if result.get('success'):
- # If temp_folder was used, documents are saved to disk
- if temp_folder:
- total_saved_files.extend(result.get('saved_files', []))
- else:
- all_documents.extend(result.get('documents', []))
+ num_docs = result.get('num_documents', 0)
+ total_docs += num_docs
processed_files_info.append({
'file_path': result['file_path'],
- 'num_documents': result.get('num_documents', 0),
+ 'num_documents': num_docs,
'status': 'success'
})
else:
@@ -178,39 +172,118 @@ async def process_with_semaphore(file_path):
'error': result.get('error', 'Unknown error')
})
- total_docs = len(total_saved_files) if temp_folder else len(all_documents)
logger.info(f"Processed {len(processed_files_info)} files, extracted {total_docs} total documents")
- response = {
+ return {
'statusCode': 200,
'message': f'Processed {len(processed_files_info)} files, {total_docs} documents',
'files': processed_files_info,
- 'num_documents': total_docs
+ 'num_documents': total_docs,
+ 'temp_folder': temp_folder,
+ 'jsonl_file': jsonl_file
}
-
- # Only include documents in response if NOT saving to temp_folder
- if temp_folder:
- response['saved_to_temp'] = True
- response['temp_folder'] = temp_folder
- response['saved_files'] = total_saved_files
- else:
- response['documents'] = all_documents
-
- return response
- def process_folder(self, folder_path, graphname=None, temp_folder=None):
+ def process_folder(self, folder_path, graphname, temp_folder):
"""
Process local folder with multiple file formats and extract text content.
Uses async processing internally for parallel file handling.
+ Saves all documents to JSONL file immediately as they are processed.
Args:
folder_path: Path to the folder containing files to process
graphname: Name of the graph (for context)
- temp_folder: Optional path to save processed documents immediately.
- If provided, documents are saved to disk instead of returned in memory.
+ temp_folder: Path to save processed documents as JSONL file
"""
logger.info(f"Processing local folder: {folder_path} for graph: {graphname}")
- return asyncio.run(self._process_folder_async(folder_path, graphname, temp_folder=temp_folder))
+ return asyncio.run(self._process_folder_async(folder_path, graphname, temp_folder))
+
+ def delete_file_from_jsonl(self, temp_folder, filename):
+ """
+ Delete all documents related to a specific file from the JSONL file.
+
+ Args:
+ temp_folder: Path to the temp folder containing processed_documents.jsonl
+ filename: Original filename (e.g., "report.pdf", "stock_gs200.jpg")
+
+ Returns:
+ dict with status and number of documents removed
+ """
+ jsonl_file = os.path.join(temp_folder, "processed_documents.jsonl")
+
+ if not os.path.exists(jsonl_file):
+ logger.warning(f"JSONL file not found: {jsonl_file}")
+ return {'success': False, 'error': 'JSONL file not found'}
+
+ # Get base name without extension to match doc_id
+ base_name = Path(filename).stem
+ logger.info(f"Deleting documents for file: {filename} (base_name: '{base_name}')")
+
+ # Read all lines and filter out ones matching this file
+ remaining_lines = []
+ removed_count = 0
+ removed_doc_ids = []
+
+ try:
+ with open(jsonl_file, 'r', encoding='utf-8') as f:
+ for line_num, line in enumerate(f, 1):
+ line = line.strip()
+ if not line:
+ continue
+
+ try:
+ doc_data = json.loads(line)
+ doc_id = doc_data.get('doc_id', '')
+
+ # Check if doc_id matches the base_name or starts with base_name_
+ # Handles: "stock_gs200" == "stock_gs200" or "stock_gs200_image_1".startswith("stock_gs200_")
+ if doc_id == base_name or doc_id.startswith(f"{base_name}_"):
+ removed_count += 1
+ removed_doc_ids.append(doc_id)
+ logger.info(f"Removing document: {doc_id}")
+ else:
+ remaining_lines.append(line)
+ except json.JSONDecodeError as e:
+ logger.warning(f"Skipping invalid JSON at line {line_num}: {e}")
+ # Keep invalid lines in case they're important
+ remaining_lines.append(line)
+
+ if removed_count == 0:
+ logger.warning(f"No documents found matching base_name: '{base_name}'")
+ return {
+ 'success': False,
+ 'error': f'No documents found for {filename}',
+ 'removed_count': 0
+ }
+
+ # If no lines remain, delete the entire temp folder
+ if not remaining_lines:
+ logger.info(f"No documents remaining, deleting temp folder: {temp_folder}")
+ import shutil
+ shutil.rmtree(temp_folder, ignore_errors=True)
+ return {
+ 'success': True,
+ 'removed_count': removed_count,
+ 'removed_doc_ids': removed_doc_ids,
+ 'temp_folder_deleted': True
+ }
+
+ # Write remaining lines back to JSONL
+ with open(jsonl_file, 'w', encoding='utf-8') as f:
+ for line in remaining_lines:
+ f.write(line + '\n')
+
+ logger.info(f"Removed {removed_count} documents ({', '.join(removed_doc_ids)}), {len(remaining_lines)} remaining")
+ return {
+ 'success': True,
+ 'removed_count': removed_count,
+ 'removed_doc_ids': removed_doc_ids,
+ 'remaining_count': len(remaining_lines),
+ 'temp_folder_deleted': False
+ }
+
+ except Exception as e:
+ logger.error(f"Error deleting from JSONL: {e}")
+ return {'success': False, 'error': str(e)}
def extract_text_from_file_with_images_as_docs(file_path, graphname=None):
diff --git a/graphrag-ui/src/pages/Setup.tsx b/graphrag-ui/src/pages/Setup.tsx
index 17f952c..c3e4689 100644
--- a/graphrag-ui/src/pages/Setup.tsx
+++ b/graphrag-ui/src/pages/Setup.tsx
@@ -398,11 +398,18 @@ const Setup = () => {
const data = await response.json();
if (data.status === "success") {
- setDownloadMessage(`✅ ${data.message}. Processing...`);
+ setDownloadMessage(`✅ ${data.message}. Processed ${data.doc_count || 0} document(s)`);
await fetchDownloadedFiles();
- // Step 2: Call create_ingest to process downloaded files
- await handleCreateIngestAfterUpload("downloaded");
+ // Save session ID from automatic processing
+ if (data.temp_session_id) {
+ setTempSessionId(data.temp_session_id);
+ await fetchTempFiles(data.temp_session_id);
+ }
+ } else if (data.status === "partial_success") {
+ setDownloadMessage(`⚠️ ${data.message}`);
+ await fetchDownloadedFiles();
+ // Don't call create_ingest if processing already attempted
} else if (data.status === "warning") {
setDownloadMessage(`⚠️ ${data.message}`);
} else {
@@ -421,22 +428,26 @@ const Setup = () => {
try {
const creds = localStorage.getItem("creds");
- const response = await fetch(
- `/ui/${ingestGraphName}/cloud/delete?filename=${encodeURIComponent(filename)}`,
- {
- method: "DELETE",
- headers: { Authorization: `Basic ${creds}` },
- }
- );
- const data = await response.json();
- // Also delete corresponding temp files if session exists
+ // Build URL with session_id if available
+ let deleteUrl = `/ui/${ingestGraphName}/cloud/delete?filename=${encodeURIComponent(filename)}`;
if (tempSessionId) {
- await handleDeleteTempFilesForOriginal(filename);
+ deleteUrl += `&session_id=${encodeURIComponent(tempSessionId)}`;
}
+ const response = await fetch(deleteUrl, {
+ method: "DELETE",
+ headers: { Authorization: `Basic ${creds}` },
+ });
+ const data = await response.json();
+
setDownloadMessage(`✅ ${data.message}`);
await fetchDownloadedFiles();
+
+ // Refresh temp files list if session exists
+ if (tempSessionId) {
+ await fetchTempFiles(tempSessionId);
+ }
} catch (error: any) {
setDownloadMessage(`❌ Error: ${error.message}`);
}
@@ -451,13 +462,26 @@ const Setup = () => {
try {
const creds = localStorage.getItem("creds");
- const response = await fetch(`/ui/${ingestGraphName}/cloud/delete`, {
+
+ // Build URL with session_id if available
+ let deleteUrl = `/ui/${ingestGraphName}/cloud/delete`;
+ if (tempSessionId) {
+ deleteUrl += `?session_id=${encodeURIComponent(tempSessionId)}`;
+ }
+
+ const response = await fetch(deleteUrl, {
method: "DELETE",
headers: { Authorization: `Basic ${creds}` },
});
const data = await response.json();
setDownloadMessage(`✅ ${data.message}`);
await fetchDownloadedFiles();
+
+ // Clear session ID and refresh temp files
+ if (tempSessionId) {
+ setTempSessionId(null);
+ setTempFiles([]);
+ }
} catch (error: any) {
setDownloadMessage(`❌ Error: ${error.message}`);
}
@@ -541,44 +565,24 @@ const Setup = () => {
}
try {
- // Extract base name without extension (e.g., "document.pdf" -> "document")
- const baseName = originalFilename.replace(/\.[^/.]+$/, "");
- console.log("Base name:", baseName);
-
const creds = localStorage.getItem("creds");
- // Fetch temp files to find matches
- const response = await fetch(`/ui/${ingestGraphName}/ingestion_temp/list?session_id=${tempSessionId}`, {
- headers: { Authorization: `Basic ${creds}` },
- });
- const data = await response.json();
- console.log("Temp files list response:", data);
-
- if (data.status === "success" && data.sessions.length > 0) {
- const files = data.sessions[0].files || [];
- console.log("All temp files:", files.map((f: any) => f.filename));
-
- // Find temp files matching pattern: doc_{idx}_{baseName}*.json
- const matchingFiles = files.filter((f: any) => f.filename.includes(`_${baseName}`));
- console.log("Matching files to delete:", matchingFiles.map((f: any) => f.filename));
-
- // Delete each matching file
- for (const file of matchingFiles) {
- console.log("Deleting temp file:", file.filename);
- const deleteResponse = await fetch(
- `/ui/${ingestGraphName}/ingestion_temp/delete?session_id=${tempSessionId}&filename=${encodeURIComponent(file.filename)}`,
- {
- method: "DELETE",
- headers: { Authorization: `Basic ${creds}` },
- }
- );
- const deleteData = await deleteResponse.json();
- console.log("Delete response:", deleteData);
+ // Call the delete endpoint with the original filename
+ // The backend will handle removing all related documents from the JSONL file
+ const deleteResponse = await fetch(
+ `/ui/${ingestGraphName}/ingestion_temp/delete?session_id=${tempSessionId}&filename=${encodeURIComponent(originalFilename)}`,
+ {
+ method: "DELETE",
+ headers: { Authorization: `Basic ${creds}` },
}
-
- console.log(`Successfully deleted ${matchingFiles.length} temp file(s)`);
+ );
+ const deleteData = await deleteResponse.json();
+ console.log("Delete temp files response:", deleteData);
+
+ if (deleteData.status === "success") {
+ console.log(`Successfully deleted processed documents for ${originalFilename}`);
} else {
- console.log("No temp files found or empty sessions");
+ console.error("Failed to delete temp files:", deleteData);
}
} catch (error: any) {
console.error("Error deleting temp files:", error);
@@ -621,7 +625,8 @@ const Setup = () => {
const ingestData = await ingestResponse.json();
console.log("Ingest response:", ingestData);
- setIngestMessage(`✅ Data ingested successfully! Processed ${tempFiles.length} documents.`);
+ const docCount = ingestData.document_count || tempFiles.length;
+ setIngestMessage(`✅ Data ingested successfully! Processed ${docCount} document(s).`);
// Clear temp state
setTempFiles([]);
@@ -691,7 +696,7 @@ const Setup = () => {
data_source_id: createData.data_source_id,
data_path: createData.data_path || createData.file_path,
});
- setIngestMessage(`✅ Processed ${createData.data_source_id.file_count} files. Review them below before ingesting.`);
+ setIngestMessage(`✅ Files processed successfully. Review them below before ingesting.`);
await fetchTempFiles(sessionId);
setIsIngesting(false);
} else {
@@ -802,7 +807,7 @@ const Setup = () => {
await handleRunIngest();
} else {
// Save for later - files ready for ingestion
- setUploadMessage(`✅ Successfully processed ${createData.data_source_id.file_count} files. Ready for ingestion.`);
+ setUploadMessage(`✅ Files processed successfully. Ready for ingestion.`);
}
} else {
console.warn("No session ID returned from create_ingest");
diff --git a/graphrag/app/routers/ui.py b/graphrag/app/routers/ui.py
index 9b012ec..9d8cc33 100644
--- a/graphrag/app/routers/ui.py
+++ b/graphrag/app/routers/ui.py
@@ -1058,7 +1058,8 @@ async def download_from_cloud(
request_body: dict = Body(...),
):
"""
- Download files from cloud storage (S3, GCS, or Azure) to local directory.
+ Download files from cloud storage (S3, GCS, or Azure) to local directory
+ and automatically process them to create JSONL files for ingestion.
Parameters:
- graphname: The graph name to associate downloaded files with
@@ -1252,14 +1253,49 @@ async def download_from_cloud(
logger.info(f"Downloaded {len(downloaded_files)} file(s) from {provider} for graph {graphname}")
- return {
- "status": "success",
- "message": f"Successfully downloaded {len(downloaded_files)} file(s) from {provider}",
- "graphname": graphname,
- "provider": provider,
- "downloaded_files": downloaded_files,
- "local_path": download_dir,
- }
+ # Automatically process downloaded files to create JSONL
+ from common.utils.text_extractors import TextExtractor
+ temp_session_id = str(uuid.uuid4())
+ temp_folder = os.path.join("uploads", "ingestion_temp", graphname, temp_session_id)
+
+ try:
+ extractor = TextExtractor()
+ processing_result = extractor.process_folder(
+ download_dir,
+ graphname=graphname,
+ temp_folder=temp_folder
+ )
+
+ if processing_result.get("statusCode") != 200:
+ logger.error(f"Cloud file processing failed: {processing_result}")
+ raise Exception(f"Failed to process downloaded files: {processing_result}")
+
+ doc_count = processing_result.get("num_documents", 0)
+ logger.info(f"Processed {doc_count} documents from downloaded files")
+
+ return {
+ "status": "success",
+ "message": f"Successfully downloaded and processed {len(downloaded_files)} file(s) from {provider}",
+ "graphname": graphname,
+ "provider": provider,
+ "downloaded_files": downloaded_files,
+ "local_path": download_dir,
+ "temp_session_id": temp_session_id,
+ "temp_folder": temp_folder,
+ "doc_count": doc_count,
+ }
+ except Exception as e:
+ logger.error(f"Error processing downloaded files: {e}")
+ # Return success for download but warn about processing failure
+ return {
+ "status": "partial_success",
+ "message": f"Downloaded {len(downloaded_files)} file(s) but processing failed: {str(e)}",
+ "graphname": graphname,
+ "provider": provider,
+ "downloaded_files": downloaded_files,
+ "local_path": download_dir,
+ "processing_error": str(e),
+ }
except HTTPException:
raise
@@ -1322,13 +1358,16 @@ async def delete_cloud_downloads(
graphname: str,
credentials: Annotated[HTTPBase, Depends(security)],
filename: str = None,
+ session_id: str = None,
):
"""
Delete downloaded cloud files for a specific graph.
+ Also deletes corresponding processed documents from the JSONL file.
Parameters:
- graphname: The graph name whose downloaded files to clear
- filename: If provided, only delete this specific file. Otherwise, delete all files.
+ - session_id: The session ID for the temp folder containing processed JSONL
"""
try:
download_dir = os.path.join("downloaded_files_cloud", graphname)
@@ -1343,9 +1382,26 @@ async def delete_cloud_downloads(
deleted_files = []
if filename:
- # Delete specific file
+ # Delete specific file AND its processed documents from JSONL
file_path = os.path.join(download_dir, filename)
if os.path.exists(file_path) and os.path.isfile(file_path):
+ # If session_id provided, also delete from JSONL
+ if session_id:
+ from common.utils.text_extractors import TextExtractor
+ extractor = TextExtractor()
+
+ temp_folder = os.path.join("uploads", "ingestion_temp", graphname, session_id)
+ if os.path.exists(temp_folder):
+ # Delete from JSONL first
+ delete_result = extractor.delete_file_from_jsonl(temp_folder, filename)
+ logger.info(f"JSONL delete result for {filename}: {delete_result}")
+
+ # If JSONL delete failed (and JSONL exists), warn but continue with file deletion
+ if not delete_result.get('success'):
+ logger.warning(f"Failed to delete from JSONL: {delete_result.get('error')}")
+ # Continue with file deletion even if JSONL deletion failed
+
+ # Delete the original downloaded file
os.remove(file_path)
deleted_files.append(filename)
logger.info(f"Deleted cloud download {filename} for graph {graphname}")
@@ -1353,13 +1409,21 @@ async def delete_cloud_downloads(
raise HTTPException(status_code=404, detail=f"File {filename} not found")
else:
# Delete all files in the directory
- for filename in os.listdir(download_dir):
- file_path = os.path.join(download_dir, filename)
+ for fname in os.listdir(download_dir):
+ file_path = os.path.join(download_dir, fname)
if os.path.isfile(file_path):
os.remove(file_path)
- deleted_files.append(filename)
+ deleted_files.append(fname)
- # Remove the directory if it's empty
+ # If session_id provided, delete the entire temp folder
+ if session_id:
+ temp_folder = os.path.join("uploads", "ingestion_temp", graphname, session_id)
+ if os.path.exists(temp_folder):
+ import shutil
+ shutil.rmtree(temp_folder, ignore_errors=True)
+ logger.info(f"Deleted temp folder for session {session_id}")
+
+ # Remove the download directory if it's empty
if not os.listdir(download_dir):
os.rmdir(download_dir)
@@ -1478,25 +1542,41 @@ async def delete_ingestion_temp_files(
deleted_files = []
if filename:
- # Delete specific file
- file_path = os.path.join(session_dir, filename)
- if os.path.exists(file_path) and os.path.isfile(file_path):
- os.remove(file_path)
- deleted_files.append(filename)
- logger.info(f"Deleted temp file {filename} from session {session_id}")
-
- # If session folder is now empty, remove it
- if not os.listdir(session_dir):
- os.rmdir(session_dir)
- logger.info(f"Removed empty session folder {session_id}")
+ # Delete processed documents from JSONL for this original filename
+ # Note: Original files are NOT in temp folder, only processed_documents.jsonl is here
+ from common.utils.text_extractors import TextExtractor
+ extractor = TextExtractor()
+
+ # Delete from JSONL - MUST succeed
+ delete_result = extractor.delete_file_from_jsonl(session_dir, filename)
+ logger.info(f"JSONL delete result for {filename}: {delete_result}")
+
+ # If JSONL delete failed, return error
+ if not delete_result.get('success'):
+ error_msg = delete_result.get('error', 'Unknown error')
+ logger.error(f"Failed to delete from JSONL: {error_msg}")
+ raise HTTPException(status_code=500, detail=f"Failed to delete processed documents: {error_msg}")
+
+ deleted_files.append(filename)
+ logger.info(f"Deleted {delete_result.get('removed_count', 0)} processed documents for {filename} from JSONL")
+
+ # Check if temp folder was deleted by JSONL cleanup
+ if delete_result.get('temp_folder_deleted'):
+ logger.info(f"Session folder {session_id} was automatically deleted (no documents remaining)")
+ elif not os.path.exists(session_dir):
+ logger.info(f"Session folder {session_id} was deleted")
+ elif not os.listdir(session_dir):
+ # Clean up empty session folder
+ os.rmdir(session_dir)
+ logger.info(f"Removed empty session folder {session_id}")
else:
- raise HTTPException(status_code=404, detail=f"File {filename} not found")
+ logger.info(f"Removed {delete_result.get('removed_count', 0)} documents from JSONL, {delete_result.get('remaining_count', 0)} remaining")
else:
- # Delete entire session folder
+ # Delete entire session folder (including JSONL)
import shutil
- for filename in os.listdir(session_dir):
- if os.path.isfile(os.path.join(session_dir, filename)):
- deleted_files.append(filename)
+ for fname in os.listdir(session_dir):
+ if os.path.isfile(os.path.join(session_dir, fname)):
+ deleted_files.append(fname)
shutil.rmtree(session_dir)
logger.info(f"Deleted session folder {session_id} for graph {graphname}")
diff --git a/graphrag/app/supportai/supportai.py b/graphrag/app/supportai/supportai.py
index 63cfa3d..a9dbbe0 100644
--- a/graphrag/app/supportai/supportai.py
+++ b/graphrag/app/supportai/supportai.py
@@ -502,26 +502,6 @@ def create_ingest(
doc_count = server_processing_result.get("num_documents", 0)
logger.info(f"Server folder processing completed: {server_processing_result.get('message')}")
-
- # Save processed documents to temporary folder instead of keeping in memory
- temp_session_id = str(uuid.uuid4())
- temp_folder = os.path.join("uploads", "ingestion_temp", graphname, temp_session_id)
- os.makedirs(temp_folder, exist_ok=True)
-
- documents = server_processing_result.get("documents", [])
- doc_count = len(documents)
-
- # Save all documents to a single JSONL file (our new logic)
- jsonl_filepath = os.path.join(temp_folder, "processed_documents.jsonl")
- with open(jsonl_filepath, 'w', encoding='utf-8') as f:
- for doc_data in documents:
- f.write(json.dumps(doc_data, ensure_ascii=False) + '\n')
-
- # Clear documents from memory immediately after saving
- documents.clear()
- server_processing_result.clear()
-
- logger.info(f"Saved {doc_count} processed documents to {jsonl_filepath}")
res_ingest_config["temp_session_id"] = temp_session_id
res_ingest_config["temp_folder"] = temp_folder
@@ -682,28 +662,28 @@ def ingest(
try:
data_source_id = ingest_config.get("data_source_id", "DocumentContent")
- # Read from temporary folder
+ # Read from temporary folder's JSONL file
temp_folder = ingest_config.get("temp_folder")
if not temp_folder or not os.path.exists(temp_folder):
raise Exception(f"Temporary folder not found: {temp_folder}")
- # Read the processed_documents.jsonl file (our new logic)
+ # Read the entire JSONL file as a string
jsonl_file = os.path.join(temp_folder, "processed_documents.jsonl")
if not os.path.exists(jsonl_file):
- raise Exception(f"Processed documents file not found: {jsonl_file}")
+ raise Exception(f"JSONL file not found: {jsonl_file}")
- # Read entire JSONL content as a single string
+ logger.info(f"Reading JSONL file: {jsonl_file}")
+
+ # Read entire JSONL content
with open(jsonl_file, 'r', encoding='utf-8') as f:
jsonl_content = f.read()
- # Count documents for logging
- document_count = jsonl_content.count('\n') if jsonl_content.strip() else 0
- logger.info(f"Ingesting {document_count} documents from {jsonl_file}")
-
- # Pass entire JSONL content in ONE call (efficient!)
+ # Load all documents in one call - runLoadingJobWithData supports JSONL format
conn.runLoadingJobWithData(jsonl_content, data_source_id, loader_info.load_job_id)
- logger.info(f"Successfully ingested {document_count} documents")
+ # Count documents for reporting
+ doc_count = sum(1 for line in jsonl_content.strip().split('\n') if line.strip())
+ logger.info(f"Successfully ingested {doc_count} documents from JSONL")
# Clean up temp folder after successful ingestion
try:
@@ -717,7 +697,8 @@ def ingest(
raise Exception(f"Error during server markdown extraction and TigerGraph loading: {e}")
return {
"job_name": loader_info.load_job_id,
- "summary": f"Data ingestion successful - processed {document_count} documents"
+ "summary": f"Successfully ingested {doc_count} documents from JSONL",
+ "document_count": doc_count
}
else:
raise Exception("Data source and file format combination not implemented")
From 2d54b02a766ab209c65b8d27bd0c2741f906efb4 Mon Sep 17 00:00:00 2001
From: Prins Kumar
Date: Fri, 5 Dec 2025 21:52:01 +0530
Subject: [PATCH 15/15] Fix: Delete processed content from JSONL before
deleting original files
---
graphrag-ui/src/pages/Setup.tsx | 364 ++++++++++++++++----------------
graphrag/app/routers/ui.py | 170 ++++++---------
2 files changed, 238 insertions(+), 296 deletions(-)
diff --git a/graphrag-ui/src/pages/Setup.tsx b/graphrag-ui/src/pages/Setup.tsx
index c3e4689..0e15939 100644
--- a/graphrag-ui/src/pages/Setup.tsx
+++ b/graphrag-ui/src/pages/Setup.tsx
@@ -40,7 +40,7 @@ const Setup = () => {
const navigate = useNavigate();
const [confirm, confirmDialog, isConfirmDialogOpen] = useConfirm();
const [availableGraphs, setAvailableGraphs] = useState([]);
-
+
const [initializeGraphOpen, setInitializeGraphOpen] = useState(false);
const [graphName, setGraphName] = useState("");
const [isInitializing, setIsInitializing] = useState(false);
@@ -71,7 +71,7 @@ const Setup = () => {
const [refreshGraphName, setRefreshGraphName] = useState("");
const [isRebuildRunning, setIsRebuildRunning] = useState(false);
const [isCheckingStatus, setIsCheckingStatus] = useState(false);
-
+
// S3 state
const [fileFormat, setFileFormat] = useState<"json" | "multi">("json");
const [awsAccessKey, setAwsAccessKey] = useState("");
@@ -129,7 +129,7 @@ const Setup = () => {
}
const filesArray = Array.from(selectedFiles);
-
+
// Check if any single file exceeds the server limit
const oversizedFiles = filesArray.filter((file) => file.size > MAX_UPLOAD_SIZE_BYTES);
if (oversizedFiles.length > 0) {
@@ -169,7 +169,7 @@ const Setup = () => {
setUploadMessage(`✅ ${data.message} Processing...`);
setSelectedFiles(null);
await fetchUploadedFiles();
-
+
// Step 2: Call create_ingest to process uploaded files
console.log("Calling handleCreateIngestAfterUpload from main upload...");
await handleCreateIngestAfterUpload("uploaded");
@@ -200,9 +200,9 @@ const Setup = () => {
for (let i = 0; i < filesArray.length; i++) {
const file = filesArray[i];
const fileNumber = i + 1;
-
+
setUploadMessage(`Uploading file ${fileNumber}/${totalFiles}: ${file.name} (${formatBytes(file.size)})...`);
-
+
const formData = new FormData();
formData.append("files", file);
@@ -236,10 +236,10 @@ const Setup = () => {
} else {
setUploadMessage(`⚠️ Uploaded ${uploadedCount} files successfully, ${failedCount} failed. Processing...`);
}
-
+
setSelectedFiles(null);
await fetchUploadedFiles();
-
+
// Step 2: Call create_ingest to process uploaded files
console.log("Calling handleCreateIngestAfterUpload...");
await handleCreateIngestAfterUpload("uploaded");
@@ -261,25 +261,25 @@ const Setup = () => {
try {
const creds = localStorage.getItem("creds");
-
- // Also delete corresponding temp files FIRST if session exists
- if (tempSessionId) {
- console.log("Calling handleDeleteTempFilesForOriginal...");
- await handleDeleteTempFilesForOriginal(filename);
- }
-
- // Then delete original file
- const response = await fetch(
- `/ui/${ingestGraphName}/uploads?filename=${encodeURIComponent(filename)}`,
- {
- method: "DELETE",
- headers: { Authorization: `Basic ${creds}` },
- }
- );
+
+ // Delete original file (backend will also delete processed content from JSONL if session_id is provided)
+ const url = tempSessionId
+ ? `/ui/${ingestGraphName}/uploads?filename=${encodeURIComponent(filename)}&session_id=${tempSessionId}`
+ : `/ui/${ingestGraphName}/uploads?filename=${encodeURIComponent(filename)}`;
+
+ const response = await fetch(url, {
+ method: "DELETE",
+ headers: { Authorization: `Basic ${creds}` },
+ });
const data = await response.json();
-
+
setUploadMessage(`✅ ${data.message}`);
await fetchUploadedFiles();
+
+ // Refresh temp files list if session exists
+ if (tempSessionId) {
+ await fetchTempFiles(tempSessionId);
+ }
} catch (error: any) {
console.error("Delete error:", error);
setUploadMessage(`❌ Error: ${error.message}`);
@@ -300,12 +300,12 @@ const Setup = () => {
headers: { Authorization: `Basic ${creds}` },
});
const data = await response.json();
-
+
// Also clear temp session
if (tempSessionId) {
await handleDeleteAllTempFiles();
}
-
+
setUploadMessage(`✅ ${data.message}`);
await fetchUploadedFiles();
} catch (error: any) {
@@ -341,7 +341,7 @@ const Setup = () => {
try {
const creds = localStorage.getItem("creds");
-
+
// Prepare request body based on provider
let requestBody: any = { provider: cloudProvider };
@@ -398,18 +398,11 @@ const Setup = () => {
const data = await response.json();
if (data.status === "success") {
- setDownloadMessage(`✅ ${data.message}. Processed ${data.doc_count || 0} document(s)`);
+ setDownloadMessage(`✅ ${data.message}. Processing...`);
await fetchDownloadedFiles();
-
- // Save session ID from automatic processing
- if (data.temp_session_id) {
- setTempSessionId(data.temp_session_id);
- await fetchTempFiles(data.temp_session_id);
- }
- } else if (data.status === "partial_success") {
- setDownloadMessage(`⚠️ ${data.message}`);
- await fetchDownloadedFiles();
- // Don't call create_ingest if processing already attempted
+
+ // Step 2: Call create_ingest to process downloaded files
+ await handleCreateIngestAfterUpload("downloaded");
} else if (data.status === "warning") {
setDownloadMessage(`⚠️ ${data.message}`);
} else {
@@ -428,22 +421,21 @@ const Setup = () => {
try {
const creds = localStorage.getItem("creds");
-
- // Build URL with session_id if available
- let deleteUrl = `/ui/${ingestGraphName}/cloud/delete?filename=${encodeURIComponent(filename)}`;
- if (tempSessionId) {
- deleteUrl += `&session_id=${encodeURIComponent(tempSessionId)}`;
- }
-
- const response = await fetch(deleteUrl, {
+
+ // Delete original file (backend will also delete processed content from JSONL if session_id is provided)
+ const url = tempSessionId
+ ? `/ui/${ingestGraphName}/cloud/delete?filename=${encodeURIComponent(filename)}&session_id=${tempSessionId}`
+ : `/ui/${ingestGraphName}/cloud/delete?filename=${encodeURIComponent(filename)}`;
+
+ const response = await fetch(url, {
method: "DELETE",
headers: { Authorization: `Basic ${creds}` },
});
const data = await response.json();
-
+
setDownloadMessage(`✅ ${data.message}`);
await fetchDownloadedFiles();
-
+
// Refresh temp files list if session exists
if (tempSessionId) {
await fetchTempFiles(tempSessionId);
@@ -462,26 +454,13 @@ const Setup = () => {
try {
const creds = localStorage.getItem("creds");
-
- // Build URL with session_id if available
- let deleteUrl = `/ui/${ingestGraphName}/cloud/delete`;
- if (tempSessionId) {
- deleteUrl += `?session_id=${encodeURIComponent(tempSessionId)}`;
- }
-
- const response = await fetch(deleteUrl, {
+ const response = await fetch(`/ui/${ingestGraphName}/cloud/delete`, {
method: "DELETE",
headers: { Authorization: `Basic ${creds}` },
});
const data = await response.json();
setDownloadMessage(`✅ ${data.message}`);
await fetchDownloadedFiles();
-
- // Clear session ID and refresh temp files
- if (tempSessionId) {
- setTempSessionId(null);
- setTempFiles([]);
- }
} catch (error: any) {
setDownloadMessage(`❌ Error: ${error.message}`);
}
@@ -558,31 +537,51 @@ const Setup = () => {
// Delete temp files matching original filename
const handleDeleteTempFilesForOriginal = async (originalFilename: string) => {
console.log("handleDeleteTempFilesForOriginal called with:", originalFilename);
-
+
if (!ingestGraphName || !tempSessionId) {
console.log("No graph name or session ID, returning");
return;
}
try {
+ // Extract base name without extension (e.g., "document.pdf" -> "document")
+ const baseName = originalFilename.replace(/\.[^/.]+$/, "");
+ console.log("Base name:", baseName);
+
const creds = localStorage.getItem("creds");
-
- // Call the delete endpoint with the original filename
- // The backend will handle removing all related documents from the JSONL file
- const deleteResponse = await fetch(
- `/ui/${ingestGraphName}/ingestion_temp/delete?session_id=${tempSessionId}&filename=${encodeURIComponent(originalFilename)}`,
- {
- method: "DELETE",
- headers: { Authorization: `Basic ${creds}` },
+
+ // Fetch temp files to find matches
+ const response = await fetch(`/ui/${ingestGraphName}/ingestion_temp/list?session_id=${tempSessionId}`, {
+ headers: { Authorization: `Basic ${creds}` },
+ });
+ const data = await response.json();
+ console.log("Temp files list response:", data);
+
+ if (data.status === "success" && data.sessions.length > 0) {
+ const files = data.sessions[0].files || [];
+ console.log("All temp files:", files.map((f: any) => f.filename));
+
+ // Find temp files matching pattern: doc_{idx}_{baseName}*.json
+ const matchingFiles = files.filter((f: any) => f.filename.includes(`_${baseName}`));
+ console.log("Matching files to delete:", matchingFiles.map((f: any) => f.filename));
+
+ // Delete each matching file
+ for (const file of matchingFiles) {
+ console.log("Deleting temp file:", file.filename);
+ const deleteResponse = await fetch(
+ `/ui/${ingestGraphName}/ingestion_temp/delete?session_id=${tempSessionId}&filename=${encodeURIComponent(file.filename)}`,
+ {
+ method: "DELETE",
+ headers: { Authorization: `Basic ${creds}` },
+ }
+ );
+ const deleteData = await deleteResponse.json();
+ console.log("Delete response:", deleteData);
}
- );
- const deleteData = await deleteResponse.json();
- console.log("Delete temp files response:", deleteData);
-
- if (deleteData.status === "success") {
- console.log(`Successfully deleted processed documents for ${originalFilename}`);
+
+ console.log(`Successfully deleted ${matchingFiles.length} temp file(s)`);
} else {
- console.error("Failed to delete temp files:", deleteData);
+ console.log("No temp files found or empty sessions");
}
} catch (error: any) {
console.error("Error deleting temp files:", error);
@@ -625,9 +624,8 @@ const Setup = () => {
const ingestData = await ingestResponse.json();
console.log("Ingest response:", ingestData);
- const docCount = ingestData.document_count || tempFiles.length;
- setIngestMessage(`✅ Data ingested successfully! Processed ${docCount} document(s).`);
-
+ setIngestMessage(`✅ Data ingested successfully! Processed ${tempFiles.length} documents.`);
+
// Clear temp state
setTempFiles([]);
setShowTempFiles(false);
@@ -648,7 +646,7 @@ const Setup = () => {
return;
}
- const folderPath = sourceType === "uploaded"
+ const folderPath = sourceType === "uploaded"
? `uploads/${ingestGraphName}`
: `downloaded_files_cloud/${ingestGraphName}`;
@@ -687,7 +685,7 @@ const Setup = () => {
// Check if temp files were created (for server data source)
const sessionId = createData.data_source_id?.temp_session_id;
-
+
if (sessionId && !directIngestion) {
// Files are saved to temp storage - show them for review (only if not direct ingestion)
setTempSessionId(sessionId);
@@ -696,37 +694,37 @@ const Setup = () => {
data_source_id: createData.data_source_id,
data_path: createData.data_path || createData.file_path,
});
- setIngestMessage(`✅ Files processed successfully. Review them below before ingesting.`);
+ setIngestMessage(`✅ Processed ${createData.data_source_id.file_count} files. Review them below before ingesting.`);
await fetchTempFiles(sessionId);
setIsIngesting(false);
} else {
// No temp files (e.g., S3 Bedrock) OR direct ingestion enabled - proceed directly to ingest
- setIngestMessage("Step 2/2: Running document ingest...");
+ setIngestMessage("Step 2/2: Running document ingest...");
- const loadingInfo = {
- load_job_id: createData.load_job_id,
- data_source_id: createData.data_source_id,
+ const loadingInfo = {
+ load_job_id: createData.load_job_id,
+ data_source_id: createData.data_source_id,
file_path: createData.data_path || createData.file_path,
- };
+ };
- const ingestResponse = await fetch(`/ui/${ingestGraphName}/ingest`, {
- method: "POST",
- headers: {
- "Content-Type": "application/json",
- Authorization: `Basic ${creds}`,
- },
- body: JSON.stringify(loadingInfo),
- });
+ const ingestResponse = await fetch(`/ui/${ingestGraphName}/ingest`, {
+ method: "POST",
+ headers: {
+ "Content-Type": "application/json",
+ Authorization: `Basic ${creds}`,
+ },
+ body: JSON.stringify(loadingInfo),
+ });
- if (!ingestResponse.ok) {
- const errorData = await ingestResponse.json();
- throw new Error(errorData.detail || `Failed to run ingest: ${ingestResponse.statusText}`);
- }
+ if (!ingestResponse.ok) {
+ const errorData = await ingestResponse.json();
+ throw new Error(errorData.detail || `Failed to run ingest: ${ingestResponse.statusText}`);
+ }
- const ingestData = await ingestResponse.json();
- console.log("Ingest response:", ingestData);
+ const ingestData = await ingestResponse.json();
+ console.log("Ingest response:", ingestData);
- setIngestMessage(`✅ Data ingested successfully! Processed documents from ${folderPath}/`);
+ setIngestMessage(`✅ Data ingested successfully! Processed documents from ${folderPath}/`);
setIsIngesting(false);
}
} catch (error: any) {
@@ -740,16 +738,16 @@ const Setup = () => {
const handleCreateIngestAfterUpload = async (sourceType: "uploaded" | "downloaded" = "uploaded") => {
console.log("handleCreateIngestAfterUpload called with sourceType:", sourceType);
console.log("ingestGraphName:", ingestGraphName);
-
+
if (!ingestGraphName) {
console.log("No graph name, returning early");
return;
}
- const folderPath = sourceType === "uploaded"
+ const folderPath = sourceType === "uploaded"
? `uploads/${ingestGraphName}`
: `downloaded_files_cloud/${ingestGraphName}`;
-
+
console.log("folderPath:", folderPath);
try {
@@ -764,7 +762,7 @@ const Setup = () => {
loader_config: {},
file_format: "multi"
};
-
+
console.log("Calling create_ingest with config:", createIngestConfig);
const createResponse = await fetch(`/ui/${ingestGraphName}/create_ingest`, {
@@ -775,7 +773,7 @@ const Setup = () => {
},
body: JSON.stringify(createIngestConfig),
});
-
+
console.log("create_ingest response status:", createResponse.status);
if (!createResponse.ok) {
@@ -786,10 +784,10 @@ const Setup = () => {
const createData = await createResponse.json();
console.log("create_ingest response data:", createData);
-
+
const sessionId = createData.data_source_id?.temp_session_id;
console.log("Session ID:", sessionId);
-
+
if (sessionId) {
// Save session ID for later ingest
setTempSessionId(sessionId);
@@ -798,16 +796,16 @@ const Setup = () => {
data_source_id: createData.data_source_id,
data_path: createData.data_path || createData.file_path,
});
-
+
console.log("Direct ingestion enabled:", directIngestion);
-
+
if (directIngestion) {
// Direct ingestion - proceed to ingest immediately
setUploadMessage("Running direct ingestion...");
await handleRunIngest();
} else {
// Save for later - files ready for ingestion
- setUploadMessage(`✅ Files processed successfully. Ready for ingestion.`);
+ setUploadMessage(`✅ Successfully processed ${createData.data_source_id.file_count} files. Ready for ingestion.`);
}
} else {
console.warn("No session ID returned from create_ingest");
@@ -969,9 +967,9 @@ const Setup = () => {
const statusData = await statusResponse.json();
const wasRunning = isRebuildRunning;
const isCurrentlyRunning = statusData.is_running || false;
-
+
setIsRebuildRunning(isCurrentlyRunning);
-
+
if (isCurrentlyRunning) {
const startTime = statusData.started_at ? new Date(statusData.started_at * 1000).toLocaleString() : "unknown time";
setRefreshMessage(`⚠️ A rebuild is already in progress for "${graphName}" (started at ${startTime}). Please wait for it to complete.`);
@@ -1027,7 +1025,7 @@ const Setup = () => {
try {
const creds = localStorage.getItem("creds");
-
+
const response = await fetch(`/ui/${refreshGraphName}/rebuild_graph`, {
method: "POST",
headers: {
@@ -1058,12 +1056,12 @@ const Setup = () => {
if (refreshOpen && refreshGraphName) {
// Check status immediately when dialog opens
checkRebuildStatus(refreshGraphName, true);
-
+
// Set up polling to check status every 5 seconds while dialog is open
const intervalId = setInterval(() => {
checkRebuildStatus(refreshGraphName, false);
}, 5000);
-
+
return () => clearInterval(intervalId);
}
}, [refreshOpen, refreshGraphName]);
@@ -1163,10 +1161,10 @@ const Setup = () => {
setIsInitializing(false);
return;
}
-
+
setStatusMessage(`✅ Graph "${graphName}" created and initialized successfully! You can now close this dialog.`);
setStatusType("success");
-
+
// Add the new graph to the available graphs list
const newGraph = graphName;
setAvailableGraphs(prev => {
@@ -1180,7 +1178,7 @@ const Setup = () => {
}
return prev;
});
-
+
// Set the newly created graph as selected for ingestion
setIngestGraphName(graphName);
setRefreshGraphName(graphName);
@@ -1216,7 +1214,7 @@ const Setup = () => {
{/* Three cards displayed horizontally */}
-
+
{/* Section 1: Initialize Knowledge Graph */}
@@ -1231,7 +1229,7 @@ const Setup = () => {
- setInitializeGraphOpen(true)}
>
@@ -1255,7 +1253,7 @@ const Setup = () => {
- setIngestOpen(true)}
>
@@ -1279,7 +1277,7 @@ const Setup = () => {
- setRefreshOpen(true)}
>
@@ -1292,7 +1290,7 @@ const Setup = () => {
{/* Initialize Graph Dialog */}
-