structured_flow_demo.py•14.2 kB
#!/usr/bin/env python3
"""
Structured Flow Demo
This demonstrates the complete structured data flow:
User -> Gateway -> Router -> LLM -> MCP -> Gateway -> User
With full control over both sides, we can optimize the entire pipeline.
"""
import asyncio
import json
import time
import sys
import os
from typing import Dict, Any, List
# Add the src directory to the Python path
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'src')))
try:
from structured_protocol import (
StructuredProtocol, ProtocolOptimizer, StructuredQuery, RouterDecision,
LLMRequest, LLMResponse, MCPRequest, MCPResponse, FinalResponse,
ToolCategory, ModelLocation
)
except ImportError:
# Fallback for different import paths
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from src.structured_protocol import (
StructuredProtocol, ProtocolOptimizer, StructuredQuery, RouterDecision,
LLMRequest, LLMResponse, MCPRequest, MCPResponse, FinalResponse,
ToolCategory, ModelLocation
)
class MockLLM:
"""Mock LLM for demonstration purposes."""
def __init__(self, name: str, location: ModelLocation):
self.name = name
self.location = location
self.cost_per_token = 0.0001 if location == ModelLocation.CLOUD else 0.00001
async def process_request(self, request: LLMRequest) -> LLMResponse:
"""Process an LLM request."""
start_time = time.time()
# Simulate processing time based on model location
if self.location == ModelLocation.LOCAL:
await asyncio.sleep(0.1) # Fast local processing
elif self.location == ModelLocation.CLOUD:
await asyncio.sleep(0.5) # Slower cloud processing
else:
await asyncio.sleep(0.3) # Medium hybrid processing
# Simulate tool calls based on available tools
tool_calls = []
for tool in request.tools_available:
if tool["category"] in ["hosts", "vms", "ips", "vlans"]:
tool_calls.append({
"tool_name": tool["name"],
"parameters": {
"limit": 10,
"include_certainty": True
},
"reasoning": f"Need to query {tool['category']} for user request"
})
processing_time = time.time() - start_time
tokens_used = len(request.user_query.split()) * 2 + len(tool_calls) * 10
return LLMResponse(
query_id=request.query_id,
content=f"Based on your query '{request.user_query}', I'll query the infrastructure data to provide you with the most relevant information.",
tool_calls=tool_calls,
confidence=0.9,
reasoning="Analyzed user query and determined appropriate tools to use",
tokens_used=tokens_used,
processing_time=processing_time,
model_used=self.name
)
class MockMCP:
"""Mock MCP server for demonstration purposes."""
def __init__(self):
self.protocol = StructuredProtocol()
self.cache = {}
async def process_request(self, request: MCPRequest) -> MCPResponse:
"""Process an MCP request."""
start_time = time.time()
# Simulate processing time
await asyncio.sleep(0.2)
# Generate mock data based on tool name
data = self._generate_mock_data(request.tool_name, request.parameters)
processing_time = time.time() - start_time
return MCPResponse(
query_id=request.query_id,
tool_name=request.tool_name,
data=data,
metadata={
"source": "netbox",
"tool_category": self._get_tool_category(request.tool_name),
"timestamp": time.time()
},
confidence=0.95,
processing_time=processing_time,
cache_hit=False
)
def _generate_mock_data(self, tool_name: str, parameters: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Generate mock data based on tool name."""
if "host" in tool_name:
return [
{
"id": 1,
"name": "web-server-01",
"status": "Active",
"primary_ip": "192.168.1.10/24",
"role": "Web Server",
"certainty_score": 0.95
},
{
"id": 2,
"name": "db-server-01",
"status": "Active",
"primary_ip": "192.168.1.20/24",
"role": "Database Server",
"certainty_score": 0.92
}
]
elif "vm" in tool_name:
return [
{
"id": 3,
"name": "app-vm-01",
"status": "Active",
"primary_ip": "192.168.1.30/24",
"role": "Application Server",
"certainty_score": 0.88
}
]
elif "ip" in tool_name:
return [
{
"id": 101,
"address": "192.168.1.10/24",
"status": "Active",
"assigned_to": "web-server-01",
"certainty_score": 0.95
}
]
elif "vlan" in tool_name:
return [
{
"id": 201,
"vid": 100,
"name": "Production VLAN",
"status": "Active",
"certainty_score": 0.90
}
]
else:
return []
def _get_tool_category(self, tool_name: str) -> str:
"""Get tool category from tool name."""
if "host" in tool_name:
return "hosts"
elif "vm" in tool_name:
return "vms"
elif "ip" in tool_name:
return "ips"
elif "vlan" in tool_name:
return "vlans"
else:
return "unknown"
class StructuredFlowDemo:
"""Demonstrates the complete structured data flow."""
def __init__(self):
self.protocol = StructuredProtocol()
self.optimizer = ProtocolOptimizer()
self.llm_models = {
ModelLocation.LOCAL: MockLLM("llama-3.1-8b", ModelLocation.LOCAL),
ModelLocation.CLOUD: MockLLM("gpt-4-turbo", ModelLocation.CLOUD),
ModelLocation.HYBRID: MockLLM("gpt-3.5-turbo", ModelLocation.HYBRID)
}
self.mcp = MockMCP()
self.metrics = {
"total_queries": 0,
"total_processing_time": 0.0,
"total_tokens_used": 0,
"total_cost": 0.0,
"cache_hits": 0
}
async def process_user_query(self, user_id: str, query: str,
context: Dict[str, Any] = None) -> FinalResponse:
"""Process a complete user query through the structured flow."""
start_time = time.time()
self.metrics["total_queries"] += 1
print(f"\nProcessing Query: '{query}'")
print("=" * 60)
# Step 1: Create structured query
structured_query = self.protocol.create_query(
user_id=user_id,
query=query,
context=context or {}
)
print(f"Step 1 - Structured Query: {structured_query.id}")
# Step 2: Router decision
router_decision = self.optimizer.optimize_router_decision(structured_query)
print(f"Step 2 - Router Decision: {router_decision.model_location.value} model, {len(router_decision.tools_needed)} tools")
# Step 3: LLM request
llm_request = self.optimizer.optimize_llm_request(structured_query, router_decision)
print(f"Step 3 - LLM Request: {llm_request.model_config['name']}")
# Step 4: LLM processing
llm_model = self.llm_models[router_decision.model_location]
llm_response = await llm_model.process_request(llm_request)
print(f"Step 4 - LLM Response: {llm_response.tokens_used} tokens, {llm_response.processing_time:.2f}s")
# Step 5: MCP requests
mcp_responses = []
for tool_call in llm_response.tool_calls:
mcp_request = self.protocol.create_mcp_request(
query_id=structured_query.id,
tool_name=tool_call["tool_name"],
parameters=tool_call["parameters"],
context=structured_query.context
)
mcp_response = await self.mcp.process_request(mcp_request)
mcp_responses.append(mcp_response)
print(f"Step 5 - MCP Response: {mcp_response.tool_name} -> {len(mcp_response.data)} items")
# Step 6: Generate final response
all_data = []
sources = []
tools_used = []
for mcp_response in mcp_responses:
all_data.extend(mcp_response.data)
sources.append({
"tool": mcp_response.tool_name,
"count": len(mcp_response.data),
"confidence": mcp_response.confidence
})
tools_used.append(mcp_response.tool_name)
# Calculate overall confidence
overall_confidence = sum(mcp_response.confidence for mcp_response in mcp_responses) / len(mcp_responses) if mcp_responses else 0.0
# Generate answer
answer = self._generate_answer(query, all_data, llm_response.content)
processing_time = time.time() - start_time
# Create final response
final_response = self.protocol.create_final_response(
query_id=structured_query.id,
user_id=user_id,
answer=answer,
data=all_data,
sources=sources,
confidence=overall_confidence,
processing_time=processing_time,
model_used=llm_response.model_used,
tools_used=tools_used,
cost=llm_response.tokens_used * llm_model.cost_per_token
)
# Update metrics
self.metrics["total_processing_time"] += processing_time
self.metrics["total_tokens_used"] += llm_response.tokens_used
self.metrics["total_cost"] += final_response.cost
print(f"Final Response: {len(all_data)} items, {overall_confidence:.2f} confidence, {processing_time:.2f}s")
return final_response
def _generate_answer(self, query: str, data: List[Dict[str, Any]], llm_content: str) -> str:
"""Generate a human-readable answer from the data."""
if not data:
return "No data found for your query."
answer = f"{llm_content}\n\n"
answer += f"Found {len(data)} items:\n\n"
for i, item in enumerate(data[:5], 1): # Limit to 5 items
if "name" in item:
answer += f"{i}. **{item['name']}**"
if "status" in item:
answer += f" - {item['status']}"
if "primary_ip" in item:
answer += f" ({item['primary_ip']})"
answer += "\n"
if len(data) > 5:
answer += f"\n... and {len(data) - 5} more items.\n"
return answer
def print_metrics(self):
"""Print performance metrics."""
print(f"\nPERFORMANCE METRICS")
print("=" * 40)
print(f"Total Queries: {self.metrics['total_queries']}")
print(f"Average Processing Time: {self.metrics['total_processing_time'] / max(self.metrics['total_queries'], 1):.2f}s")
print(f"Total Tokens Used: {self.metrics['total_tokens_used']}")
print(f"Total Cost: ${self.metrics['total_cost']:.4f}")
print(f"Cache Hit Rate: {self.metrics['cache_hits'] / max(self.metrics['total_queries'], 1) * 100:.1f}%")
async def main():
"""Main demonstration function."""
print("STRUCTURED FLOW DEMONSTRATION")
print("=" * 60)
print("Demonstrating: User -> Gateway -> Router -> LLM -> MCP -> Gateway -> User")
print("With full control over both sides for optimal performance")
print("=" * 60)
demo = StructuredFlowDemo()
# Test queries
test_queries = [
{
"user_id": "user123",
"query": "Show me all web servers in the infrastructure",
"context": {"department": "IT", "priority": "high"}
},
{
"user_id": "user456",
"query": "Find all VMs with high confidence scores",
"context": {"department": "Operations", "priority": "medium"}
},
{
"user_id": "user789",
"query": "List all IP addresses in the production VLAN",
"context": {"department": "Security", "priority": "high"}
}
]
# Process each query
for i, query_data in enumerate(test_queries, 1):
print(f"\nTEST QUERY {i}")
print("-" * 40)
response = await demo.process_user_query(
user_id=query_data["user_id"],
query=query_data["query"],
context=query_data["context"]
)
print(f"\nRESPONSE SUMMARY:")
print(f"Answer: {response.answer[:100]}...")
print(f"Data Items: {len(response.data)}")
print(f"Confidence: {response.confidence:.2f}")
print(f"Processing Time: {response.processing_time:.2f}s")
print(f"Model Used: {response.model_used}")
print(f"Tools Used: {', '.join(response.tools_used)}")
print(f"Cost: ${response.cost:.4f}")
# Print final metrics
demo.print_metrics()
print(f"\nDEMONSTRATION COMPLETE!")
print(f"Structured protocol working perfectly!")
print(f"Ready for production deployment!")
if __name__ == "__main__":
asyncio.run(main())