Skip to main content
Glama

Agentic MCP Weather System

by Shivbaj
simple_orchestrator.py13.6 kB
""" Simplified Agent Orchestrator for MCP Servers This module provides a simplified orchestrator that can coordinate MCP servers for agentic workflows without requiring LangGraph initially. """ import asyncio import httpx import json from typing import Dict, List, Any, Optional from datetime import datetime from enum import Enum from server_registry import MCPServerRegistry, registry class TaskType(Enum): """Types of tasks the orchestrator can handle.""" WEATHER_QUERY = "weather_query" FORECAST_ANALYSIS = "forecast_analysis" ALERT_MONITORING = "alert_monitoring" MULTI_LOCATION = "multi_location" GENERAL_INQUIRY = "general_inquiry" class SimpleOrchestrator: """Simplified orchestrator for weather-related agentic workflows.""" def __init__(self): self.registry = registry def _classify_task(self, query: str) -> TaskType: """Classify the user query to determine task type.""" query_lower = query.lower() if any(word in query_lower for word in ["forecast", "prediction", "future", "tomorrow", "week"]): return TaskType.FORECAST_ANALYSIS elif any(word in query_lower for word in ["alert", "warning", "storm", "emergency"]): return TaskType.ALERT_MONITORING elif any(word in query_lower for word in ["multiple", "compare", "cities", "locations"]): return TaskType.MULTI_LOCATION elif any(word in query_lower for word in ["weather", "temperature", "current", "now"]): return TaskType.WEATHER_QUERY else: return TaskType.GENERAL_INQUIRY def _extract_locations(self, query: str) -> List[str]: """Extract location information from the query using simple keyword matching.""" # Simple location extraction - in production, use proper NER common_cities = [ "london", "paris", "new york", "tokyo", "sydney", "berlin", "rome", "madrid", "amsterdam", "barcelona", "vienna", "prague", "moscow", "beijing", "mumbai", "delhi", "bangkok", "singapore", "los angeles", "chicago", "houston", "phoenix", "philadelphia", "san francisco", "seattle", "boston", "atlanta", "miami" ] us_states = [ "california", "texas", "florida", "new york", "pennsylvania", "illinois", "ohio", "georgia", "north carolina", "michigan" ] query_lower = query.lower() found_locations = [] # Check for cities for city in common_cities: if city in query_lower: # Capitalize properly found_locations.append(city.title()) # Check for US states for state in us_states: if state in query_lower: found_locations.append(state.title()) # Remove duplicates while preserving order return list(dict.fromkeys(found_locations)) async def _call_weather_tool(self, city: str) -> Dict[str, Any]: """Call the weather tool on the weather server.""" server = self.registry.get_server("weather-server") if not server: return {"error": "Weather server not found"} try: async with httpx.AsyncClient() as client: response = await client.post( f"{server.base_url}/tools/get_weather", json={"city": city}, timeout=10.0 ) return response.json() except Exception as e: return {"error": f"Failed to call weather tool: {str(e)}"} async def _call_forecast_tool(self, latitude: float, longitude: float) -> Dict[str, Any]: """Call the forecast tool on the weather server.""" server = self.registry.get_server("weather-server") if not server: return {"error": "Weather server not found"} try: async with httpx.AsyncClient() as client: response = await client.post( f"{server.base_url}/tools/get_forecast", json={"latitude": latitude, "longitude": longitude}, timeout=15.0 ) return response.json() except Exception as e: return {"error": f"Failed to call forecast tool: {str(e)}"} async def _call_alerts_tool(self, state_code: str) -> Dict[str, Any]: """Call the alerts tool on the weather server.""" server = self.registry.get_server("weather-server") if not server: return {"error": "Weather server not found"} try: async with httpx.AsyncClient() as client: response = await client.post( f"{server.base_url}/tools/get_alerts", json={"state": state_code}, timeout=10.0 ) return response.json() except Exception as e: return {"error": f"Failed to call alerts tool: {str(e)}"} def _get_state_code(self, location: str) -> Optional[str]: """Get US state code for alert queries.""" state_mapping = { "california": "CA", "texas": "TX", "florida": "FL", "new york": "NY", "pennsylvania": "PA", "illinois": "IL", "ohio": "OH", "georgia": "GA", "north carolina": "NC", "michigan": "MI" } return state_mapping.get(location.lower()) def _get_coordinates(self, location: str) -> Optional[tuple]: """Get coordinates for forecast queries (simplified mapping).""" coord_mapping = { "london": (51.5074, -0.1278), "paris": (48.8566, 2.3522), "new york": (40.7128, -74.0060), "tokyo": (35.6762, 139.6503), "sydney": (-33.8688, 151.2093), "berlin": (52.5200, 13.4050), "los angeles": (34.0522, -118.2437), "chicago": (41.8781, -87.6298) } return coord_mapping.get(location.lower()) def _format_response(self, task_type: TaskType, data: Dict[str, Any], locations: List[str]) -> str: """Format the response based on task type and data.""" if task_type == TaskType.WEATHER_QUERY: if len(locations) == 1 and locations[0].lower() in data: weather_data = data[locations[0].lower()] if "error" in weather_data: return f"❌ Error getting weather for {locations[0]}: {weather_data['error']}" return f"🌤️ Current weather in {weather_data.get('city', locations[0])}:\n" \ f" 🌡️ Temperature: {weather_data.get('temperature', 'N/A')}°C\n" \ f" 📝 Conditions: {weather_data.get('description', 'N/A')}" elif task_type == TaskType.MULTI_LOCATION: responses = [] for location in locations: if location.lower() in data: weather_data = data[location.lower()] if "error" not in weather_data: responses.append( f"🌤️ {weather_data.get('city', location)}: " f"{weather_data.get('temperature', 'N/A')}°C, " f"{weather_data.get('description', 'N/A')}" ) else: responses.append(f"❌ {location}: Error - {weather_data['error']}") if responses: return "🗺️ Weather comparison:\n" + "\n".join(f" {resp}" for resp in responses) elif task_type == TaskType.FORECAST_ANALYSIS: if "forecast" in data and locations: forecast_data = data["forecast"] if isinstance(forecast_data, str) and not forecast_data.startswith("Error"): return f"📅 Forecast for {locations[0]}:\n{forecast_data}" elif "error" in str(forecast_data): return f"❌ Error getting forecast: {forecast_data}" elif task_type == TaskType.ALERT_MONITORING: if "alerts" in data and locations: alert_data = data["alerts"] if isinstance(alert_data, str) and "No active alerts" in alert_data: return f"✅ No active weather alerts for {locations[0]}" elif isinstance(alert_data, str) and not alert_data.startswith("Error"): return f"🚨 Weather alerts for {locations[0]}:\n{alert_data}" elif "error" in str(alert_data): return f"❌ Error getting alerts: {alert_data}" # Fallback response return f"🤖 I processed your query but couldn't format a proper response. Raw data: {json.dumps(data, indent=2)}" async def process_query(self, user_query: str) -> Dict[str, Any]: """Process a user query through the simplified orchestrator.""" start_time = datetime.now() execution_log = [] try: # Step 1: Classify task task_type = self._classify_task(user_query) execution_log.append(f"Classified task as: {task_type.value}") # Step 2: Extract locations locations = self._extract_locations(user_query) execution_log.append(f"Extracted locations: {locations}") # Step 3: Gather data based on task type gathered_data = {} if task_type in [TaskType.WEATHER_QUERY, TaskType.MULTI_LOCATION]: # Get weather for all locations for location in locations: weather_data = await self._call_weather_tool(location) gathered_data[location.lower()] = weather_data execution_log.append(f"Gathered weather data for {len(locations)} locations") elif task_type == TaskType.FORECAST_ANALYSIS and locations: # Get forecast for first location (simplified) coords = self._get_coordinates(locations[0]) if coords: forecast_data = await self._call_forecast_tool(coords[0], coords[1]) gathered_data["forecast"] = forecast_data execution_log.append("Gathered forecast data") else: gathered_data["forecast"] = {"error": "Coordinates not available for this location"} execution_log.append("Could not get coordinates for forecast") elif task_type == TaskType.ALERT_MONITORING and locations: # Get alerts for US states for location in locations: state_code = self._get_state_code(location) if state_code: alert_data = await self._call_alerts_tool(state_code) gathered_data["alerts"] = alert_data execution_log.append(f"Gathered alert data for {location}") break # Only first US state for simplicity # Step 4: Format response response = self._format_response(task_type, gathered_data, locations) execution_log.append("Generated response") end_time = datetime.now() execution_time = (end_time - start_time).total_seconds() return { "success": True, "response": response, "task_type": task_type.value, "locations": locations, "execution_log": execution_log, "execution_time": execution_time, "raw_data": gathered_data, "metadata": { "timestamp": start_time.isoformat(), "query": user_query } } except Exception as e: execution_log.append(f"Error: {str(e)}") return { "success": False, "error": str(e), "execution_log": execution_log, "metadata": { "timestamp": start_time.isoformat(), "query": user_query } } async def main(): """Example usage of the simplified orchestrator.""" orchestrator = SimpleOrchestrator() test_queries = [ "What's the weather like in London?", "Compare weather in New York and Paris", "What's the forecast for Tokyo tomorrow?", "Any weather alerts in California?", ] print("🤖 Simplified Agent Orchestrator Demo") print("=" * 50) for query in test_queries: print(f"\n📝 Query: {query}") print("-" * 40) result = await orchestrator.process_query(query) if result["success"]: print(f"✅ Response ({result['execution_time']:.2f}s):") print(f" {result['response']}") print(f"🏷️ Task: {result['task_type']} | Locations: {result['locations']}") else: print(f"❌ Error: {result['error']}") if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Shivbaj/MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server