Skip to main content
Glama

Snowflake MCP Agent System

client.py24.5 kB
""" LangGraph-based Agentic Architecture with Few-Shot QnA Training This module implements a sophisticated multi-agent system using LangGraph with support for few-shot learning and agent training capabilities. """ import os import json import asyncio import argparse import uuid import logging from typing import TypedDict, Optional, List, Dict, Any, Literal, Annotated from datetime import datetime from enum import Enum from dataclasses import dataclass, field from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, SystemMessage from langchain_core.prompts import ChatPromptTemplate, FewShotChatMessagePromptTemplate from langchain_openai import ChatOpenAI from langchain_mcp_adapters.client import MultiServerMCPClient from langgraph.graph import StateGraph, END from langgraph.prebuilt import ToolNode from langgraph.checkpoint.memory import MemorySaver from dotenv import load_dotenv # Load environment variables load_dotenv() os.environ["NO_PROXY"] = "127.0.0.1,localhost" # Setup logging logger = logging.getLogger(__name__) # ============================================================================ # Agent Archetypes and Few-Shot Training Data # ============================================================================ class AgentArchetype(Enum): """Define different agent archetypes with specific capabilities""" ANALYST = "analyst" LINEAGE_EXPERT = "lineage_expert" USAGE_AUDITOR = "usage_auditor" METADATA_CURATOR = "metadata_curator" ACCESS_CONTROLLER = "access_controller" QUERY_OPTIMIZER = "query_optimizer" ORCHESTRATOR = "orchestrator" @dataclass class FewShotExample: """Structure for few-shot learning examples""" query: str context: Dict[str, Any] response: str agent_type: AgentArchetype metadata: Dict[str, Any] = field(default_factory=dict) class FewShotTrainingStore: """Manages few-shot examples for agent training""" def __init__(self, storage_path: str = "./agent_training_data.json"): self.storage_path = storage_path self.examples: Dict[AgentArchetype, List[FewShotExample]] = { archetype: [] for archetype in AgentArchetype } self._load_examples() def _load_examples(self): """Load existing training examples from storage""" if os.path.exists(self.storage_path): with open(self.storage_path, 'r') as f: data = json.load(f) for archetype in AgentArchetype: if archetype.value in data: self.examples[archetype] = [ FewShotExample(**example) for example in data[archetype.value] ] def add_example(self, example: FewShotExample): """Add a new training example""" self.examples[example.agent_type].append(example) self._save_examples() def get_examples(self, agent_type: AgentArchetype, n: int = 3) -> List[FewShotExample]: """Retrieve n most relevant examples for an agent type""" return self.examples[agent_type][-n:] if self.examples[agent_type] else [] def _save_examples(self): """Persist examples to storage""" data = { archetype.value: [ { "query": ex.query, "context": ex.context, "response": ex.response, "agent_type": ex.agent_type.value, "metadata": ex.metadata } for ex in examples ] for archetype, examples in self.examples.items() } with open(self.storage_path, 'w') as f: json.dump(data, f, indent=2) # ============================================================================ # Enhanced Agent State # ============================================================================ class AgentState(TypedDict): """Enhanced state management for the multi-agent system""" # Core fields query: str messages: List[BaseMessage] session_id: str # Planning and execution execution_plan: List[Dict[str, Any]] current_step: int completed_steps: List[str] # Agent-specific data agent_responses: Dict[str, Any] metadata_info: Optional[Dict] lineage_info: Optional[Dict] usage_info: Optional[Dict] access_info: Optional[Dict] profiling_info: Optional[Dict] query_analysis: Optional[Dict] # Decision and routing next_agent: Optional[str] confidence_scores: Dict[str, float] # Final output final_response: Optional[str] recommendations: List[str] # Training feedback user_feedback: Optional[str] training_examples: List[FewShotExample] # ============================================================================ # Specialized Agent Classes # ============================================================================ class BaseAgent: """Base class for all specialized agents""" def __init__( self, model: ChatOpenAI, archetype: AgentArchetype, training_store: FewShotTrainingStore, tools: Optional[List] = None ): self.model = model self.archetype = archetype self.training_store = training_store self.tools = tools or [] self.prompt_template = self._build_prompt_template() def _build_prompt_template(self) -> ChatPromptTemplate: """Build prompt template with few-shot examples""" examples = self.training_store.get_examples(self.archetype) base_prompt = self._get_base_prompt() if examples: few_shot_prompt = self._create_few_shot_prompt(examples) return ChatPromptTemplate.from_messages([ SystemMessage(content=base_prompt), *few_shot_prompt, HumanMessage(content="{query}") ]) return ChatPromptTemplate.from_messages([ SystemMessage(content=base_prompt), HumanMessage(content="{query}") ]) def _get_base_prompt(self) -> str: """Get base system prompt for the agent""" return f"You are a {self.archetype.value} agent." def _create_few_shot_prompt(self, examples: List[FewShotExample]) -> List[BaseMessage]: """Create few-shot prompt messages from examples""" messages = [] for example in examples: messages.append(HumanMessage(content=example.query)) messages.append(AIMessage(content=example.response)) return messages async def process(self, state: AgentState) -> AgentState: """Process the current state and return updated state""" prompt = self.prompt_template.format_messages(query=state["query"]) response = await self.model.ainvoke(prompt) # Update state with agent's response state["agent_responses"][self.archetype.value] = response.content return state class AnalystAgent(BaseAgent): """Specialized agent for data analysis""" def _get_base_prompt(self) -> str: return """You are an expert Data Analyst Agent specializing in Snowflake data analysis. Your responsibilities: 1. Perform comprehensive EDA on datasets 2. Identify patterns, trends, and anomalies 3. Calculate statistical metrics and distributions 4. Generate insights from data correlations 5. Provide data-driven recommendations Focus on: - Accurate aggregations and counts - Meaningful statistical analysis - Clear visualization recommendations - Actionable insights from patterns""" class LineageExpertAgent(BaseAgent): """Specialized agent for data lineage analysis""" def _get_base_prompt(self) -> str: return """You are a Data Lineage Expert Agent specializing in understanding data flows. Your responsibilities: 1. Trace upstream and downstream data dependencies 2. Build comprehensive lineage maps (up to 3 levels) 3. Identify data flow bottlenecks 4. Analyze impact of changes across pipelines 5. Document data transformation logic Focus on: - Complete lineage traversal - Clear source-to-target mappings - Transformation logic documentation - Impact analysis for changes""" class UsageAuditorAgent(BaseAgent): """Specialized agent for usage auditing and optimization""" def _get_base_prompt(self) -> str: return """You are a Usage Auditor Agent specializing in resource utilization analysis. Your responsibilities: 1. Monitor and analyze usage patterns 2. Identify high-resource consumers 3. Detect usage anomalies and trends 4. Recommend optimization opportunities 5. Track user access patterns Focus on: - Resource consumption metrics - User behavior patterns - Cost optimization opportunities - Performance bottlenecks""" class QueryOptimizerAgent(BaseAgent): """Specialized agent for query optimization""" def _get_base_prompt(self) -> str: return """You are a Query Optimizer Agent specializing in SQL performance. Your responsibilities: 1. Analyze query execution plans 2. Identify performance bottlenecks 3. Recommend query optimizations 4. Suggest indexing strategies 5. Detect common anti-patterns Use the AAI_SQL_ANALYZER dataset to: - Identify frequently queried table combinations - Detect resource-intensive queries - Find optimization opportunities - Recommend data product creation""" class MetadataCuratorAgent(BaseAgent): """Specialized agent for metadata management and documentation""" def _get_base_prompt(self) -> str: return """You are a Metadata Curator Agent specializing in data documentation and governance. Your responsibilities: 1. Gather and organize table metadata and documentation 2. Identify data quality and completeness issues 3. Provide comprehensive data dictionary information 4. Analyze data lineage and dependencies 5. Recommend data governance improvements Focus on: - Complete metadata coverage - Clear documentation standards - Data quality assessments - Governance compliance""" # ============================================================================ # Orchestrator Agent # ============================================================================ class OrchestratorAgent: """Main orchestrator that coordinates other agents""" def __init__(self, model: ChatOpenAI, training_store: FewShotTrainingStore): self.model = model self.training_store = training_store def create_execution_plan(self, state: AgentState) -> AgentState: """Create an execution plan based on the query""" planning_prompt = f""" You are an orchestrator planning agent execution for this query: "{state['query']}" Available agents: - analyst: Data analysis and EDA - lineage_expert: Data flow and dependencies - usage_auditor: Usage patterns and optimization - metadata_curator: Metadata and documentation - access_controller: Access control and permissions - query_optimizer: Query performance analysis Create an execution plan as a sequence of agents to invoke. Consider dependencies between agents. Return a JSON list of steps: [ {{"agent": "agent_name", "reason": "why this agent is needed", "priority": 1}}, ... ] """ response = self.model.invoke(planning_prompt) try: plan = json.loads(response.content) state["execution_plan"] = plan state["current_step"] = 0 except json.JSONDecodeError: # Fallback plan state["execution_plan"] = [ {"agent": "metadata_curator", "reason": "Gather context", "priority": 1}, {"agent": "analyst", "reason": "Analyze data", "priority": 2} ] state["current_step"] = 0 return state def route_next_agent(self, state: AgentState) -> str: """Determine next agent to execute""" if state["current_step"] < len(state["execution_plan"]): next_agent = state["execution_plan"][state["current_step"]]["agent"] state["next_agent"] = next_agent state["current_step"] += 1 return next_agent return "synthesizer" # ============================================================================ # Synthesis and Response Generation # ============================================================================ class SynthesizerAgent: """Synthesizes responses from all agents into final output""" def __init__(self, model: ChatOpenAI): self.model = model def synthesize(self, state: AgentState) -> AgentState: """Synthesize all agent responses into final output""" synthesis_prompt = f""" You are synthesizing responses from multiple agents for this query: "{state['query']}" Agent Responses: {json.dumps(state['agent_responses'], indent=2)} Create a comprehensive, well-structured response that: 1. Directly answers the user's question 2. Integrates insights from all agents 3. Provides actionable recommendations 4. Includes relevant visualizations if needed Format as a clear, professional response. """ response = self.model.invoke(synthesis_prompt) state["final_response"] = response.content # Generate training example if feedback is positive if state.get("user_feedback") == "positive": example = FewShotExample( query=state["query"], context={"agent_responses": state["agent_responses"]}, response=state["final_response"], agent_type=AgentArchetype.ORCHESTRATOR, metadata={"session_id": state["session_id"], "timestamp": datetime.now().isoformat()} ) state["training_examples"].append(example) return state # ============================================================================ # LangGraph Workflow Construction # ============================================================================ class SnowflakeAgenticSystem: """Main system orchestrating the LangGraph workflow""" def __init__(self, jwt_token: str = None): self.jwt_token = jwt_token or os.getenv("JWT_TOKEN") self.training_store = FewShotTrainingStore() self.memory = MemorySaver() self.setup_components() self.build_graph() def setup_components(self): """Initialize MCP client, model, and agents""" # MCP Client setup try: self.mcp_client = MultiServerMCPClient({ "snowflake": { "url": "http://127.0.0.1:8000/mcp", "transport": "streamable_http", } }) except Exception as e: logger.warning(f"Failed to initialize MCP client: {e}. Operating in standalone mode.") self.mcp_client = None # Model setup self.model = ChatOpenAI( base_url="https://askattapis-orchestration-stage.dev.att.com/api/v1", api_key=self.jwt_token, temperature=0.3, model_name="gpt-4.1" ) # Initialize agents self.orchestrator = OrchestratorAgent(self.model, self.training_store) self.synthesizer = SynthesizerAgent(self.model) # Specialized agents self.agents = { "analyst": AnalystAgent(self.model, AgentArchetype.ANALYST, self.training_store), "lineage_expert": LineageExpertAgent(self.model, AgentArchetype.LINEAGE_EXPERT, self.training_store), "usage_auditor": UsageAuditorAgent(self.model, AgentArchetype.USAGE_AUDITOR, self.training_store), "query_optimizer": QueryOptimizerAgent(self.model, AgentArchetype.QUERY_OPTIMIZER, self.training_store), "metadata_curator": MetadataCuratorAgent(self.model, AgentArchetype.METADATA_CURATOR, self.training_store), } def build_graph(self): """Build the LangGraph workflow""" workflow = StateGraph(AgentState) # Add nodes workflow.add_node("planner", self.orchestrator.create_execution_plan) workflow.add_node("router", self.route_to_agent) workflow.add_node("analyst", self.execute_analyst) workflow.add_node("lineage_expert", self.execute_lineage) workflow.add_node("usage_auditor", self.execute_usage) workflow.add_node("query_optimizer", self.execute_optimizer) workflow.add_node("metadata_curator", self.execute_metadata_curator) workflow.add_node("synthesizer", self.synthesizer.synthesize) # Add edges workflow.set_entry_point("planner") workflow.add_edge("planner", "router") # Conditional routing from router workflow.add_conditional_edges( "router", self.determine_next_node, { "analyst": "analyst", "lineage_expert": "lineage_expert", "usage_auditor": "usage_auditor", "query_optimizer": "query_optimizer", "metadata_curator": "metadata_curator", "synthesizer": "synthesizer", "end": END } ) # All agents route back to router for agent in ["analyst", "lineage_expert", "usage_auditor", "query_optimizer", "metadata_curator"]: workflow.add_edge(agent, "router") workflow.add_edge("synthesizer", END) # Compile the graph with memory self.app = workflow.compile(checkpointer=self.memory) def route_to_agent(self, state: AgentState) -> AgentState: """Router node that determines next agent""" next_agent = self.orchestrator.route_next_agent(state) state["next_agent"] = next_agent return state def determine_next_node(self, state: AgentState) -> str: """Conditional function to determine next node""" return state.get("next_agent", "synthesizer") async def execute_analyst(self, state: AgentState) -> AgentState: """Execute analyst agent""" agent = self.agents["analyst"] return await agent.process(state) async def execute_lineage(self, state: AgentState) -> AgentState: """Execute lineage expert agent""" agent = self.agents["lineage_expert"] return await agent.process(state) async def execute_usage(self, state: AgentState) -> AgentState: """Execute usage auditor agent""" agent = self.agents["usage_auditor"] return await agent.process(state) async def execute_optimizer(self, state: AgentState) -> AgentState: """Execute query optimizer agent""" agent = self.agents["query_optimizer"] return await agent.process(state) async def execute_metadata_curator(self, state: AgentState) -> AgentState: """Execute metadata curator agent""" agent = self.agents["metadata_curator"] return await agent.process(state) async def process_query(self, query: str, session_id: str = None) -> str: """Process a user query through the agent system""" session_id = session_id or str(uuid.uuid4()) # Initialize state initial_state = { "query": query, "messages": [HumanMessage(content=query)], "session_id": session_id, "execution_plan": [], "current_step": 0, "completed_steps": [], "agent_responses": {}, "metadata_info": None, "lineage_info": None, "usage_info": None, "access_info": None, "profiling_info": None, "query_analysis": None, "next_agent": None, "confidence_scores": {}, "final_response": None, "recommendations": [], "user_feedback": None, "training_examples": [] } # Get MCP tools if client is available tools = [] if self.mcp_client: try: tools = await self.mcp_client.get_tools() logger.info(f"Loaded {len(tools)} tools from MCP client") except Exception as e: logger.warning(f"Failed to get tools from MCP client: {e}") # Update agents with tools for agent in self.agents.values(): agent.tools = tools # Execute workflow config = {"configurable": {"thread_id": session_id}} result = await self.app.ainvoke(initial_state, config) return result["final_response"] def collect_feedback(self, session_id: str, feedback: str): """Collect user feedback for training improvement""" # Implementation for feedback collection pass # ============================================================================ # Interactive CLI Interface # ============================================================================ async def interactive_session(): """Run interactive chat session with the agent system""" print("Initializing Snowflake Agentic System...") system = SnowflakeAgenticSystem() print("\n" + "="*60) print("Snowflake MCP Agent - LangGraph Architecture") print("Type 'exit' to quit, 'feedback' to provide feedback") print("="*60 + "\n") session_id = str(uuid.uuid4()) print(f"Session ID: {session_id}\n") while True: try: user_input = input("\n🔍 Query: ").strip() if not user_input: continue if user_input.lower() == 'exit': print("\n👋 Goodbye!") break if user_input.lower() == 'feedback': feedback = input("Please provide feedback (positive/negative/neutral): ").strip() system.collect_feedback(session_id, feedback) print("Thank you for your feedback!") continue print("\n⚡ Processing query through agent network...") response = await system.process_query(user_input, session_id) print("\n" + "="*60) print("📊 Response:") print("="*60) print(response) except KeyboardInterrupt: print("\n\n👋 Session interrupted. Goodbye!") break except Exception as e: print(f"\n❌ Error: {str(e)}") print("Please try again or type 'exit' to quit.") async def batch_process(queries_file: str): """Process batch queries from a file""" system = SnowflakeAgenticSystem() with open(queries_file, 'r') as f: queries = [line.strip() for line in f if line.strip()] results = [] for i, query in enumerate(queries, 1): print(f"\nProcessing query {i}/{len(queries)}: {query[:50]}...") response = await system.process_query(query) results.append({ "query": query, "response": response, "timestamp": datetime.now().isoformat() }) # Save results output_file = f"batch_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" with open(output_file, 'w') as f: json.dump(results, f, indent=2) print(f"\nResults saved to {output_file}") # ============================================================================ # Main Entry Point # ============================================================================ async def main(): parser = argparse.ArgumentParser(description="Snowflake MCP Agent with LangGraph") parser.add_argument( "--mode", choices=["interactive", "batch", "query"], default="interactive", help="Execution mode" ) parser.add_argument( "--query", type=str, help="Direct query to process (for 'query' mode)" ) parser.add_argument( "--file", type=str, help="File containing queries (for 'batch' mode)" ) parser.add_argument( "--train", action="store_true", help="Enable training mode to collect examples" ) args = parser.parse_args() if args.mode == "interactive": await interactive_session() elif args.mode == "query" and args.query: system = SnowflakeAgenticSystem() response = await system.process_query(args.query) print(response) elif args.mode == "batch" and args.file: await batch_process(args.file) else: print("Invalid arguments. Use --help for usage information.") if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/silverknight404/mcp_code2'

If you have feedback or need assistance with the MCP directory API, please join our Discord server