Skip to main content
Glama
vertex_ai_rag.py10.8 kB
""" Google Vertex AI RAG MCP tool. This tool provides integration with Google Vertex AI's RAG (Retrieval-Augmented Generation) capabilities for enhanced document search and question answering. Environment variables: GOOGLE_CLOUD_PROJECT: Google Cloud project ID (required) GOOGLE_APPLICATION_CREDENTIALS: Path to service account key file (required) VERTEX_AI_LOCATION: Vertex AI location/region (default: us-central1) """ import os from typing import Any, Dict, List, Optional import json # Conditional imports for Google Cloud dependencies try: from google.cloud import aiplatform from google.cloud import discoveryengine_v1beta as discoveryengine GOOGLE_CLOUD_AVAILABLE = True except ImportError: GOOGLE_CLOUD_AVAILABLE = False from ..core.base import BaseMCPTool class VertexAIRAGMCPTool(BaseMCPTool): """Google Vertex AI RAG tool for document search and Q&A.""" def __init__(self): """Initialize the Vertex AI RAG tool.""" super().__init__() if not GOOGLE_CLOUD_AVAILABLE: raise ImportError( "Google Cloud dependencies not available. Install with: " "pip install google-cloud-aiplatform google-cloud-discoveryengine" ) self.project_id = os.getenv("GOOGLE_CLOUD_PROJECT") self.location = os.getenv("VERTEX_AI_LOCATION", "us-central1") if not self.project_id: raise ValueError("GOOGLE_CLOUD_PROJECT environment variable is required") # Initialize Vertex AI aiplatform.init(project=self.project_id, location=self.location) # Initialize Discovery Engine client self.discovery_client = discoveryengine.SearchServiceClient() def search_documents( self, query: str, search_engine_id: str, data_store_id: str, num_results: int = 10, filter_expression: Optional[str] = None ) -> Dict[str, Any]: """ Search documents using Vertex AI Discovery Engine. Args: query: Search query string search_engine_id: ID of the search engine data_store_id: ID of the data store num_results: Maximum number of results to return filter_expression: Optional filter expression for results Returns: Dictionary containing search results and metadata """ try: # Construct the search request search_request = discoveryengine.SearchRequest( serving_config=f"projects/{self.project_id}/locations/{self.location}/dataStores/{data_store_id}/servingConfigs/{search_engine_id}", query=query, page_size=num_results, filter=filter_expression, ) # Execute the search response = self.discovery_client.search(search_request) # Process results results = [] for result in response.results: doc_data = result.document.derived_struct_data results.append({ "title": doc_data.get("title", ""), "content": doc_data.get("content", ""), "url": doc_data.get("link", ""), "score": result.relevance_score, "metadata": dict(doc_data) }) return { "results": results, "total_count": len(results), "query": query, "error": None } except Exception as e: return { "results": [], "total_count": 0, "query": query, "error": str(e) } def rag_query( self, question: str, search_engine_id: str, data_store_id: str, context_documents: Optional[List[str]] = None ) -> Dict[str, Any]: """ Perform RAG (Retrieval-Augmented Generation) query. Args: question: Question to answer search_engine_id: ID of the search engine data_store_id: ID of the data store context_documents: Optional list of document IDs to focus on Returns: Dictionary containing answer and supporting documents """ try: # First, search for relevant documents search_results = self.search_documents( query=question, search_engine_id=search_engine_id, data_store_id=data_store_id, num_results=5 ) if search_results["error"]: return { "answer": "", "supporting_documents": [], "confidence": 0.0, "error": search_results["error"] } # Extract context from search results context_parts = [] supporting_docs = [] for result in search_results["results"]: context_parts.append(result["content"]) supporting_docs.append({ "title": result["title"], "url": result["url"], "relevance_score": result["score"] }) # Combine context for RAG context = "\n\n".join(context_parts) # Use Vertex AI to generate answer based on retrieved context answer_prompt = f""" Based on the following context documents, please answer the question: {question} Context: {context} Please provide a comprehensive answer based on the context. If the context doesn't contain enough information to answer the question, please state that clearly. """ # Initialize Vertex AI model for text generation from vertexai.language_models import TextGenerationModel model = TextGenerationModel.from_pretrained("text-bison@001") response = model.predict( prompt=answer_prompt, max_output_tokens=1024, temperature=0.1 ) return { "answer": response.text, "supporting_documents": supporting_docs, "confidence": 0.8, # Placeholder confidence score "error": None } except Exception as e: return { "answer": "", "supporting_documents": [], "confidence": 0.0, "error": str(e) } def create_document_index( self, documents: List[Dict[str, Any]], data_store_id: str, batch_size: int = 100 ) -> Dict[str, Any]: """ Create or update document index in Vertex AI Discovery Engine. Args: documents: List of documents to index data_store_id: ID of the data store batch_size: Number of documents to process in each batch Returns: Dictionary containing indexing results """ try: from google.cloud import discoveryengine_v1beta as discoveryengine # Initialize document service client document_client = discoveryengine.DocumentServiceClient() indexed_count = 0 errors = [] # Process documents in batches for i in range(0, len(documents), batch_size): batch = documents[i:i + batch_size] for doc in batch: try: # Create document document = discoveryengine.Document( id=doc.get("id", ""), struct_data=doc.get("content", {}), content=discoveryengine.Document.Content( mime_type="application/json", uri=doc.get("uri", "") ) ) # Create document request request = discoveryengine.CreateDocumentRequest( parent=f"projects/{self.project_id}/locations/{self.location}/dataStores/{data_store_id}/branches/default_branch", document=document ) # Create the document response = document_client.create_document(request) indexed_count += 1 except Exception as e: errors.append(f"Document {doc.get('id', 'unknown')}: {str(e)}") return { "indexed_count": indexed_count, "total_documents": len(documents), "errors": errors, "success": len(errors) == 0 } except Exception as e: return { "indexed_count": 0, "total_documents": len(documents), "errors": [str(e)], "success": False } def get_data_stores(self) -> Dict[str, Any]: """ List available data stores in the project. Returns: Dictionary containing list of data stores """ try: from google.cloud import discoveryengine_v1beta as discoveryengine # Initialize data store service client data_store_client = discoveryengine.DataStoreServiceClient() # List data stores parent = f"projects/{self.project_id}/locations/{self.location}" request = discoveryengine.ListDataStoresRequest(parent=parent) response = data_store_client.list_data_stores(request) data_stores = [] for data_store in response: data_stores.append({ "id": data_store.name.split("/")[-1], "name": data_store.display_name, "type": data_store.content_config.name, "state": data_store.state.name }) return { "data_stores": data_stores, "error": None } except Exception as e: return { "data_stores": [], "error": str(e) }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/connectaman/Pitchlense-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server