Skip to main content
Glama

MCP Agent Tracker

by Big0290
context_ui.py86.4 kB
#!/usr/bin/env python3 """ Simple Context Management UI - MINIMAL VERSION A clean, user-friendly interface for managing our context system """ import streamlit as st import pandas as pd import sqlite3 import json from datetime import datetime, timedelta import plotly.express as px import plotly.graph_objects as go from pathlib import Path import sys import os import time # Add current directory to Python path sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) # Import our context system try: from context_manager import SeamlessContextManager CONTEXT_SYSTEM_AVAILABLE = True except ImportError as e: st.error(f"Context system not available: {e}") st.info("💡 Make sure all dependencies are installed: pip install -r requirements_ui.txt") CONTEXT_SYSTEM_AVAILABLE = False try: from models_unified import get_database_url, get_session_factory MODELS_AVAILABLE = True except ImportError as e: st.warning(f"Models not available: {e}") MODELS_AVAILABLE = False # Page configuration st.set_page_config( page_title="Context Manager", page_icon="🧠", layout="wide", initial_sidebar_state="expanded" ) # MINIMAL CSS - Only basic dark theme, NO chart interference st.markdown(""" <style> /* MINIMAL CSS - Only basic dark theme */ .stApp { background: #0f172a !important; } .main .block-container { background: #1e293b !important; color: #f8fafc !important; } [data-testid="stSidebar"] { background: #1e293b !important; color: #f8fafc !important; } h1, h2, h3, h4, h5, h6, p, span, div { color: #f8fafc !important; } .main-header { color: #f8fafc !important; font-size: 3rem !important; text-align: center; margin: 2rem 0; } .section-header { color: #f8fafc !important; font-size: 2rem !important; margin: 1.5rem 0 1rem 0; padding: 0.5rem 0; border-bottom: 2px solid #475569; } /* NO CHART CSS - Let Plotly handle charts naturally */ </style> """, unsafe_allow_html=True) def init_database_connection(): """Initialize database connection""" try: # Use unified database (agent_tracker.db) first db_path = "./data/agent_tracker.db" if os.path.exists(db_path): conn = sqlite3.connect(db_path) return conn # Fallback to local database if main doesn't exist db_path = "./data/agent_tracker_local.db" if os.path.exists(db_path): conn = sqlite3.connect(db_path) return conn return None except Exception as e: st.error(f"Database connection failed: {e}") return None def get_system_stats(conn): """Get system statistics""" if not conn: return None try: stats = {} # Total interactions result = conn.execute("SELECT COUNT(*) FROM interactions") stats['total_interactions'] = result.fetchone()[0] # Total sessions result = conn.execute("SELECT COUNT(*) FROM sessions") stats['total_sessions'] = result.fetchone()[0] # Total contexts (from interactions metadata) result = conn.execute(""" SELECT COUNT(DISTINCT session_id) FROM interactions WHERE session_id IS NOT NULL """) stats['total_contexts'] = result.fetchone()[0] # Recent activity (last hour) result = conn.execute(""" SELECT COUNT(*) FROM interactions WHERE timestamp > datetime('now', '-1 hour') """) stats['recent_activity'] = result.fetchone()[0] # Error rate - Updated to match new schema result = conn.execute(""" SELECT COUNT(*) FROM interactions WHERE status = 'error' OR error_message IS NOT NULL """) error_count = result.fetchone()[0] stats['error_rate'] = error_count return stats except Exception as e: st.error(f"Error getting system stats: {e}") return None def show_dashboard(conn): """Show main dashboard""" st.markdown('<div class="section-header">📊 System Overview</div>', unsafe_allow_html=True) # Add refresh button col1, col2 = st.columns([3, 1]) with col1: st.write("") # Spacer with col2: if st.button("🔄 Refresh Data", type="primary"): st.rerun() # Get system stats stats = get_system_stats(conn) if not stats: st.warning("Unable to load system statistics") return # Show data freshness and last refresh time col1, col2 = st.columns([2, 1]) with col1: st.info(f"📊 Data snapshot taken at: {datetime.now().strftime('%H:%M:%S')}") with col2: if st.button("🔄 Refresh Now", type="secondary"): st.rerun() # Metrics row col1, col2, col3, col4, col5 = st.columns(5) with col1: st.metric("Total Interactions", stats.get('total_interactions', 0)) with col2: st.metric("Active Sessions", stats.get('total_sessions', 0)) with col3: st.metric("Context Objects", stats.get('total_contexts', 0)) with col4: st.metric("Recent Activity (1h)", stats.get('recent_activity', 0)) with col5: error_rate = stats.get('error_rate', 0) st.metric("Errors", error_rate, delta=None, delta_color="inverse") # Charts row col1, col2 = st.columns(2) with col1: st.markdown('<div class="section-header">📈 Activity Timeline</div>', unsafe_allow_html=True) if conn: try: # Get hourly activity for last 24 hours query = """ SELECT strftime('%H', timestamp) as hour, COUNT(*) as count FROM interactions WHERE timestamp > datetime('now', '-1 day') GROUP BY hour ORDER BY hour """ hourly_data = pd.read_sql(query, conn) # Debug info st.write(f"Debug: Found {len(hourly_data)} hourly data points") if not hourly_data.empty: st.write("Debug: Sample data:", hourly_data.head()) if not hourly_data.empty: fig = px.bar(hourly_data, x='hour', y='count', title="Interactions by Hour (Last 24h)") # Use white background with dark text for maximum readability fig.update_traces(marker_color='#1e40af', marker_line_color='#0f172a', marker_line_width=1) fig.update_layout( plot_bgcolor='white', paper_bgcolor='white', font=dict(color='#0f172a', size=16), xaxis=dict(gridcolor='#e2e8f0', zerolinecolor='#475569', color='#0f172a'), yaxis=dict(gridcolor='#e2e8f0', zerolinecolor='#475569', color='#0f172a'), title=dict(font=dict(color='#0f172a', size=18)) ) st.plotly_chart(fig, use_container_width=True) else: st.info("No recent activity data available - showing sample chart") # Create sample chart when no data sample_hours = [str(i).zfill(2) for i in range(24)] sample_counts = [0] * 24 sample_data = pd.DataFrame({'hour': sample_hours, 'count': sample_counts}) fig = px.bar(sample_data, x='hour', y='count', title="Sample Activity Timeline (No Data Yet)") st.plotly_chart(fig, use_container_width=True) except Exception as e: st.error(f"Error loading activity data: {e}") st.write("Debug: Creating fallback chart due to error") # Create fallback chart on error sample_hours = [str(i).zfill(2) for i in range(24)] sample_counts = [0] * 24 sample_data = pd.DataFrame({'hour': sample_hours, 'count': sample_counts}) fig = px.bar(sample_data, x='hour', y='count', title="Fallback Activity Timeline") st.plotly_chart(fig, use_container_width=True) with col2: st.markdown('<div class="section-header">🔍 Interaction Types</div>', unsafe_allow_html=True) if conn: try: query = """ SELECT interaction_type, COUNT(*) as count FROM interactions GROUP BY interaction_type """ type_data = pd.read_sql(query, conn) # Debug info st.write(f"Debug: Found {len(type_data)} interaction types") if not type_data.empty: st.write("Debug: Sample data:", type_data.head()) if not type_data.empty: fig = px.pie(type_data, values='count', names='interaction_type', title="Interaction Type Distribution") # Use white background with dark text for maximum readability fig.update_traces(marker_colors=['#1e40af', '#7c3aed', '#059669', '#dc2626']) fig.update_layout( plot_bgcolor='white', paper_bgcolor='white', font=dict(color='#0f172a', size=16), title=dict(font=dict(color='#0f172a', size=18)) ) st.plotly_chart(fig, use_container_width=True) else: st.info("No interaction type data available - showing sample chart") # Create sample chart when no data sample_types = ['client_request', 'agent_response', 'conversation_turn', 'error'] sample_counts = [1, 1, 1, 1] # At least 1 of each to show the chart sample_data = pd.DataFrame({'interaction_type': sample_types, 'count': sample_counts}) fig = px.pie(sample_data, values='count', names='interaction_type', title="Sample Interaction Types (No Data Yet)") st.plotly_chart(fig, use_container_width=True) except Exception as e: st.error(f"Error loading type data: {e}") st.write("Debug: Creating fallback chart due to error") # Create fallback chart on error sample_types = ['client_request', 'agent_response', 'conversation_turn', 'error'] sample_counts = [1, 1, 1, 1] sample_data = pd.DataFrame({'interaction_type': sample_types, 'count': sample_counts}) fig = px.pie(sample_data, values='count', names='interaction_type', title="Fallback Interaction Types") st.plotly_chart(fig, use_container_width=True) # New SQLAlchemy 2.x Features Section st.markdown('<div class="section-header">🚀 SQLAlchemy 2.x Features</div>', unsafe_allow_html=True) col1, col2, col3 = st.columns(3) with col1: st.info("**🔄 Shared Registry**") st.write("✅ SQLAlchemy 2.x shared registry configured") st.write("✅ Models properly mapped to Base") st.write("✅ Session factory bound to registry") with col2: st.info("**📊 Enhanced Fields**") st.write("✅ `tool_name` field support") st.write("✅ `parameters` JSON field") st.write("✅ `error_message` field") st.write("✅ `interaction_metadata` field") with col3: st.info("**🔧 System Status**") st.write("✅ Database operations working") st.write("✅ Session management active") st.write("✅ Context injection enabled") st.write("✅ Conversation tracking operational") def show_interactions(conn): """Show interactions management""" st.markdown('<div class="section-header">💬 Interactions</div>', unsafe_allow_html=True) # Add refresh button col1, col2 = st.columns([3, 1]) with col1: st.write("") # Spacer with col2: if st.button("🔄 Refresh Interactions", type="primary"): st.rerun() # Filters col1, col2, col3 = st.columns(3) with col1: interaction_type = st.selectbox( "Interaction Type", ["All", "client_request", "agent_response", "conversation_turn", "error"] ) with col2: status_filter = st.selectbox( "Status", ["All", "success", "error", "timeout"] ) with col3: date_filter = st.date_input( "Date", value=datetime.now().date() ) # Build query query = "SELECT * FROM interactions WHERE 1=1" params = [] if interaction_type != "All": query += " AND interaction_type = ?" params.append(interaction_type) if status_filter != "All": query += " AND status = ?" params.append(status_filter) if date_filter: query += " AND DATE(timestamp) = ?" params.append(date_filter.strftime('%Y-%m-%d')) query += " ORDER BY timestamp DESC LIMIT 100" # Execute query if conn: try: interactions = pd.read_sql(query, conn, params=params) if not interactions.empty: st.success(f"Found {len(interactions)} interactions") st.dataframe(interactions, use_container_width=True) else: st.info("No interactions found with the selected filters") except Exception as e: st.error(f"Error loading interactions: {e}") else: st.error("Database connection not available") def show_sessions(conn): """Show sessions management""" st.markdown('<div class="section-header">🔄 Sessions</div>', unsafe_allow_html=True) # Add refresh button col1, col2 = st.columns([3, 1]) with col1: st.write("") # Spacer with col2: if st.button("🔄 Refresh Sessions", type="primary"): st.rerun() if conn: try: # Get recent sessions - FIXED to match actual schema query = """ SELECT id, started_at, last_activity, total_interactions, user_id FROM sessions ORDER BY last_activity DESC LIMIT 50 """ # Debug: Show the query being executed st.write(f"Debug: Executing query: {query}") sessions = pd.read_sql(query, conn) # Debug: Show what we found st.write(f"Debug: Found {len(sessions)} sessions") if not sessions.empty: st.write("Debug: Sample data:", sessions.head()) if not sessions.empty: st.success(f"Found {len(sessions)} sessions") st.dataframe(sessions, use_container_width=True) else: st.info("No sessions found") except Exception as e: st.error(f"Error loading sessions: {e}") st.write("Debug: Full error details:", str(e)) # Show the actual table schema to help debug try: cursor = conn.execute("PRAGMA table_info(sessions)") schema_info = cursor.fetchall() st.write("Debug: Table schema:", schema_info) except Exception as schema_error: st.write("Debug: Could not get table schema:", str(schema_error)) else: st.error("Database connection not available") def show_contexts(conn): """Show contexts management""" st.markdown('<div class="section-header">🧠 Contexts</div>', unsafe_allow_html=True) if conn: try: # Get recent contexts - FIXED to match actual schema query = """ SELECT id, session_id, context_type, relevance_score, usage_count, created_at, context_summary, user_id FROM conversation_contexts ORDER BY created_at DESC LIMIT 50 """ # Debug: Show the query being executed (commented out for clean UI) # st.write(f"Debug: Executing query: {query}") contexts = pd.read_sql(query, conn) # Debug: Show what we found (commented out for clean UI) # st.write(f"Debug: Found {len(contexts)} contexts") # if not contexts.empty: # st.write("Debug: Sample data:", contexts.head()) if not contexts.empty: st.success(f"Found {len(contexts)} contexts") st.dataframe(contexts, use_container_width=True) else: st.info("No contexts found") except Exception as e: st.error(f"Error loading contexts: {e}") st.write("Debug: Full error details:", str(e)) # Show the actual table schema to help debug try: cursor = conn.execute("PRAGMA table_info(conversation_contexts)") schema_info = cursor.fetchall() st.write("Debug: Table schema:", schema_info) except Exception as schema_error: st.write("Debug: Could not get table schema:", str(schema_error)) else: st.error("Database connection not available") def show_system_status(conn): """Show system status and health""" st.header("⚙️ System Status") # Context system status st.subheader("🧠 Context System") if CONTEXT_SYSTEM_AVAILABLE: st.success("✅ Context system is available") # Try to initialize context manager try: with st.spinner("Initializing context manager..."): context_manager = SeamlessContextManager(auto_start=False) st.success("✅ Context manager initialized successfully") # Show system capabilities st.subheader("🔧 Available Capabilities") for system_name, system_info in context_manager.context_systems.items(): status_color = "🟢" if system_info['status'] == 'available' else "🔴" st.write(f"{status_color} **{system_name}**: {system_info['status']}") if system_info['capabilities']: st.write(f" Capabilities: {', '.join(system_info['capabilities'])}") except Exception as e: st.error(f"❌ Failed to initialize context manager: {e}") else: st.error("❌ Context system is not available") # Database status st.subheader("🗄️ Database Status") if conn: st.success("✅ Database connection successful") # Test queries try: # Test basic queries test_interactions = pd.read_sql("SELECT COUNT(*) as count FROM interactions", conn) test_sessions = pd.read_sql("SELECT COUNT(*) as count FROM sessions", conn) test_contexts = pd.read_sql(""" SELECT COUNT(DISTINCT session_id) as count FROM interactions WHERE session_id IS NOT NULL """, conn) st.write(f"✅ Interactions table: {test_interactions.iloc[0]['count']} records") st.write(f"✅ Sessions table: {test_sessions.iloc[0]['count']} records") st.write(f"✅ Active contexts: {test_contexts.iloc[0]['count']} sessions") except Exception as e: st.error(f"❌ Database test failed: {e}") else: st.error("❌ Database connection failed") # System info st.subheader("ℹ️ System Information") col1, col2 = st.columns(2) with col1: st.write(f"**Python Version**: {sys.version}") st.write(f"**Working Directory**: {os.getcwd()}") if MODELS_AVAILABLE: st.write(f"**Database Path**: {get_database_url()}") else: st.write("**Database Path**: N/A (models not available)") with col2: st.write(f"**Current Time**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") st.write(f"**Streamlit Version**: {st.__version__}") st.write(f"**Pandas Version**: {pd.__version__}") def show_prompt_crafting(conn): """Show prompt crafting tools""" st.markdown('<div class="section-header">🛠️ Prompt Crafting Tools</div>', unsafe_allow_html=True) # Add refresh button and data freshness indicator col1, col2 = st.columns([3, 1]) with col1: st.write(""" Welcome to your **Prompt Crafting Workshop**! 🚀 Here you can access all the powerful prompt enhancement tools that automatically inject conversation context, project knowledge, and user preferences into your prompts for maximum AI effectiveness. """) with col2: if st.button("🔄 Refresh Data", type="primary"): st.rerun() # Show data freshness and current counts try: total_interactions = pd.read_sql("SELECT COUNT(*) as count FROM interactions", conn).iloc[0]['count'] total_sessions = pd.read_sql("SELECT COUNT(*) as count FROM sessions", conn).iloc[0]['count'] st.info(f"📊 **Live Data**: {total_interactions} interactions, {total_sessions} sessions | Updated: {datetime.now().strftime('%H:%M:%S')}") except Exception as e: st.warning(f"Could not get live data: {e}") st.write("---") # Tool Selection st.subheader("🔧 Available Tools") tool_choice = st.selectbox( "Choose a prompt crafting tool:", [ "🤖 Smart Context Injector", "📝 Basic Prompt Enhancement", "🚀 Enhanced Chat with Context", "⚡ Real-time Context Injection", "🧠 Auto Context Wrapper", "🔍 MCP Server Debug Tool", "📋 Interaction History Browser", "📊 Prompt Performance Analytics", "💬 Conversation Analysis Tool" ] ) if tool_choice == "🤖 Smart Context Injector": show_smart_context_injector(conn) elif tool_choice == "📝 Basic Prompt Enhancement": show_basic_prompt_enhancement(conn) elif tool_choice == "🚀 Enhanced Chat with Context": show_enhanced_chat_tool(conn) elif tool_choice == "⚡ Real-time Context Injection": show_realtime_context_tool(conn) elif tool_choice == "🧠 Auto Context Wrapper": show_auto_context_wrapper(conn) elif tool_choice == "🔍 MCP Server Debug Tool": show_mcp_server_debug_tool(conn) elif tool_choice == "📋 Interaction History Browser": show_interaction_history_browser(conn) elif tool_choice == "📊 Prompt Performance Analytics": show_prompt_analytics(conn) elif tool_choice == "💬 Conversation Analysis Tool": show_conversation_analysis_tool(conn) def show_basic_prompt_enhancement(conn): """Show basic prompt enhancement tool""" st.subheader("📝 Basic Prompt Enhancement") st.write(""" This tool allows you to enhance a simple user message with basic context. It injects a placeholder for context, which will be replaced by the MCP server. """) # Add tabs for different views tab1, tab2 = st.tabs(["🔧 Enhance New Prompt", "📊 Context Analysis"]) with tab1: st.write("**Enhance a new prompt:**") user_message = st.text_area( "User Message:", placeholder="Enter a simple user message (e.g., 'What's the weather?')", height=100 ) if st.button("Enhance Prompt"): if user_message.strip(): with st.spinner("Enhancing prompt..."): try: # Call real MCP server enhanced_prompt = call_mcp_tool(user_message, "process_prompt_with_context") st.success("Prompt enhanced successfully!") st.text_area("Enhanced Prompt:", enhanced_prompt, height=400) except Exception as e: st.error(f"Error enhancing prompt: {e}") else: st.warning("Please enter a user message to enhance.") with tab2: st.write("**Analyze recent prompt context:**") if not conn: st.error("Database connection not available") return try: # Get recent prompt interactions prompt_query = """ SELECT id, timestamp, prompt, response, execution_time_ms, session_id, interaction_metadata FROM interactions WHERE prompt IS NOT NULL OR response IS NOT NULL ORDER BY timestamp DESC LIMIT 5 """ prompt_history = pd.read_sql(prompt_query, conn) if not prompt_history.empty: for idx, prompt in prompt_history.iterrows(): with st.expander(f"📝 Prompt {prompt['id']} - {prompt['timestamp'][:19]}", expanded=False): col1, col2 = st.columns(2) with col1: st.markdown("**📤 Original Prompt:**") if prompt['prompt']: st.text_area("Prompt:", prompt['prompt'], height=80, key=f"prompt_orig_{idx}") else: st.info("No prompt data") with col2: st.markdown("**📥 Enhanced Response:**") if prompt['response']: st.text_area("Response:", prompt['response'], height=80, key=f"prompt_resp_{idx}") else: st.info("No response data") if prompt['execution_time_ms']: st.caption(f"⏱️ Processing time: {prompt['execution_time_ms']}ms") if prompt['session_id']: st.caption(f"🆔 Session: {prompt['session_id'][:8]}...") if prompt['interaction_metadata']: st.markdown("**📊 Context Metadata:**") st.json(prompt['interaction_metadata']) else: st.info("No prompt history found") except Exception as e: st.error(f"Error loading prompt history: {e}") def show_enhanced_chat_tool(conn): """Show enhanced chat tool""" st.subheader("🚀 Enhanced Chat with Context") st.write(""" This tool allows you to have a conversation with the AI, but with enhanced context. It will automatically inject context from the conversation history and recent interactions. """) # Add tabs for different views tab1, tab2 = st.tabs(["💬 New Chat", "📚 Chat History"]) with tab1: st.write("**Start a new enhanced chat:**") user_message = st.text_area( "User Message:", placeholder="Enter a message to chat with the AI...", height=100 ) if st.button("Chat with Enhanced Context"): if user_message.strip(): with st.spinner("Chatting with enhanced context..."): try: # Call real MCP server enhanced_prompt = call_mcp_tool(user_message, "enhanced_chat") st.success("Chat completed with enhanced context!") st.text_area("Enhanced Chat Response:", enhanced_prompt, height=400) except Exception as e: st.error(f"Error in chat: {e}") else: st.warning("Please enter a message to chat with.") with tab2: st.write("**View your recent chat history:**") if not conn: st.error("Database connection not available") return try: # Get recent chat interactions chat_query = """ SELECT id, timestamp, client_request, agent_response, prompt, response, execution_time_ms, session_id FROM interactions WHERE interaction_type IN ('conversation_turn', 'client_request', 'agent_response') ORDER BY timestamp DESC LIMIT 10 """ chat_history = pd.read_sql(chat_query, conn) if not chat_history.empty: for idx, chat in chat_history.iterrows(): with st.expander(f"💬 Chat {chat['id']} - {chat['timestamp'][:19]}", expanded=False): col1, col2 = st.columns(2) with col1: st.markdown("**📤 Your Message:**") if chat['client_request']: st.text_area("Request:", chat['client_request'], height=80, key=f"chat_req_{idx}") elif chat['prompt']: st.text_area("Prompt:", chat['prompt'], height=80, key=f"chat_prompt_{idx}") with col2: st.markdown("**📥 AI Response:**") if chat['agent_response']: st.text_area("Response:", chat['agent_response'], height=80, key=f"chat_resp_{idx}") elif chat['response']: st.text_area("Response:", chat['response'], height=80, key=f"chat_resp_{idx}") if chat['execution_time_ms']: st.caption(f"⏱️ Response time: {chat['execution_time_ms']}ms") if chat['session_id']: st.caption(f"🆔 Session: {chat['session_id'][:8]}...") else: st.info("No chat history found") except Exception as e: st.error(f"Error loading chat history: {e}") def show_realtime_context_tool(conn): """Show real-time context injection tool""" st.subheader("⚡ Real-time Context Injection") st.write(""" This tool allows you to inject context directly into a prompt. It's useful for testing context injection without going through the full MCP server flow. """) user_message = st.text_area( "User Message:", placeholder="Enter a message to inject context into...", height=100 ) if st.button("Inject Context"): if user_message.strip(): with st.spinner("Injecting context..."): try: # Simulate context injection context_summary = "Recent interactions, UI development, database fixes" injected_elements = ["conversation_summary", "action_history", "tech_stack", "project_plans", "user_preferences", "agent_metadata"] enhanced_prompt = call_mcp_tool(user_message, "process_prompt_with_context") # Simulate the cursor agent payload cursor_agent_payload = { "enhanced_prompt": enhanced_prompt, "original_prompt": user_message, "context_summary": context_summary, "injected_elements": injected_elements, "processing_metadata": { "tool_used": "realtime_context_injection", "enhancement_success": True, "context_relevance_score": 0.95 } } st.success("Context injected successfully!") st.text_area("Enhanced Prompt:", enhanced_prompt, height=400) st.json(cursor_agent_payload) except Exception as e: st.error(f"Error injecting context: {e}") else: st.warning("Please enter a message to inject context into.") def show_auto_context_wrapper(conn): """Show auto context wrapper tool""" st.subheader("🧠 Auto Context Wrapper") st.write(""" This tool allows you to wrap a prompt in a context-aware structure. It's useful for testing how the MCP server handles context-aware prompts. """) user_message = st.text_area( "User Message:", placeholder="Enter a message to wrap in context...", height=100 ) if st.button("Wrap in Context"): if user_message.strip(): with st.spinner("Wrapping in context..."): try: # Simulate context injection context_summary = "Recent interactions, UI development, database fixes" injected_elements = ["conversation_summary", "action_history", "tech_stack", "project_plans", "user_preferences", "agent_metadata"] enhanced_prompt = call_mcp_tool(user_message, "process_prompt_with_context") # Simulate the cursor agent payload cursor_agent_payload = { "enhanced_prompt": enhanced_prompt, "original_prompt": user_message, "context_summary": context_summary, "injected_elements": injected_elements, "processing_metadata": { "tool_used": "auto_context_wrapper", "enhancement_success": True, "context_relevance_score": 0.98 } } st.success("Prompt wrapped in context successfully!") st.text_area("Enhanced Prompt:", enhanced_prompt, height=400) st.json(cursor_agent_payload) except Exception as e: st.error(f"Error wrapping in context: {e}") else: st.warning("Please enter a message to wrap in context.") def show_prompt_analytics(conn): """Show prompt performance analytics""" st.subheader("📊 Prompt Performance Analytics") st.write(""" This tool allows you to analyze the performance of your prompt enhancement pipeline. It can show you how many prompts were enhanced, the average enhancement ratio, and the success rate of context injection. """) # Placeholder for analytics data # In a real application, you'd query your database for these metrics total_prompts_enhanced = 150 avg_enhancement_ratio = 2.8 success_rate_context_injection = 98.5 col1, col2, col3 = st.columns(3) with col1: st.metric("Total Prompts Enhanced", total_prompts_enhanced) with col2: st.metric("Avg Enhancement Ratio", f"{avg_enhancement_ratio:.1f}x") with col3: st.metric("Context Injection Success Rate", f"{success_rate_context_injection}%") # Example: Bar chart for success rate by tool st.markdown('<div class="section-header">🔍 Success Rate by Tool</div>', unsafe_allow_html=True) if conn: try: # Simulate fetching data from database tools = ["process_prompt_with_context", "enhanced_chat", "realtime_context_injection", "auto_context_wrapper"] success_rates = [98.5, 97.0, 99.0, 99.5] # Placeholder data fig = px.bar(x=tools, y=success_rates, title="Success Rate of Context Injection by Tool") fig.update_traces(marker_color='#1e40af') fig.update_layout( plot_bgcolor='white', paper_bgcolor='white', font=dict(color='#0f172a', size=16), xaxis=dict(gridcolor='#e2e8f0', zerolinecolor='#475569', color='#0f172a'), yaxis=dict(gridcolor='#e2e8f0', zerolinecolor='#475569', color='#0f172a'), title=dict(font=dict(color='#0f172a', size=18)) ) st.plotly_chart(fig, use_container_width=True) except Exception as e: st.error(f"Error loading analytics data: {e}") st.write("Debug: Creating fallback chart due to error") # Create fallback chart on error tools = ["process_prompt_with_context", "enhanced_chat", "realtime_context_injection", "auto_context_wrapper"] success_rates = [98.5, 97.0, 99.0, 99.5] fig = px.bar(x=tools, y=success_rates, title="Fallback Success Rate by Tool") st.plotly_chart(fig, use_container_width=True) def show_mcp_server_debug_tool(conn): """Show MCP Server Debug Tool - Request/Response Flow""" st.subheader("🔍 MCP Server Debug Tool") st.write(""" This tool shows you the **complete data flow** between your UI and the MCP server: 1. **📤 What gets sent TO the MCP server** (request payload) 2. **🔄 How the MCP server processes it** (internal processing) 3. **📥 What comes BACK from the MCP server** (response payload) 4. **🎯 What gets sent to your Cursor agent** (final output) This is crucial for debugging the prompt enhancement pipeline and understanding the data transformation. """) # Debug options st.subheader("🔧 Debug Options") col1, col2 = st.columns(2) with col1: debug_tool = st.selectbox( "MCP Tool to test:", [ "process_prompt_with_context", "enhanced_chat", "agent_interaction", "get_conversation_summary", "get_interaction_history" ] ) with col2: debug_level = st.selectbox( "Debug level:", ["Basic", "Detailed", "Full Trace"] ) # Test input test_input = st.text_area( "Test input:", placeholder="Enter text to test with the MCP server...", height=100 ) if st.button("🔍 Debug MCP Server Flow", type="primary"): if test_input.strip(): with st.spinner("Debugging MCP server communication..."): try: # Simulate the complete MCP server flow debug_results = simulate_mcp_server_debug( test_input, debug_tool, debug_level, conn ) # Display the complete flow display_mcp_debug_results(debug_results, debug_level) except Exception as e: st.error(f"Error in MCP server debug: {e}") else: st.warning("Please enter test input") def simulate_mcp_server_debug(test_input: str, tool_name: str, debug_level: str, conn) -> dict: """Simulate the complete MCP server request/response flow""" # Step 1: What gets sent TO the MCP server request_payload = { "tool_name": tool_name, "parameters": {"user_message": test_input}, "timestamp": datetime.now().isoformat(), "session_id": "debug_session_123", "request_id": f"req_{hash(test_input) % 10000}" } # Step 2: MCP server internal processing processing_steps = [ "Received request from UI", "Validated parameters", "Retrieved conversation context from database", "Generated enhanced prompt with context injection", "Processed through prompt processor", "Generated response with full context awareness", "Logged interaction for future reference" ] # Step 3: What comes BACK from MCP server response_payload = { "status": "success", "result": call_mcp_tool(test_input, tool_name), "processing_time_ms": 45, "context_injected": True, "enhancement_ratio": 3.2, "timestamp": datetime.now().isoformat() } # Step 4: What gets sent to Cursor agent cursor_agent_payload = { "enhanced_prompt": response_payload["result"], "original_prompt": test_input, "context_summary": "20+ interactions, recent focus on UI and database fixes", "injected_elements": [ "conversation_summary", "action_history", "tech_stack", "project_plans", "user_preferences", "agent_metadata" ], "processing_metadata": { "tool_used": tool_name, "enhancement_success": True, "context_relevance_score": 0.92 } } return { "request_payload": request_payload, "processing_steps": processing_steps, "response_payload": response_payload, "cursor_agent_payload": cursor_agent_payload, "debug_level": debug_level } def call_mcp_tool(test_input: str, tool_name: str) -> str: """Call real MCP server functions instead of simulating""" try: # Import the real MCP server functions from local_mcp_server_simple import ( process_prompt_with_context, agent_interaction ) # Import enhanced chat with semantic capabilities from enhanced_chat_integration import enhanced_chat if tool_name == "process_prompt_with_context": return process_prompt_with_context(test_input) elif tool_name == "enhanced_chat": return enhanced_chat(test_input) elif tool_name == "agent_interaction": return agent_interaction(test_input) else: return f"Tool '{tool_name}' response: {test_input} (enhanced with context)" except ImportError as e: # Fallback to simulation if MCP server not available st.warning(f"MCP server not available: {e}") return f"""=== FALLBACK ENHANCED PROMPT === USER MESSAGE: {test_input} === CONTEXT INJECTION (FALLBACK) === This is a fallback response because the MCP server is not available. To get real enhanced prompts, make sure the MCP server is running. === END FALLBACK PROMPT ===""" except Exception as e: st.error(f"Error calling MCP server: {e}") return f"❌ Error: {str(e)}" def display_mcp_debug_results(debug_results: dict, debug_level: str): """Display the complete MCP server debug flow""" st.subheader("🔍 Complete MCP Server Flow") # Step 1: Request to MCP Server st.markdown("#### 📤 1. Request TO MCP Server") st.json(debug_results["request_payload"]) # Step 2: MCP Server Processing st.markdown("#### 🔄 2. MCP Server Internal Processing") for i, step in enumerate(debug_results["processing_steps"], 1): st.write(f"{i}. {step}") # Step 3: Response from MCP Server st.markdown("#### 📥 3. Response FROM MCP Server") st.json(debug_results["response_payload"]) # Step 4: What goes to Cursor Agent st.markdown("#### 🎯 4. Final Output to Cursor Agent") st.json(debug_results["cursor_agent_payload"]) # Summary metrics st.subheader("📊 Flow Summary") col1, col2, col3, col4 = st.columns(4) with col1: st.metric("Request Size", f"{len(str(debug_results['request_payload']))} chars") with col2: st.metric("Response Size", f"{len(str(debug_results['response_payload']))} chars") with col3: st.metric("Enhancement Ratio", f"{debug_results['response_payload']['enhancement_ratio']:.1f}x") with col4: st.metric("Processing Time", f"{debug_results['response_payload']['processing_time_ms']}ms") # Show the actual enhanced content if debug_level in ["Detailed", "Full Trace"]: st.subheader("📝 Enhanced Content Preview") enhanced_content = debug_results["response_payload"]["result"] st.text_area("Enhanced Content:", enhanced_content, height=400) st.info(f"Full content length: {len(enhanced_content)} characters") # Show what was injected if debug_level == "Full Trace": st.subheader("🔍 Context Injection Details") injected_elements = debug_results["cursor_agent_payload"]["injected_elements"] st.write("**Context elements injected:**") for element in injected_elements: st.write(f"✅ {element}") metadata = debug_results["cursor_agent_payload"]["processing_metadata"] st.write("**Processing metadata:**") st.json(metadata) def show_interaction_history_browser(conn): """Show interaction history browser with full request/response details""" st.subheader("📋 Interaction History Browser") # Add refresh button col1, col2 = st.columns([3, 1]) with col1: st.write("") # Spacer with col2: if st.button("🔄 Refresh Browser", type="primary"): st.rerun() st.write(""" This tool shows you a **complete list of all interactions** and allows you to drill down into each one to see exactly **what was sent** and **what was the response**. Perfect for auditing and debugging! """) if not conn: st.error("Database connection not available") return try: # Get all interactions with pagination page_size = 20 page = st.number_input("Page", min_value=1, value=1) offset = (page - 1) * page_size # Add filtering options st.write("**Filter Options:**") st.info("💡 **Conversations Only**: Shows your actual chat interactions. **System Only**: Shows monitoring data. **All Types**: Shows everything.") # Quick filter buttons with dynamic counts col1, col2, col3 = st.columns(3) # Get dynamic counts from database try: all_count = pd.read_sql("SELECT COUNT(*) as count FROM interactions", conn).iloc[0]['count'] conv_count = pd.read_sql("SELECT COUNT(*) as count FROM interactions WHERE interaction_type IN ('conversation_turn', 'client_request', 'agent_response', 'user_prompt')", conn).iloc[0]['count'] sys_count = pd.read_sql("SELECT COUNT(*) as count FROM interactions WHERE interaction_type IN ('health_check', 'monitoring_started', 'module_import', 'system_startup', 'system_shutdown')", conn).iloc[0]['count'] except Exception as e: st.warning(f"Could not get live counts: {e}") all_count, conv_count, sys_count = 0, 0, 0 with col1: if st.button(f"🔄 Show All ({all_count})", type="primary", use_container_width=True): st.session_state.quick_filter = "All Types" st.rerun() with col2: if st.button(f"💬 Conversations Only ({conv_count})", use_container_width=True): st.session_state.quick_filter = "Conversations Only" st.rerun() with col3: if st.button(f"⚙️ System Only ({sys_count})", use_container_width=True): st.session_state.quick_filter = "System Only" st.rerun() col1, col2 = st.columns(2) with col1: # Use quick filter if set, otherwise use selectbox if 'quick_filter' in st.session_state: filter_type = st.session_state.quick_filter st.info(f"Quick filter active: {filter_type}") # Clear quick filter after use del st.session_state.quick_filter else: filter_type = st.selectbox( "Filter by Interaction Type:", options=["All Types", "Conversations Only", "System Only", "Custom Filter"], index=0, # Default to "All Types" help="Choose what type of interactions to display" ) with col2: if filter_type == "Custom Filter": custom_types = st.multiselect( "Select specific types:", options=["conversation_turn", "client_request", "agent_response", "user_prompt", "health_check", "monitoring_started", "module_import"], default=["conversation_turn", "client_request", "agent_response", "user_prompt"] ) else: custom_types = [] # Build query based on filter base_query = """ SELECT id, timestamp, interaction_type, status, user_id, session_id, prompt, response, full_content, execution_time_ms, parameters, meta_data FROM interactions """ # Add WHERE clause based on filter if filter_type == "Conversations Only": base_query += " WHERE interaction_type IN ('conversation_turn', 'client_request', 'agent_response', 'user_prompt')" elif filter_type == "System Only": base_query += " WHERE interaction_type IN ('health_check', 'monitoring_started', 'module_import', 'system_startup', 'system_shutdown')" elif filter_type == "Custom Filter" and custom_types: placeholders = ','.join(['?'] * len(custom_types)) base_query += f" WHERE interaction_type IN ({placeholders})" base_query += " ORDER BY timestamp DESC" # Get filtered total count count_query = base_query.replace("SELECT \n id,\n timestamp,\n interaction_type,\n status,\n user_id,\n session_id,\n prompt,\n response,\n full_content,\n execution_time_ms,\n parameters,\n meta_data", "SELECT COUNT(*) as count") count_query = count_query.replace(" ORDER BY timestamp DESC", "") try: if filter_type == "Custom Filter" and custom_types: total_count = pd.read_sql(count_query, conn, params=custom_types).iloc[0]['count'] else: total_count = pd.read_sql(count_query, conn).iloc[0]['count'] except Exception as e: st.error(f"Error getting count: {e}") total_count = 0 total_pages = (total_count + page_size - 1) // page_size st.write(f"**Filtered Interactions**: {total_count} | **Page {page} of {total_pages}**") # Show interaction type breakdown st.subheader("📊 Interaction Type Breakdown") breakdown_query = """ SELECT interaction_type, COUNT(*) as count FROM interactions GROUP BY interaction_type ORDER BY count DESC """ try: breakdown = pd.read_sql(breakdown_query, conn) if not breakdown.empty: col1, col2 = st.columns(2) with col1: st.write("**All Interaction Types:**") for _, row in breakdown.iterrows(): st.write(f"• {row['interaction_type']}: {row['count']}") with col2: # Create a simple bar chart fig = px.bar(breakdown, x='interaction_type', y='count', title="Interaction Type Distribution") st.plotly_chart(fig, use_container_width=True) except Exception as e: st.warning(f"Could not load breakdown: {e}") # Add LIMIT and OFFSET to base query query = base_query + " LIMIT ? OFFSET ?" try: if filter_type == "Custom Filter" and custom_types: # For custom filter, we need to pass both the filter types and pagination params params = custom_types + [page_size, offset] else: # For other filters, just pagination params params = [page_size, offset] interactions = pd.read_sql(query, conn, params=params) except Exception as e: st.error(f"Error loading interactions: {e}") st.write("Debug: Database query failed") st.write(f"Query: {query}") st.write(f"Params: {params}") return if not interactions.empty: # Display interactions table st.subheader(f"📊 Interactions (Page {page})") # Create a more readable display display_data = [] for _, row in interactions.iterrows(): display_data.append({ "ID": row['id'], "Timestamp": row['timestamp'], "Type": row['interaction_type'], "Status": row['status'], "User": row['user_id'] or "N/A", "Session": row['session_id'][:8] + "..." if row['session_id'] else "N/A", "Prompt Length": len(str(row['prompt'])) if row['prompt'] else 0, "Response Length": len(str(row['response'])) if row['response'] else 0, "Exec Time": f"{row['execution_time_ms']}ms" if row['execution_time_ms'] else "N/A" }) display_df = pd.DataFrame(display_data) st.dataframe(display_df, use_container_width=True) # Pagination controls col1, col2, col3 = st.columns(3) with col1: if page > 1: if st.button("⬅️ Previous Page"): st.rerun() with col2: st.write(f"Page {page} of {total_pages}") with col3: if page < total_pages: if st.button("Next Page ➡️"): st.rerun() # Interaction detail viewer st.subheader("🔍 Interaction Detail Viewer") selected_id = st.selectbox( "Select an interaction to view details:", options=interactions['id'].tolist(), format_func=lambda x: f"ID {x} - {interactions[interactions['id'] == x]['timestamp'].iloc[0]}" ) if selected_id: show_interaction_details(selected_id, conn) else: st.info("No interactions found") except Exception as e: st.error(f"Error loading interaction history: {e}") st.write("Debug: Full error details:", str(e)) def show_interaction_details(interaction_id: int, conn): """Show detailed view of a specific interaction""" try: # Get full interaction details query = """ SELECT * FROM interactions WHERE id = ? """ interaction = pd.read_sql(query, conn, params=[interaction_id]) if not interaction.empty: interaction = interaction.iloc[0] st.subheader(f"🔍 Interaction Details - ID {interaction_id}") # Basic info col1, col2, col3, col4 = st.columns(4) with col1: st.metric("Type", interaction['interaction_type']) with col2: st.metric("Status", interaction['status']) with col3: st.metric("Execution Time", f"{interaction['execution_time_ms']}ms" if interaction['execution_time_ms'] else "N/A") with col4: st.metric("Timestamp", interaction['timestamp']) # Request/Response comparison st.subheader("📤 Request vs 📥 Response") col1, col2 = st.columns(2) with col1: st.markdown("#### 📤 **What Was Sent**") try: if interaction['prompt']: # Check if prompt contains error information prompt_text = str(interaction['prompt']) if '"ERROR":' in prompt_text or '"error":' in prompt_text: st.warning("⚠️ This interaction contains error information") st.text_area("Original Prompt (with errors):", prompt_text, height=150) else: st.text_area("Original Prompt:", prompt_text, height=150) st.write(f"**Length**: {len(prompt_text)} characters") else: st.info("No prompt data available") except Exception as e: st.error(f"Error displaying prompt: {e}") st.text_area("Raw Prompt Data:", str(interaction.get('prompt', 'N/A')), height=150) # Show other request data if interaction['parameters']: st.write("**Parameters:**") try: # Try to parse as JSON, fallback to raw text if it fails if isinstance(interaction['parameters'], str): parsed_params = json.loads(interaction['parameters']) st.json(parsed_params) else: st.json(interaction['parameters']) except (json.JSONDecodeError, TypeError) as e: st.warning(f"⚠️ Invalid JSON in parameters: {e}") st.text_area("Raw Parameters:", str(interaction['parameters']), height=200) if interaction['tool_name']: st.write(f"**Tool Used**: {interaction['tool_name']}") with col2: st.markdown("#### 📥 **What Was Returned**") try: if interaction['response']: # Check if response contains error information response_text = str(interaction['response']) if '"ERROR":' in response_text or '"error":' in response_text: st.warning("⚠️ This response contains error information") st.text_area("Server Response (with errors):", response_text, height=150) else: st.text_area("Server Response:", response_text, height=150) st.write(f"**Length**: {len(response_text)} characters") else: st.info("No response data available") except Exception as e: st.error(f"Error displaying response: {e}") st.text_area("Raw Response Data:", str(interaction.get('response', 'N/A')), height=150) # Show response metadata if interaction['full_content']: st.write("**Full Content:**") st.text_area("Full Content:", interaction['full_content'], height=200) if interaction['context_summary']: st.write("**Context Summary:**") st.text_area("Context:", interaction['context_summary'], height=200) # Enhancement analysis if interaction['prompt'] and interaction['response']: st.subheader("📊 Enhancement Analysis") prompt_len = len(str(interaction['prompt'])) response_len = len(str(interaction['response'])) enhancement_ratio = response_len / prompt_len if prompt_len > 0 else 0 col1, col2, col3 = st.columns(3) with col1: st.metric("Original Length", prompt_len) with col2: st.metric("Response Length", response_len) with col3: st.metric("Enhancement Ratio", f"{enhancement_ratio:.1f}x") # Show what was added if enhancement_ratio > 1: st.success(f"✅ Response enhanced by {enhancement_ratio:.1f}x") st.write("**Context was injected to enhance the response**") else: st.info("ℹ️ No enhancement applied") # Additional metadata st.subheader("🔍 Additional Metadata") metadata_cols = ['user_id', 'session_id', 'error_message', 'tool_name', 'parameters', 'interaction_metadata', 'meta_data'] metadata_data = {} for col in metadata_cols: if interaction[col]: try: if col == 'meta_data' and isinstance(interaction[col], str): # Try to parse JSON, fallback to raw text if it fails try: metadata_data[col] = json.loads(interaction[col]) except (json.JSONDecodeError, TypeError): metadata_data[col] = f"Invalid JSON: {interaction[col]}" else: metadata_data[col] = interaction[col] except Exception as e: metadata_data[col] = f"Error processing {col}: {str(e)}" if metadata_data: st.json(metadata_data) else: st.info("No additional metadata available") else: st.error(f"Interaction ID {interaction_id} not found") except Exception as e: st.error(f"Error loading interaction details: {e}") st.write("Debug: Full error details:", str(e)) def show_smart_context_injector(conn): """Show Smart Context Injector with Automatic Stack Detection""" st.subheader("🤖 Smart Context Injector") st.write(""" This **revolutionary tool** automatically detects your project's tech stack and injects **intelligent, project-specific context** while maintaining your portable user preferences! 🚀 **Features:** - **Automatic tech stack detection** (Python, Node.js, React, Rust, Go, Java, PHP, .NET) - **Project-specific patterns** and best practices - **Common issues and solutions** for your tech stack - **Development workflow** recommendations - **Portable user preferences** that work across all projects """) # Project path input project_path = st.text_input( "Project Path (leave empty for current directory):", value=os.getcwd(), help="Path to the project you want to analyze" ) # Initialize smart context injector try: from smart_context_injector import SmartContextInjector injector = SmartContextInjector(project_path) # Detect tech stack if st.button("🔍 Detect Tech Stack", type="primary"): with st.spinner("Detecting tech stack..."): try: stack_info = injector.detect_tech_stack() # Display detected stack st.subheader("📊 Detected Tech Stack") col1, col2, col3, col4 = st.columns(4) with col1: st.metric("Project Type", stack_info['project_type'].replace('_', ' ').title()) with col2: st.metric("Primary Language", stack_info['primary_language']) with col3: frameworks_count = len(stack_info['frameworks']) st.metric("Frameworks", frameworks_count) with col4: confidence = stack_info['confidence_score'] st.metric("Confidence", f"{confidence:.1%}") # Detailed stack information st.subheader("🔍 Stack Details") col1, col2 = st.columns(2) with col1: st.write("**Frameworks:**") if stack_info['frameworks']: for framework in stack_info['frameworks']: st.write(f"✅ {framework}") else: st.write("None detected") st.write("**Databases:**") if stack_info['databases']: for db in stack_info['databases']: st.write(f"🗄️ {db}") else: st.write("None detected") with col2: st.write("**Build Tools:**") if stack_info['build_tools']: for tool in stack_info['build_tools']: st.write(f"🔧 {tool}") else: st.write("None detected") st.write("**Package Managers:**") if stack_info['package_managers']: for pm in stack_info['package_managers']: st.write(f"📦 {pm}") else: st.write("None detected") # Get project context project_context = injector.get_project_context() # Display project patterns and best practices st.subheader("�� Project Patterns & Best Practices") col1, col2 = st.columns(2) with col1: st.write("**Common Patterns:**") for pattern in project_context['project_patterns']: st.write(f"• {pattern}") with col2: st.write("**Best Practices:**") for practice in project_context['best_practices']: st.write(f"• {practice}") # Display common issues and workflow st.subheader("⚠️ Common Issues & Development Workflow") col1, col2 = st.columns(2) with col1: st.write("**Common Issues:**") for issue in project_context['common_issues']: st.write(f"• {issue}") with col2: st.write("**Development Workflow:**") for workflow in project_context['development_workflow']: st.write(f"• {workflow}") # User preferences st.subheader("👤 Your Portable Preferences") preferences = injector.user_preferences for key, value in preferences.items(): st.write(f"**{key.replace('_', ' ').title()}**: {value}") # Context injection st.subheader("🚀 Smart Context Injection") user_message = st.text_area( "Enter your message:", placeholder="e.g., How do I set up a development environment for this project?", height=100 ) if st.button("🤖 Inject Smart Context", type="primary"): if user_message.strip(): with st.spinner("Injecting intelligent context..."): try: # Inject smart context enhanced_prompt = injector.inject_smart_context(user_message) # Display results col1, col2 = st.columns(2) with col1: st.subheader("📤 Original Message") st.info(user_message) st.write(f"**Length**: {len(user_message)} characters") with col2: st.subheader("📥 Enhanced with Smart Context") st.success(enhanced_prompt) st.write(f"**Length**: {len(enhanced_prompt)} characters") st.write(f"**Enhancement**: +{len(enhanced_prompt) - len(user_message)} characters") # Show enhancement breakdown st.subheader("🔍 Smart Context Breakdown") col1, col2, col3 = st.columns(3) with col1: st.metric("Original Length", len(user_message)) with col2: st.metric("Enhanced Length", len(enhanced_prompt)) with col3: enhancement_ratio = len(enhanced_prompt) / len(user_message) if user_message else 0 st.metric("Enhancement Ratio", f"{enhancement_ratio:.1f}x") # Show what was injected st.subheader("➕ Smart Context Injected") # Extract the injected sections sections = [ "DETECTED TECH STACK", "PROJECT PATTERNS", "BEST PRACTICES", "COMMON ISSUES & SOLUTIONS", "DEVELOPMENT WORKFLOW", "PORTABLE USER PREFERENCES" ] for section in sections: if section in enhanced_prompt: st.write(f"**✅ {section}**") # Show context summary st.subheader("�� Context Summary") context_summary = injector.get_context_summary() st.info(context_summary) except Exception as e: st.error(f"Error injecting smart context: {e}") st.write("Debug: Full error details:", str(e)) else: st.warning("Please enter a message to enhance with smart context") except Exception as e: st.error(f"Error detecting tech stack: {e}") st.write("Debug: Full error details:", str(e)) else: st.info("Click 'Detect Tech Stack' to start analyzing your project") except Exception as e: st.error(f"Error detecting tech stack: {e}") st.write("Debug: Full error details:", str(e)) except ImportError: st.error("❌ Smart Context Injector not available") st.write(""" **To use this tool, you need to install the required dependencies:** ```bash pip install pyyaml toml ``` **The Smart Context Injector is working independently and can be imported once dependencies are installed.** """) st.info("💡 **Install the dependencies above to use the Smart Context Injector**") def show_conversation_analysis_tool(conn): """Show conversation analysis tool for full Cursor agent visibility""" st.subheader("💬 Conversation Analysis Tool") st.write(""" **Full Visibility into Your Cursor Agent Interactions** 🚀 This tool gives you complete insight into: - **What you sent** to the Cursor agent - **What the agent received** (enhanced with context) - **What the agent responded** back to you - **Full conversation flow** and context injection """) # Add refresh button col1, col2 = st.columns([3, 1]) with col1: st.write("") # Spacer with col2: if st.button("🔄 Refresh Conversations", type="primary"): st.rerun() if not conn: st.error("Database connection not available") return try: # Get all conversation interactions st.subheader("📊 Conversation Overview") # Get conversation counts conv_query = """ SELECT interaction_type, COUNT(*) as count, MAX(timestamp) as latest FROM interactions WHERE interaction_type IN ('conversation_turn', 'client_request', 'agent_response', 'user_prompt') GROUP BY interaction_type ORDER BY count DESC """ conv_data = pd.read_sql(conv_query, conn) if not conv_data.empty: col1, col2, col3 = st.columns(3) for idx, row in conv_data.iterrows(): with col1 if idx == 0 else col2 if idx == 1 else col3: st.metric( row['interaction_type'].replace('_', ' ').title(), row['count'], f"Latest: {row['latest'][:16]}" if row['latest'] else "" ) else: st.info("No conversation data found") st.write("---") # Session Selection st.subheader("🎯 Select Session to Analyze") # Get available sessions sessions_query = """ SELECT session_id, user_id, started_at, last_activity, total_interactions FROM sessions ORDER BY last_activity DESC LIMIT 20 """ sessions_data = pd.read_sql(sessions_query, conn) if not sessions_data.empty: # Create session selector session_options = [] for _, session in sessions_data.iterrows(): session_label = f"{session['session_id'][:8]}... - {session['user_id']} - {session['started_at'][:16]} ({session['total_interactions']} interactions)" session_options.append((session_label, session['session_id'])) selected_session_label = st.selectbox( "Choose a session to analyze:", options=[opt[0] for opt in session_options], help="Select a session to see all conversations within it" ) # Get selected session ID selected_session_id = None for label, session_id in session_options: if label == selected_session_label: selected_session_id = session_id break if selected_session_id: st.success(f"✅ Selected session: {selected_session_id[:8]}...") # Get conversations for selected session conv_query = """ SELECT id, timestamp, interaction_type, client_request, agent_response, prompt, response, full_content, execution_time_ms, tool_name, parameters, error_message, interaction_metadata FROM interactions WHERE session_id = ? ORDER BY timestamp ASC """ conversations = pd.read_sql(conv_query, conn, params=[selected_session_id]) if not conversations.empty: st.subheader(f"💬 Conversations in Session: {selected_session_id[:8]}...") # Show conversation flow for idx, conv in conversations.iterrows(): with st.expander(f"🔄 {conv['interaction_type'].replace('_', ' ').title()} - {conv['timestamp'][:19]}", expanded=True): col1, col2 = st.columns(2) with col1: st.markdown("**📤 What You Sent:**") if conv['client_request']: st.text_area("Client Request:", conv['client_request'], height=100, key=f"req_{idx}") elif conv['prompt']: st.text_area("Prompt:", conv['prompt'], height=100, key=f"prompt_{idx}") else: st.info("No request/prompt data") # Show metadata if conv['tool_name'] or conv['parameters']: st.markdown("**🔧 Tool Details:**") if conv['tool_name']: st.write(f"Tool: {conv['tool_name']}") if conv['parameters']: st.write(f"Parameters: {conv['parameters']}") with col2: st.markdown("**📥 What Agent Received/Responded:**") if conv['agent_response']: st.text_area("Agent Response:", conv['agent_response'], height=100, key=f"resp_{idx}") elif conv['response']: st.text_area("Response:", conv['response'], height=100, key=f"resp_{idx}") else: st.info("No response data") # Show execution details if conv['execution_time_ms']: st.write(f"⏱️ Execution time: {conv['execution_time_ms']}ms") if conv['error_message']: st.error(f"❌ Error: {conv['error_message']}") # Show full content if available if conv['full_content']: st.markdown("**📄 Full Content:**") st.text_area("Complete Interaction:", conv['full_content'], height=150, key=f"full_{idx}") # Show interaction metadata if conv['interaction_metadata']: st.markdown("**📊 Metadata:**") st.json(conv['interaction_metadata']) # Summary statistics st.subheader("📈 Session Summary") col1, col2, col3, col4 = st.columns(4) with col1: st.metric("Total Interactions", len(conversations)) with col2: avg_time = conversations['execution_time_ms'].mean() if 'execution_time_ms' in conversations.columns else 0 st.metric("Avg Response Time", f"{avg_time:.1f}ms") with col3: error_count = len(conversations[conversations['error_message'].notna()]) st.metric("Errors", error_count) with col4: tool_usage = conversations['tool_name'].value_counts().iloc[0] if 'tool_name' in conversations.columns and not conversations['tool_name'].isna().all() else 0 st.metric("Most Used Tool", tool_usage) else: st.info(f"No conversations found for session {selected_session_id[:8]}...") else: st.info("No sessions found in the database") except Exception as e: st.error(f"Error analyzing conversations: {e}") st.write("Debug: Full error details:", str(e)) def main(): """Main application""" st.markdown('<h1 class="main-header">🧠 Context Manager</h1>', unsafe_allow_html=True) # Add global refresh controls col1, col2, col3 = st.columns([2, 2, 1]) with col1: st.write("") # Spacer with col2: if st.button("🔄 Refresh All Data", type="primary", use_container_width=True): st.rerun() with col3: # Auto-refresh toggle auto_refresh = st.checkbox("Auto-refresh every 30s", value=False) if auto_refresh: st.info("Auto-refresh enabled") # Auto-refresh logic if auto_refresh: if 'last_refresh' not in st.session_state: st.session_state.last_refresh = time.time() # Check if 30 seconds have passed if time.time() - st.session_state.last_refresh > 30: st.session_state.last_refresh = time.time() st.rerun() # Show last refresh time if 'last_refresh' in st.session_state: last_refresh_time = datetime.fromtimestamp(st.session_state.last_refresh) st.info(f"🕐 Last refreshed: {last_refresh_time.strftime('%H:%M:%S')}") else: st.info("🕐 Data loaded on page load") # Initialize database connection conn = init_database_connection() # Sidebar navigation st.sidebar.title("🧭 Navigation") page = st.sidebar.selectbox( "Choose a page:", ["📊 Dashboard", "💬 Interactions", "🔄 Sessions", "🧠 Contexts", "🛠️ Prompt Crafting", "⚙️ System Status"] ) # Page routing if page == "📊 Dashboard": show_dashboard(conn) elif page == "💬 Interactions": show_interactions(conn) elif page == "🔄 Sessions": show_sessions(conn) elif page == "🧠 Contexts": show_contexts(conn) elif page == "🛠️ Prompt Crafting": show_prompt_crafting(conn) elif page == "⚙️ System Status": show_system_status(conn) if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Big0290/MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server