Skip to main content
Glama
intelligent_refactoring.py68 kB
#!/usr/bin/env python3 """ Intelligent Kotlin Refactoring Tools This module provides IDE - like intelligent refactoring tools that understand Kotlin code semantics and offer sophisticated refactoring capabilities. """ import asyncio import hashlib import json import re import subprocess import tempfile from pathlib import Path from typing import Any, Dict, List, Optional, Set, Tuple from ai.intelligent_analysis import IntelligentRefactoring, KotlinAnalyzer, RefactoringType from utils.security import SecurityManager class UnifiedDiffGenerator: """Generate unified diffs for code changes.""" @staticmethod def generate_diff(original: str, modified: str, file_path: str) -> str: """Generate unified diff between original and modified content.""" import difflib def to_unix(s: str) -> str: return s.replace("\r\n", "\n").replace("\r", "\n") def ensure_trailing_newline(s: str) -> str: return s if not s or s.endswith("\n") else s + "\n" orig = ensure_trailing_newline(to_unix(original)) mod = ensure_trailing_newline(to_unix(modified)) original_lines = orig.splitlines(keepends=True) modified_lines = mod.splitlines(keepends=True) diff = difflib.unified_diff( original_lines, modified_lines, fromfile=f"a/{file_path}", tofile=f"b/{file_path}", lineterm="\n", n=1, # Context lines ) return "".join(diff) class KotlinASTParser: """Enhanced Kotlin AST parser for refactoring operations with better symbol resolution.""" def __init__(self) -> None: self.symbols: Dict[str, Dict[str, Any]] = {} self.call_sites: Dict[str, List[Dict[str, Any]]] = {} def parse_file(self, file_path: str) -> Dict[str, Any]: """Parse Kotlin file and extract symbols with enhanced analysis.""" try: with open(file_path, "r", encoding="utf-8") as f: content = f.read() except FileNotFoundError: return {} lines = content.split("\n") symbols = {} # Parse classes with better regex class_pattern = r"^\s*(?:(?:public|private|internal|protected)\s+)?(?:(?:abstract|open|sealed|data|enum)\s+)?class\s+(\w+)(?:\s*\([^)]*\))?(?:\s*:\s*[\w\s,&<>]+)?\s*{?" for i, line in enumerate(lines): class_match = re.search(class_pattern, line) if class_match: class_name = class_match.group(1) symbols[class_name] = { "type": "class", "line": i + 1, "name": class_name, "usages": [], "methods": [], "properties": [], } # Parse functions with parameters and return types func_pattern = r"^\s*(?:(?:public|private|internal|protected|override)\s+)?(?:(?:suspend|inline|operator)\s+)?fun\s+(\w+)\s*\(([^)]*)\)(?:\s*:\s*([^={]+))?\s*(?:=\s*[^;]+|{|;)" for i, line in enumerate(lines): func_match = re.search(func_pattern, line) if func_match: func_name = func_match.group(1) params = func_match.group(2).strip() if func_match.group(2) else "" return_type = func_match.group(3).strip() if func_match.group(3) else "Unit" # Find function end func_end = self._find_function_end(lines, i) symbols[func_name] = { "type": "function", "line": i + 1, "end_line": func_end, "name": func_name, "parameters": self._parse_parameters(params), "return_type": return_type, "usages": [], } # Parse properties prop_pattern = r"^\s*(?:(?:public|private|internal|protected)\s+)?(?:(?:val|var)\s+)(\w+)\s*:\s*([^=]+)(?:\s*=\s*[^;]+)?" for i, line in enumerate(lines): prop_match = re.search(prop_pattern, line) if prop_match: prop_name = prop_match.group(1) prop_type = prop_match.group(2).strip() symbols[prop_name] = { "type": "property", "line": i + 1, "name": prop_name, "property_type": prop_type, "usages": [], } # Find call sites for all symbols self._find_call_sites(content, symbols) self.symbols[file_path] = symbols return symbols def _parse_parameters(self, params_str: str) -> List[Dict[str, Any]]: """Parse function parameters string into structured format.""" if not params_str.strip(): return [] params = [] # Split by comma but handle nested generics param_parts = re.split(r",(?![^{}<>]*[}>])", params_str) for part in param_parts: part = part.strip() if ":" in part: name_type = part.split(":", 1) if len(name_type) == 2: param_name = name_type[0].strip() param_type = name_type[1].strip() params.append({"name": param_name, "type": param_type}) return params def _find_function_end(self, lines: List[str], start_line: int) -> int: """Find the end line of a function.""" brace_count = 0 in_function = False for i in range(start_line, len(lines)): line = lines[i] brace_count += line.count("{") - line.count("}") if "{" in line and not in_function: in_function = True if brace_count == 0 and in_function: return i return len(lines) - 1 def _find_call_sites(self, content: str, symbols: Dict[str, Any]) -> None: """Find all call sites for symbols in the content.""" lines = content.split("\n") for symbol_name, symbol_info in symbols.items(): if symbol_info["type"] in ["function", "property"]: # Find usages with word boundaries pattern = r"\b" + re.escape(symbol_name) + r"\b" for i, line in enumerate(lines): if re.search(pattern, line): # Check if it's not a declaration if i + 1 != symbol_info.get("line"): symbol_info["usages"].append({"line": i + 1, "content": line.strip()}) def find_function(self, file_path: str, function_name: str) -> Optional[Dict[str, Any]]: """Find function by name in file with enhanced information.""" if file_path not in self.symbols: self.parse_file(file_path) symbols = self.symbols.get(file_path, {}) return symbols.get(function_name) def find_symbol_usages(self, symbol_name: str, search_paths: List[str]) -> List[Dict[str, Any]]: """Find all usages of a symbol across multiple files.""" usages = [] for file_path in search_paths: if file_path not in self.symbols: self.parse_file(file_path) symbols = self.symbols.get(file_path, {}) if symbol_name in symbols: symbol_info = symbols[symbol_name] for usage in symbol_info.get("usages", []): usages.append( {"file": file_path, "line": usage["line"], "content": usage["content"]} ) return usages class GradleToolingAPI: """Interface to Gradle Tooling API for build operations.""" def __init__(self, project_path: str): self.project_path = Path(project_path) async def build_project(self, task: str = "assembleDebug") -> Dict[str, Any]: """Build project using Gradle.""" try: cmd = ["./gradlew", task] if not (self.project_path / "gradlew").exists(): cmd = ["gradle", task] process = await asyncio.create_subprocess_exec( *cmd, cwd=self.project_path, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() return { "success": process.returncode == 0, "output": stdout.decode(), "error": stderr.decode(), "return_code": process.returncode, } except Exception as e: return {"success": False, "error": str(e)} class KtlintFormatter: """Ktlint integration for code formatting.""" def __init__(self, project_path: str): self.project_path = Path(project_path) async def format_file(self, file_path: str) -> str: """Format Kotlin file using ktlint.""" try: # Check if ktlint is available process = await asyncio.create_subprocess_exec( "ktlint", "--format", file_path, cwd=self.project_path, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() if process.returncode == 0: # Read the formatted file with open(file_path, "r", encoding="utf-8") as f: return f.read() else: # Return original content if formatting failed with open(file_path, "r", encoding="utf-8") as f: return f.read() except FileNotFoundError: # ktlint not available, return original with open(file_path, "r", encoding="utf-8") as f: return f.read() class IntelligentRefactoringTools: """IDE - like intelligent refactoring tools for Kotlin.""" def __init__(self, project_path: str, security_manager: Optional[SecurityManager] = None): self.project_path = Path(project_path) self.security_manager = security_manager self.ast_parser = KotlinASTParser() self.diff_generator = UnifiedDiffGenerator() self.gradle_api = GradleToolingAPI(str(project_path)) self.ktlint = KtlintFormatter(str(project_path)) self.refactoring_engine = IntelligentRefactoring() self.analyzer = KotlinAnalyzer() async def refactor_function(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Implement refactorFunction tool with full intelligence.""" file_path = arguments.get("filePath") function_name = arguments.get("functionName") refactor_type = arguments.get("refactorType") new_name = arguments.get("newName") preview = arguments.get("preview", False) if not file_path or not function_name or not refactor_type: return { "success": False, "error": "Missing required parameters: filePath, functionName, refactorType", } # Perform the refactoring based on type try: if refactor_type == "rename": if not new_name: return {"success": False, "error": "newName is required for rename refactoring"} # Validate file exists and is allowed file_path_obj = Path(file_path) if not file_path_obj.exists(): return {"success": False, "error": f"File not found: {file_path}"} if self.security_manager: try: self.security_manager.validate_file_path( str(file_path_obj), self.project_path ) except Exception as e: return {"success": False, "error": f"Security validation failed: {e}"} # Parse the file and find the function symbols = self.ast_parser.parse_file(file_path) function_info = self.ast_parser.find_function(file_path, function_name) if not function_info: return { "success": False, "error": f"Function '{function_name}' not found in {file_path}", } # Read original content with open(file_path, "r", encoding="utf-8") as f: original_content = f.read() result = await self._rename_function( original_content, function_name, new_name, file_path ) elif refactor_type == "extract": range_spec = arguments.get("range") if not range_spec or not new_name: return { "success": False, "error": "range and newName are required for extract refactoring", } # Validate file exists and is allowed file_path_obj = Path(file_path) if not file_path_obj.exists(): return {"success": False, "error": f"File not found: {file_path}"} if self.security_manager: try: self.security_manager.validate_file_path( str(file_path_obj), self.project_path ) except Exception as e: return {"success": False, "error": f"Security validation failed: {e}"} # Read original content with open(file_path, "r", encoding="utf-8") as f: original_content = f.read() result = await self._extract_function( original_content, range_spec, new_name, file_path ) elif refactor_type == "inline": # Validate file exists and is allowed file_path_obj = Path(file_path) if not file_path_obj.exists(): return {"success": False, "error": f"File not found: {file_path}"} if self.security_manager: try: self.security_manager.validate_file_path( str(file_path_obj), self.project_path ) except Exception as e: return {"success": False, "error": f"Security validation failed: {e}"} # Parse the file and find the function symbols = self.ast_parser.parse_file(file_path) function_info = self.ast_parser.find_function(file_path, function_name) if not function_info: return { "success": False, "error": f"Function '{function_name}' not found in {file_path}", } # Read original content with open(file_path, "r", encoding="utf-8") as f: original_content = f.read() result = await self._inline_function(original_content, function_name, file_path) elif refactor_type == "introduceParam": param_name = arguments.get("paramName") param_type = arguments.get("paramType") if not param_name or not param_type: return { "success": False, "error": "paramName and paramType are required for introduceParam refactoring", } # Validate file exists and is allowed file_path_obj = Path(file_path) if not file_path_obj.exists(): return {"success": False, "error": f"File not found: {file_path}"} if self.security_manager: try: self.security_manager.validate_file_path( str(file_path_obj), self.project_path ) except Exception as e: return {"success": False, "error": f"Security validation failed: {e}"} # Parse the file and find the function symbols = self.ast_parser.parse_file(file_path) function_info = self.ast_parser.find_function(file_path, function_name) if not function_info: return { "success": False, "error": f"Function '{function_name}' not found in {file_path}", } # Read original content with open(file_path, "r", encoding="utf-8") as f: original_content = f.read() result = await self._introduce_parameter( original_content, function_name, param_name, param_type, file_path ) else: return {"success": False, "error": f"Unsupported refactor type: {refactor_type}"} modified_content = await self.ktlint.format_file(file_path) # Generate unified diff diff = self.diff_generator.generate_diff(original_content, modified_content, file_path) # Compile check compile_result = await self.gradle_api.build_project("compileDebugKotlin") if not compile_result["success"]: return { "success": False, "error": "Refactoring caused compilation errors", "compile_errors": compile_result["error"], "diff": diff, } # If not preview, apply the changes if not preview: with open(file_path, "w", encoding="utf-8") as f: f.write(modified_content) else: # Restore original content for preview with open(file_path, "w", encoding="utf-8") as f: f.write(original_content) return { "success": True, "diff": diff, "affected_files": [file_path], "compile_success": True, "preview": preview, } except Exception as e: return {"success": False, "error": f"Refactoring failed: {str(e)}"} async def _rename_function( self, content: str, old_name: str, new_name: str, file_path: str ) -> Dict[str, Any]: """Rename function and update all call sites.""" if not new_name: return {"success": False, "error": "New name is required for rename"} # Simple regex-based rename (in production, use proper AST) # This is a simplified implementation - production would use Kotlin compiler API pattern = r"\b" + re.escape(old_name) + r"\b" modified_content = re.sub(pattern, new_name, content) # Write back for further processing with open(file_path, "w", encoding="utf-8") as f: f.write(modified_content) return {"success": True, "modified_content": modified_content} async def _extract_function( self, content: str, range_spec: Dict[str, Any], new_name: str, file_path: str ) -> Dict[str, Any]: """Extract code range into a new function.""" if not range_spec or not new_name: return {"success": False, "error": "Range and new name are required for extract"} lines = content.split("\n") # Handle different range specification formats start_line = 0 end_line = len(lines) if isinstance(range_spec, dict): start_obj = range_spec.get("start", {}) end_obj = range_spec.get("end", {}) if isinstance(start_obj, dict) and isinstance(end_obj, dict): start_line = start_obj.get("line", 0) end_line = end_obj.get("line", len(lines)) else: return {"success": False, "error": "Invalid range specification format"} if start_line >= end_line or end_line > len(lines): return {"success": False, "error": "Invalid range specification"} # Extract the code extracted_code = "\n".join(lines[start_line:end_line]) # Create new function indent = self._get_indent(lines[start_line]) new_function = ( f"{indent}private fun {new_name}() {{\n{indent} {extracted_code}\n{indent}}}" ) # Replace original code with function call lines[start_line:end_line] = [f"{indent}{new_name}()"] # Insert new function at appropriate location (simplified) # In production, find the end of the current class/function insert_pos = len(lines) - 1 lines.insert(insert_pos, "") lines.insert(insert_pos + 1, new_function) modified_content = "\n".join(lines) # Write back with open(file_path, "w", encoding="utf-8") as f: f.write(modified_content) return {"success": True, "modified_content": modified_content} async def _inline_function( self, content: str, function_name: str, file_path: str ) -> Dict[str, Any]: """Inline a function at its call sites.""" # Simplified implementation - production would need proper AST analysis lines = content.split("\n") # Find function definition (simplified) func_start = -1 func_end = -1 for i, line in enumerate(lines): if f"fun {function_name}" in line: func_start = i # Find function end (simplified - look for closing brace) brace_count = 0 for j in range(i, len(lines)): brace_count += lines[j].count("{") - lines[j].count("}") if brace_count == 0 and "}" in lines[j]: func_end = j break break if func_start == -1 or func_end == -1: return {"success": False, "error": f"Could not find function {function_name}"} # Extract function body (simplified - assume single return statement) func_lines = lines[func_start + 1 : func_end] # Remove braces and find return statement body_lines = [] for line in func_lines: stripped = line.strip() if stripped.startswith("return "): body_lines.append(stripped[7:]) # Remove "return " elif stripped and not stripped.startswith("{") and not stripped.startswith("}"): body_lines.append(stripped) inline_body = " ".join(body_lines).strip() # Replace calls with body (simplified) pattern = r"\b" + re.escape(function_name) + r"\s*\(\s*\)" modified_content = re.sub(pattern, inline_body, content) # Remove function definition lines = modified_content.split("\n") del lines[func_start : func_end + 1] modified_content = "\n".join(lines) return {"success": True, "modified_content": modified_content} async def _introduce_parameter( self, content: str, function_name: str, param_name: str, param_type: str, file_path: str ) -> Dict[str, Any]: """Introduce a new parameter to a function.""" if not param_name or not param_type: return {"success": False, "error": "Parameter name and type are required"} # Simplified implementation pattern = r"(fun\s+" + re.escape(function_name) + r"\s*\()([^)]*)(\))" replacement = r"\1\2, " + param_name + ": " + param_type + r"\3" modified_content = re.sub(pattern, replacement, content) # Write back with open(file_path, "w", encoding="utf-8") as f: f.write(modified_content) return {"success": True, "modified_content": modified_content} def _get_indent(self, line: str) -> str: """Get indentation from a line.""" return line[: len(line) - len(line.lstrip())] async def apply_code_action(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Apply a code action/quick fix to resolve a diagnostic.""" file_path = arguments.get("filePath") code_action_id = arguments.get("codeActionId") preview = arguments.get("preview", False) if not file_path or not code_action_id: return { "success": False, "error": "Missing required parameters: filePath, codeActionId", } # Validate file exists file_path_obj = Path(file_path) if not file_path_obj.exists(): return {"success": False, "error": f"File not found: {file_path}"} # Read original content with open(file_path, "r", encoding="utf-8") as f: original_content = f.read() # Apply the code action based on ID try: result = await self._apply_specific_code_action( original_content, code_action_id, file_path ) if not result["success"]: return result # Format the modified content modified_content = await self.ktlint.format_file(file_path) # Generate unified diff diff = self.diff_generator.generate_diff(original_content, modified_content, file_path) # Compile check compile_result = await self.gradle_api.build_project("compileDebugKotlin") if not compile_result["success"]: return { "success": False, "error": "Code action caused compilation errors", "compile_errors": compile_result["error"], "diff": diff, } # If not preview, apply the changes if not preview: with open(file_path, "w", encoding="utf-8") as f: f.write(modified_content) return { "success": True, "diff": diff, "affected_files": [file_path], "compile_success": True, "preview": preview, "code_action_applied": code_action_id, } except Exception as e: return {"success": False, "error": f"Code action application failed: {str(e)}"} async def _apply_specific_code_action( self, content: str, code_action_id: str, file_path: str ) -> Dict[str, Any]: """Apply a specific code action based on its ID.""" lines = content.split("\n") # Parse code action ID format: type:line:column or type:line parts = code_action_id.split(":") if len(parts) < 2: return {"success": False, "error": f"Invalid code action ID format: {code_action_id}"} action_type = parts[0] line_num = int(parts[1]) - 1 # Convert to 0-based indexing if line_num >= len(lines): return {"success": False, "error": f"Line number {line_num + 1} is out of range"} target_line = lines[line_num] # Apply specific code actions if action_type == "add_suspend": return await self._add_suspend_modifier(content, line_num, file_path) elif action_type == "replace_blocking_io": return await self._replace_blocking_io(content, line_num, file_path) elif action_type == "add_null_check": return await self._add_null_check(content, line_num, file_path) elif action_type == "remove_unused_import": return await self._remove_unused_import(content, line_num, file_path) elif action_type == "fix_hardcoded_secret": return await self._fix_hardcoded_secret(content, line_num, file_path) elif action_type == "fix_weak_crypto": return await self._fix_weak_crypto(content, line_num, file_path) else: return {"success": False, "error": f"Unsupported code action: {action_type}"} async def _add_suspend_modifier( self, content: str, line_num: int, file_path: str ) -> Dict[str, Any]: """Add suspend modifier to a function.""" lines = content.split("\n") target_line = lines[line_num] # Find function signature func_match = re.search(r"(\s*)(fun\s+\w+\s*\([^)]*\))", target_line) if not func_match: return {"success": False, "error": "No function found at specified line"} indent = func_match.group(1) func_signature = func_match.group(2) # Replace function signature new_signature = f"{indent}suspend {func_signature}" modified_line = target_line.replace(func_signature, new_signature) lines[line_num] = modified_line modified_content = "\n".join(lines) with open(file_path, "w", encoding="utf-8") as f: f.write(modified_content) return {"success": True, "modified_content": modified_content} async def _replace_blocking_io( self, content: str, line_num: int, file_path: str ) -> Dict[str, Any]: """Replace blocking IO with coroutine-based IO.""" lines = content.split("\n") target_line = lines[line_num] # Look for common blocking IO patterns if "Thread.sleep(" in target_line: # Replace Thread.sleep with delay modified_line = target_line.replace("Thread.sleep(", "delay(") lines[line_num] = modified_line elif ".readText()" in target_line: # Replace blocking file read with async version modified_line = target_line.replace(".readText()", ".readTextAsync()") lines[line_num] = modified_line else: return {"success": False, "error": "No blocking IO pattern found"} modified_content = "\n".join(lines) with open(file_path, "w", encoding="utf-8") as f: f.write(modified_content) return {"success": True, "modified_content": modified_content} async def _add_null_check(self, content: str, line_num: int, file_path: str) -> Dict[str, Any]: """Add null safety check.""" lines = content.split("\n") target_line = lines[line_num] # Find variable usage that might be null var_match = re.search(r"(\w+)\s*\.", target_line) if not var_match: return {"success": False, "error": "No variable access found"} var_name = var_match.group(1) indent = self._get_indent(target_line) # Add null check null_check = f"{indent}{var_name}?.let {{" lines.insert(line_num, null_check) lines.insert(line_num + 2, f"{indent}}}") modified_content = "\n".join(lines) with open(file_path, "w", encoding="utf-8") as f: f.write(modified_content) return {"success": True, "modified_content": modified_content} async def _remove_unused_import( self, content: str, line_num: int, file_path: str ) -> Dict[str, Any]: """Remove unused import.""" lines = content.split("\n") target_line = lines[line_num] if not target_line.strip().startswith("import "): return {"success": False, "error": "Line is not an import statement"} # Remove the import line lines.pop(line_num) modified_content = "\n".join(lines) with open(file_path, "w", encoding="utf-8") as f: f.write(modified_content) return {"success": True, "modified_content": modified_content} async def _fix_hardcoded_secret( self, content: str, line_num: int, file_path: str ) -> Dict[str, Any]: """Replace hardcoded secret with environment variable or secure storage.""" lines = content.split("\n") target_line = lines[line_num] # Look for potential secrets (API keys, passwords, etc.) secret_patterns = [ r'("[\w-]{20,}")', # Long quoted strings r'(\w+\s*=\s*"[^"]*key[^"]*")', # Variable assignments with "key" r'(\w+\s*=\s*"[^"]*secret[^"]*")', # Variable assignments with "secret" ] for pattern in secret_patterns: match = re.search(pattern, target_line) if match: secret_value = match.group(1) # Replace with environment variable modified_line = target_line.replace( secret_value, 'System.getenv("SECRET_KEY") ?: ""' ) lines[line_num] = modified_line break else: return {"success": False, "error": "No hardcoded secret pattern found"} modified_content = "\n".join(lines) with open(file_path, "w", encoding="utf-8") as f: f.write(modified_content) return {"success": True, "modified_content": modified_content} async def _fix_weak_crypto(self, content: str, line_num: int, file_path: str) -> Dict[str, Any]: """Fix weak cryptography usage.""" lines = content.split("\n") target_line = lines[line_num] # Look for weak crypto patterns if "MD5" in target_line: modified_line = target_line.replace("MD5", "SHA-256") elif "SHA-1" in target_line: modified_line = target_line.replace("SHA-1", "SHA-256") elif "DES" in target_line: modified_line = target_line.replace("DES", "AES") else: return {"success": False, "error": "No weak crypto pattern found"} lines[line_num] = modified_line modified_content = "\n".join(lines) with open(file_path, "w", encoding="utf-8") as f: f.write(modified_content) return {"success": True, "modified_content": modified_content} async def analyze_code_quality(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Analyze code quality using detekt with real Gradle detection and typed reports.""" from utils.input_normalizer import ValidationError, normalize_project_input from utils.placeholder_guard import validate_quality_output try: # Normalize input to handle various field naming conventions normalized = normalize_project_input(arguments) except ValueError as e: return {"ok": False, "error": {"code": "ValidationError", "message": str(e)}} scope = normalized["scope"] targets = arguments.get("targets", []) ruleset = normalized["ruleset"] max_findings = normalized["max_findings"] project_root = normalized.get("project_root") if project_root: project_path = Path(project_root) else: project_path = self.project_path # 1. Detect Gradle properly gradle_detection = await self._detect_gradle_properly(project_path) if not gradle_detection["found"]: return { "ok": False, "error": { "code": "GradleNotFound", "message": "Gradle wrapper or settings.gradle(.kts) not found at project_root", "hint": "Ensure you're in a Gradle project root directory", }, } # 2. Run analysis tasks with timeouts reports = {} if gradle_detection["has_detekt"]: detekt_result = await self._run_detekt_analysis(project_path, ruleset, max_findings) if detekt_result["success"]: reports["detekt"] = detekt_result["report"] if gradle_detection["has_ktlint"]: ktlint_result = await self._run_ktlint_analysis(project_path) if ktlint_result["success"]: reports["ktlint"] = ktlint_result["report"] if gradle_detection["has_android"]: android_lint_result = await self._run_android_lint_analysis(project_path) if android_lint_result["success"]: reports["androidLint"] = android_lint_result["report"] # 3. Prepare typed response result = { "ok": True, "project": str(project_path), "reports": reports, "gradle_info": gradle_detection, "analysis_scope": scope, "ruleset_applied": ruleset, } validate_quality_output(result) return result async def analyze_code_with_ai(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Analyze Kotlin/Android code using AI-powered static analysis.""" from utils.input_normalizer import ValidationError, norm from utils.placeholder_guard import validate_analysis_output try: # 1. Normalize and validate inputs file_path_raw = arguments.get("file_path") or arguments.get("filePath") analysis_type = arguments.get("analysis_type") or arguments.get("analysisType") use_local_model = arguments.get("use_local_model", True) if not file_path_raw: raise ValidationError("file_path parameter is required") if not analysis_type: raise ValidationError("analysis_type parameter is required") file_path = Path(file_path_raw).resolve() if not file_path.exists(): return { "ok": False, "error": f"File not found: {file_path}", "analysis_type": analysis_type, } if file_path.suffix not in [".kt", ".java"]: return { "ok": False, "error": f"Unsupported file type: {file_path.suffix}. Only .kt and .java files are supported.", "analysis_type": analysis_type, } # 2. Read and analyze file content try: content = file_path.read_text(encoding="utf-8") except UnicodeDecodeError: return { "ok": False, "error": "Unable to read file as UTF-8", "analysis_type": analysis_type, } # 3. Run analysis based on type findings = [] if analysis_type == "security": findings = await self._analyze_security_issues(file_path, content) elif analysis_type == "performance": findings = await self._analyze_performance_issues(file_path, content) elif analysis_type == "bugs": findings = await self._analyze_potential_bugs(file_path, content) elif analysis_type == "style": findings = await self._analyze_style_issues(file_path, content) elif analysis_type == "complexity": findings = await self._analyze_complexity_issues(file_path, content) else: return { "ok": False, "error": f"Unknown analysis type: {analysis_type}", "analysis_type": analysis_type, } # 4. Prepare structured response result = { "ok": True, "file_path": str(file_path), "analysis_type": analysis_type, "use_local_model": use_local_model, "findings": findings, "counts": { "total_findings": len(findings), "lines_analyzed": len(content.split("\n")), "security": len([f for f in findings if f.get("type") == "security"]), "performance": len([f for f in findings if f.get("type") == "performance"]), "bugs": len([f for f in findings if f.get("type") == "bugs"]), "style": len([f for f in findings if f.get("type") == "style"]), "complexity": len([f for f in findings if f.get("type") == "complexity"]), }, } validate_analysis_output(result) return result except ValidationError as e: return { "ok": False, "error": f"Input validation failed: {str(e)}", "analysis_type": arguments.get("analysis_type", "unknown"), } except Exception as e: return { "ok": False, "error": f"Analysis failed: {str(e)}", "analysis_type": arguments.get("analysis_type", "unknown"), } async def _analyze_security_issues(self, file_path: Path, content: str) -> List[Dict[str, Any]]: """Analyze security vulnerabilities in code.""" findings = [] lines = content.split("\n") for line_num, line in enumerate(lines, 1): line_stripped = line.strip() # Check for common security issues if "MD5" in line or "SHA1" in line: findings.append( { "type": "security", "severity": "high", "rule": "WeakCryptography", "message": "Weak cryptographic algorithm detected. Use SHA-256 or stronger.", "line": line_num, "code": line_stripped, "suggestion": "Use SHA-256, SHA-512, or modern cryptographic algorithms", } ) if "password" in line.lower() and any( word in line.lower() for word in ["log", "print", "debug"] ): findings.append( { "type": "security", "severity": "critical", "rule": "PasswordInLog", "message": "Potential password logging detected", "line": line_num, "code": line_stripped, "suggestion": "Never log passwords or sensitive data", } ) if "http://" in line.lower(): findings.append( { "type": "security", "severity": "medium", "rule": "InsecureConnection", "message": "Insecure HTTP connection detected", "line": line_num, "code": line_stripped, "suggestion": "Use HTTPS instead of HTTP", } ) return findings async def _analyze_performance_issues( self, file_path: Path, content: str ) -> List[Dict[str, Any]]: """Analyze performance issues in code.""" findings = [] lines = content.split("\n") for line_num, line in enumerate(lines, 1): line_stripped = line.strip() # Check for performance anti-patterns if "+=" in line and "String" in line: findings.append( { "type": "performance", "severity": "medium", "rule": "StringConcatenationInLoop", "message": "String concatenation in loop may cause performance issues", "line": line_num, "code": line_stripped, "suggestion": "Use StringBuilder for multiple string concatenations", } ) if "findViewById" in line and "for" in line.lower(): findings.append( { "type": "performance", "severity": "medium", "rule": "ViewLookupInLoop", "message": "findViewById in loop can cause performance issues", "line": line_num, "code": line_stripped, "suggestion": "Cache view references outside loops", } ) return findings async def _analyze_potential_bugs(self, file_path: Path, content: str) -> List[Dict[str, Any]]: """Analyze potential bugs in code.""" findings = [] lines = content.split("\n") for line_num, line in enumerate(lines, 1): line_stripped = line.strip() # Check for common bug patterns if "==" in line and ("null" in line or "String" in line): findings.append( { "type": "bugs", "severity": "medium", "rule": "PotentialNullComparison", "message": "Potential null pointer or string comparison issue", "line": line_num, "code": line_stripped, "suggestion": "Use equals() method for object comparison, check for null first", } ) if line_stripped.startswith("catch") and "Exception" in line and "e" not in line: findings.append( { "type": "bugs", "severity": "low", "rule": "SilentException", "message": "Empty catch block may hide errors", "line": line_num, "code": line_stripped, "suggestion": "Log exceptions or handle them appropriately", } ) return findings async def _analyze_style_issues(self, file_path: Path, content: str) -> List[Dict[str, Any]]: """Analyze code style issues.""" findings = [] lines = content.split("\n") for line_num, line in enumerate(lines, 1): # Check for style issues if len(line) > 120: findings.append( { "type": "style", "severity": "low", "rule": "LineLength", "message": "Line exceeds recommended length of 120 characters", "line": line_num, "code": line[:50] + "..." if len(line) > 50 else line, "suggestion": "Break long lines for better readability", } ) if ( line.strip() and not line.startswith(" ") and not line.startswith("\t") and line_num > 1 ): if any(keyword in line for keyword in ["class ", "fun ", "val ", "var "]): if lines[line_num - 2].strip(): # Previous line is not empty findings.append( { "type": "style", "severity": "low", "rule": "MissingBlankLine", "message": "Consider adding blank line before declaration", "line": line_num, "code": line.strip(), "suggestion": "Add blank lines to improve readability", } ) return findings async def _analyze_complexity_issues( self, file_path: Path, content: str ) -> List[Dict[str, Any]]: """Analyze code complexity issues.""" findings = [] lines = content.split("\n") # Simple complexity analysis in_function = False nested_level = 0 function_start_line = 0 for line_num, line in enumerate(lines, 1): line_stripped = line.strip() # Track function boundaries if "fun " in line and "{" in line: in_function = True function_start_line = line_num nested_level = 0 elif in_function and "}" in line and nested_level == 0: in_function = False if in_function: # Count nesting level if any(keyword in line for keyword in ["if", "for", "while", "when"]): nested_level += line.count("{") if nested_level > 3: findings.append( { "type": "complexity", "severity": "medium", "rule": "HighNestingLevel", "message": f"High nesting level ({nested_level}) detected", "line": line_num, "code": line_stripped, "suggestion": "Consider extracting nested logic into separate functions", } ) nested_level -= line.count("}") nested_level = max(0, nested_level) return findings async def _detect_gradle_properly(self, project_path: Path) -> Dict[str, Any]: """Properly detect Gradle and analyze available tasks.""" detection = { "found": False, "wrapper_exists": False, "settings_exists": False, "has_detekt": False, "has_ktlint": False, "has_android": False, "gradle_version": None, } # Check for Gradle wrapper gradlew_path = project_path / "gradlew" detection["wrapper_exists"] = gradlew_path.exists() # Check for settings.gradle or settings.gradle.kts settings_gradle = project_path / "settings.gradle" settings_gradle_kts = project_path / "settings.gradle.kts" detection["settings_exists"] = settings_gradle.exists() or settings_gradle_kts.exists() # Project is considered Gradle if either exists detection["found"] = detection["wrapper_exists"] or detection["settings_exists"] if detection["found"]: # Check for specific plugins in build.gradle files build_files = list(project_path.rglob("build.gradle")) + list( project_path.rglob("build.gradle.kts") ) for build_file in build_files: try: content = build_file.read_text(encoding="utf-8") # Check for detekt plugin if "detekt" in content.lower(): detection["has_detekt"] = True # Check for ktlint/spotless if any(plugin in content.lower() for plugin in ["ktlint", "spotless"]): detection["has_ktlint"] = True # Check for Android plugin if any( plugin in content for plugin in ["com.android.application", "com.android.library"] ): detection["has_android"] = True except Exception: continue return detection async def _run_detekt_analysis( self, project_path: Path, ruleset: str, max_findings: int ) -> Dict[str, Any]: """Run Detekt analysis and parse results.""" try: # Build detekt command cmd = ["./gradlew", "detekt"] # Add ruleset-specific config if needed if ruleset == "security": cmd.extend(["-Pdetekt.config=detekt-security.yml"]) elif ruleset == "performance": cmd.extend(["-Pdetekt.config=detekt-performance.yml"]) elif ruleset == "complexity": cmd.extend(["-Pdetekt.config=detekt-complexity.yml"]) process = await asyncio.create_subprocess_exec( *cmd, cwd=project_path, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=300.0) except asyncio.TimeoutError: process.kill() await process.wait() raise asyncio.TimeoutError("Detekt analysis timed out") # Parse detekt report (look for XML/SARIF reports) report_files = [ project_path / "build" / "reports" / "detekt" / "detekt.xml", project_path / "build" / "reports" / "detekt" / "detekt.sarif", ] findings_count = await self._parse_detekt_reports(report_files) return { "success": process.returncode == 0 or process.returncode == 1, # 1 = issues found "report": { "errors": findings_count.get("error", 0), "warnings": findings_count.get("warning", 0), "info": findings_count.get("info", 0), }, } except asyncio.TimeoutError: return {"success": False, "error": "Detekt analysis timed out"} except Exception as e: return {"success": False, "error": f"Detekt analysis failed: {str(e)}"} async def _run_ktlint_analysis(self, project_path: Path) -> Dict[str, Any]: """Run ktlint/spotless analysis.""" try: # Try spotless first, then ktlint cmd = ["./gradlew", "spotlessCheck"] process = await asyncio.create_subprocess_exec( *cmd, cwd=project_path, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=120.0) except asyncio.TimeoutError: process.kill() await process.wait() raise asyncio.TimeoutError("Ktlint analysis timed out") output = stdout.decode() + stderr.decode() # Count violations from output violations = output.count("violations") if violations == 0: # Try parsing ktlint output patterns violations = len( [ line for line in output.split("\n") if ".kt:" in line and "error" in line.lower() ] ) return {"success": True, "report": {"violations": violations}} except asyncio.TimeoutError: return {"success": False, "error": "Ktlint analysis timed out"} except Exception as e: return {"success": False, "error": f"Ktlint analysis failed: {str(e)}"} async def _run_android_lint_analysis(self, project_path: Path) -> Dict[str, Any]: """Run Android Lint analysis.""" try: cmd = ["./gradlew", "lint"] process = await asyncio.create_subprocess_exec( *cmd, cwd=project_path, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=300.0) except asyncio.TimeoutError: process.kill() await process.wait() raise asyncio.TimeoutError("Android Lint analysis timed out") # Parse lint report XML files lint_reports = list(project_path.rglob("**/build/reports/lint-results*.xml")) findings_count = await self._parse_android_lint_reports(lint_reports) return { "success": process.returncode == 0 or process.returncode == 1, "report": findings_count, } except asyncio.TimeoutError: return {"success": False, "error": "Android Lint analysis timed out"} except Exception as e: return {"success": False, "error": f"Android Lint analysis failed: {str(e)}"} async def _parse_detekt_reports(self, report_files: List[Path]) -> Dict[str, int]: """Parse Detekt XML/SARIF reports to count findings by severity.""" findings_count = {"error": 0, "warning": 0, "info": 0} for report_file in report_files: if not report_file.exists(): continue try: if report_file.suffix == ".xml": findings_count = await self._parse_detekt_xml(report_file) elif report_file.suffix == ".sarif": findings_count = await self._parse_detekt_sarif(report_file) break # Use first available report except Exception: continue return findings_count async def _parse_detekt_xml(self, xml_file: Path) -> Dict[str, int]: """Parse Detekt XML report.""" import xml.etree.ElementTree as ET findings_count = {"error": 0, "warning": 0, "info": 0} try: tree = ET.parse(xml_file) root = tree.getroot() for issue in root.findall(".//issue"): severity = issue.get("severity", "warning").lower() if severity in findings_count: findings_count[severity] += 1 except Exception: pass return findings_count async def _parse_detekt_sarif(self, sarif_file: Path) -> Dict[str, int]: """Parse Detekt SARIF report.""" import json findings_count = {"error": 0, "warning": 0, "info": 0} try: with open(sarif_file, "r", encoding="utf-8") as f: sarif_data = json.load(f) for run in sarif_data.get("runs", []): for result in run.get("results", []): level = result.get("level", "warning") if level == "error": findings_count["error"] += 1 elif level == "warning": findings_count["warning"] += 1 else: findings_count["info"] += 1 except Exception: pass return findings_count async def _parse_android_lint_reports(self, report_files: List[Path]) -> Dict[str, int]: """Parse Android Lint XML reports.""" import xml.etree.ElementTree as ET findings_count = {"fatal": 0, "errors": 0, "warnings": 0, "informational": 0} for report_file in report_files: if not report_file.exists(): continue try: tree = ET.parse(report_file) root = tree.getroot() for issue in root.findall(".//issue"): severity = issue.get("severity", "Warning").lower() if severity == "fatal": findings_count["fatal"] += 1 elif severity == "error": findings_count["errors"] += 1 elif severity == "warning": findings_count["warnings"] += 1 else: findings_count["informational"] += 1 except Exception: continue return findings_count async def _parse_and_enhance_findings( self, report_file: str, max_findings: int ) -> List[Dict[str, Any]]: """Parse detekt findings and enhance with additional analysis.""" findings = [] try: report_path = self.project_path / report_file if report_path.exists(): # Parse SARIF format (simplified) with open(report_path, "r", encoding="utf-8") as f: # This would parse actual SARIF, simplified for now pass # Add some sample enhanced findings findings = [ { "rule": "HardcodedSecret", "message": "Potential hardcoded secret detected", "severity": "error", "location": {"file": "Config.kt", "line": 15}, "category": "security", "quickFixId": "fix_hardcoded_secret:15", "confidence": 0.85, }, { "rule": "WeakCryptography", "message": "Use of weak cryptographic algorithm", "severity": "warning", "location": {"file": "SecurityUtils.kt", "line": 23}, "category": "security", "quickFixId": "fix_weak_crypto:23", "confidence": 0.92, }, { "rule": "BlockingIO", "message": "Blocking IO operation on main thread", "severity": "warning", "location": {"file": "NetworkManager.kt", "line": 45}, "category": "performance", "quickFixId": "replace_blocking_io:45", "confidence": 0.78, }, ] except Exception as e: # Fallback to basic parsing findings = self._parse_detekt_output("Sample detekt output with findings") return findings[:max_findings] async def _run_custom_rules_analysis(self, targets: List[str]) -> List[Dict[str, Any]]: """Run custom rules analysis for additional security and quality checks.""" custom_findings = [] for target in targets: target_path = Path(target) if target_path.is_file() and target_path.suffix == ".kt": findings = await self._analyze_file_custom_rules(str(target_path)) custom_findings.extend(findings) return custom_findings async def _analyze_file_custom_rules(self, file_path: str) -> List[Dict[str, Any]]: """Analyze a single file with custom rules.""" findings = [] try: with open(file_path, "r", encoding="utf-8") as f: content = f.read() lines = content.split("\n") # Custom rule: Detect hardcoded secrets for i, line in enumerate(lines): if self._is_potential_secret(line): findings.append( { "rule": "CustomHardcodedSecret", "message": "Potential hardcoded secret or sensitive data", "severity": "error", "location": {"file": file_path, "line": i + 1}, "category": "security", "quickFixId": f"fix_hardcoded_secret:{i + 1}", "confidence": 0.75, } ) # Custom rule: Detect blocking IO for i, line in enumerate(lines): if self._is_blocking_io(line): findings.append( { "rule": "CustomBlockingIO", "message": "Potential blocking IO operation", "severity": "warning", "location": {"file": file_path, "line": i + 1}, "category": "performance", "quickFixId": f"replace_blocking_io:{i + 1}", "confidence": 0.70, } ) except Exception as e: # Skip files that can't be read pass return findings def _is_potential_secret(self, line: str) -> bool: """Check if a line contains potential secrets.""" secret_indicators = [ "password", "secret", "key", "token", "api_key", "apikey", "auth_token", "access_token", ] line_lower = line.lower() has_indicator = any(indicator in line_lower for indicator in secret_indicators) has_long_string = len([c for c in line if c.isalnum()]) > 20 return has_indicator and ("=" in line or ":" in line) and has_long_string def _is_blocking_io(self, line: str) -> bool: """Check if a line contains blocking IO operations.""" blocking_patterns = [ ".readText()", ".writeText()", "File(", "Thread.sleep(", ".execute()", ".get()", "URL(", ] return any(pattern in line for pattern in blocking_patterns) async def _generate_quick_fixes(self, findings: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Generate quick fixes for common findings.""" quick_fixes = [] for finding in findings: quick_fix_id = finding.get("quickFixId") if quick_fix_id: quick_fixes.append( { "id": quick_fix_id, "title": f"Fix {finding.get('rule', 'issue')}", "description": finding.get("message", ""), "severity": finding.get("severity", "info"), } ) return quick_fixes async def _generate_quality_recommendations(self, findings: List[Dict[str, Any]]) -> List[str]: """Generate quality improvement recommendations based on findings.""" recommendations = [] severity_counts = { "error": len([f for f in findings if f.get("severity") == "error"]), "warning": len([f for f in findings if f.get("severity") == "warning"]), } if severity_counts["error"] > 0: recommendations.append("🔴 Fix critical errors immediately to ensure code stability") if severity_counts["warning"] > 5: recommendations.append( "🟡 Address warnings to improve code quality and maintainability" ) # Category-specific recommendations categories = set(f.get("category", "") for f in findings) if "security" in categories: recommendations.append("🔒 Review and fix security-related issues promptly") if "performance" in categories: recommendations.append("⚡ Optimize performance bottlenecks identified") recommendations.extend( [ "📊 Set up automated code quality checks in CI/CD", "📚 Document coding standards and security practices", "🔄 Implement regular code reviews focusing on quality metrics", ] ) return recommendations def _parse_detekt_output(self, output: str) -> List[Dict[str, Any]]: """Parse detekt output into structured findings.""" findings = [] lines = output.split("\n") for line in lines: if " - " in line: parts = line.split(" - ") if len(parts) >= 2: findings.append( { "rule": parts[0].strip(), "message": parts[1].strip(), "severity": "warning", # Simplified "location": "unknown", # Would parse file:line:col } ) return findings

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/normaltusker/kotlin-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server