Skip to main content
Glama
ApoorvBrooklyn

PM Counter Monitoring MCP Server

streamlit_app.py12.6 kB
""" Streamlit frontend with chat bot interface for PM Counter Monitoring """ import streamlit as st import requests import pandas as pd from datetime import datetime, timedelta from typing import Dict, Any, List import json from config import Config # Page configuration st.set_page_config( page_title="PM Counter Monitor", page_icon="📊", layout="wide" ) # API endpoints - use service names in Docker, localhost otherwise import os docker_env = os.getenv("DOCKER_ENV", "false").lower() == "true" API_HOST = os.getenv("API_HOST", "localhost" if not docker_env else "api_server") MCP_HOST = os.getenv("MCP_HOST", "localhost" if not docker_env else "mcp_server") API_BASE_URL = f"http://{API_HOST}:{Config.API_PORT}" MCP_BASE_URL = f"http://{MCP_HOST}:{Config.MCP_PORT}" # Initialize session state if "messages" not in st.session_state: st.session_state.messages = [] if "fetch_interval_minutes" not in st.session_state: # Convert hours to minutes for default st.session_state.fetch_interval_minutes = int(Config.FETCH_INTERVAL_HOURS * 60) def call_api(endpoint: str, params: Dict[str, Any] = None, method: str = "GET", files: Dict = None, data: Dict = None) -> Any: """Call API endpoint""" try: if method == "GET": response = requests.get(f"{API_BASE_URL}{endpoint}", params=params, timeout=30) elif method == "POST": if files: response = requests.post(f"{API_BASE_URL}{endpoint}", files=files, data=data, timeout=30) else: response = requests.post(f"{API_BASE_URL}{endpoint}", json=params, timeout=30) else: response = requests.request(method, f"{API_BASE_URL}{endpoint}", params=params, timeout=30) response.raise_for_status() return response.json() except Exception as e: return {"error": str(e)} def call_mcp(method: str, params: Dict[str, Any] = None) -> Any: """Call MCP server""" try: response = requests.post( f"{MCP_BASE_URL}/mcp", json={"method": method, "params": params or {}}, timeout=10 ) response.raise_for_status() result = response.json() return result.get("result") if "result" in result else result except Exception as e: return {"error": str(e)} def update_fetch_interval(minutes: int): """Update fetch interval in job server""" try: # Call job server API to update interval (if we add that endpoint) # For now, we'll just note it needs to be updated hours = minutes / 60.0 return {"success": True, "hours": hours, "minutes": minutes} except Exception as e: return {"error": str(e)} def process_query(query: str) -> str: """ Process natural language query - MCP first, then RAG fallback """ from query_router import QueryRouter # Wrapper functions for router def call_mcp_wrapper(method: str, params: Dict[str, Any] = None) -> Any: """Wrapper for MCP calls""" result = call_mcp(method, params) # Return in format router expects if isinstance(result, dict) and "error" in result: return result return result def call_rag_wrapper(question: str) -> Dict[str, Any]: """Wrapper for RAG calls""" try: result = call_api("/rag/query", params={"question": question}, method="GET") if "error" not in result and "response" in result: return result else: return {"error": result.get("error", "Unknown RAG error")} except Exception as e: return {"error": str(e)} # Create router and process query router = QueryRouter(call_mcp_wrapper, call_rag_wrapper) routed_result = router.route_query(query) # Handle special cases that don't go through router query_lower = query.lower() # Network elements (direct API call) if "network" in query_lower and "element" in query_lower: result = call_api("/network-elements") if "error" in result: return f"Error: {result['error']}" if result: return f"Network Elements:\n{json.dumps(result, indent=2)}" return "No network elements found." # Stats/summary (direct API call) if "stats" in query_lower or "summary" in query_lower or "status" in query_lower: result = call_api("/stats/summary") if "error" in result: return f"Error: {result['error']}" return f"System Statistics:\n" \ f"- Total Files: {result.get('total_files', 0)}\n" \ f"- Processed Files: {result.get('processed_files', 0)}\n" \ f"- Total Intervals: {result.get('total_intervals', 0)}\n" \ f"- Network Elements: {result.get('total_network_elements', 0)}" # Return routed response response = routed_result.get("response", "No response generated") source = routed_result.get("source", "unknown") # Add source indicator (optional, for debugging) if source == "mcp": return f"[MCP] {response}" elif source == "rag": return f"[RAG] {response}" else: return response def extract_hours(text: str, default: int = 24) -> int: """Extract hours from query text""" import re patterns = [ r"last\s+(\d+)\s+hours?", r"(\d+)\s+hours?", r"past\s+(\d+)\s+hours?" ] for pattern in patterns: match = re.search(pattern, text) if match: return int(match.group(1)) return default def extract_interface_name(text: str) -> str: """Extract interface name from query""" import re patterns = [ r"interface\s+([\w/]+)", r"([Gg]igabit[Ee]thernet[\w/]+)", r"([Tt]en[Gg]igabit[Ee]thernet[\w/]+)" ] for pattern in patterns: match = re.search(pattern, text) if match: return match.group(1) return "" def extract_counter_name(text: str) -> str: """Extract counter name from query""" counters = ["ifInOctets", "ifOutOctets", "ifInErrors", "ifOutErrors", "ifUtilizationIn", "ifUtilizationOut", "ifInUcastPkts", "ifOutUcastPkts"] for counter in counters: if counter.lower() in text.lower(): return counter return "" def main(): st.title("📊 PM Counter Monitoring System") # Sidebar for configuration with st.sidebar: st.header("⚙️ Settings") # Fetch interval setting in minutes st.subheader("Job Server") fetch_interval_minutes = st.number_input( "Fetch Interval (minutes)", min_value=1, max_value=1440, value=st.session_state.fetch_interval_minutes, step=1, help="How often to fetch files from SFTP (in minutes). Restart job server to apply changes." ) if fetch_interval_minutes != st.session_state.fetch_interval_minutes: st.session_state.fetch_interval_minutes = fetch_interval_minutes hours = fetch_interval_minutes / 60.0 st.info(f"⚠️ Set FETCH_INTERVAL_HOURS={hours:.2f} in .env and restart job server to apply") st.divider() # Quick stats st.subheader("Quick Stats") try: stats = call_api("/stats/summary") if "error" not in stats: st.metric("Processed Files", stats.get("processed_files", 0)) st.metric("Total Intervals", stats.get("total_intervals", 0)) except: st.error("Unable to fetch stats") st.divider() # API status st.subheader("Status") try: response = requests.get(f"{API_BASE_URL}/", timeout=5) if response.status_code == 200: st.success("✅ API Online") else: st.error("❌ API Error") except: st.error("❌ API Offline") # File Upload Section with st.expander("📤 Upload XML Files", expanded=False): st.write("Upload PM counter XML files to process them immediately") uploaded_files = st.file_uploader( "Choose XML files to upload", type=['xml'], accept_multiple_files=True, help="Select one or more PM counter XML files to process" ) if uploaded_files: st.write(f"**{len(uploaded_files)} file(s) selected:**") for f in uploaded_files: st.write(f"- {f.name} ({f.size:,} bytes)") if st.button("🚀 Upload and Process", type="primary"): progress_bar = st.progress(0) status_text = st.empty() results = [] total_files = len(uploaded_files) for idx, uploaded_file in enumerate(uploaded_files): try: status_text.text(f"Processing {uploaded_file.name}... ({idx + 1}/{total_files})") progress_bar.progress((idx + 1) / total_files) # Prepare file for upload files = {'file': (uploaded_file.name, uploaded_file.getvalue(), 'application/xml')} # Upload single file result = call_api("/upload", method="POST", files=files) if "error" in result: results.append({ "filename": uploaded_file.name, "status": "error", "message": result["error"] }) else: results.append(result) except Exception as e: results.append({ "filename": uploaded_file.name, "status": "error", "message": str(e) }) progress_bar.empty() status_text.empty() # Display results if results: successful = sum(1 for r in results if r.get("status") == "processed") skipped = sum(1 for r in results if r.get("status") == "skipped") errors = sum(1 for r in results if r.get("status") == "error") if successful > 0: st.success(f"✅ {successful} file(s) processed successfully") if skipped > 0: st.warning(f"⚠️ {skipped} file(s) already processed (skipped)") if errors > 0: st.error(f"❌ {errors} file(s) had errors") # Detailed results for result in results: if result.get("status") == "processed": st.success(f"✅ **{result.get('filename')}** - {result.get('intervals', 0)} intervals processed") elif result.get("status") == "skipped": st.warning(f"⚠️ **{result.get('filename')}** - Already processed") else: st.error(f"❌ **{result.get('filename')}** - {result.get('message', 'Unknown error')}") st.divider() # Chat interface st.subheader("💬 Chat Interface") st.markdown("Ask questions about your PM counter data") # Display chat history for message in st.session_state.messages: with st.chat_message(message["role"]): st.markdown(message["content"]) # Chat input if prompt := st.chat_input("Ask me about PM counters, interfaces, metrics, etc."): # Add user message st.session_state.messages.append({"role": "user", "content": prompt}) with st.chat_message("user"): st.markdown(prompt) # Get response with st.chat_message("assistant"): with st.spinner("Thinking..."): response = process_query(prompt) st.markdown(response) st.session_state.messages.append({"role": "assistant", "content": response}) if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ApoorvBrooklyn/Networking-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server