"""
Document processor for extracting business insights using Gemma3.
This module processes document content from various sources (Google Drive, etc.)
and extracts structured business insights for case study generation.
"""
import logging
from typing import Dict, Any, List, Optional
import re
import asyncio
from gemma3_client import get_gemma3_client, Gemma3ClientError
from prompts import get_document_prompt, get_analysis_config
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class DocumentProcessorError(Exception):
"""Base exception for document processing errors."""
pass
class DocumentProcessor:
"""
Processes document content to extract business insights using Gemma3.
Handles different document types (proposals, case studies, contracts, general)
and extracts structured information suitable for case study generation.
"""
def __init__(self, gemma3_client=None):
"""
Initialize document processor.
Args:
gemma3_client: Optional Gemma3Client instance. If None, uses global client.
"""
self.gemma3 = gemma3_client or get_gemma3_client()
# Document type validation
self.valid_doc_types = {"proposal", "case_study", "contract", "general"}
# Text preprocessing patterns
self.cleanup_patterns = [
(r'\s+', ' '), # Multiple whitespace to single space
(r'\n{3,}', '\n\n'), # Multiple newlines to double newline
(r'[^\w\s\.\,\!\?\:\;\-\(\)\[\]\'\"\/\@\#\$\%\&\*\+\=]', ' '), # Remove special chars
]
def _preprocess_text(self, text: str) -> str:
"""
Clean and preprocess document text for better analysis.
Args:
text: Raw document text
Returns:
Cleaned text
"""
if not text or not isinstance(text, str):
return ""
# Apply cleanup patterns
cleaned = text.strip()
for pattern, replacement in self.cleanup_patterns:
cleaned = re.sub(pattern, replacement, cleaned)
# Truncate if too long (leave room for prompt)
max_length = 6000 # Conservative limit for context window
if len(cleaned) > max_length:
cleaned = cleaned[:max_length] + "... [truncated]"
logger.warning(f"Document truncated to {max_length} characters")
return cleaned.strip()
def _validate_doc_type(self, doc_type: str) -> str:
"""
Validate and normalize document type.
Args:
doc_type: Document type string
Returns:
Validated document type
"""
if not doc_type or not isinstance(doc_type, str):
return "general"
normalized = doc_type.lower().strip()
if normalized in self.valid_doc_types:
return normalized
# Try to guess document type from keywords
if any(keyword in normalized for keyword in ["proposal", "rfp", "bid"]):
return "proposal"
elif any(keyword in normalized for keyword in ["case", "study", "success"]):
return "case_study"
elif any(keyword in normalized for keyword in ["contract", "agreement", "sow"]):
return "contract"
else:
return "general"
def _validate_response(self, response: Dict[str, Any]) -> Dict[str, Any]:
"""
Validate and sanitize Gemma3 response.
Args:
response: Raw response from Gemma3
Returns:
Validated and structured response
"""
# Check for processing errors
if "error" in response:
logger.warning(f"Gemma3 processing error: {response.get('error')}")
return self._create_fallback_response(response.get("raw_response", ""))
# Expected fields for document analysis
expected_fields = {
"challenges": [],
"solutions": [],
"metrics": [],
"context": "",
"key_stakeholders": [],
"timeline": ""
}
# Validate and fill missing fields
validated = {}
for field, default in expected_fields.items():
value = response.get(field, default)
# Ensure lists are actually lists
if field in ["challenges", "solutions", "metrics", "key_stakeholders"]:
if not isinstance(value, list):
if isinstance(value, str) and value:
value = [value]
else:
value = default
# Filter empty strings
value = [item for item in value if item and isinstance(item, str)]
# Ensure strings are strings
elif field in ["context", "timeline"]:
if not isinstance(value, str):
value = str(value) if value else default
validated[field] = value
return validated
def _create_fallback_response(self, raw_text: str) -> Dict[str, Any]:
"""
Create a fallback response when Gemma3 processing fails.
Args:
raw_text: Original document text
Returns:
Basic structured response
"""
return {
"challenges": ["Unable to extract specific challenges from document"],
"solutions": ["Document contains business information requiring manual review"],
"metrics": [],
"context": "Document processing failed - manual review recommended",
"key_stakeholders": [],
"timeline": "Not specified",
"processing_note": "Fallback response due to processing error",
"raw_text_sample": raw_text[:200] + "..." if len(raw_text) > 200 else raw_text
}
def process_document_content(self, text: str, doc_type: str = "general") -> Dict[str, Any]:
"""
Process document content to extract business insights.
Args:
text: Document content text
doc_type: Type of document (proposal, case_study, contract, general)
Returns:
Structured business insights dictionary
Raises:
DocumentProcessorError: If processing fails critically
"""
try:
# Validate inputs
if not text or not isinstance(text, str):
raise DocumentProcessorError("Invalid or empty document text")
# Preprocess text and validate document type
cleaned_text = self._preprocess_text(text)
validated_doc_type = self._validate_doc_type(doc_type)
if not cleaned_text:
raise DocumentProcessorError("Document text is empty after preprocessing")
logger.info(f"Processing {validated_doc_type} document ({len(cleaned_text)} chars)")
# Generate prompt and process with Gemma3
prompt = get_document_prompt(validated_doc_type, cleaned_text)
response = self.gemma3.process_with_json(prompt, "document_processing")
# Validate and structure response
validated_response = self._validate_response(response)
# Add metadata
validated_response.update({
"document_type": validated_doc_type,
"document_length": len(text),
"processed_length": len(cleaned_text),
"processing_success": True
})
logger.info("Document processing completed successfully")
return validated_response
except Gemma3ClientError as e:
logger.error(f"Gemma3 client error: {e}")
# Return fallback response instead of failing
fallback = self._create_fallback_response(text[:500])
fallback["document_type"] = self._validate_doc_type(doc_type)
fallback["processing_success"] = False
fallback["error_type"] = "gemma3_error"
return fallback
except Exception as e:
logger.error(f"Document processing failed: {e}")
raise DocumentProcessorError(f"Processing failed: {e}")
async def process_document_content_async(self, text: str, doc_type: str = "general") -> Dict[str, Any]:
"""
Process document content asynchronously.
Args:
text: Document content text
doc_type: Type of document
Returns:
Structured business insights dictionary
"""
try:
# Validate inputs
if not text or not isinstance(text, str):
raise DocumentProcessorError("Invalid or empty document text")
# Preprocess text and validate document type
cleaned_text = self._preprocess_text(text)
validated_doc_type = self._validate_doc_type(doc_type)
if not cleaned_text:
raise DocumentProcessorError("Document text is empty after preprocessing")
logger.info(f"Processing {validated_doc_type} document ({len(cleaned_text)} chars) [async]")
# Generate prompt and process with Gemma3
prompt = get_document_prompt(validated_doc_type, cleaned_text)
response = await self.gemma3.process_with_json_async(prompt, "document_processing")
# Validate and structure response
validated_response = self._validate_response(response)
# Add metadata
validated_response.update({
"document_type": validated_doc_type,
"document_length": len(text),
"processed_length": len(cleaned_text),
"processing_success": True
})
logger.info("Document processing completed successfully [async]")
return validated_response
except Gemma3ClientError as e:
logger.error(f"Gemma3 client error: {e}")
# Return fallback response instead of failing
fallback = self._create_fallback_response(text[:500])
fallback["document_type"] = self._validate_doc_type(doc_type)
fallback["processing_success"] = False
fallback["error_type"] = "gemma3_error"
return fallback
except Exception as e:
logger.error(f"Document processing failed: {e}")
raise DocumentProcessorError(f"Processing failed: {e}")
def process_multiple_documents(self, documents: List[Dict[str, str]]) -> List[Dict[str, Any]]:
"""
Process multiple documents efficiently.
Args:
documents: List of documents with 'text' and 'doc_type' keys
Returns:
List of processed document insights
"""
results = []
for i, doc in enumerate(documents):
try:
text = doc.get("text", "")
doc_type = doc.get("doc_type", "general")
result = self.process_document_content(text, doc_type)
result["document_index"] = i
results.append(result)
except Exception as e:
logger.error(f"Failed to process document {i}: {e}")
# Add error result
results.append({
"document_index": i,
"processing_success": False,
"error": str(e),
"document_type": doc.get("doc_type", "general")
})
return results
async def process_multiple_documents_async(self, documents: List[Dict[str, str]]) -> List[Dict[str, Any]]:
"""
Process multiple documents asynchronously.
Args:
documents: List of documents with 'text' and 'doc_type' keys
Returns:
List of processed document insights
"""
tasks = []
for i, doc in enumerate(documents):
text = doc.get("text", "")
doc_type = doc.get("doc_type", "general")
# Create async task
task = self._process_single_document_async(text, doc_type, i)
tasks.append(task)
# Wait for all tasks to complete
results = await asyncio.gather(*tasks, return_exceptions=True)
# Handle any exceptions
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"Failed to process document {i}: {result}")
processed_results.append({
"document_index": i,
"processing_success": False,
"error": str(result),
"document_type": documents[i].get("doc_type", "general")
})
else:
processed_results.append(result)
return processed_results
async def _process_single_document_async(self, text: str, doc_type: str, index: int) -> Dict[str, Any]:
"""Helper method for async document processing."""
result = await self.process_document_content_async(text, doc_type)
result["document_index"] = index
return result
def get_processing_summary(self, results: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Generate a summary of processing results.
Args:
results: List of processing results
Returns:
Summary statistics
"""
total_docs = len(results)
successful = sum(1 for r in results if r.get("processing_success", False))
failed = total_docs - successful
doc_types = {}
for result in results:
doc_type = result.get("document_type", "unknown")
doc_types[doc_type] = doc_types.get(doc_type, 0) + 1
return {
"total_documents": total_docs,
"successful_processing": successful,
"failed_processing": failed,
"success_rate": successful / total_docs if total_docs > 0 else 0,
"document_types": doc_types,
"processing_timestamp": "Generated summary"
}