Model Control Plane (MCP) Server
by dvladimirov
- scripts
#!/usr/bin/env python3
import os
import sys
import json
import argparse
import tempfile
import subprocess
from typing import Dict, List, Optional, Any, Tuple
# Add the parent directory to Python path for proper imports
script_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(script_dir)
if parent_dir not in sys.path:
sys.path.insert(0, parent_dir)
from langflow import MCPAIComponent
def compare_requirements(current_reqs: str, previous_reqs: str) -> Tuple[List[str], List[str], List[str]]:
"""
Compare two requirements.txt files and identify added, removed, and changed dependencies.
Args:
current_reqs: Content of the current requirements.txt
previous_reqs: Content of the previous requirements.txt
Returns:
Tuple of (added, removed, changed) dependencies
"""
def parse_requirements(content: str) -> Dict[str, str]:
"""Parse requirements into a dictionary of {package: version}"""
result = {}
for line in content.strip().split('\n'):
line = line.strip()
if not line or line.startswith('#'):
continue
# Handle various formats like package==1.0.0, package>=1.0.0, etc.
if '==' in line:
package, version = line.split('==', 1)
result[package.strip()] = version.strip()
elif '>=' in line:
package, version = line.split('>=', 1)
result[package.strip()] = f">={version.strip()}"
elif '<=' in line:
package, version = line.split('<=', 1)
result[package.strip()] = f"<={version.strip()}"
elif '>' in line:
package, version = line.split('>', 1)
result[package.strip()] = f">{version.strip()}"
elif '<' in line:
package, version = line.split('<', 1)
result[package.strip()] = f"<{version.strip()}"
else:
# Handle packages without version specifications
result[line.strip()] = "any"
return result
current_dict = parse_requirements(current_reqs)
previous_dict = parse_requirements(previous_reqs)
# Find added, removed, and changed dependencies
added = []
for pkg in current_dict:
if pkg not in previous_dict:
added.append(f"{pkg}=={current_dict[pkg]}")
removed = []
for pkg in previous_dict:
if pkg not in current_dict:
removed.append(f"{pkg}=={previous_dict[pkg]}")
changed = []
for pkg in current_dict:
if pkg in previous_dict and current_dict[pkg] != previous_dict[pkg]:
changed.append(f"{pkg}: {previous_dict[pkg]} -> {current_dict[pkg]}")
return added, removed, changed
def get_file_from_commit(repo_url: str, commit_sha: str, file_path: str) -> Optional[str]:
"""
Get the content of a file from a specific commit in a Git repository.
Args:
repo_url: URL of the Git repository
commit_sha: Commit SHA to get the file from
file_path: Path to the file within the repository
Returns:
Content of the file or None if the file doesn't exist
"""
print(f"Retrieving {file_path} from commit {commit_sha} in repository {repo_url}")
with tempfile.TemporaryDirectory() as temp_dir:
try:
# Clone the repository without the commit SHA
print(f"Cloning repository {repo_url}...")
clone_cmd = ["git", "clone", repo_url, temp_dir]
result = subprocess.run(clone_cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"Error cloning repository: {result.stderr}")
return None
# Try to get the file content directly from git show
try:
print(f"Getting file content at commit {commit_sha}...")
show_cmd = ["git", "show", f"{commit_sha}:{file_path}"]
show_result = subprocess.run(show_cmd, cwd=temp_dir, capture_output=True, text=True)
if show_result.returncode == 0:
return show_result.stdout
print(f"File not found using git show, trying checkout method...")
# If git show fails, try to checkout the commit and read the file
checkout_cmd = ["git", "checkout", commit_sha]
checkout_result = subprocess.run(checkout_cmd, cwd=temp_dir, capture_output=True, text=True)
if checkout_result.returncode != 0:
print(f"Error checking out commit: {checkout_result.stderr}")
return None
# Read the file directly from the filesystem
file_fullpath = os.path.join(temp_dir, file_path)
if os.path.exists(file_fullpath):
with open(file_fullpath, 'r') as f:
file_content = f.read()
print(f"File {file_path} found and read successfully")
return file_content
else:
print(f"File {file_path} not found in commit {commit_sha}")
# List files in the repo root to help debug
print("Listing files in repository root:")
ls_cmd = ["ls", "-la"]
ls_result = subprocess.run(ls_cmd, cwd=temp_dir, capture_output=True, text=True)
if ls_result.returncode == 0:
print(ls_result.stdout)
return None
except subprocess.CalledProcessError as e:
print(f"Error accessing file in commit: {e}")
return None
except Exception as e:
print(f"Error accessing repository: {e}")
return None
def validate_commit_sha(repo_url: str, commit_sha: str) -> bool:
"""
Validate if a commit SHA exists in the repository.
Args:
repo_url: URL of the Git repository
commit_sha: SHA of the commit to validate
Returns:
True if the commit exists, False otherwise
"""
print(f"Validating commit SHA: {commit_sha}")
with tempfile.TemporaryDirectory() as temp_dir:
try:
# Clone the repository
print(f"Cloning repository for validation...")
clone_cmd = ["git", "clone", "--bare", repo_url, temp_dir]
result = subprocess.run(clone_cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"Error cloning repository: {result.stderr}")
return False
# Check if commit exists
check_cmd = ["git", "cat-file", "-e", commit_sha]
check_result = subprocess.run(check_cmd, cwd=temp_dir, capture_output=True, text=True)
if check_result.returncode == 0:
print(f"Commit {commit_sha} exists in the repository")
return True
else:
print(f"Commit {commit_sha} does not exist in the repository")
return False
except Exception as e:
print(f"Error validating commit SHA: {e}")
return False
def get_diff_between_commits(repo_url: str, base_commit: str, target_commit: str = 'HEAD') -> str:
"""
Get the diff between two commits.
Args:
repo_url: URL of the Git repository
base_commit: Base commit SHA
target_commit: Target commit SHA (defaults to HEAD)
Returns:
Diff output as a string
"""
print(f"Getting diff between {base_commit} and {target_commit}...")
with tempfile.TemporaryDirectory() as temp_dir:
try:
# Clone the repository
clone_cmd = ["git", "clone", repo_url, temp_dir]
clone_result = subprocess.run(clone_cmd, capture_output=True, text=True)
if clone_result.returncode != 0:
print(f"Error cloning repository: {clone_result.stderr}")
return "Error cloning repository"
# Get the diff
diff_cmd = ["git", "diff", base_commit, target_commit, "--", "requirements.txt"]
diff_result = subprocess.run(diff_cmd, cwd=temp_dir, capture_output=True, text=True)
if diff_result.returncode == 0:
if diff_result.stdout:
return diff_result.stdout
else:
return "No differences found in requirements.txt"
else:
print(f"Error getting diff: {diff_result.stderr}")
return f"Error: {diff_result.stderr}"
except Exception as e:
print(f"Error: {e}")
return f"Error: {e}"
def test_git_diff(repo_url=None, compare_commit=None):
"""
Test the git diff functionality with requirements.txt compatibility check.
Args:
repo_url: URL of the Git repository
compare_commit: SHA of the commit to compare with the latest commit
"""
if not repo_url:
print("Error: Please provide a Git repository URL")
print("Usage: python test_git_diff.py <git-repo-url> [commit-sha]")
sys.exit(1)
print(f"Analyzing repository: {repo_url}")
# Validate commit SHA if provided
if compare_commit:
print(f"Comparing with commit: {compare_commit}")
if not validate_commit_sha(repo_url, compare_commit):
print(f"Invalid commit SHA: {compare_commit}")
sys.exit(1)
# Initialize the MCP component
try:
mcp = MCPAIComponent(mcp_server_url="http://localhost:8000")
except Exception as e:
print(f"Error initializing MCP component: {e}")
sys.exit(1)
try:
# Get the diff of the last commit
print("Fetching diff from the MCP server...")
diff_info = mcp.get_git_diff(repo_url)
# Extract basic commit info
current_commit_id = diff_info.get('commit_id', 'N/A')
commit_message = diff_info.get('commit_message', 'N/A')
commit_author = diff_info.get('commit_author', 'N/A')
commit_date = diff_info.get('commit_date', 'N/A')
print("\nLatest Commit Info:")
print(f"Commit ID: {current_commit_id}")
print(f"Author: {commit_author}")
print(f"Date: {commit_date}")
print(f"Message: {commit_message}")
# Print statistics
total_files = diff_info.get('total_files_changed', 0)
total_additions = diff_info.get('total_additions', 0)
total_deletions = diff_info.get('total_deletions', 0)
print(f"\nChanges: {total_files} files changed, {total_additions} insertions(+), {total_deletions} deletions(-)")
# Get the current requirements.txt directly from the repository
# We're not relying on the diff info which might be incomplete
with tempfile.TemporaryDirectory() as temp_dir:
print("\nDirectly retrieving current requirements.txt...")
clone_cmd = ["git", "clone", repo_url, temp_dir]
clone_result = subprocess.run(clone_cmd, capture_output=True, text=True)
if clone_result.returncode != 0:
print(f"Error cloning repository: {clone_result.stderr}")
return
requirements_path = os.path.join(temp_dir, 'requirements.txt')
current_requirements = None
if os.path.exists(requirements_path):
with open(requirements_path, 'r') as f:
current_requirements = f.read()
print("Found current requirements.txt")
else:
print("No requirements.txt found in the current commit")
# List the root directory to help debug
print("Listing repository root:")
ls_cmd = ["ls", "-la"]
ls_result = subprocess.run(ls_cmd, cwd=temp_dir, capture_output=True, text=True)
if ls_result.returncode == 0:
print(ls_result.stdout)
if compare_commit:
print(f"\nComparing latest commit {current_commit_id[:7]} with {compare_commit[:7]}")
# Get a direct diff between the commits for debugging purposes
manual_diff = get_diff_between_commits(repo_url, compare_commit, current_commit_id)
print("\nDirect git diff output for requirements.txt:")
print(manual_diff)
# Get requirements.txt from the specified commit
previous_requirements = get_file_from_commit(repo_url, compare_commit, 'requirements.txt')
if current_requirements is not None and previous_requirements is not None:
print("\nAnalyzing requirements.txt compatibility...")
added, removed, changed = compare_requirements(current_requirements, previous_requirements)
if not (added or removed or changed):
print("No changes to requirements.txt between commits.")
else:
if added:
print("\nAdded dependencies:")
for dep in added:
print(f" + {dep}")
if removed:
print("\nRemoved dependencies:")
for dep in removed:
print(f" - {dep}")
if changed:
print("\nChanged dependencies:")
for dep in changed:
print(f" * {dep}")
# Ask AI to analyze compatibility
print("\nAsking AI to analyze requirements compatibility...")
# Prepare the prompt
prompt = f"""
I need to analyze the compatibility between two versions of requirements.txt.
Current requirements:
```
{current_requirements}
```
Previous requirements (from commit {compare_commit[:7]}):
```
{previous_requirements}
```
Changes summary:
- Added: {', '.join(added) if added else 'None'}
- Removed: {', '.join(removed) if removed else 'None'}
- Changed: {', '.join(changed) if changed else 'None'}
Raw diff:
```
{manual_diff}
```
Please analyze these changes and provide:
1. Are these changes compatible with the existing codebase?
2. Any potential dependency conflicts or issues?
3. Security concerns with the changes (e.g., outdated versions, known vulnerabilities)?
4. Recommendations for a safe upgrade path
5. Should this change be approved for merging? (Yes/No/Need more information)
"""
try:
chat_response = mcp.chat(
model_id="openai-gpt-chat",
messages=[
{"role": "system", "content": "You are an expert Python developer with deep knowledge of Python packaging, dependencies, and security. Your task is to analyze changes to requirements.txt files and provide compatibility assessments."},
{"role": "user", "content": prompt}
],
max_tokens=800,
temperature=0.7
)
choices = chat_response.get('choices', [])
if choices:
message = choices[0].get('message', {})
content = message.get('content', '')
print(f"\nAI Analysis:\n{content}")
else:
print("\nUnable to generate AI analysis.")
except Exception as e:
print(f"\nError getting AI analysis: {e}")
else:
if current_requirements is None:
print("No requirements.txt found in the current commit.")
if previous_requirements is None:
print(f"No requirements.txt found in commit {compare_commit[:7]}.")
else:
# If no comparison commit specified, show changes by file
if files_changed:
print("\nChanged Files:")
for i, file_info in enumerate(files_changed, 1):
path = file_info.get('path', 'N/A')
change_type = file_info.get('change_type', 'N/A')
additions = file_info.get('additions', 0)
deletions = file_info.get('deletions', 0)
print(f"{i}. {path} ({change_type}): +{additions} -{deletions}")
# Ask if user wants to see the diff for this file
if i < len(files_changed): # Don't ask for the last file
show_diff = input(f"\nShow diff for {path}? (y/n, default: n): ").lower() == 'y'
if show_diff:
diff = file_info.get('diff', 'No diff available')
print(f"\nDiff for {path}:\n{diff}\n")
input("Press Enter to continue...")
else:
# Always show the diff for the last file without asking
diff = file_info.get('diff', 'No diff available')
print(f"\nDiff for {path}:\n{diff}\n")
else:
print("\nNo files changed in the last commit.")
# Allow the user to request generic AI analysis for any commit
print("\nWould you like an AI analysis of this commit? (y/n, default: y): ", end="")
analyze = input().lower() != 'n'
if analyze:
print("\nAsking AI to analyze the commit...")
chat_response = mcp.chat(
model_id="openai-gpt-chat",
messages=[
{"role": "system", "content": "You are an expert code reviewer and developer. Your task is to analyze git commit diffs and provide insights about the changes."},
{"role": "user", "content": f"Please analyze this commit and provide:\n1. A summary of what changed\n2. The purpose of these changes\n3. Potential impact on the codebase\n4. Any potential issues or improvements\n\nHere's the information about the commit:\n\nCommit Message: {commit_message}\nTotal Changes: {total_files} files changed, {total_additions} insertions(+), {total_deletions} deletions(-)"}
],
max_tokens=500,
temperature=0.7
)
choices = chat_response.get('choices', [])
if choices:
message = choices[0].get('message', {})
content = message.get('content', '')
print(f"\nAI Analysis:\n{content}")
else:
print("\nUnable to generate AI analysis.")
except Exception as e:
print(f"Error analyzing diff: {e}")
sys.exit(1)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Test git diff functionality with requirements.txt compatibility check")
parser.add_argument("repo_url", nargs="?", help="URL of the Git repository")
parser.add_argument("commit_sha", nargs="?", help="SHA of the commit to compare with the latest commit")
parser.add_argument("--help-more", action="store_true", help="Show more detailed help information")
args = parser.parse_args()
if args.help_more:
print("Git Diff Analyzer with Requirements.txt Compatibility Check")
print("=========================================================")
print("\nThis tool analyzes a Git repository to compare requirements.txt files between commits.")
print("It uses AI to assess compatibility, detect potential issues, and provide upgrade recommendations.")
print("\nUsage:")
print(" python test_git_diff.py <repository-url> [commit-sha]")
print("\nExamples:")
print(" python test_git_diff.py https://github.com/username/repo")
print(" - Analyzes the latest commit in the repository")
print(" python test_git_diff.py https://github.com/username/repo abc123")
print(" - Compares the latest commit with commit 'abc123', focusing on requirements.txt")
sys.exit(0)
if not args.repo_url:
parser.print_help()
sys.exit(1)
test_git_diff(args.repo_url, args.commit_sha)