Skip to main content
Glama
rag_processor.py•8.45 kB
""" RAG (Retrieval Augmented Generation) processor for web content analysis. Implements HTMLRAG-style functionality for question answering. """ import os import re from typing import List, Dict, Any, Optional from dataclasses import dataclass import openai from .web_extractor import url_to_markdown @dataclass class TextChunk: """Represents a chunk of text with metadata.""" content: str score: float = 0.0 source_type: str = "text" # text, table, image, etc. class RAGProcessor: """Processes web content for RAG-based question answering.""" def __init__(self, api_key: Optional[str] = None): """Initialize RAG processor with OpenAI client.""" self.api_key = api_key or os.getenv('OPENAI_API_KEY') self.client = None if self.api_key: try: self.client = openai.OpenAI(api_key=self.api_key) except Exception: pass self.max_chunk_size = 1000 self.overlap_size = 100 def chunk_content(self, content: str) -> List[TextChunk]: """Split content into manageable chunks for processing.""" # Split by sections (markdown headers and paragraph breaks) sections = re.split(r'\n(?=#{1,3}\s|\n)', content) chunks = [] for section in sections: section = section.strip() if not section: continue # Determine source type based on content markers source_type = "text" if section.startswith('|') or '[Table]' in section: source_type = "table" elif section.startswith('![') or '[Image]' in section: source_type = "image" elif '[Video]' in section or '[Popup]' in section: source_type = "media" # If section is too long, split it further if len(section) > self.max_chunk_size: # Split by sentences sentences = re.split(r'(?<=[.!?])\s+', section) current_chunk = "" for sentence in sentences: if len(current_chunk + sentence) > self.max_chunk_size: if current_chunk: chunks.append(TextChunk( content=current_chunk.strip(), source_type=source_type )) current_chunk = sentence else: current_chunk += " " + sentence if current_chunk else sentence if current_chunk: chunks.append(TextChunk( content=current_chunk.strip(), source_type=source_type )) else: chunks.append(TextChunk( content=section, source_type=source_type )) return chunks def score_relevance(self, query: str, chunk: TextChunk) -> float: """Score how relevant a chunk is to the query.""" query_lower = query.lower() content_lower = chunk.content.lower() # Basic keyword matching score query_words = set(re.findall(r'\b\w+\b', query_lower)) content_words = set(re.findall(r'\b\w+\b', content_lower)) if not query_words: return 0.0 # Calculate overlap common_words = query_words.intersection(content_words) keyword_score = len(common_words) / len(query_words) # Boost score based on source type relevance type_boost = { "text": 1.0, "table": 1.2, # Tables often contain structured important info "image": 0.8, "media": 0.6 } # Boost for exact phrase matches phrase_boost = 1.0 if len(query) > 10: # Only for longer queries # Check for phrase matches (simplified) query_phrases = [phrase.strip() for phrase in query.split() if len(phrase) > 3] for phrase in query_phrases: if phrase.lower() in content_lower: phrase_boost += 0.2 final_score = keyword_score * type_boost.get(chunk.source_type, 1.0) * phrase_boost return min(final_score, 2.0) # Cap at 2.0 def select_relevant_chunks(self, query: str, chunks: List[TextChunk], max_chunks: int = 5) -> List[TextChunk]: """Select the most relevant chunks for a query.""" # Score all chunks for chunk in chunks: chunk.score = self.score_relevance(query, chunk) # Sort by relevance and take top chunks relevant_chunks = sorted(chunks, key=lambda x: x.score, reverse=True) # Filter out chunks with very low scores relevant_chunks = [chunk for chunk in relevant_chunks if chunk.score > 0.1] return relevant_chunks[:max_chunks] def generate_answer(self, query: str, relevant_chunks: List[TextChunk]) -> str: """Generate an answer using OpenAI based on relevant chunks.""" if not relevant_chunks: return "I couldn't find relevant information in the provided content to answer your question." # Check if OpenAI client is available if not self.client: return "OpenAI API key not configured. Please set OPENAI_API_KEY environment variable to use Q&A functionality." # Prepare context from chunks context_parts = [] for i, chunk in enumerate(relevant_chunks, 1): context_parts.append(f"[Context {i}] ({chunk.source_type}):\n{chunk.content}\n") context = "\n".join(context_parts) # Prepare prompt system_prompt = """You are a helpful AI assistant that answers questions based on provided web content. Use only the information from the given context to answer the question. If the context doesn't contain enough information to answer the question, say so clearly. Be concise but comprehensive in your answer.""" user_prompt = f"""Based on the following web content, please answer this question: {query} Context: {context} Question: {query} Answer:""" try: response = self.client.chat.completions.create( model="gpt-3.5-turbo", messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], max_tokens=500, temperature=0.1 ) return response.choices[0].message.content.strip() except Exception as e: return f"Error generating answer: {str(e)}" def process_web_qna(self, url: str, question: str) -> str: """ Process a URL and answer a question about its content. This is the main RAG function that combines web extraction and QA. Args: url: The URL to analyze question: The question to answer based on the URL content Returns: str: The answer to the question based on the web content """ try: # Extract content from URL markdown_content = url_to_markdown(url) if markdown_content.startswith("Error"): return f"Could not process the URL: {markdown_content}" # Chunk the content chunks = self.chunk_content(markdown_content) if not chunks: return "No content could be extracted from the URL to answer your question." # Select relevant chunks relevant_chunks = self.select_relevant_chunks(question, chunks) if not relevant_chunks: return f"The content from {url} doesn't seem to contain information relevant to your question: '{question}'" # Generate answer answer = self.generate_answer(question, relevant_chunks) return answer except Exception as e: return f"Error processing question about {url}: {str(e)}"

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kimdonghwi94/web-analyzer-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server