Skip to main content
Glama

Crawl4AI RAG MCP Server

by Chillbruhhh
utils.py•29.5 kB
""" Utility functions for the Crawl4AI MCP server. """ import os import concurrent.futures from typing import List, Dict, Any, Optional, Tuple import json from supabase import create_client, Client from urllib.parse import urlparse import openai import re import time # Load OpenAI API key for embeddings openai.api_key = os.getenv("OPENAI_API_KEY") # Initialize OpenRouter client for chat completions def get_openrouter_client(): """ Get an OpenRouter-configured OpenAI client for chat completions. Returns: OpenAI client configured for OpenRouter """ openrouter_api_key = os.getenv("OPENROUTER_API_KEY") if not openrouter_api_key: raise ValueError("OPENROUTER_API_KEY must be set in environment variables for chat completions") return openai.OpenAI( base_url="https://openrouter.ai/api/v1", api_key=openrouter_api_key, default_headers={ "HTTP-Referer": os.getenv("YOUR_SITE_URL", ""), # Optional for rankings "X-Title": os.getenv("YOUR_SITE_NAME", "Crawl4AI-MCP"), # Optional for rankings } ) def get_supabase_client() -> Client: """ Get a Supabase client with the URL and key from environment variables. Returns: Supabase client instance """ url = os.getenv("SUPABASE_URL") key = os.getenv("SUPABASE_SERVICE_KEY") if not url or not key: raise ValueError("SUPABASE_URL and SUPABASE_SERVICE_KEY must be set in environment variables") return create_client(url, key) def create_embeddings_batch(texts: List[str]) -> List[List[float]]: """ Create embeddings for multiple texts in a single API call. Args: texts: List of texts to create embeddings for Returns: List of embeddings (each embedding is a list of floats) """ if not texts: return [] max_retries = 3 retry_delay = 1.0 # Start with 1 second delay for retry in range(max_retries): try: response = openai.embeddings.create( model="text-embedding-3-small", # Hardcoding embedding model for now, will change this later to be more dynamic input=texts ) return [item.embedding for item in response.data] except Exception as e: if retry < max_retries - 1: print(f"Error creating batch embeddings (attempt {retry + 1}/{max_retries}): {e}") print(f"Retrying in {retry_delay} seconds...") time.sleep(retry_delay) retry_delay *= 2 # Exponential backoff else: print(f"Failed to create batch embeddings after {max_retries} attempts: {e}") # Try creating embeddings one by one as fallback print("Attempting to create embeddings individually...") embeddings = [] successful_count = 0 for i, text in enumerate(texts): try: individual_response = openai.embeddings.create( model="text-embedding-3-small", input=[text] ) embeddings.append(individual_response.data[0].embedding) successful_count += 1 except Exception as individual_error: print(f"Failed to create embedding for text {i}: {individual_error}") # Add zero embedding as fallback embeddings.append([0.0] * 1536) print(f"Successfully created {successful_count}/{len(texts)} embeddings individually") return embeddings def create_embedding(text: str) -> List[float]: """ Create an embedding for a single text using OpenAI's API. Args: text: Text to create an embedding for Returns: List of floats representing the embedding """ try: embeddings = create_embeddings_batch([text]) return embeddings[0] if embeddings else [0.0] * 1536 except Exception as e: print(f"Error creating embedding: {e}") # Return empty embedding if there's an error return [0.0] * 1536 def generate_contextual_embedding(full_document: str, chunk: str) -> Tuple[str, bool]: """ Generate contextual information for a chunk within a document to improve retrieval. Args: full_document: The complete document text chunk: The specific chunk of text to generate context for Returns: Tuple containing: - The contextual text that situates the chunk within the document - Boolean indicating if contextual embedding was performed """ model_choice = os.getenv("MODEL_CHOICE", "openai/gpt-4.1-nano") try: # Get OpenRouter client for chat completions openrouter_client = get_openrouter_client() # Create the prompt for generating contextual information prompt = f"""<document> {full_document[:25000]} </document> Here is the chunk we want to situate within the whole document <chunk> {chunk} </chunk> Please give a short succinct context to situate this chunk within the overall document for the purposes of improving search retrieval of the chunk. Answer only with the succinct context and nothing else.""" # Call the OpenRouter API to generate contextual information response = openrouter_client.chat.completions.create( model=model_choice, messages=[ {"role": "system", "content": "You are a helpful assistant that provides concise contextual information."}, {"role": "user", "content": prompt} ], temperature=0.3, max_tokens=200 ) # Extract the generated context context = response.choices[0].message.content.strip() # Combine the context with the original chunk contextual_text = f"{context}\n---\n{chunk}" return contextual_text, True except Exception as e: print(f"Error generating contextual embedding: {e}. Using original chunk instead.") return chunk, False def process_chunk_with_context(args): """ Process a single chunk with contextual embedding. This function is designed to be used with concurrent.futures. Args: args: Tuple containing (url, content, full_document) Returns: Tuple containing: - The contextual text that situates the chunk within the document - Boolean indicating if contextual embedding was performed """ url, content, full_document = args return generate_contextual_embedding(full_document, content) def add_documents_to_supabase( client: Client, urls: List[str], chunk_numbers: List[int], contents: List[str], metadatas: List[Dict[str, Any]], url_to_full_document: Dict[str, str], batch_size: int = 20 ) -> None: """ Add documents to the Supabase crawled_pages table in batches. Deletes existing records with the same URLs before inserting to prevent duplicates. Args: client: Supabase client urls: List of URLs chunk_numbers: List of chunk numbers contents: List of document contents metadatas: List of document metadata url_to_full_document: Dictionary mapping URLs to their full document content batch_size: Size of each batch for insertion """ # Get unique URLs to delete existing records unique_urls = list(set(urls)) # Delete existing records for these URLs in a single operation try: if unique_urls: # Use the .in_() filter to delete all records with matching URLs client.table("crawled_pages").delete().in_("url", unique_urls).execute() except Exception as e: print(f"Batch delete failed: {e}. Trying one-by-one deletion as fallback.") # Fallback: delete records one by one for url in unique_urls: try: client.table("crawled_pages").delete().eq("url", url).execute() except Exception as inner_e: print(f"Error deleting record for URL {url}: {inner_e}") # Continue with the next URL even if one fails # Check if MODEL_CHOICE is set for contextual embeddings use_contextual_embeddings = os.getenv("USE_CONTEXTUAL_EMBEDDINGS", "false") == "true" print(f"\n\nUse contextual embeddings: {use_contextual_embeddings}\n\n") # Process in batches to avoid memory issues for i in range(0, len(contents), batch_size): batch_end = min(i + batch_size, len(contents)) # Get batch slices batch_urls = urls[i:batch_end] batch_chunk_numbers = chunk_numbers[i:batch_end] batch_contents = contents[i:batch_end] batch_metadatas = metadatas[i:batch_end] # Apply contextual embedding to each chunk if MODEL_CHOICE is set if use_contextual_embeddings: # Prepare arguments for parallel processing process_args = [] for j, content in enumerate(batch_contents): url = batch_urls[j] full_document = url_to_full_document.get(url, "") process_args.append((url, content, full_document)) # Process in parallel using ThreadPoolExecutor contextual_contents = [] with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: # Submit all tasks and collect results future_to_idx = {executor.submit(process_chunk_with_context, arg): idx for idx, arg in enumerate(process_args)} # Process results as they complete for future in concurrent.futures.as_completed(future_to_idx): idx = future_to_idx[future] try: result, success = future.result() contextual_contents.append(result) if success: batch_metadatas[idx]["contextual_embedding"] = True except Exception as e: print(f"Error processing chunk {idx}: {e}") # Use original content as fallback contextual_contents.append(batch_contents[idx]) # Sort results back into original order if needed if len(contextual_contents) != len(batch_contents): print(f"Warning: Expected {len(batch_contents)} results but got {len(contextual_contents)}") # Use original contents as fallback contextual_contents = batch_contents else: # If not using contextual embeddings, use original contents contextual_contents = batch_contents # Create embeddings for the entire batch at once batch_embeddings = create_embeddings_batch(contextual_contents) batch_data = [] for j in range(len(contextual_contents)): # Extract metadata fields chunk_size = len(contextual_contents[j]) # Extract source_id from URL parsed_url = urlparse(batch_urls[j]) source_id = parsed_url.netloc or parsed_url.path # Prepare data for insertion data = { "url": batch_urls[j], "chunk_number": batch_chunk_numbers[j], "content": contextual_contents[j], # Store original content "metadata": { "chunk_size": chunk_size, **batch_metadatas[j] }, "source_id": source_id, # Add source_id field "embedding": batch_embeddings[j] # Use embedding from contextual content } batch_data.append(data) # Insert batch into Supabase with retry logic max_retries = 3 retry_delay = 1.0 # Start with 1 second delay for retry in range(max_retries): try: client.table("crawled_pages").insert(batch_data).execute() # Success - break out of retry loop break except Exception as e: if retry < max_retries - 1: print(f"Error inserting batch into Supabase (attempt {retry + 1}/{max_retries}): {e}") print(f"Retrying in {retry_delay} seconds...") time.sleep(retry_delay) retry_delay *= 2 # Exponential backoff else: # Final attempt failed print(f"Failed to insert batch after {max_retries} attempts: {e}") # Optionally, try inserting records one by one as a last resort print("Attempting to insert records individually...") successful_inserts = 0 for record in batch_data: try: client.table("crawled_pages").insert(record).execute() successful_inserts += 1 except Exception as individual_error: print(f"Failed to insert individual record for URL {record['url']}: {individual_error}") if successful_inserts > 0: print(f"Successfully inserted {successful_inserts}/{len(batch_data)} records individually") def search_documents( client: Client, query: str, match_count: int = 10, filter_metadata: Optional[Dict[str, Any]] = None ) -> List[Dict[str, Any]]: """ Search for documents in Supabase using vector similarity. Args: client: Supabase client query: Query text match_count: Maximum number of results to return filter_metadata: Optional metadata filter Returns: List of matching documents """ # Create embedding for the query query_embedding = create_embedding(query) # Execute the search using the match_crawled_pages function try: # Only include filter parameter if filter_metadata is provided and not empty params = { 'query_embedding': query_embedding, 'match_count': match_count } # Only add the filter if it's actually provided and not empty if filter_metadata: params['filter'] = filter_metadata # Pass the dictionary directly, not JSON-encoded result = client.rpc('match_crawled_pages', params).execute() return result.data except Exception as e: print(f"Error searching documents: {e}") return [] def extract_code_blocks(markdown_content: str, min_length: int = 1000) -> List[Dict[str, Any]]: """ Extract code blocks from markdown content along with context. Args: markdown_content: The markdown content to extract code blocks from min_length: Minimum length of code blocks to extract (default: 1000 characters) Returns: List of dictionaries containing code blocks and their context """ code_blocks = [] # Skip if content starts with triple backticks (edge case for files wrapped in backticks) content = markdown_content.strip() start_offset = 0 if content.startswith('```'): # Skip the first triple backticks start_offset = 3 print("Skipping initial triple backticks") # Find all occurrences of triple backticks backtick_positions = [] pos = start_offset while True: pos = markdown_content.find('```', pos) if pos == -1: break backtick_positions.append(pos) pos += 3 # Process pairs of backticks i = 0 while i < len(backtick_positions) - 1: start_pos = backtick_positions[i] end_pos = backtick_positions[i + 1] # Extract the content between backticks code_section = markdown_content[start_pos+3:end_pos] # Check if there's a language specifier on the first line lines = code_section.split('\n', 1) if len(lines) > 1: # Check if first line is a language specifier (no spaces, common language names) first_line = lines[0].strip() if first_line and not ' ' in first_line and len(first_line) < 20: language = first_line code_content = lines[1].strip() if len(lines) > 1 else "" else: language = "" code_content = code_section.strip() else: language = "" code_content = code_section.strip() # Skip if code block is too short if len(code_content) < min_length: i += 2 # Move to next pair continue # Extract context before (1000 chars) context_start = max(0, start_pos - 1000) context_before = markdown_content[context_start:start_pos].strip() # Extract context after (1000 chars) context_end = min(len(markdown_content), end_pos + 3 + 1000) context_after = markdown_content[end_pos + 3:context_end].strip() code_blocks.append({ 'code': code_content, 'language': language, 'context_before': context_before, 'context_after': context_after, 'full_context': f"{context_before}\n\n{code_content}\n\n{context_after}" }) # Move to next pair (skip the closing backtick we just processed) i += 2 return code_blocks def generate_code_example_summary(code: str, context_before: str, context_after: str) -> str: """ Generate a summary for a code example using its surrounding context. Args: code: The code example context_before: Context before the code context_after: Context after the code Returns: A summary of what the code example demonstrates """ model_choice = os.getenv("MODEL_CHOICE", "openai/gpt-4.1-nano") # Create the prompt prompt = f"""<context_before> {context_before[-500:] if len(context_before) > 500 else context_before} </context_before> <code_example> {code[:1500] if len(code) > 1500 else code} </code_example> <context_after> {context_after[:500] if len(context_after) > 500 else context_after} </context_after> Based on the code example and its surrounding context, provide a concise summary (2-3 sentences) that describes what this code example demonstrates and its purpose. Focus on the practical application and key concepts illustrated. """ try: # Get OpenRouter client for chat completions openrouter_client = get_openrouter_client() response = openrouter_client.chat.completions.create( model=model_choice, messages=[ {"role": "system", "content": "You are a helpful assistant that provides concise code example summaries."}, {"role": "user", "content": prompt} ], temperature=0.3, max_tokens=300 ) return response.choices[0].message.content.strip() except Exception as e: print(f"Error generating code example summary: {e}") return "Code example for demonstration purposes." def add_code_examples_to_supabase( client: Client, urls: List[str], chunk_numbers: List[int], code_examples: List[str], summaries: List[str], metadatas: List[Dict[str, Any]], batch_size: int = 20 ): """ Add code examples to the Supabase code_examples table in batches. Args: client: Supabase client urls: List of URLs chunk_numbers: List of chunk numbers code_examples: List of code example contents summaries: List of code example summaries metadatas: List of metadata dictionaries batch_size: Size of each batch for insertion """ if not urls: return # Delete existing records for these URLs unique_urls = list(set(urls)) for url in unique_urls: try: client.table('code_examples').delete().eq('url', url).execute() except Exception as e: print(f"Error deleting existing code examples for {url}: {e}") # Process in batches total_items = len(urls) for i in range(0, total_items, batch_size): batch_end = min(i + batch_size, total_items) batch_texts = [] # Create combined texts for embedding (code + summary) for j in range(i, batch_end): combined_text = f"{code_examples[j]}\n\nSummary: {summaries[j]}" batch_texts.append(combined_text) # Create embeddings for the batch embeddings = create_embeddings_batch(batch_texts) # Check if embeddings are valid (not all zeros) valid_embeddings = [] for embedding in embeddings: if embedding and not all(v == 0.0 for v in embedding): valid_embeddings.append(embedding) else: print(f"Warning: Zero or invalid embedding detected, creating new one...") # Try to create a single embedding as fallback single_embedding = create_embedding(batch_texts[len(valid_embeddings)]) valid_embeddings.append(single_embedding) # Prepare batch data batch_data = [] for j, embedding in enumerate(valid_embeddings): idx = i + j # Extract source_id from URL parsed_url = urlparse(urls[idx]) source_id = parsed_url.netloc or parsed_url.path batch_data.append({ 'url': urls[idx], 'chunk_number': chunk_numbers[idx], 'content': code_examples[idx], 'summary': summaries[idx], 'metadata': metadatas[idx], # Store as JSON object, not string 'source_id': source_id, 'embedding': embedding }) # Insert batch into Supabase with retry logic max_retries = 3 retry_delay = 1.0 # Start with 1 second delay for retry in range(max_retries): try: client.table('code_examples').insert(batch_data).execute() # Success - break out of retry loop break except Exception as e: if retry < max_retries - 1: print(f"Error inserting batch into Supabase (attempt {retry + 1}/{max_retries}): {e}") print(f"Retrying in {retry_delay} seconds...") time.sleep(retry_delay) retry_delay *= 2 # Exponential backoff else: # Final attempt failed print(f"Failed to insert batch after {max_retries} attempts: {e}") # Optionally, try inserting records one by one as a last resort print("Attempting to insert records individually...") successful_inserts = 0 for record in batch_data: try: client.table('code_examples').insert(record).execute() successful_inserts += 1 except Exception as individual_error: print(f"Failed to insert individual record for URL {record['url']}: {individual_error}") if successful_inserts > 0: print(f"Successfully inserted {successful_inserts}/{len(batch_data)} records individually") print(f"Inserted batch {i//batch_size + 1} of {(total_items + batch_size - 1)//batch_size} code examples") def update_source_info(client: Client, source_id: str, summary: str, word_count: int): """ Update or insert source information in the sources table. Args: client: Supabase client source_id: The source ID (domain) summary: Summary of the source word_count: Total word count for the source """ try: # Try to update existing source result = client.table('sources').update({ 'summary': summary, 'total_word_count': word_count, 'updated_at': 'now()' }).eq('source_id', source_id).execute() # If no rows were updated, insert new source if not result.data: client.table('sources').insert({ 'source_id': source_id, 'summary': summary, 'total_word_count': word_count }).execute() print(f"Created new source: {source_id}") else: print(f"Updated source: {source_id}") except Exception as e: print(f"Error updating source {source_id}: {e}") def extract_source_summary(source_id: str, content: str, max_length: int = 500) -> str: """ Extract a summary for a source from its content using an LLM. This function uses the OpenRouter API to generate a concise summary of the source content. Args: source_id: The source ID (domain) content: The content to extract a summary from max_length: Maximum length of the summary Returns: A summary string """ # Default summary if we can't extract anything meaningful default_summary = f"Content from {source_id}" if not content or len(content.strip()) == 0: return default_summary # Get the model choice from environment variables model_choice = os.getenv("MODEL_CHOICE", "openai/gpt-4.1-nano") # Limit content length to avoid token limits truncated_content = content[:25000] if len(content) > 25000 else content # Create the prompt for generating the summary prompt = f"""<source_content> {truncated_content} </source_content> The above content is from the documentation for '{source_id}'. Please provide a concise summary (3-5 sentences) that describes what this library/tool/framework is about. The summary should help understand what the library/tool/framework accomplishes and the purpose. """ try: # Get OpenRouter client for chat completions openrouter_client = get_openrouter_client() # Call the OpenRouter API to generate the summary response = openrouter_client.chat.completions.create( model=model_choice, messages=[ {"role": "system", "content": "You are a helpful assistant that provides concise library/tool/framework summaries."}, {"role": "user", "content": prompt} ], temperature=0.3, max_tokens=300 ) # Extract the generated summary summary = response.choices[0].message.content.strip() # Ensure the summary is not too long if len(summary) > max_length: summary = summary[:max_length] + "..." return summary except Exception as e: print(f"Error generating summary with LLM for {source_id}: {e}. Using default summary.") return default_summary def search_code_examples( client: Client, query: str, match_count: int = 10, filter_metadata: Optional[Dict[str, Any]] = None, source_id: Optional[str] = None ) -> List[Dict[str, Any]]: """ Search for code examples in Supabase using vector similarity. Args: client: Supabase client query: Query text match_count: Maximum number of results to return filter_metadata: Optional metadata filter source_id: Optional source ID to filter results Returns: List of matching code examples """ # Create a more descriptive query for better embedding match # Since code examples are embedded with their summaries, we should make the query more descriptive enhanced_query = f"Code example for {query}\n\nSummary: Example code showing {query}" # Create embedding for the enhanced query query_embedding = create_embedding(enhanced_query) # Execute the search using the match_code_examples function try: # Only include filter parameter if filter_metadata is provided and not empty params = { 'query_embedding': query_embedding, 'match_count': match_count } # Only add the filter if it's actually provided and not empty if filter_metadata: params['filter'] = filter_metadata # Add source filter if provided if source_id: params['source_filter'] = source_id result = client.rpc('match_code_examples', params).execute() return result.data except Exception as e: print(f"Error searching code examples: {e}") return []

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Chillbruhhh/crawl4ai-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server