Model Control Plane (MCP) Server
by dvladimirov
- MCP
- scripts
#!/usr/bin/env python3
"""
MCP Feature Showcase
This script demonstrates the key features of the MCP (Mission Control Platform),
making it easy to test and showcase functionality through langflow.
Current demonstrations:
- Filesystem operations
- Git repository analysis
Usage:
python showcase_mcp_features.py --feature filesystem
python showcase_mcp_features.py --feature git --repo-url https://github.com/username/repo
Reports are generated and can be viewed in Langflow (http://localhost:7860)
"""
import os
import sys
import json
import argparse
import tempfile
import http.server
import socketserver
import threading
import webbrowser
import signal
import time
from datetime import datetime
from typing import Dict, List, Any, Optional
from pathlib import Path
# Add the parent directory to Python path for proper imports
script_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(script_dir)
if parent_dir not in sys.path:
sys.path.insert(0, parent_dir)
# Import MCP-specific modules
from langflow import MCPAIComponent
import scripts.test_filesystem as filesystem_module
import scripts.test_git_integration as git_module
import scripts.test_git_diff as git_diff_module
# Define the reports directory
REPORTS_DIR = os.path.join(parent_dir, "reports")
os.makedirs(REPORTS_DIR, exist_ok=True)
# Global variable to track if we should keep running
keep_running = True
def signal_handler(sig, frame):
"""Handle Ctrl+C by setting the keep_running flag to False."""
global keep_running
print("\nShutting down server. Please wait...")
keep_running = False
sys.exit(0)
# Set up signal handler for graceful shutdown
signal.signal(signal.SIGINT, signal_handler)
def generate_html_report(title: str, results: Dict[str, Any], feature_type: str) -> str:
"""Generate an HTML report from the showcase results."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
report_filename = f"{feature_type}_showcase_{timestamp}.html"
report_path = os.path.join(REPORTS_DIR, report_filename)
# Start building the HTML content
html_content = f"""<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{title}</title>
<style>
body {{
font-family: Arial, sans-serif;
line-height: 1.6;
margin: 0;
padding: 20px;
color: #333;
max-width: 1200px;
margin: 0 auto;
}}
h1, h2, h3 {{
color: #2c3e50;
}}
h1 {{
border-bottom: 2px solid #3498db;
padding-bottom: 10px;
}}
.card {{
background-color: #f9f9f9;
border-radius: 5px;
box-shadow: 0 2px 5px rgba(0,0,0,0.1);
padding: 20px;
margin-bottom: 20px;
}}
.success {{
color: #27ae60;
font-weight: bold;
}}
.error {{
color: #e74c3c;
font-weight: bold;
}}
.timestamp {{
color: #7f8c8d;
font-style: italic;
}}
pre {{
background-color: #f1f1f1;
padding: 10px;
border-radius: 5px;
overflow-x: auto;
}}
table {{
width: 100%;
border-collapse: collapse;
margin-bottom: 20px;
}}
th, td {{
padding: 8px 12px;
text-align: left;
border-bottom: 1px solid #ddd;
}}
th {{
background-color: #f2f2f2;
}}
.highlight {{
background-color: #e8f4fc;
}}
.ai-analysis {{
background-color: #f0f8ff;
border-left: 4px solid #3498db;
padding: 10px 20px;
margin: 20px 0;
}}
</style>
</head>
<body>
<h1>{title}</h1>
<p class="timestamp">Generated on: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}</p>
"""
# Add content based on feature type
if feature_type == "filesystem":
html_content += generate_filesystem_report_content(results)
elif feature_type == "git":
html_content += generate_git_report_content(results)
# Add link back to Langflow
html_content += """
<div class="card">
<h3>Return to Langflow</h3>
<p><a href="http://localhost:7860" target="_blank">Go back to Langflow UI</a></p>
</div>
"""
# Close the HTML document
html_content += """
</body>
</html>
"""
# Write the HTML content to the file
with open(report_path, "w", encoding="utf-8") as f:
f.write(html_content)
return report_path
def generate_filesystem_report_content(results: Dict[str, Any]) -> str:
"""Generate the HTML content for filesystem analysis."""
content = """
<div class="card">
<h2>Filesystem Analysis Summary</h2>
"""
# Add summary statistics
content += f"""
<p><strong>Total Files:</strong> {len(results.get('files', []))}</p>
<p><strong>Total Directories:</strong> {len(results.get('directories', []))}</p>
<p><strong>Total Size:</strong> {results.get('total_size_bytes', 0) / (1024 * 1024):.2f} MB</p>
</div>
"""
# Add file type distribution
content += """
<div class="card">
<h2>File Type Distribution</h2>
<table>
<tr>
<th>Extension</th>
<th>Count</th>
<th>Percentage</th>
</tr>
"""
file_types = results.get('file_types', {})
total_files = sum(file_types.values())
for ext, count in sorted(file_types.items(), key=lambda x: x[1], reverse=True)[:20]:
percentage = (count / total_files * 100) if total_files > 0 else 0
content += f"""
<tr>
<td>{ext}</td>
<td>{count}</td>
<td>{percentage:.1f}%</td>
</tr>
"""
content += """
</table>
</div>
"""
# Add largest files
content += """
<div class="card">
<h2>Largest Files</h2>
<table>
<tr>
<th>Path</th>
<th>Size (MB)</th>
<th>Modified</th>
</tr>
"""
for file_info in results.get('largest_files', [])[:10]:
size_mb = file_info.get('size_bytes', 0) / (1024 * 1024)
modified = file_info.get('modified', 'Unknown')
path = file_info.get('path', 'Unknown')
content += f"""
<tr>
<td>{path}</td>
<td>{size_mb:.2f} MB</td>
<td>{modified}</td>
</tr>
"""
content += """
</table>
</div>
"""
# Add AI analysis if available
if 'ai_analysis' in results:
content += """
<div class="card ai-analysis">
<h2>AI-Generated Analysis</h2>
"""
# Replace newlines with <br> tags for proper HTML rendering
ai_analysis = results.get('ai_analysis', 'No AI analysis available').replace('\n', '<br>')
content += f"<p>{ai_analysis}</p>"
content += """
</div>
"""
return content
def generate_git_report_content(results: Dict[str, Any]) -> str:
"""Generate the HTML content for git repository analysis."""
repo_url = results.get('repository', 'Unknown')
content = f"""
<div class="card">
<h2>Git Repository Analysis</h2>
<p><strong>Repository:</strong> {repo_url}</p>
<p><strong>Analysis Time:</strong> {results.get('timestamp', 'Unknown')}</p>
</div>
"""
# Add repository statistics
repo_stats = results.get('repository_stats', {})
if repo_stats:
content += """
<div class="card">
<h2>Repository Statistics</h2>
"""
for key, value in repo_stats.items():
# Convert key from snake_case to Title Case for display
display_key = ' '.join(word.capitalize() for word in key.split('_'))
content += f"<p><strong>{display_key}:</strong> {value}</p>"
content += """
</div>
"""
# Add file type breakdown
file_types = results.get('file_types', {})
if file_types:
content += """
<div class="card">
<h2>File Type Distribution</h2>
<table>
<tr>
<th>Extension</th>
<th>Count</th>
<th>Percentage</th>
</tr>
"""
total_files = sum(file_types.values())
for ext, count in sorted(file_types.items(), key=lambda x: x[1], reverse=True)[:15]:
percentage = (count / total_files * 100) if total_files > 0 else 0
content += f"""
<tr>
<td>{ext}</td>
<td>{count}</td>
<td>{percentage:.1f}%</td>
</tr>
"""
content += """
</table>
</div>
"""
# Add recent commits
commit_history = results.get('recent_commits', [])
if commit_history:
content += """
<div class="card">
<h2>Recent Commits</h2>
<table>
<tr>
<th>SHA</th>
<th>Author</th>
<th>Date</th>
<th>Message</th>
</tr>
"""
for commit in commit_history[:10]:
sha = commit.get('sha', 'Unknown')[:7] # First 7 chars of SHA
author = commit.get('author', 'Unknown')
date = commit.get('date', 'Unknown')
message = commit.get('message', 'No message')
content += f"""
<tr>
<td>{sha}</td>
<td>{author}</td>
<td>{date}</td>
<td>{message}</td>
</tr>
"""
content += """
</table>
</div>
"""
# Add diff analysis if available
diff_analysis = results.get('diff_analysis', {})
if diff_analysis:
content += """
<div class="card">
<h2>Diff Analysis</h2>
"""
diff_stats = diff_analysis.get('diff_stats', {})
if diff_stats:
content += "<h3>Change Statistics</h3>"
content += "<ul>"
content += f"<li><strong>Files Changed:</strong> {diff_stats.get('files_changed', 0)}</li>"
content += f"<li><strong>Additions:</strong> {diff_stats.get('additions', 0)}</li>"
content += f"<li><strong>Deletions:</strong> {diff_stats.get('deletions', 0)}</li>"
content += f"<li><strong>Total Lines:</strong> {diff_stats.get('total_lines', 0)}</li>"
content += "</ul>"
# Add requirements changes if available
req_changes = diff_analysis.get('requirements_changes', {})
if req_changes:
content += "<h3>Requirements.txt Changes</h3>"
if req_changes.get('added'):
content += "<h4>Added Dependencies</h4>"
content += "<ul>"
for dep in req_changes.get('added', []):
content += f"<li>{dep}</li>"
content += "</ul>"
if req_changes.get('removed'):
content += "<h4>Removed Dependencies</h4>"
content += "<ul>"
for dep in req_changes.get('removed', []):
content += f"<li>{dep}</li>"
content += "</ul>"
if req_changes.get('changed'):
content += "<h4>Changed Dependencies</h4>"
content += "<ul>"
for dep in req_changes.get('changed', []):
content += f"<li>{dep}</li>"
content += "</ul>"
content += """
</div>
"""
# Add AI analysis if available
if 'ai_analysis' in results:
content += """
<div class="card ai-analysis">
<h2>AI-Generated Repository Analysis</h2>
"""
# Replace newlines with <br> tags for proper HTML rendering
ai_analysis = results.get('ai_analysis', 'No AI analysis available').replace('\n', '<br>')
content += f"<p>{ai_analysis}</p>"
content += """
</div>
"""
return content
class ReportHTTPHandler(http.server.SimpleHTTPRequestHandler):
def __init__(self, *args, **kwargs):
super().__init__(*args, directory=REPORTS_DIR, **kwargs)
def log_message(self, format, *args):
# Silence the HTTP server logs
pass
def serve_reports(port=8765):
"""Start a simple HTTP server to serve the reports."""
try:
with socketserver.TCPServer(("", port), ReportHTTPHandler) as httpd:
print(f"Report server started at http://localhost:{port}")
global keep_running
while keep_running:
httpd.handle_request()
except OSError:
# If the port is in use, try another one
if port < 8775:
serve_reports(port + 1)
else:
print("Could not start report server - all ports in range are busy")
def start_report_server():
"""Start the report server in a background thread."""
server_thread = threading.Thread(target=serve_reports)
server_thread.daemon = True
server_thread.start()
return server_thread
def showcase_filesystem() -> Dict[str, Any]:
"""Showcase filesystem operations and analysis."""
print("=== Showcasing Filesystem Operations ===")
# Use the capture_output parameter to get results instead of printing
results = filesystem_module.analyze_filesystem(capture_output=True)
# Print a summary of the results
print(f"\nAnalyzed {len(results.get('files', []))} files across {len(results.get('directories', []))} directories")
print(f"Found {len(results.get('file_types', {}))} different file types")
# Extract and display the top file types by count
file_types = results.get('file_types', {})
sorted_types = sorted(file_types.items(), key=lambda x: x[1], reverse=True)
print("\nTop file types by count:")
for file_type, count in sorted_types[:5]:
print(f" - {file_type}: {count} files")
# Show largest files
largest_files = results.get('largest_files', [])
if largest_files:
print("\nLargest files in the workspace:")
for file_info in largest_files[:3]:
size_mb = file_info.get('size_bytes', 0) / (1024 * 1024)
print(f" - {file_info.get('path')}: {size_mb:.2f} MB")
# Generate AI analysis if available
if 'ai_analysis' in results:
print("\n=== AI-Generated Filesystem Analysis ===")
print(results.get('ai_analysis', 'No AI analysis available'))
# Generate HTML report
report_path = generate_html_report(
title="MCP Filesystem Analysis Report",
results=results,
feature_type="filesystem"
)
# Get the relative path to display
rel_report_path = os.path.relpath(report_path, parent_dir)
# Add report path to results
results['report_path'] = report_path
results['report_url'] = f"http://localhost:8765/{os.path.basename(report_path)}"
print(f"\n✅ Report generated: {rel_report_path}")
print(f"📊 View in browser: {results['report_url']}")
print(" (You can view this URL in the Langflow interface)")
return results
def normalize_repo_url(repo_url: str) -> str:
"""Normalize the repository URL to ensure compatibility."""
if not repo_url:
return repo_url
# Remove trailing slashes
repo_url = repo_url.rstrip('/')
# Ensure .git extension for GitHub URLs
if 'github.com' in repo_url and not repo_url.endswith('.git'):
return f"{repo_url}.git"
return repo_url
def analyze_git_repo(repo_url, commit_sha=None):
"""
Analyze a Git repository and generate a report with key insights.
Args:
repo_url: URL of the Git repository to analyze
commit_sha: Optional specific commit SHA to analyze
Returns:
Dict containing analysis results
"""
print(f"\n🔍 Analyzing Git repository: {repo_url}")
if commit_sha:
print(f"📋 Focusing on commit: {commit_sha}")
# Get main repository analysis using the git integration module
results = git_module.analyze_repo(repo_url, capture_output=True)
# If a specific commit is provided, do a diff analysis on it
if commit_sha:
print(f"\n📊 Analyzing commit diff for: {commit_sha}")
try:
diff_results = git_diff_module.analyze_git_diff(
repo_url=repo_url,
commit_sha=commit_sha,
capture_output=True
)
# Merge the diff results with the main results
if diff_results:
results['diff_analysis'] = diff_results
# Extract key metrics for display
if 'diff_stats' in diff_results:
print("\n=== Diff Analysis ===")
stats = diff_results['diff_stats']
print(f"Files changed: {stats.get('files_changed', 'Unknown')}")
print(f"Additions: {stats.get('additions', 'Unknown')}")
print(f"Deletions: {stats.get('deletions', 'Unknown')}")
if 'requirements_changes' in diff_results:
changes = diff_results['requirements_changes']
if any(changes.values()):
print("\n=== Requirements Changes ===")
if changes.get('added'):
print(f"Added: {', '.join(changes['added'])}")
if changes.get('removed'):
print(f"Removed: {', '.join(changes['removed'])}")
if changes.get('changed'):
print(f"Changed: {', '.join(changes['changed'])}")
except Exception as e:
print(f"Error in diff analysis: {str(e)}")
# Try to perform a Git diff analysis on the most recent commit if no specific commit was provided
if not commit_sha:
try:
# Get most recent commit if we have one
recent_commit = None
commit_history = results.get('commit_history', [])
if 'latest_commit_sha' in results:
recent_commit = results['latest_commit_sha']
elif commit_history and len(commit_history) > 0:
recent_commit = commit_history[0].get('hash')
# If we have a recent commit SHA and at least two commits, try to analyze the diff
if recent_commit and len(commit_history) > 1:
print(f"\n=== Analyzing diff for most recent commit {recent_commit[:7]} ===")
# Get the parent commit
parent_commit = commit_history[1].get('hash')
if parent_commit:
try:
# Use the analyze_git_diff function to get proper diff analysis
diff_results = git_diff_module.analyze_git_diff(
repo_url=repo_url,
commit_sha=parent_commit,
target_commit=recent_commit,
capture_output=True
)
if diff_results:
results['diff_analysis'] = diff_results
# Extract key metrics for display
if 'diff_stats' in diff_results:
print("\n=== Diff Analysis ===")
stats = diff_results['diff_stats']
print(f"Files changed: {stats.get('files_changed', 'Unknown')}")
print(f"Additions: {stats.get('additions', 'Unknown')}")
print(f"Deletions: {stats.get('deletions', 'Unknown')}")
except Exception as e:
print(f"Error analyzing commit diff: {str(e)}")
else:
# Fallback to single commit analysis
if recent_commit:
print(f"\n=== Analyzing repository at commit {recent_commit[:7]} ===")
repo_info = {
"repository": repo_url,
"commit_sha": recent_commit,
"single_commit": True
}
results['single_commit_analysis'] = repo_info
print(f" - Repository: {repo_url}")
print(f" - Commit: {recent_commit[:7]}")
except Exception as e:
print(f"Error retrieving commit information: {str(e)}")
# Generate HTML report
report_path = generate_html_report(
title=f"MCP Git Analysis Report - {os.path.basename(repo_url)}",
results=results,
feature_type="git"
)
# Get the relative path to display
rel_report_path = os.path.relpath(report_path, parent_dir)
# Add report path to results
results['report_path'] = report_path
results['report_url'] = f"http://localhost:8765/{os.path.basename(report_path)}"
print(f"\n✅ Report generated: {rel_report_path}")
print(f"📊 View in browser: {results['report_url']}")
print(" (You can view this URL in the Langflow interface)")
return results
def showcase_git(repo_url: str) -> Dict[str, Any]:
"""Showcase Git repository analysis."""
if not repo_url:
print("Error: Repository URL is required for Git showcase.")
print("Usage: python showcase_mcp_features.py --feature git --repo-url https://github.com/username/repo")
return {"error": "Repository URL is required"}
# Normalize the repository URL to ensure consistent format
repo_url = normalize_repo_url(repo_url)
print(f"=== Showcasing Git Analysis for {repo_url} ===")
print("Please wait, cloning repository and analyzing data...")
# Call the analyze_git_repo function to perform the analysis
return analyze_git_repo(repo_url)
def wait_for_user_exit():
"""Keep the server running until the user presses Ctrl+C."""
global keep_running
try:
print("\nServer is running. Press Ctrl+C to exit.")
while keep_running:
time.sleep(1)
except KeyboardInterrupt:
print("\nShutting down server...")
keep_running = False
def main():
parser = argparse.ArgumentParser(description="MCP Feature Showcase Tool")
parser.add_argument("--feature", choices=["filesystem", "git"], required=True,
help="Feature to showcase (filesystem or git)")
parser.add_argument("--repo-url", help="Git repository URL (required for git feature)")
parser.add_argument("--commit-sha", help="Optional: Specific Git commit SHA to analyze")
parser.add_argument("--json", action="store_true", help="Output results as JSON")
parser.add_argument("--open-browser", action="store_true", help="Open the report in a browser automatically")
parser.add_argument("--exit-when-done", action="store_true", help="Exit after generating the report (default: keep running)")
args = parser.parse_args()
# Start the report server in a background thread
server_thread = start_report_server()
print("🌐 Started report server at http://localhost:8765")
# Call the appropriate showcase function based on the selected feature
results = {}
if args.feature == "filesystem":
results = showcase_filesystem()
elif args.feature == "git":
repo_url = args.repo_url
if not repo_url:
parser.error("--repo-url is required for git feature")
# Validate and normalize Git repository URL
repo_url = normalize_repo_url(repo_url)
# Run the Git showcase with optional commit_sha
if args.commit_sha:
print(f"Analyzing specific commit: {args.commit_sha}")
results = analyze_git_repo(repo_url, args.commit_sha)
else:
results = showcase_git(repo_url)
# Output as JSON if requested
if args.json:
print("\n=== JSON Output ===")
# We exclude the large file and directory lists to keep the output manageable
output_results = {k: v for k, v in results.items() if k not in ['files', 'directories']}
print(json.dumps(output_results, indent=2))
# Open the report in the browser if requested
if args.open_browser and 'report_url' in results:
print("\nOpening report in browser...")
webbrowser.open(results['report_url'])
# Display tips for viewing in Langflow
print("\n💡 Tips for viewing in Langflow:")
print("1. Copy the report URL shown above")
print("2. In Langflow, create a custom component that displays the URL in an iframe")
print("3. Or use a 'Text' component to share the URL with users")
print("\n📋 Full report URL for copying into Langflow:")
if 'report_url' in results:
print(results['report_url'])
# Keep the server running unless explicitly told to exit
if not args.exit_when_done:
print("\n⚠️ Keep this terminal running to view the report.")
print("The report server will continue running until you press Ctrl+C.")
wait_for_user_exit()
else:
print("\nExiting as requested with --exit-when-done flag.")
if __name__ == "__main__":
main()