"""
Streamlit frontend with chat bot interface for PM Counter Monitoring
"""
import streamlit as st
import requests
import pandas as pd
from datetime import datetime, timedelta
from typing import Dict, Any, List
import json
from config import Config
# Page configuration
st.set_page_config(
page_title="PM Counter Monitor",
page_icon="📊",
layout="wide"
)
# API endpoints - use service names in Docker, localhost otherwise
import os
docker_env = os.getenv("DOCKER_ENV", "false").lower() == "true"
API_HOST = os.getenv("API_HOST", "localhost" if not docker_env else "api_server")
MCP_HOST = os.getenv("MCP_HOST", "localhost" if not docker_env else "mcp_server")
API_BASE_URL = f"http://{API_HOST}:{Config.API_PORT}"
MCP_BASE_URL = f"http://{MCP_HOST}:{Config.MCP_PORT}"
# Initialize session state
if "messages" not in st.session_state:
st.session_state.messages = []
if "fetch_interval_minutes" not in st.session_state:
# Convert hours to minutes for default
st.session_state.fetch_interval_minutes = int(Config.FETCH_INTERVAL_HOURS * 60)
def call_api(endpoint: str, params: Dict[str, Any] = None, method: str = "GET", files: Dict = None, data: Dict = None) -> Any:
"""Call API endpoint"""
try:
if method == "GET":
response = requests.get(f"{API_BASE_URL}{endpoint}", params=params, timeout=30)
elif method == "POST":
if files:
response = requests.post(f"{API_BASE_URL}{endpoint}", files=files, data=data, timeout=30)
else:
response = requests.post(f"{API_BASE_URL}{endpoint}", json=params, timeout=30)
else:
response = requests.request(method, f"{API_BASE_URL}{endpoint}", params=params, timeout=30)
response.raise_for_status()
return response.json()
except Exception as e:
return {"error": str(e)}
def call_mcp(method: str, params: Dict[str, Any] = None) -> Any:
"""Call MCP server"""
try:
response = requests.post(
f"{MCP_BASE_URL}/mcp",
json={"method": method, "params": params or {}},
timeout=10
)
response.raise_for_status()
result = response.json()
return result.get("result") if "result" in result else result
except Exception as e:
return {"error": str(e)}
def update_fetch_interval(minutes: int):
"""Update fetch interval in job server"""
try:
# Call job server API to update interval (if we add that endpoint)
# For now, we'll just note it needs to be updated
hours = minutes / 60.0
return {"success": True, "hours": hours, "minutes": minutes}
except Exception as e:
return {"error": str(e)}
def process_query(query: str) -> str:
"""
Process natural language query - MCP first, then RAG fallback
"""
from query_router import QueryRouter
# Wrapper functions for router
def call_mcp_wrapper(method: str, params: Dict[str, Any] = None) -> Any:
"""Wrapper for MCP calls"""
result = call_mcp(method, params)
# Return in format router expects
if isinstance(result, dict) and "error" in result:
return result
return result
def call_rag_wrapper(question: str) -> Dict[str, Any]:
"""Wrapper for RAG calls"""
try:
result = call_api("/rag/query", params={"question": question}, method="GET")
if "error" not in result and "response" in result:
return result
else:
return {"error": result.get("error", "Unknown RAG error")}
except Exception as e:
return {"error": str(e)}
# Create router and process query
router = QueryRouter(call_mcp_wrapper, call_rag_wrapper)
routed_result = router.route_query(query)
# Handle special cases that don't go through router
query_lower = query.lower()
# Network elements (direct API call)
if "network" in query_lower and "element" in query_lower:
result = call_api("/network-elements")
if "error" in result:
return f"Error: {result['error']}"
if result:
return f"Network Elements:\n{json.dumps(result, indent=2)}"
return "No network elements found."
# Stats/summary (direct API call)
if "stats" in query_lower or "summary" in query_lower or "status" in query_lower:
result = call_api("/stats/summary")
if "error" in result:
return f"Error: {result['error']}"
return f"System Statistics:\n" \
f"- Total Files: {result.get('total_files', 0)}\n" \
f"- Processed Files: {result.get('processed_files', 0)}\n" \
f"- Total Intervals: {result.get('total_intervals', 0)}\n" \
f"- Network Elements: {result.get('total_network_elements', 0)}"
# Return routed response
response = routed_result.get("response", "No response generated")
source = routed_result.get("source", "unknown")
# Add source indicator (optional, for debugging)
if source == "mcp":
return f"[MCP] {response}"
elif source == "rag":
return f"[RAG] {response}"
else:
return response
def extract_hours(text: str, default: int = 24) -> int:
"""Extract hours from query text"""
import re
patterns = [
r"last\s+(\d+)\s+hours?",
r"(\d+)\s+hours?",
r"past\s+(\d+)\s+hours?"
]
for pattern in patterns:
match = re.search(pattern, text)
if match:
return int(match.group(1))
return default
def extract_interface_name(text: str) -> str:
"""Extract interface name from query"""
import re
patterns = [
r"interface\s+([\w/]+)",
r"([Gg]igabit[Ee]thernet[\w/]+)",
r"([Tt]en[Gg]igabit[Ee]thernet[\w/]+)"
]
for pattern in patterns:
match = re.search(pattern, text)
if match:
return match.group(1)
return ""
def extract_counter_name(text: str) -> str:
"""Extract counter name from query"""
counters = ["ifInOctets", "ifOutOctets", "ifInErrors", "ifOutErrors",
"ifUtilizationIn", "ifUtilizationOut", "ifInUcastPkts", "ifOutUcastPkts"]
for counter in counters:
if counter.lower() in text.lower():
return counter
return ""
def main():
st.title("📊 PM Counter Monitoring System")
# Sidebar for configuration
with st.sidebar:
st.header("⚙️ Settings")
# Fetch interval setting in minutes
st.subheader("Job Server")
fetch_interval_minutes = st.number_input(
"Fetch Interval (minutes)",
min_value=1,
max_value=1440,
value=st.session_state.fetch_interval_minutes,
step=1,
help="How often to fetch files from SFTP (in minutes). Restart job server to apply changes."
)
if fetch_interval_minutes != st.session_state.fetch_interval_minutes:
st.session_state.fetch_interval_minutes = fetch_interval_minutes
hours = fetch_interval_minutes / 60.0
st.info(f"⚠️ Set FETCH_INTERVAL_HOURS={hours:.2f} in .env and restart job server to apply")
st.divider()
# Quick stats
st.subheader("Quick Stats")
try:
stats = call_api("/stats/summary")
if "error" not in stats:
st.metric("Processed Files", stats.get("processed_files", 0))
st.metric("Total Intervals", stats.get("total_intervals", 0))
except:
st.error("Unable to fetch stats")
st.divider()
# API status
st.subheader("Status")
try:
response = requests.get(f"{API_BASE_URL}/", timeout=5)
if response.status_code == 200:
st.success("✅ API Online")
else:
st.error("❌ API Error")
except:
st.error("❌ API Offline")
# File Upload Section
with st.expander("📤 Upload XML Files", expanded=False):
st.write("Upload PM counter XML files to process them immediately")
uploaded_files = st.file_uploader(
"Choose XML files to upload",
type=['xml'],
accept_multiple_files=True,
help="Select one or more PM counter XML files to process"
)
if uploaded_files:
st.write(f"**{len(uploaded_files)} file(s) selected:**")
for f in uploaded_files:
st.write(f"- {f.name} ({f.size:,} bytes)")
if st.button("🚀 Upload and Process", type="primary"):
progress_bar = st.progress(0)
status_text = st.empty()
results = []
total_files = len(uploaded_files)
for idx, uploaded_file in enumerate(uploaded_files):
try:
status_text.text(f"Processing {uploaded_file.name}... ({idx + 1}/{total_files})")
progress_bar.progress((idx + 1) / total_files)
# Prepare file for upload
files = {'file': (uploaded_file.name, uploaded_file.getvalue(), 'application/xml')}
# Upload single file
result = call_api("/upload", method="POST", files=files)
if "error" in result:
results.append({
"filename": uploaded_file.name,
"status": "error",
"message": result["error"]
})
else:
results.append(result)
except Exception as e:
results.append({
"filename": uploaded_file.name,
"status": "error",
"message": str(e)
})
progress_bar.empty()
status_text.empty()
# Display results
if results:
successful = sum(1 for r in results if r.get("status") == "processed")
skipped = sum(1 for r in results if r.get("status") == "skipped")
errors = sum(1 for r in results if r.get("status") == "error")
if successful > 0:
st.success(f"✅ {successful} file(s) processed successfully")
if skipped > 0:
st.warning(f"⚠️ {skipped} file(s) already processed (skipped)")
if errors > 0:
st.error(f"❌ {errors} file(s) had errors")
# Detailed results
for result in results:
if result.get("status") == "processed":
st.success(f"✅ **{result.get('filename')}** - {result.get('intervals', 0)} intervals processed")
elif result.get("status") == "skipped":
st.warning(f"⚠️ **{result.get('filename')}** - Already processed")
else:
st.error(f"❌ **{result.get('filename')}** - {result.get('message', 'Unknown error')}")
st.divider()
# Chat interface
st.subheader("💬 Chat Interface")
st.markdown("Ask questions about your PM counter data")
# Display chat history
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
# Chat input
if prompt := st.chat_input("Ask me about PM counters, interfaces, metrics, etc."):
# Add user message
st.session_state.messages.append({"role": "user", "content": prompt})
with st.chat_message("user"):
st.markdown(prompt)
# Get response
with st.chat_message("assistant"):
with st.spinner("Thinking..."):
response = process_query(prompt)
st.markdown(response)
st.session_state.messages.append({"role": "assistant", "content": response})
if __name__ == "__main__":
main()