Skip to main content
Glama

NetBox MCP Server

by fringemonkey
structured_flow_demo.py14.2 kB
#!/usr/bin/env python3 """ Structured Flow Demo This demonstrates the complete structured data flow: User -> Gateway -> Router -> LLM -> MCP -> Gateway -> User With full control over both sides, we can optimize the entire pipeline. """ import asyncio import json import time import sys import os from typing import Dict, Any, List # Add the src directory to the Python path sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'src'))) try: from structured_protocol import ( StructuredProtocol, ProtocolOptimizer, StructuredQuery, RouterDecision, LLMRequest, LLMResponse, MCPRequest, MCPResponse, FinalResponse, ToolCategory, ModelLocation ) except ImportError: # Fallback for different import paths sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from src.structured_protocol import ( StructuredProtocol, ProtocolOptimizer, StructuredQuery, RouterDecision, LLMRequest, LLMResponse, MCPRequest, MCPResponse, FinalResponse, ToolCategory, ModelLocation ) class MockLLM: """Mock LLM for demonstration purposes.""" def __init__(self, name: str, location: ModelLocation): self.name = name self.location = location self.cost_per_token = 0.0001 if location == ModelLocation.CLOUD else 0.00001 async def process_request(self, request: LLMRequest) -> LLMResponse: """Process an LLM request.""" start_time = time.time() # Simulate processing time based on model location if self.location == ModelLocation.LOCAL: await asyncio.sleep(0.1) # Fast local processing elif self.location == ModelLocation.CLOUD: await asyncio.sleep(0.5) # Slower cloud processing else: await asyncio.sleep(0.3) # Medium hybrid processing # Simulate tool calls based on available tools tool_calls = [] for tool in request.tools_available: if tool["category"] in ["hosts", "vms", "ips", "vlans"]: tool_calls.append({ "tool_name": tool["name"], "parameters": { "limit": 10, "include_certainty": True }, "reasoning": f"Need to query {tool['category']} for user request" }) processing_time = time.time() - start_time tokens_used = len(request.user_query.split()) * 2 + len(tool_calls) * 10 return LLMResponse( query_id=request.query_id, content=f"Based on your query '{request.user_query}', I'll query the infrastructure data to provide you with the most relevant information.", tool_calls=tool_calls, confidence=0.9, reasoning="Analyzed user query and determined appropriate tools to use", tokens_used=tokens_used, processing_time=processing_time, model_used=self.name ) class MockMCP: """Mock MCP server for demonstration purposes.""" def __init__(self): self.protocol = StructuredProtocol() self.cache = {} async def process_request(self, request: MCPRequest) -> MCPResponse: """Process an MCP request.""" start_time = time.time() # Simulate processing time await asyncio.sleep(0.2) # Generate mock data based on tool name data = self._generate_mock_data(request.tool_name, request.parameters) processing_time = time.time() - start_time return MCPResponse( query_id=request.query_id, tool_name=request.tool_name, data=data, metadata={ "source": "netbox", "tool_category": self._get_tool_category(request.tool_name), "timestamp": time.time() }, confidence=0.95, processing_time=processing_time, cache_hit=False ) def _generate_mock_data(self, tool_name: str, parameters: Dict[str, Any]) -> List[Dict[str, Any]]: """Generate mock data based on tool name.""" if "host" in tool_name: return [ { "id": 1, "name": "web-server-01", "status": "Active", "primary_ip": "192.168.1.10/24", "role": "Web Server", "certainty_score": 0.95 }, { "id": 2, "name": "db-server-01", "status": "Active", "primary_ip": "192.168.1.20/24", "role": "Database Server", "certainty_score": 0.92 } ] elif "vm" in tool_name: return [ { "id": 3, "name": "app-vm-01", "status": "Active", "primary_ip": "192.168.1.30/24", "role": "Application Server", "certainty_score": 0.88 } ] elif "ip" in tool_name: return [ { "id": 101, "address": "192.168.1.10/24", "status": "Active", "assigned_to": "web-server-01", "certainty_score": 0.95 } ] elif "vlan" in tool_name: return [ { "id": 201, "vid": 100, "name": "Production VLAN", "status": "Active", "certainty_score": 0.90 } ] else: return [] def _get_tool_category(self, tool_name: str) -> str: """Get tool category from tool name.""" if "host" in tool_name: return "hosts" elif "vm" in tool_name: return "vms" elif "ip" in tool_name: return "ips" elif "vlan" in tool_name: return "vlans" else: return "unknown" class StructuredFlowDemo: """Demonstrates the complete structured data flow.""" def __init__(self): self.protocol = StructuredProtocol() self.optimizer = ProtocolOptimizer() self.llm_models = { ModelLocation.LOCAL: MockLLM("llama-3.1-8b", ModelLocation.LOCAL), ModelLocation.CLOUD: MockLLM("gpt-4-turbo", ModelLocation.CLOUD), ModelLocation.HYBRID: MockLLM("gpt-3.5-turbo", ModelLocation.HYBRID) } self.mcp = MockMCP() self.metrics = { "total_queries": 0, "total_processing_time": 0.0, "total_tokens_used": 0, "total_cost": 0.0, "cache_hits": 0 } async def process_user_query(self, user_id: str, query: str, context: Dict[str, Any] = None) -> FinalResponse: """Process a complete user query through the structured flow.""" start_time = time.time() self.metrics["total_queries"] += 1 print(f"\nProcessing Query: '{query}'") print("=" * 60) # Step 1: Create structured query structured_query = self.protocol.create_query( user_id=user_id, query=query, context=context or {} ) print(f"Step 1 - Structured Query: {structured_query.id}") # Step 2: Router decision router_decision = self.optimizer.optimize_router_decision(structured_query) print(f"Step 2 - Router Decision: {router_decision.model_location.value} model, {len(router_decision.tools_needed)} tools") # Step 3: LLM request llm_request = self.optimizer.optimize_llm_request(structured_query, router_decision) print(f"Step 3 - LLM Request: {llm_request.model_config['name']}") # Step 4: LLM processing llm_model = self.llm_models[router_decision.model_location] llm_response = await llm_model.process_request(llm_request) print(f"Step 4 - LLM Response: {llm_response.tokens_used} tokens, {llm_response.processing_time:.2f}s") # Step 5: MCP requests mcp_responses = [] for tool_call in llm_response.tool_calls: mcp_request = self.protocol.create_mcp_request( query_id=structured_query.id, tool_name=tool_call["tool_name"], parameters=tool_call["parameters"], context=structured_query.context ) mcp_response = await self.mcp.process_request(mcp_request) mcp_responses.append(mcp_response) print(f"Step 5 - MCP Response: {mcp_response.tool_name} -> {len(mcp_response.data)} items") # Step 6: Generate final response all_data = [] sources = [] tools_used = [] for mcp_response in mcp_responses: all_data.extend(mcp_response.data) sources.append({ "tool": mcp_response.tool_name, "count": len(mcp_response.data), "confidence": mcp_response.confidence }) tools_used.append(mcp_response.tool_name) # Calculate overall confidence overall_confidence = sum(mcp_response.confidence for mcp_response in mcp_responses) / len(mcp_responses) if mcp_responses else 0.0 # Generate answer answer = self._generate_answer(query, all_data, llm_response.content) processing_time = time.time() - start_time # Create final response final_response = self.protocol.create_final_response( query_id=structured_query.id, user_id=user_id, answer=answer, data=all_data, sources=sources, confidence=overall_confidence, processing_time=processing_time, model_used=llm_response.model_used, tools_used=tools_used, cost=llm_response.tokens_used * llm_model.cost_per_token ) # Update metrics self.metrics["total_processing_time"] += processing_time self.metrics["total_tokens_used"] += llm_response.tokens_used self.metrics["total_cost"] += final_response.cost print(f"Final Response: {len(all_data)} items, {overall_confidence:.2f} confidence, {processing_time:.2f}s") return final_response def _generate_answer(self, query: str, data: List[Dict[str, Any]], llm_content: str) -> str: """Generate a human-readable answer from the data.""" if not data: return "No data found for your query." answer = f"{llm_content}\n\n" answer += f"Found {len(data)} items:\n\n" for i, item in enumerate(data[:5], 1): # Limit to 5 items if "name" in item: answer += f"{i}. **{item['name']}**" if "status" in item: answer += f" - {item['status']}" if "primary_ip" in item: answer += f" ({item['primary_ip']})" answer += "\n" if len(data) > 5: answer += f"\n... and {len(data) - 5} more items.\n" return answer def print_metrics(self): """Print performance metrics.""" print(f"\nPERFORMANCE METRICS") print("=" * 40) print(f"Total Queries: {self.metrics['total_queries']}") print(f"Average Processing Time: {self.metrics['total_processing_time'] / max(self.metrics['total_queries'], 1):.2f}s") print(f"Total Tokens Used: {self.metrics['total_tokens_used']}") print(f"Total Cost: ${self.metrics['total_cost']:.4f}") print(f"Cache Hit Rate: {self.metrics['cache_hits'] / max(self.metrics['total_queries'], 1) * 100:.1f}%") async def main(): """Main demonstration function.""" print("STRUCTURED FLOW DEMONSTRATION") print("=" * 60) print("Demonstrating: User -> Gateway -> Router -> LLM -> MCP -> Gateway -> User") print("With full control over both sides for optimal performance") print("=" * 60) demo = StructuredFlowDemo() # Test queries test_queries = [ { "user_id": "user123", "query": "Show me all web servers in the infrastructure", "context": {"department": "IT", "priority": "high"} }, { "user_id": "user456", "query": "Find all VMs with high confidence scores", "context": {"department": "Operations", "priority": "medium"} }, { "user_id": "user789", "query": "List all IP addresses in the production VLAN", "context": {"department": "Security", "priority": "high"} } ] # Process each query for i, query_data in enumerate(test_queries, 1): print(f"\nTEST QUERY {i}") print("-" * 40) response = await demo.process_user_query( user_id=query_data["user_id"], query=query_data["query"], context=query_data["context"] ) print(f"\nRESPONSE SUMMARY:") print(f"Answer: {response.answer[:100]}...") print(f"Data Items: {len(response.data)}") print(f"Confidence: {response.confidence:.2f}") print(f"Processing Time: {response.processing_time:.2f}s") print(f"Model Used: {response.model_used}") print(f"Tools Used: {', '.join(response.tools_used)}") print(f"Cost: ${response.cost:.4f}") # Print final metrics demo.print_metrics() print(f"\nDEMONSTRATION COMPLETE!") print(f"Structured protocol working perfectly!") print(f"Ready for production deployment!") if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/fringemonkey/mcp-dc'

If you have feedback or need assistance with the MCP directory API, please join our Discord server