Skip to main content
Glama

AI Tutoring RAG System

file_processor.pyโ€ข11.2 kB
""" File processor for extracting text from PDFs and DOCX files. Integrates with the RAG system to store extracted content. """ import io import json from typing import Dict, Any, Optional from datetime import datetime import pymupdf # PyMuPDF from docx import Document from rag.types import LearningContext, MemoryType from rag.system import TutoringRAGSystem class FileProcessor: """ Processes uploaded files (PDF, DOCX) and extracts text content for integration with the RAG system. """ def __init__(self, rag_system: TutoringRAGSystem): """ Initialize the file processor. Args: rag_system: Instance of TutoringRAGSystem for storing extracted content """ self.rag_system = rag_system def extract_text_from_pdf(self, file_content: bytes) -> Dict[str, Any]: """ Extract text from a PDF file using PyMuPDF. Args: file_content: PDF file content as bytes Returns: Dictionary containing extracted text and metadata """ try: # Open PDF from bytes pdf_document = pymupdf.open(stream=file_content, filetype="pdf") extracted_text = [] metadata = { "total_pages": pdf_document.page_count, "page_texts": [], } # Extract text from each page for page_num in range(pdf_document.page_count): page = pdf_document[page_num] page_text = page.get_text() if page_text.strip(): # Only include non-empty pages extracted_text.append(f"[Page {page_num + 1}]\n{page_text}") metadata["page_texts"].append( { "page_number": page_num + 1, "text_length": len(page_text), "has_images": len(page.get_images()) > 0, } ) # Close the document pdf_document.close() full_text = "\n\n".join(extracted_text) return { "status": "success", "text": full_text, "metadata": metadata, "total_characters": len(full_text), } except Exception as e: return { "status": "error", "message": f"Failed to extract text from PDF: {str(e)}", } def extract_text_from_docx(self, file_content: bytes) -> Dict[str, Any]: """ Extract text from a DOCX file. Args: file_content: DOCX file content as bytes Returns: Dictionary containing extracted text and metadata """ try: # Open DOCX from bytes docx_stream = io.BytesIO(file_content) document = Document(docx_stream) extracted_text = [] metadata = { "total_paragraphs": len(document.paragraphs), "total_tables": len(document.tables), } # Extract text from paragraphs for para in document.paragraphs: if para.text.strip(): extracted_text.append(para.text) # Extract text from tables table_texts = [] for table in document.tables: for row in table.rows: row_text = " | ".join([cell.text for cell in row.cells]) if row_text.strip(): table_texts.append(row_text) if table_texts: extracted_text.append("\n[Tables]\n" + "\n".join(table_texts)) full_text = "\n\n".join(extracted_text) return { "status": "success", "text": full_text, "metadata": metadata, "total_characters": len(full_text), } except Exception as e: return { "status": "error", "message": f"Failed to extract text from DOCX: {str(e)}", } def process_and_store_file( self, file_content: bytes, filename: str, student_id: str, subject: str, topic: Optional[str] = None, difficulty_level: int = 5, additional_metadata: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """ Process a file (PDF or DOCX) and store extracted content in the RAG system. Args: file_content: File content as bytes filename: Original filename student_id: Student identifier subject: Subject area topic: Optional specific topic difficulty_level: Difficulty level (1-10) additional_metadata: Optional additional metadata Returns: Dictionary with processing results and stored document IDs """ try: # Determine file type file_extension = filename.lower().split(".")[-1] # Extract text based on file type if file_extension == "pdf": extraction_result = self.extract_text_from_pdf(file_content) elif file_extension in ["docx", "doc"]: extraction_result = self.extract_text_from_docx(file_content) else: return { "status": "error", "message": f"Unsupported file type: {file_extension}", } if extraction_result["status"] == "error": return extraction_result extracted_text = extraction_result["text"] file_metadata = extraction_result["metadata"] # Split text into chunks if it's too long (optional but recommended) chunks = self._chunk_text(extracted_text, chunk_size=1000, overlap=200) # Store each chunk in the RAG system stored_doc_ids = [] for i, chunk in enumerate(chunks): # Prepare metadata - Pinecone only accepts simple types # Convert complex metadata to JSON strings metadata = { "filename": filename, "file_type": file_extension, "chunk_index": i, "total_chunks": len(chunks), "extraction_metadata": json.dumps( file_metadata ), # Serialize to JSON string **(additional_metadata or {}), } # Ensure all metadata values are simple types for key, value in list(metadata.items()): if isinstance(value, (dict, list)) and not isinstance(value, str): metadata[key] = json.dumps(value) # Create learning context context = LearningContext( student_id=student_id, subject=subject, topic=topic or f"uploaded_content_{filename}", difficulty_level=difficulty_level, learning_style="document_upload", timestamp=datetime.now(), content=chunk, memory_type=MemoryType.CONTENT_MASTERY, metadata=metadata, ) # Store in RAG system doc_id = self.rag_system.store_learning_interaction(context) stored_doc_ids.append(doc_id) return { "status": "success", "message": f"Successfully processed and stored {len(chunks)} chunks from {filename}", "filename": filename, "file_type": file_extension, "total_characters": extraction_result["total_characters"], "chunks_stored": len(chunks), "document_ids": stored_doc_ids, "extraction_metadata": file_metadata, } except Exception as e: return { "status": "error", "message": f"Failed to process file: {str(e)}", } def _chunk_text( self, text: str, chunk_size: int = 1000, overlap: int = 200 ) -> list[str]: """ Split text into overlapping chunks for better retrieval. Args: text: Text to chunk chunk_size: Maximum characters per chunk overlap: Number of characters to overlap between chunks Returns: List of text chunks """ if len(text) <= chunk_size: return [text] chunks = [] start = 0 while start < len(text): # Get chunk end = start + chunk_size chunk = text[start:end] # Try to break at sentence or paragraph boundary if end < len(text): # Look for sentence endings last_period = chunk.rfind(". ") last_newline = chunk.rfind("\n") break_point = max(last_period, last_newline) if break_point > chunk_size * 0.5: # Only break if we're past halfway chunk = text[start : start + break_point + 1] end = start + break_point + 1 chunks.append(chunk.strip()) # Move start position with overlap start = end - overlap return chunks def extract_text_preview( self, file_content: bytes, filename: str, max_chars: int = 500 ) -> Dict[str, Any]: """ Extract a preview of text from a file without storing it. Useful for showing users what was extracted before storing. Args: file_content: File content as bytes filename: Original filename max_chars: Maximum characters to return in preview Returns: Dictionary with preview text and metadata """ try: file_extension = filename.lower().split(".")[-1] if file_extension == "pdf": extraction_result = self.extract_text_from_pdf(file_content) elif file_extension in ["docx", "doc"]: extraction_result = self.extract_text_from_docx(file_content) else: return { "status": "error", "message": f"Unsupported file type: {file_extension}", } if extraction_result["status"] == "error": return extraction_result full_text = extraction_result["text"] preview = full_text[:max_chars] if len(full_text) > max_chars: preview += "..." return { "status": "success", "preview": preview, "total_characters": len(full_text), "metadata": extraction_result["metadata"], } except Exception as e: return { "status": "error", "message": f"Failed to extract preview: {str(e)}", }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Chukwuebuka-2003/ebuka_mcps'

If you have feedback or need assistance with the MCP directory API, please join our Discord server