Skip to main content
Glama
cbunting99

MCP Code Analysis & Quality Server

by cbunting99
PythonAnalyzer.ts19.6 kB
// Copyright 2025 Chris Bunting // Brief: Python analyzer for Static Analysis MCP Server // Scope: Provides Pylint, Flake8, Black, and mypy integration import { execSync, exec } from 'child_process'; import { readFileSync, existsSync, writeFileSync } from 'fs'; import { join, dirname, extname } from 'path'; import { AnalysisResult, AnalysisIssue, AnalysisMetrics, AnalysisOptions, SeverityLevel, IssueCategory, Language, FixSuggestion, AnalyzerInterface } from '@mcp-code-analysis/shared-types'; import { Logger } from '../utils/Logger.js'; import { ConfigParser } from '../services/ConfigParser.js'; import { ErrorHandler, AnalysisError, ErrorCode } from '../utils/ErrorHandler.js'; export interface PythonRule { ruleId: string; description: string; category: IssueCategory; severity: SeverityLevel; tool: 'pylint' | 'flake8' | 'mypy' | 'black'; fixable: boolean; } export class PythonAnalyzer implements AnalyzerInterface { name = 'PythonAnalyzer'; language = Language.PYTHON; version = '1.0.0'; private logger: Logger; private configParser: ConfigParser; private supportedRules: Map<string, PythonRule> = new Map(); private _projectConfig: any = null; constructor(logger: Logger, configParser: ConfigParser) { this.logger = logger; this.configParser = configParser; this.initializeRules(); } private initializeRules(): void { // Initialize common Python rules with metadata const rules: PythonRule[] = [ { ruleId: 'unused-variable', description: 'Unused variable', category: IssueCategory.MAINTAINABILITY, severity: SeverityLevel.WARNING, tool: 'pylint', fixable: true }, { ruleId: 'undefined-variable', description: 'Undefined variable', category: IssueCategory.SYNTAX, severity: SeverityLevel.ERROR, tool: 'pylint', fixable: false }, { ruleId: 'missing-docstring', description: 'Missing docstring', category: IssueCategory.MAINTAINABILITY, severity: SeverityLevel.WARNING, tool: 'pylint', fixable: true }, { ruleId: 'invalid-name', description: 'Invalid name', category: IssueCategory.STYLE, severity: SeverityLevel.WARNING, tool: 'pylint', fixable: true }, { ruleId: 'line-too-long', description: 'Line too long', category: IssueCategory.STYLE, severity: SeverityLevel.WARNING, tool: 'flake8', fixable: true }, { ruleId: 'trailing-whitespace', description: 'Trailing whitespace', category: IssueCategory.STYLE, severity: SeverityLevel.WARNING, tool: 'flake8', fixable: true }, { ruleId: 'import-error', description: 'Import error', category: IssueCategory.SYNTAX, severity: SeverityLevel.ERROR, tool: 'flake8', fixable: false }, { ruleId: 'type-var', description: 'Type variable', category: IssueCategory.MAINTAINABILITY, severity: SeverityLevel.WARNING, tool: 'mypy', fixable: false }, { ruleId: 'syntax-error', description: 'Syntax error', category: IssueCategory.SYNTAX, severity: SeverityLevel.ERROR, tool: 'black', fixable: true } ]; rules.forEach(rule => { this.supportedRules.set(rule.ruleId, rule); }); } async initializeProject(projectPath: string): Promise<void> { try { const configFiles = await this.configParser.findConfigFiles(projectPath, Language.PYTHON); this._projectConfig = this.configParser.mergeConfigs(configFiles); this.logger.info('Python project configuration loaded successfully'); } catch (error) { this._projectConfig = {}; this.logger.info('Using default Python configuration'); } } async analyzeFile(filePath: string, options: AnalysisOptions = {}): Promise<AnalysisResult> { try { if (!existsSync(filePath)) { throw ErrorHandler.createFileNotFoundError(filePath); } const content = readFileSync(filePath, 'utf-8'); const issues: AnalysisIssue[] = []; const metrics = this.calculateMetrics(content); // Run Pylint analysis if (!options.exclude?.includes('pylint')) { const pylintIssues = await this.runPylintAnalysis(filePath, options); issues.push(...pylintIssues); } // Run Flake8 analysis if (!options.exclude?.includes('flake8')) { const flake8Issues = await this.runFlake8Analysis(filePath, options); issues.push(...flake8Issues); } // Run mypy analysis if (!options.exclude?.includes('mypy') && (options.rules?.includes('mypy') || filePath.endsWith('.pyi'))) { const mypyIssues = await this.runMypyAnalysis(filePath, options); issues.push(...mypyIssues); } // Run Black formatting check if (!options.exclude?.includes('black')) { const blackIssues = await this.runBlackAnalysis(filePath, options); issues.push(...blackIssues); } return this.createAnalysisResult(filePath, Language.PYTHON, issues, metrics); } catch (error) { if (error instanceof AnalysisError) { throw error; } throw ErrorHandler.createAnalysisFailedError(filePath, Language.PYTHON, error as Error); } } async analyzeProject( projectPath: string, patterns?: string[], excludePatterns?: string[], options: AnalysisOptions = {} ): Promise<AnalysisResult[]> { try { await this.initializeProject(projectPath); const defaultPatterns = [ '**/*.py', '**/*.pyi', '!**/venv/**', '!**/env/**', '!**/__pycache__/**', '!**/.pytest_cache/**', '!**/node_modules/**' ]; const filePatterns = patterns || defaultPatterns; const exclude = excludePatterns || []; // Find Python files const { glob } = await import('fast-glob'); const files = await glob(filePatterns, { cwd: projectPath, ignore: exclude, absolute: true, onlyFiles: true }); const results: AnalysisResult[] = []; for (const file of files) { try { const result = await this.analyzeFile(file, options); results.push(result); } catch (error) { this.logger.warn(`Failed to analyze file ${file}:`, error); } } return results; } catch (error) { throw ErrorHandler.createAnalysisFailedError(projectPath, Language.PYTHON, error as Error); } } async getRules(_configFile?: string): Promise<any> { try { const allRules: any[] = []; // Add our predefined rules Array.from(this.supportedRules.values()).forEach(rule => { allRules.push({ ...rule, enabled: true // All rules are enabled by default }); }); return { rules: allRules, total: allRules.length, enabled: allRules.filter(r => r.enabled).length, tools: ['pylint', 'flake8', 'mypy', 'black'] }; } catch (error) { throw ErrorHandler.createExecutionError('Python tools', 'getting rules', error as Error); } } async configure(config: any): Promise<void> { try { if (config.projectPath) { await this.initializeProject(config.projectPath); } this.logger.info('Python analyzer configured successfully'); } catch (error) { throw ErrorHandler.createInvalidConfigError('python config', error as Error); } } private async runPylintAnalysis(filePath: string, options: AnalysisOptions): Promise<AnalysisIssue[]> { try { const command = `pylint --output-format=json ${filePath}`; const result = execSync(command, { encoding: 'utf-8', timeout: 30000 }); const issues: AnalysisIssue[] = []; try { const pylintOutput = JSON.parse(result); for (const message of pylintOutput) { const issue = this.convertPylintMessageToIssue(message, filePath); if (this.shouldIncludeIssue(issue, options)) { issues.push(issue); } } } catch (parseError) { // If JSON parsing fails, try to parse text output this.logger.warn('Pylint JSON output parsing failed, attempting text parsing'); return this.parsePylintTextOutput(result, filePath, options); } return issues; } catch (error) { this.logger.warn(`Pylint analysis failed for ${filePath}:`, error); return []; } } private async runFlake8Analysis(filePath: string, options: AnalysisOptions): Promise<AnalysisIssue[]> { try { const command = `flake8 --format=json ${filePath}`; const result = execSync(command, { encoding: 'utf-8', timeout: 30000 }); const issues: AnalysisIssue[] = []; try { const flake8Output = JSON.parse(result); for (const message of flake8Output) { const issue = this.convertFlake8MessageToIssue(message, filePath); if (this.shouldIncludeIssue(issue, options)) { issues.push(issue); } } } catch (parseError) { // If JSON parsing fails, try to parse text output return this.parseFlake8TextOutput(result, filePath, options); } return issues; } catch (error) { this.logger.warn(`Flake8 analysis failed for ${filePath}:`, error); return []; } } private async runMypyAnalysis(filePath: string, options: AnalysisOptions): Promise<AnalysisIssue[]> { try { const command = `mypy --show-column-numbers --no-error-summary ${filePath}`; const result = execSync(command, { encoding: 'utf-8', timeout: 30000 }); const issues: AnalysisIssue[] = []; const lines = result.split('\n'); for (const line of lines) { if (line.trim() && !line.startsWith('Success:')) { const issue = this.parseMypyLine(line, filePath); if (issue && this.shouldIncludeIssue(issue, options)) { issues.push(issue); } } } return issues; } catch (error) { this.logger.warn(`MyPy analysis failed for ${filePath}:`, error); return []; } } private async runBlackAnalysis(filePath: string, _options: AnalysisOptions): Promise<AnalysisIssue[]> { try { const command = `black --check --diff ${filePath}`; const result = execSync(command, { encoding: 'utf-8', timeout: 30000 }); const issues: AnalysisIssue[] = []; if (result.includes('would reformat')) { // Black found formatting issues const issue: AnalysisIssue = { id: `black_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`, ruleId: 'formatting', message: 'File would be reformatted by Black', severity: SeverityLevel.WARNING, line: 1, column: 1, category: IssueCategory.STYLE, fix: { description: 'Format file with Black', replacement: 'Run: black ' + filePath, range: { start: { line: 1, column: 1 }, end: { line: 1, column: 1 } } }, tags: ['black', 'formatting'] }; issues.push(issue); } return issues; } catch (error) { this.logger.warn(`Black analysis failed for ${filePath}:`, error); return []; } } private parsePylintTextOutput(output: string, _filePath: string, _options: AnalysisOptions): AnalysisIssue[] { const issues: AnalysisIssue[] = []; const lines = output.split('\n'); for (const line of lines) { const match = line.match(/^(.+):(\d+):(\d+):\s(.+):\s(.+)$/); if (match) { const [, , lineNum, colNum, code, message] = match; const issue: AnalysisIssue = { id: `pylint_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`, ruleId: code, message: message.trim(), severity: this.mapPylintSeverity(code), line: parseInt(lineNum), column: parseInt(colNum), category: this.categorizePylintRule(code), tags: ['pylint'] }; if (this.shouldIncludeIssue(issue, _options)) { issues.push(issue); } } } return issues; } private parseFlake8TextOutput(output: string, _filePath: string, _options: AnalysisOptions): AnalysisIssue[] { const issues: AnalysisIssue[] = []; const lines = output.split('\n'); for (const line of lines) { const match = line.match(/^(.+):(\d+):(\d+):\s(.+)$/); if (match) { const [, , lineNum, colNum, codeMessage] = match; const [code, ...messageParts] = codeMessage.split(' '); const message = messageParts.join(' '); const issue: AnalysisIssue = { id: `flake8_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`, ruleId: code, message: message.trim(), severity: this.mapFlake8Severity(code), line: parseInt(lineNum), column: parseInt(colNum), category: this.categorizeFlake8Rule(code), tags: ['flake8'] }; if (this.shouldIncludeIssue(issue, _options)) { issues.push(issue); } } } return issues; } private parseMypyLine(line: string, _filePath: string): AnalysisIssue | null { const match = line.match(/^(.+):(\d+):(\d+):\s(error|warning|note):\s(.+)$/); if (match) { const [, , lineNum, colNum, severity, message] = match; return { id: `mypy_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`, ruleId: 'mypy', message: message.trim(), severity: this.mapMypySeverity(severity), line: parseInt(lineNum), column: parseInt(colNum), category: IssueCategory.MAINTAINABILITY, tags: ['mypy'] }; } return null; } private convertPylintMessageToIssue(message: any, _filePath: string): AnalysisIssue { return { id: `pylint_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`, ruleId: message['message-id'] || 'pylint', message: message.message, severity: this.mapPylintSeverity(message.type), line: message.line, column: message.column, category: this.categorizePylintRule(message['message-id']), tags: ['pylint'] }; } private convertFlake8MessageToIssue(message: any, _filePath: string): AnalysisIssue { return { id: `flake8_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`, ruleId: message['error-code'] || 'flake8', message: message.text, severity: this.mapFlake8Severity(message['error-code']), line: message['line-number'], column: message['column-number'], category: this.categorizeFlake8Rule(message['error-code']), tags: ['flake8'] }; } private mapPylintSeverity(type: string): SeverityLevel { switch (type.toLowerCase()) { case 'error': case 'fatal': return SeverityLevel.ERROR; case 'warning': case 'convention': case 'refactor': return SeverityLevel.WARNING; default: return SeverityLevel.INFO; } } private mapFlake8Severity(code: string): SeverityLevel { if (code.startsWith('E')) { return SeverityLevel.ERROR; } else if (code.startsWith('W')) { return SeverityLevel.WARNING; } return SeverityLevel.INFO; } private mapMypySeverity(severity: string): SeverityLevel { switch (severity.toLowerCase()) { case 'error': return SeverityLevel.ERROR; case 'warning': return SeverityLevel.WARNING; case 'note': return SeverityLevel.INFO; default: return SeverityLevel.INFO; } } private categorizePylintRule(ruleId: string): IssueCategory { if (ruleId.startsWith('E')) { return IssueCategory.SYNTAX; } else if (ruleId.startsWith('W')) { return IssueCategory.STYLE; } else if (ruleId.startsWith('R')) { return IssueCategory.MAINTAINABILITY; } else if (ruleId.startsWith('C')) { return IssueCategory.MAINTAINABILITY; } return IssueCategory.STYLE; } private categorizeFlake8Rule(code: string): IssueCategory { if (code.startsWith('E9') || code.startsWith('F')) { return IssueCategory.SYNTAX; } else if (code.startsWith('E') || code.startsWith('W')) { return IssueCategory.STYLE; } return IssueCategory.STYLE; } private shouldIncludeIssue(issue: AnalysisIssue, options: AnalysisOptions): boolean { // Filter by rules if (options.rules && options.rules.length > 0) { if (!options.rules.includes(issue.ruleId)) { return false; } } // Filter by excluded rules if (options.exclude && options.exclude.length > 0) { if (options.exclude.includes(issue.ruleId)) { return false; } } // Filter by fixable if (options.fixable && !issue.fix) { return false; } return true; } private calculateMetrics(content: string): AnalysisMetrics { const lines = content.split('\n'); const linesOfCode = lines.length; const commentLines = lines.filter(line => line.trim().startsWith('#') || line.trim().startsWith('"""') || line.trim().startsWith("'''") ).length; return { linesOfCode, commentLines, cyclomaticComplexity: 0, // Would need more sophisticated analysis cognitiveComplexity: 0, // Would need more sophisticated analysis maintainabilityIndex: 0 // Would need more sophisticated analysis }; } private createAnalysisResult( filePath: string, language: Language, issues: AnalysisIssue[], metrics: AnalysisMetrics ): AnalysisResult { return { id: `analysis_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`, timestamp: new Date(), filePath, language, issues, metrics, severity: this.calculateOverallSeverity(issues) }; } private calculateOverallSeverity(issues: AnalysisIssue[]): SeverityLevel { if (issues.length === 0) { return SeverityLevel.INFO; } const hasErrors = issues.some(issue => issue.severity === SeverityLevel.ERROR); const hasWarnings = issues.some(issue => issue.severity === SeverityLevel.WARNING); if (hasErrors) { return SeverityLevel.ERROR; } else if (hasWarnings) { return SeverityLevel.WARNING; } else { return SeverityLevel.INFO; } } async analyze(filePath: string, _content: string, _options?: AnalysisOptions): Promise<AnalysisResult> { // Implementation will be added return { id: `analysis-${Date.now()}`, timestamp: new Date(), filePath, language: this.language, issues: [], metrics: { cyclomaticComplexity: 0, cognitiveComplexity: 0, linesOfCode: 0, commentLines: 0, maintainabilityIndex: 0 }, severity: SeverityLevel.INFO }; } getSupportedExtensions(): string[] { return ['.py']; } isApplicable(filePath: string): boolean { return filePath.toLowerCase().endsWith('.py'); } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/cbunting99/mcp-code-analysis-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server