Skip to main content
Glama

Agentic MCP Weather System

by Shivbaj
streamlit_app.py20.9 kB
""" Streamlit Web UI for Weather Agent Coordination System A ChatGPT-like interface for interacting with your weather MCP system with agent-to-agent coordination capabilities. """ import streamlit as st import asyncio import json import time from datetime import datetime from typing import Dict, Any, List, Optional import sys import os # Configure Streamlit page settings FIRST st.set_page_config( page_title="Weather AI Assistant", page_icon="🌤️", layout="wide", initial_sidebar_state="expanded" ) # Add current directory to path for imports sys.path.append(os.path.dirname(os.path.abspath(__file__))) # Import your agent coordination system try: from agent_coordination_hub import AgentCoordinationHub from smart_alert_agent import AlertAgent from weather_intelligence_agent import WeatherIntelligenceAgent from simple_orchestrator import SimpleOrchestrator AGENTS_AVAILABLE = True AGENT_ERROR = None except Exception as e: AGENTS_AVAILABLE = False AGENT_ERROR = str(e) class StreamlitWeatherChat: """Streamlit-based chat interface for weather agent coordination.""" def __init__(self): self.initialize_session_state() if AGENTS_AVAILABLE: try: self.coordination_hub = AgentCoordinationHub() self.alert_agent = AlertAgent() self.intelligence_agent = WeatherIntelligenceAgent() self.simple_orchestrator = SimpleOrchestrator() self.system_available = True except Exception as e: st.error(f"Failed to initialize agents: {e}") self.system_available = False else: st.error(f"⚠️ Agent system not available: {AGENT_ERROR}") self.system_available = False def setup_page_config(self): """Configure Streamlit page settings.""" # This is now done at module level pass def initialize_session_state(self): """Initialize Streamlit session state variables.""" if "messages" not in st.session_state: st.session_state.messages = [] if "alert_subscriptions" not in st.session_state: st.session_state.alert_subscriptions = [] if "last_query_type" not in st.session_state: st.session_state.last_query_type = None if "processing" not in st.session_state: st.session_state.processing = False def render_header(self): """Render the main header.""" st.markdown(""" <div style="text-align: center; padding: 20px; background: linear-gradient(90deg, #1e3c72, #2a5298); color: white; border-radius: 10px; margin-bottom: 20px;"> <h1>🌤️ Weather AI Assistant</h1> <h3>Agent Coordination System with Multi-Source Intelligence</h3> <p>ChatGPT-like interface for weather, alerts, and intelligent forecasting!</p> </div> """, unsafe_allow_html=True) def render_sidebar(self): """Render the sidebar with system info and controls.""" with st.sidebar: st.markdown("## 🤖 Agent System Status") if self.system_available: st.success("✅ All agents online") # Agent status indicators st.markdown("### Active Agents:") st.markdown("- 🌤️ **Weather Agent**: Real-time weather data") st.markdown("- 🚨 **Alert Agent**: Proactive monitoring") st.markdown("- 🧠 **Intelligence Agent**: Multi-source consensus") st.markdown("- 🎯 **Coordination Hub**: Smart routing") else: st.error("❌ Agent system unavailable") st.markdown("Please check Docker containers are running.") st.markdown("---") # Quick action buttons st.markdown("## 🚀 Quick Actions") if st.button("🔄 System Status Check"): self.run_system_check() if st.button("🚨 View Active Alerts"): self.show_active_alerts() if st.button("🧠 Test Intelligence"): self.test_intelligence_agent() if st.button("🗑️ Clear Chat History"): st.session_state.messages = [] st.rerun() st.markdown("---") # Sample queries st.markdown("## 💡 Sample Queries") sample_queries = [ "What's the weather in San Francisco?", "Set up weather alerts for New York", "Get accurate weather from multiple sources for Chicago", "Compare weather in Paris and London", "Monitor severe weather for my locations", "Plan travel weather for Tokyo this weekend" ] for query in sample_queries: if st.button(f"💬 {query}", key=f"sample_{hash(query)}"): st.session_state.messages.append({"role": "user", "content": query}) st.rerun() def render_chat_interface(self): """Render the main chat interface.""" # Chat container chat_container = st.container() with chat_container: # Display chat messages for message in st.session_state.messages: with st.chat_message(message["role"]): if message["role"] == "assistant": self.render_assistant_message(message) else: st.markdown(message["content"]) # Chat input if prompt := st.chat_input("Ask about weather, set up alerts, or get intelligent forecasts..."): # Add user message st.session_state.messages.append({"role": "user", "content": prompt}) # Display user message with st.chat_message("user"): st.markdown(prompt) # Process and display assistant response with st.chat_message("assistant"): self.process_user_query(prompt) def render_assistant_message(self, message: Dict[str, Any]): """Render assistant message with rich formatting.""" content = message["content"] # Ensure content is a dictionary if isinstance(content, str): st.markdown(content) return # Display main response response_text = content.get("response", "No response available") if response_text: st.markdown(response_text) # Display metadata if available if "metadata" in content and content["metadata"]: metadata = content["metadata"] with st.expander("🔍 Query Details"): col1, col2, col3 = st.columns(3) with col1: coordination_type = metadata.get("coordination_type", "Unknown") st.metric("Coordination Type", coordination_type) with col2: sources_used = metadata.get("sources_used") if sources_used is not None: st.metric("Data Sources", sources_used) with col3: confidence = metadata.get("confidence") if confidence is not None: st.metric("Confidence", f"{confidence*100:.0f}%") # Show execution log if available if "execution_log" in content and content["execution_log"]: with st.expander("📋 Execution Log"): for i, log_entry in enumerate(content["execution_log"], 1): st.text(f"{i}. {log_entry}") # Show additional data if available if "additional_data" in content and content["additional_data"]: additional = content["additional_data"] if "alerts" in additional and additional["alerts"]: with st.expander("🚨 Alert Details"): for alert in additional["alerts"]: alert_location = alert.get('location', 'Unknown') if isinstance(alert, dict) else 'Unknown' alert_condition = alert.get('condition', 'Unknown condition') if isinstance(alert, dict) else str(alert) st.warning(f"**{alert_location}**: {alert_condition}") if "weather_data" in additional: with st.expander("🌤️ Detailed Weather Data"): weather_data = additional["weather_data"] # Check if weather_data is valid and not empty if weather_data and isinstance(weather_data, (dict, list)): try: st.json(weather_data) except Exception as e: st.warning(f"Unable to display weather data: {e}") st.text(str(weather_data)) else: st.info("No detailed weather data available") def process_user_query(self, query: str): """Process user query through the agent coordination system.""" if not self.system_available: st.error("❌ Agent system is not available. Please check if Docker containers are running.") return # Show processing indicator with st.spinner("🤖 Processing your query through agent coordination..."): try: # Run async query processing result = asyncio.run(self._async_process_query(query)) # Format and display response formatted_response = self.format_agent_response(result, query) # Add to chat history st.session_state.messages.append({ "role": "assistant", "content": formatted_response }) # Display the response self.render_assistant_message({ "role": "assistant", "content": formatted_response }) except Exception as e: error_msg = f"❌ Error processing query: {str(e)}" st.error(error_msg) st.session_state.messages.append({ "role": "assistant", "content": {"response": error_msg} }) async def _async_process_query(self, query: str) -> Dict[str, Any]: """Async wrapper for query processing.""" return await self.coordination_hub.process_coordinated_query(query) def format_agent_response(self, result: Dict[str, Any], query: str) -> Dict[str, Any]: """Format agent response for display in chat.""" if not result or not isinstance(result, dict): return { "response": "❌ **Invalid response**: Unable to process query response", "metadata": { "coordination_type": "error", "success": False } } if not result.get("success", False): error_msg = result.get('error', 'Unknown error') return { "response": f"❌ **Query failed**: {error_msg}", "metadata": { "coordination_type": result.get("coordination_used", "unknown"), "success": False } } coordination_type = result.get("coordination_used", "unknown") # Format response based on coordination type try: if coordination_type == "intelligence": return self.format_intelligence_response(result) elif coordination_type == "alerts": return self.format_alert_response(result) elif coordination_type.endswith("weather_agent"): return self.format_weather_response(result) else: return self.format_generic_response(result) except Exception as e: # Fallback for any formatting errors return { "response": f"✅ **Query processed successfully**\n\nResponse: {result.get('response', 'No response available')}", "metadata": { "coordination_type": coordination_type, "success": True, "format_error": str(e) }, "execution_log": result.get("execution_log", []) } def format_intelligence_response(self, result: Dict[str, Any]) -> Dict[str, Any]: """Format intelligence agent response.""" consensus = result.get("consensus") if consensus and hasattr(consensus, 'temperature'): # Handle consensus object with attributes response = f""" ## 🧠 Multi-Source Weather Intelligence **Temperature**: {consensus.temperature}°F **Conditions**: {consensus.conditions} **Humidity**: {consensus.humidity}% **Confidence**: {consensus.confidence_score*100:.0f}% **Sources**: {consensus.source_count} ({', '.join(consensus.data_sources)}) ### Intelligence Analysis {result.get('intelligence_report', 'No detailed analysis available')} """ metadata = { "coordination_type": "Multi-Source Intelligence", "sources_used": consensus.source_count, "confidence": consensus.confidence_score, } additional_data = {"weather_data": { "temperature": consensus.temperature, "conditions": consensus.conditions, "humidity": consensus.humidity, "confidence": consensus.confidence_score, "sources": consensus.data_sources }} elif consensus and isinstance(consensus, dict): # Handle consensus as dictionary response = f""" ## 🧠 Multi-Source Weather Intelligence **Temperature**: {consensus.get('temperature', 'N/A')}°F **Conditions**: {consensus.get('conditions', 'N/A')} **Humidity**: {consensus.get('humidity', 'N/A')}% **Confidence**: {consensus.get('confidence_score', 0)*100:.0f}% **Sources**: {consensus.get('source_count', 0)} ### Intelligence Analysis {result.get('intelligence_report', 'No detailed analysis available')} """ metadata = { "coordination_type": "Multi-Source Intelligence", "sources_used": consensus.get('source_count', 0), "confidence": consensus.get('confidence_score', 0), } additional_data = {"weather_data": consensus} else: response = "🧠 **Multi-source intelligence analysis completed**" metadata = { "coordination_type": "Multi-Source Intelligence", "sources_used": 0, "confidence": 0, } additional_data = {"weather_data": result.get("consensus", {})} return { "response": response.strip(), "metadata": metadata, "additional_data": additional_data } def format_alert_response(self, result: Dict[str, Any]) -> Dict[str, Any]: """Format alert agent response.""" alerts = result.get("alerts", []) if alerts: response = f"🚨 **Active Weather Alerts** ({len(alerts)} found)\n\n" for alert in alerts[:3]: # Show top 3 alerts response += f"**📍 {alert.get('location', 'Unknown')}**: {alert.get('severity', 'Unknown')} - {', '.join(alert.get('conditions', []))}\n" else: response = "✅ **No active weather alerts** - All monitored locations have normal conditions" return { "response": response, "metadata": { "coordination_type": "Smart Weather Alerts", "alerts_count": len(alerts) }, "additional_data": { "alerts": alerts } } def format_weather_response(self, result: Dict[str, Any]) -> Dict[str, Any]: """Format standard weather response.""" response_text = result.get("response", "Weather information retrieved") return { "response": f"🌤️ **Weather Information**\n\n{response_text}", "metadata": { "coordination_type": "Weather Data", "locations": result.get("locations", []) }, "execution_log": result.get("execution_log", []) } def format_generic_response(self, result: Dict[str, Any]) -> Dict[str, Any]: """Format generic response.""" return { "response": result.get("response", "Query processed successfully"), "metadata": { "coordination_type": result.get("coordination_used", "Unknown"), "success": result.get("success", False) }, "execution_log": result.get("execution_log", []) } def run_system_check(self): """Run a quick system status check.""" with st.spinner("🔍 Checking system status..."): try: if self.system_available: # Test basic functionality test_query = "system status check" result = asyncio.run(self._async_process_query("What's the weather?")) st.success("✅ System check passed - All agents responding") st.info(f"Coordination available: {result.get('coordination_used', 'Unknown')}") else: st.error("❌ System check failed - Agents not available") except Exception as e: st.error(f"❌ System check failed: {e}") def show_active_alerts(self): """Show current active alerts.""" with st.spinner("🚨 Checking active alerts..."): try: alerts = asyncio.run(self.alert_agent.check_alerts_for_all_subscriptions()) if alerts: st.warning(f"🚨 {len(alerts)} active alerts found:") for alert in alerts: st.error(f"**{alert.get('location', 'Unknown')}**: {', '.join(alert.get('conditions', []))}") else: st.success("✅ No active alerts - All conditions normal") except Exception as e: st.error(f"❌ Error checking alerts: {e}") def test_intelligence_agent(self): """Test the intelligence agent with a sample query.""" with st.spinner("🧠 Testing multi-source intelligence..."): try: result = asyncio.run(self.intelligence_agent.get_consensus_weather("San Francisco")) if result.get("success"): consensus = result["consensus"] st.success("✅ Intelligence agent test successful") st.info(f"Temperature: {consensus.temperature}°F, Confidence: {consensus.confidence_score*100:.0f}%") else: st.warning("⚠️ Intelligence agent test completed with warnings") except Exception as e: st.error(f"❌ Intelligence agent test failed: {e}") def run(self): """Run the Streamlit application.""" self.render_header() # Create layout col1, col2 = st.columns([1, 3]) with col1: self.render_sidebar() with col2: self.render_chat_interface() # Footer st.markdown("---") st.markdown(""" <div style="text-align: center; color: #666; font-size: 12px;"> Weather AI Assistant v1.0 | Agent Coordination System | Powered by MCP & Ollama </div> """, unsafe_allow_html=True) def main(): """Main application entry point.""" app = StreamlitWeatherChat() app.run() if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Shivbaj/MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server