Model Control Plane (MCP) Server

#!/usr/bin/env python3 import os import sys # Add the parent directory to Python path for proper imports script_dir = os.path.dirname(os.path.abspath(__file__)) parent_dir = os.path.dirname(script_dir) if parent_dir not in sys.path: sys.path.insert(0, parent_dir) import json import tempfile from pathlib import Path from datetime import datetime from langflow import MCPAIComponent from typing import Dict, List, Any, Optional import mimetypes def analyze_filesystem(base_path: str = None, capture_output: bool = False) -> Dict[str, Any]: """ Analyze the filesystem structure and provide statistics about file types, sizes, etc. Args: base_path (str): The base path to analyze. Defaults to the current directory. capture_output (bool): If True, return results without printing. If False, print results. Returns: Dict[str, Any]: A dictionary containing filesystem analysis data """ if base_path is None: base_path = os.getcwd() # Initialize result data result = { "timestamp": datetime.now().isoformat(), "base_path": base_path, "files": [], "directories": [], "file_types": {}, "total_size_bytes": 0, "largest_files": [] } # Initialize mimetypes mimetypes.init() # Helper function to log information def log_info(message: str): if not capture_output: print(message) log_info(f"=== Analyzing Filesystem at {base_path} ===") # Walk the directory tree for root, dirs, files in os.walk(base_path): # Add directories to the result rel_root = os.path.relpath(root, base_path) if rel_root != ".": result["directories"].append(rel_root) # Process each file for file in files: file_path = os.path.join(root, file) rel_path = os.path.relpath(file_path, base_path) try: # Get file stats file_stats = os.stat(file_path) size_bytes = file_stats.st_size result["total_size_bytes"] += size_bytes # Get file extension and mime type _, ext = os.path.splitext(file) ext = ext.lower() if ext: # Remove the dot from extension ext = ext[1:] else: ext = "no extension" mime_type, _ = mimetypes.guess_type(file_path) if not mime_type: mime_type = "application/octet-stream" # Update file type counts if ext in result["file_types"]: result["file_types"][ext] += 1 else: result["file_types"][ext] = 1 # Create file info file_info = { "path": rel_path, "size_bytes": size_bytes, "modified": datetime.fromtimestamp(file_stats.st_mtime).isoformat(), "extension": ext, "mime_type": mime_type } # Add to files list result["files"].append(file_info) # Check if it's one of the largest files if len(result["largest_files"]) < 10: # Add the file to the list and sort by size result["largest_files"].append(file_info) result["largest_files"] = sorted( result["largest_files"], key=lambda x: x["size_bytes"], reverse=True ) elif size_bytes > result["largest_files"][-1]["size_bytes"]: # Replace the smallest file in the largest files list result["largest_files"][-1] = file_info result["largest_files"] = sorted( result["largest_files"], key=lambda x: x["size_bytes"], reverse=True ) except (PermissionError, FileNotFoundError) as e: # Skip files we can't access if not capture_output: print(f"Warning: Could not access {file_path}: {str(e)}") continue # Calculate additional statistics result["total_files"] = len(result["files"]) result["total_directories"] = len(result["directories"]) result["total_size_mb"] = result["total_size_bytes"] / (1024 * 1024) # Generate summary summary = ( f"Found {result['total_files']} files in {result['total_directories']} directories\n" f"Total size: {result['total_size_mb']:.2f} MB\n" f"File types: {len(result['file_types'])} different extensions" ) result["summary"] = summary # Try to get AI analysis of the filesystem try: mcp = MCPAIComponent(mcp_server_url="http://localhost:8000") models = mcp.list_models() # Check if we have a chat model available chat_models = [model for model in models if model.get('id', '').endswith('-chat')] if chat_models: log_info("Generating AI analysis of filesystem structure...") model_id = chat_models[0]['id'] # Prepare file type summary for prompt file_types_summary = "\n".join([ f"- {ext}: {count} files" for ext, count in sorted( result['file_types'].items(), key=lambda x: x[1], reverse=True )[:10] ]) # Prepare largest files summary largest_files_summary = "\n".join([ f"- {file_info['path']}: {file_info['size_bytes'] / (1024 * 1024):.2f} MB" for file_info in result['largest_files'][:5] ]) # Create prompt prompt = f""" Analyze this filesystem structure and provide insights: Base path: {base_path} Total files: {result['total_files']} Total directories: {result['total_directories']} Total size: {result['total_size_mb']:.2f} MB Top file types: {file_types_summary} Largest files: {largest_files_summary} Please provide: 1. A brief analysis of what type of project/codebase this appears to be 2. Any anomalies or interesting patterns in the file structure 3. Best practices or recommendations for organizing this filesystem Keep your response concise and focused on actionable insights. """ # Get AI analysis response = mcp.chat( model_id=model_id, messages=[ {"role": "system", "content": "You are an expert filesystem analyst."}, {"role": "user", "content": prompt} ], max_tokens=500 ) if response and 'choices' in response and len(response['choices']) > 0: result["ai_analysis"] = response['choices'][0]['message']['content'] log_info(f"AI Analysis generated successfully using model {model_id}") else: log_info("AI Analysis: No valid response received from the model.") else: log_info("AI Analysis: No chat models available.") except Exception as e: log_info(f"AI Analysis: Error - {str(e)}") # Print results if not capturing output if not capture_output: print(f"\n=== Filesystem Analysis Results ===") print(summary) print("\nTop 5 file types:") for i, (ext, count) in enumerate( sorted(result['file_types'].items(), key=lambda x: x[1], reverse=True)[:5] ): print(f"{i+1}. {ext}: {count} files") print("\nTop 5 largest files:") for i, file_info in enumerate(result['largest_files'][:5]): size_mb = file_info['size_bytes'] / (1024 * 1024) print(f"{i+1}. {file_info['path']} ({size_mb:.2f} MB)") if "ai_analysis" in result: print("\n=== AI Analysis ===") print(result["ai_analysis"]) return result def file_system_demo(capture_output=False): """ Demonstrate filesystem operations with the option to capture results. Args: capture_output (bool): If True, return results instead of printing them Returns: list: List of operation results if capture_output is True, None otherwise """ results = [] def log_operation(operation, details): """Log an operation to results or print it""" result = {"operation": operation, "details": details, "timestamp": datetime.now().isoformat()} if capture_output: results.append(result) else: print(f"\n=== {operation} ===") for key, value in details.items(): print(f"{key}: {value}") # Create a temporary directory for the demo temp_dir = tempfile.mkdtemp(prefix="mcp_fs_demo_") log_operation("Create Temporary Directory", {"path": temp_dir}) # Create and write to a file test_file_path = os.path.join(temp_dir, "test_file.txt") with open(test_file_path, "w") as f: f.write("Hello from MCP filesystem test!\n") f.write("This file was created at: " + datetime.now().isoformat() + "\n") file_size = os.path.getsize(test_file_path) log_operation("Create File", {"path": test_file_path, "size_bytes": file_size}) # Read the file with open(test_file_path, "r") as f: content = f.read() log_operation("Read File", {"path": test_file_path, "content": content, "length": len(content)}) # Get file metadata file_stats = os.stat(test_file_path) metadata = { "size_bytes": file_stats.st_size, "created": datetime.fromtimestamp(file_stats.st_ctime).isoformat(), "modified": datetime.fromtimestamp(file_stats.st_mtime).isoformat(), "accessed": datetime.fromtimestamp(file_stats.st_atime).isoformat(), } log_operation("File Metadata", {"path": test_file_path, "metadata": metadata}) # Create a subdirectory subdir_path = os.path.join(temp_dir, "subdir") os.makedirs(subdir_path, exist_ok=True) log_operation("Create Directory", {"path": subdir_path}) # Create multiple files for i in range(3): file_path = os.path.join(subdir_path, f"file_{i}.txt") with open(file_path, "w") as f: f.write(f"This is file {i} in the subdirectory.\n") log_operation("Create Subdirectory File", {"path": file_path, "index": i}) # List directory contents dir_contents = os.listdir(temp_dir) log_operation("List Directory", {"path": temp_dir, "contents": dir_contents}) subdir_contents = os.listdir(subdir_path) log_operation("List Subdirectory", {"path": subdir_path, "contents": subdir_contents}) # Recursive directory scan using Path all_files = list(Path(temp_dir).rglob("*")) all_files_str = [str(p.relative_to(temp_dir)) for p in all_files if p.is_file()] log_operation("Recursive Directory Scan", {"base_path": temp_dir, "files": all_files_str}) # Update a file with open(test_file_path, "a") as f: f.write("\nThis line was appended to the file.\n") # Read updated file with open(test_file_path, "r") as f: updated_content = f.read() log_operation("Update File", { "path": test_file_path, "updated_content": updated_content, "new_size_bytes": os.path.getsize(test_file_path) }) # Rename a file renamed_path = os.path.join(temp_dir, "renamed_file.txt") os.rename(test_file_path, renamed_path) log_operation("Rename File", {"old_path": test_file_path, "new_path": renamed_path}) # Delete files os.remove(renamed_path) log_operation("Delete File", {"path": renamed_path}) for i in range(3): file_path = os.path.join(subdir_path, f"file_{i}.txt") os.remove(file_path) log_operation("Delete Subdirectory File", {"path": file_path}) # Delete directories os.rmdir(subdir_path) log_operation("Delete Directory", {"path": subdir_path}) os.rmdir(temp_dir) log_operation("Delete Temporary Directory", {"path": temp_dir}) # If capturing output, return the results if capture_output: return results def test_filesystem(): """Test the filesystem functionality.""" print("Initializing MCPAIComponent...") mcp = MCPAIComponent(mcp_server_url="http://localhost:8000") # Get current directory cwd = os.getcwd() print(f"Current working directory: {cwd}") # List current directory contents print("\nContents of current directory:") contents = os.listdir(cwd) for item in contents[:10]: # Show only first 10 items to avoid spam if os.path.isdir(os.path.join(cwd, item)): print(f" - 📁 {item}/") else: print(f" - 📄 {item}") if len(contents) > 10: print(f" ... and {len(contents) - 10} more items") # Run the filesystem demo with printing print("\nRunning filesystem operations demo...") file_system_demo(capture_output=False) print("\nFilesystem test completed successfully.") if __name__ == "__main__": test_filesystem()