Skip to main content
Glama
gemini_search.py15.9 kB
""" Gemini-powered documentation search using Google Search grounding Replaces local BM25 index with real-time web search against docs.observeinc.com, eliminating need for documentation archives and providing always-current results. """ import os import time from typing import List, Dict, Any from datetime import datetime, timedelta from collections import deque from src.logging import semantic_logger # Import telemetry decorators try: from src.telemetry.decorators import trace_mcp_tool from src.telemetry.utils import add_mcp_context except ImportError: def trace_mcp_tool(tool_name=None, record_args=False, record_result=False): def decorator(func): return func return decorator def add_mcp_context(span, **kwargs): pass class RateLimiter: """ Rate limiter for Gemini API calls Tier 1 limits: 500 requests per day for Google Search grounding We'll be conservative and limit to 400 RPD to leave buffer. """ def __init__(self, max_requests_per_day: int = 400): self.max_requests = max_requests_per_day self.requests = deque() self.window = timedelta(days=1) def _clean_old_requests(self): """Remove requests older than the time window""" cutoff = datetime.now() - self.window while self.requests and self.requests[0] < cutoff: self.requests.popleft() def can_make_request(self) -> bool: """Check if we can make a request without exceeding limits""" self._clean_old_requests() return len(self.requests) < self.max_requests def record_request(self): """Record that a request was made""" self.requests.append(datetime.now()) def get_stats(self) -> Dict[str, Any]: """Get current rate limiter statistics""" self._clean_old_requests() return { "requests_in_window": len(self.requests), "max_requests": self.max_requests, "remaining": self.max_requests - len(self.requests), "window_hours": 24 } # Global rate limiter instance _rate_limiter = RateLimiter() async def search_docs_gemini(query: str, n_results: int = 5) -> List[Dict[str, Any]]: """ Search Observe documentation using Gemini Search grounding Args: query: Search query text n_results: Number of results to return (default: 5) Returns: List of search results with metadata, compatible with previous search format """ try: # Check rate limits if not _rate_limiter.can_make_request(): stats = _rate_limiter.get_stats() error_msg = f"Rate limit reached: {stats['requests_in_window']}/{stats['max_requests']} requests used in last 24h. Please try again later." semantic_logger.warning(f"gemini rate limit exceeded | {error_msg}") return [{ "id": "rate_limit", "score": 1.0, "text": error_msg, "source": "rate_limiter", "title": "Rate Limit Exceeded" }] semantic_logger.info(f"gemini docs search | query:'{query[:100]}' | n_results:{n_results}") # Import Gemini client try: from google import genai from google.genai import types except ImportError as e: error_msg = "Gemini API client not installed. Install with: pip install google-genai" semantic_logger.error(f"gemini import failed | {error_msg}") return [{ "id": "import_error", "score": 1.0, "text": error_msg, "source": "error", "title": "Gemini API Not Available" }] # Get API key from environment api_key = os.getenv('GEMINI_API_KEY') if not api_key: error_msg = "GEMINI_API_KEY environment variable not set" semantic_logger.error(f"gemini config error | {error_msg}") return [{ "id": "config_error", "score": 1.0, "text": error_msg, "source": "error", "title": "Gemini API Key Missing" }] # Initialize Gemini client client = genai.Client(api_key=api_key) # Construct domain-scoped search query # Restrict to docs.observeinc.com for official Observe documentation domain_scope = "site:docs.observeinc.com" scoped_query = f"{domain_scope} {query}" # Configure with Google Search grounding tool grounding_tool = types.Tool(google_search=types.GoogleSearch()) config = types.GenerateContentConfig( tools=[grounding_tool], temperature=0.1, # Low temperature for factual documentation retrieval ) # Create a focused prompt that requests ONLY grounded documentation prompt = f"""Search docs.observeinc.com for: {query} CRITICAL INSTRUCTIONS: 1. ONLY return information directly from the actual documentation pages you find via search 2. Quote OPAL code examples EXACTLY as written in the documentation - do not modify syntax 3. DO NOT add examples, explanations, or syntax from your general knowledge 4. DO NOT generate code examples if they are not present in the actual documentation 5. If documentation is found but doesn't contain what was asked for, say so explicitly For each documentation page found, provide: - Direct quotes or very close paraphrases from the actual page content - OPAL code examples exactly as they appear (preserve all syntax) - The specific page where this information was found""" # Record request before making it _rate_limiter.record_request() # Make the API call start_time = time.time() response = client.models.generate_content( model="gemini-2.5-flash", # Fast and cost-effective contents=scoped_query, config=config, ) elapsed = time.time() - start_time semantic_logger.info(f"gemini search complete | elapsed:{elapsed:.2f}s") # Parse the response results = _parse_gemini_response(response, query, n_results) # Log statistics stats = _rate_limiter.get_stats() semantic_logger.info( f"gemini stats | results:{len(results)} | " f"rate_limit:{stats['requests_in_window']}/{stats['max_requests']} | " f"remaining:{stats['remaining']}" ) return results except Exception as e: semantic_logger.error(f"gemini search error | error:{e}") return [{ "id": "error", "score": 1.0, "text": f"Error in Gemini search: {str(e)}. Your query was: {query}", "source": "error", "title": "Gemini Search Error" }] def _parse_gemini_response(response: Any, query: str, n_results: int) -> List[Dict[str, Any]]: """ Parse Gemini response and extract grounded sources Args: response: Gemini API response object query: Original search query (for logging) n_results: Maximum number of results to return Returns: Formatted search results """ try: results = [] # Extract the main text response if hasattr(response, 'text'): main_text = response.text else: main_text = str(response) # Extract grounding metadata if available grounding_metadata = None if hasattr(response, 'candidates') and response.candidates: candidate = response.candidates[0] if hasattr(candidate, 'grounding_metadata'): grounding_metadata = candidate.grounding_metadata # If we have grounding metadata, use it to create structured results if grounding_metadata: # Extract search entry point and grounding chunks grounding_chunks = [] if hasattr(grounding_metadata, 'grounding_chunks'): grounding_chunks = grounding_metadata.grounding_chunks # Extract grounding supports (links between text and sources) grounding_supports = [] if hasattr(grounding_metadata, 'grounding_supports'): grounding_supports = grounding_metadata.grounding_supports # Create result entries from grounding chunks seen_uris = set() seen_text = set() # Track unique text content for i, chunk in enumerate(grounding_chunks[:n_results]): if hasattr(chunk, 'web') and chunk.web: uri = chunk.web.uri if hasattr(chunk.web, 'uri') else "" title = chunk.web.title if hasattr(chunk.web, 'title') else "Documentation" # CRITICAL: Only include results from docs.observeinc.com # Check both URI and title since URI may be a redirect URL is_observe_doc = ( ("docs.observeinc.com" in uri.lower() if uri else False) or ("observeinc.com" in title.lower() if title else False) ) if not is_observe_doc: semantic_logger.debug(f"skipping non-observe result | title:{title} | uri:{uri}") continue # Find relevant text segments that reference this chunk relevant_text = _extract_relevant_text(main_text, chunk, grounding_supports, i) # Skip duplicates by URI or by text content text_hash = relevant_text[:200] # Use first 200 chars as hash if uri in seen_uris or text_hash in seen_text: continue seen_uris.add(uri) seen_text.add(text_hash) results.append({ "id": f"gemini_result_{i}", "score": 1.0 - (i * 0.1), # Decreasing score for ranking "text": relevant_text, "source": uri, "title": title }) semantic_logger.debug(f"parsed {len(results)} grounded results from gemini") # ONLY return grounded results - no AI-generated fallback content if not results: # No grounded results found - suggest query rewording suggestion = _generate_query_suggestions(query) results.append({ "id": "no_grounded_results", "score": 0.0, "text": f"No documentation found for: '{query}'.\n\nSuggestions:\n{suggestion}\n\nTry:\n- Using different keywords (e.g., 'OPAL' instead of 'SQL')\n- Breaking complex queries into simpler parts\n- Searching for specific function names or verbs\n- Checking docs.observeinc.com directly", "source": "search_assistant", "title": "No Grounded Results - Try Rewording Query" }) semantic_logger.info(f"no grounded results found for query: '{query}'") return results[:n_results] except Exception as e: semantic_logger.error(f"error parsing gemini response | error:{e}") return [{ "id": "parse_error", "score": 1.0, "text": f"Error parsing search results: {str(e)}", "source": "error", "title": "Parse Error" }] def _generate_query_suggestions(query: str) -> str: """ Generate helpful suggestions for rewording failed queries Args: query: Original search query that returned no results Returns: Suggestion text """ suggestions = [] # Detect SQL-specific terms and suggest OPAL alternatives sql_terms = { 'select': 'Use OPAL verbs like filter, make_col, aggregate', 'from': 'OPAL queries start with dataset reference or align', 'where': 'Use filter verb in OPAL', 'group by': 'Use group_by() or statsby in OPAL', 'order by': 'Use sort verb in OPAL', 'join': 'Try searching for "lookup" or "join" in OPAL', 'over': 'OPAL uses window() function, not OVER clause', 'partition by': 'Use group_by() within window() in OPAL', } query_lower = query.lower() for sql_term, opal_alternative in sql_terms.items(): if sql_term in query_lower: suggestions.append(f"- Instead of '{sql_term}': {opal_alternative}") # General suggestions if no SQL terms detected if not suggestions: suggestions.append("- Try searching for specific OPAL function names (filter, statsby, window)") suggestions.append("- Search for the verb/operation you want to perform") suggestions.append("- Use more general terms (e.g., 'aggregation' instead of specific syntax)") return "\n".join(suggestions) def _extract_relevant_text(main_text: str, chunk: Any, supports: List[Any], chunk_index: int) -> str: """ Extract text segments that are grounded in this specific chunk Args: main_text: Full response text chunk: Grounding chunk supports: List of grounding supports linking text to sources chunk_index: Index of this chunk in the grounding_chunks list Returns: Relevant text segment unique to this chunk """ try: # First priority: Use the chunk's own content if available if hasattr(chunk, 'web') and chunk.web: # Try to get snippet or excerpt from the web chunk itself # DO NOT truncate - Gemini already provides appropriately-sized chunks if hasattr(chunk.web, 'snippet') and chunk.web.snippet: semantic_logger.debug(f"using chunk snippet | length:{len(chunk.web.snippet)}") return chunk.web.snippet if hasattr(chunk.web, 'text') and chunk.web.text: semantic_logger.debug(f"using chunk text | length:{len(chunk.web.text)}") return chunk.web.text # Second priority: Find supports that reference THIS specific chunk relevant_segments = [] for support in supports: if not hasattr(support, 'segment') or not hasattr(support, 'grounding_chunk_indices'): continue # Check if this support references our specific chunk index chunk_indices = support.grounding_chunk_indices if support.grounding_chunk_indices else [] if chunk_index in chunk_indices: # Extract the text segment for THIS chunk only if hasattr(support.segment, 'text'): segment_text = support.segment.text if segment_text and segment_text not in relevant_segments: relevant_segments.append(segment_text) # Combine segments specific to this chunk - DO NOT truncate if relevant_segments: combined = " ".join(relevant_segments) semantic_logger.debug(f"using combined segments | count:{len(relevant_segments)} | length:{len(combined)}") return combined # Last resort: Use offset portion of main text to ensure uniqueness # Use chunk_index to get different portions for different chunks offset = chunk_index * 2000 fallback = main_text[offset:offset+3000] if len(main_text) > offset else main_text[:3000] semantic_logger.debug(f"using fallback text | offset:{offset} | length:{len(fallback)}") return fallback except Exception as e: semantic_logger.debug(f"error extracting relevant text | error:{e}") return main_text[:3000] if main_text else "Documentation available at source URL" def get_rate_limiter_stats() -> Dict[str, Any]: """Get current rate limiter statistics for monitoring""" return _rate_limiter.get_stats()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rustomax/observe-experimental-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server