// Copyright 2025 Chris Bunting
// Brief: Python analyzer for Static Analysis MCP Server
// Scope: Provides Pylint, Flake8, Black, and mypy integration
import { execSync, exec } from 'child_process';
import { readFileSync, existsSync, writeFileSync } from 'fs';
import { join, dirname, extname } from 'path';
import {
AnalysisResult,
AnalysisIssue,
AnalysisMetrics,
AnalysisOptions,
SeverityLevel,
IssueCategory,
Language,
FixSuggestion,
AnalyzerInterface
} from '@mcp-code-analysis/shared-types';
import { Logger } from '../utils/Logger.js';
import { ConfigParser } from '../services/ConfigParser.js';
import { ErrorHandler, AnalysisError, ErrorCode } from '../utils/ErrorHandler.js';
export interface PythonRule {
ruleId: string;
description: string;
category: IssueCategory;
severity: SeverityLevel;
tool: 'pylint' | 'flake8' | 'mypy' | 'black';
fixable: boolean;
}
export class PythonAnalyzer implements AnalyzerInterface {
name = 'PythonAnalyzer';
language = Language.PYTHON;
version = '1.0.0';
private logger: Logger;
private configParser: ConfigParser;
private supportedRules: Map<string, PythonRule> = new Map();
private _projectConfig: any = null;
constructor(logger: Logger, configParser: ConfigParser) {
this.logger = logger;
this.configParser = configParser;
this.initializeRules();
}
private initializeRules(): void {
// Initialize common Python rules with metadata
const rules: PythonRule[] = [
{
ruleId: 'unused-variable',
description: 'Unused variable',
category: IssueCategory.MAINTAINABILITY,
severity: SeverityLevel.WARNING,
tool: 'pylint',
fixable: true
},
{
ruleId: 'undefined-variable',
description: 'Undefined variable',
category: IssueCategory.SYNTAX,
severity: SeverityLevel.ERROR,
tool: 'pylint',
fixable: false
},
{
ruleId: 'missing-docstring',
description: 'Missing docstring',
category: IssueCategory.MAINTAINABILITY,
severity: SeverityLevel.WARNING,
tool: 'pylint',
fixable: true
},
{
ruleId: 'invalid-name',
description: 'Invalid name',
category: IssueCategory.STYLE,
severity: SeverityLevel.WARNING,
tool: 'pylint',
fixable: true
},
{
ruleId: 'line-too-long',
description: 'Line too long',
category: IssueCategory.STYLE,
severity: SeverityLevel.WARNING,
tool: 'flake8',
fixable: true
},
{
ruleId: 'trailing-whitespace',
description: 'Trailing whitespace',
category: IssueCategory.STYLE,
severity: SeverityLevel.WARNING,
tool: 'flake8',
fixable: true
},
{
ruleId: 'import-error',
description: 'Import error',
category: IssueCategory.SYNTAX,
severity: SeverityLevel.ERROR,
tool: 'flake8',
fixable: false
},
{
ruleId: 'type-var',
description: 'Type variable',
category: IssueCategory.MAINTAINABILITY,
severity: SeverityLevel.WARNING,
tool: 'mypy',
fixable: false
},
{
ruleId: 'syntax-error',
description: 'Syntax error',
category: IssueCategory.SYNTAX,
severity: SeverityLevel.ERROR,
tool: 'black',
fixable: true
}
];
rules.forEach(rule => {
this.supportedRules.set(rule.ruleId, rule);
});
}
async initializeProject(projectPath: string): Promise<void> {
try {
const configFiles = await this.configParser.findConfigFiles(projectPath, Language.PYTHON);
this._projectConfig = this.configParser.mergeConfigs(configFiles);
this.logger.info('Python project configuration loaded successfully');
} catch (error) {
this._projectConfig = {};
this.logger.info('Using default Python configuration');
}
}
async analyzeFile(filePath: string, options: AnalysisOptions = {}): Promise<AnalysisResult> {
try {
if (!existsSync(filePath)) {
throw ErrorHandler.createFileNotFoundError(filePath);
}
const content = readFileSync(filePath, 'utf-8');
const issues: AnalysisIssue[] = [];
const metrics = this.calculateMetrics(content);
// Run Pylint analysis
if (!options.exclude?.includes('pylint')) {
const pylintIssues = await this.runPylintAnalysis(filePath, options);
issues.push(...pylintIssues);
}
// Run Flake8 analysis
if (!options.exclude?.includes('flake8')) {
const flake8Issues = await this.runFlake8Analysis(filePath, options);
issues.push(...flake8Issues);
}
// Run mypy analysis
if (!options.exclude?.includes('mypy') && (options.rules?.includes('mypy') || filePath.endsWith('.pyi'))) {
const mypyIssues = await this.runMypyAnalysis(filePath, options);
issues.push(...mypyIssues);
}
// Run Black formatting check
if (!options.exclude?.includes('black')) {
const blackIssues = await this.runBlackAnalysis(filePath, options);
issues.push(...blackIssues);
}
return this.createAnalysisResult(filePath, Language.PYTHON, issues, metrics);
} catch (error) {
if (error instanceof AnalysisError) {
throw error;
}
throw ErrorHandler.createAnalysisFailedError(filePath, Language.PYTHON, error as Error);
}
}
async analyzeProject(
projectPath: string,
patterns?: string[],
excludePatterns?: string[],
options: AnalysisOptions = {}
): Promise<AnalysisResult[]> {
try {
await this.initializeProject(projectPath);
const defaultPatterns = [
'**/*.py',
'**/*.pyi',
'!**/venv/**',
'!**/env/**',
'!**/__pycache__/**',
'!**/.pytest_cache/**',
'!**/node_modules/**'
];
const filePatterns = patterns || defaultPatterns;
const exclude = excludePatterns || [];
// Find Python files
const { glob } = await import('fast-glob');
const files = await glob(filePatterns, {
cwd: projectPath,
ignore: exclude,
absolute: true,
onlyFiles: true
});
const results: AnalysisResult[] = [];
for (const file of files) {
try {
const result = await this.analyzeFile(file, options);
results.push(result);
} catch (error) {
this.logger.warn(`Failed to analyze file ${file}:`, error);
}
}
return results;
} catch (error) {
throw ErrorHandler.createAnalysisFailedError(projectPath, Language.PYTHON, error as Error);
}
}
async getRules(_configFile?: string): Promise<any> {
try {
const allRules: any[] = [];
// Add our predefined rules
Array.from(this.supportedRules.values()).forEach(rule => {
allRules.push({
...rule,
enabled: true // All rules are enabled by default
});
});
return {
rules: allRules,
total: allRules.length,
enabled: allRules.filter(r => r.enabled).length,
tools: ['pylint', 'flake8', 'mypy', 'black']
};
} catch (error) {
throw ErrorHandler.createExecutionError('Python tools', 'getting rules', error as Error);
}
}
async configure(config: any): Promise<void> {
try {
if (config.projectPath) {
await this.initializeProject(config.projectPath);
}
this.logger.info('Python analyzer configured successfully');
} catch (error) {
throw ErrorHandler.createInvalidConfigError('python config', error as Error);
}
}
private async runPylintAnalysis(filePath: string, options: AnalysisOptions): Promise<AnalysisIssue[]> {
try {
const command = `pylint --output-format=json ${filePath}`;
const result = execSync(command, { encoding: 'utf-8', timeout: 30000 });
const issues: AnalysisIssue[] = [];
try {
const pylintOutput = JSON.parse(result);
for (const message of pylintOutput) {
const issue = this.convertPylintMessageToIssue(message, filePath);
if (this.shouldIncludeIssue(issue, options)) {
issues.push(issue);
}
}
} catch (parseError) {
// If JSON parsing fails, try to parse text output
this.logger.warn('Pylint JSON output parsing failed, attempting text parsing');
return this.parsePylintTextOutput(result, filePath, options);
}
return issues;
} catch (error) {
this.logger.warn(`Pylint analysis failed for ${filePath}:`, error);
return [];
}
}
private async runFlake8Analysis(filePath: string, options: AnalysisOptions): Promise<AnalysisIssue[]> {
try {
const command = `flake8 --format=json ${filePath}`;
const result = execSync(command, { encoding: 'utf-8', timeout: 30000 });
const issues: AnalysisIssue[] = [];
try {
const flake8Output = JSON.parse(result);
for (const message of flake8Output) {
const issue = this.convertFlake8MessageToIssue(message, filePath);
if (this.shouldIncludeIssue(issue, options)) {
issues.push(issue);
}
}
} catch (parseError) {
// If JSON parsing fails, try to parse text output
return this.parseFlake8TextOutput(result, filePath, options);
}
return issues;
} catch (error) {
this.logger.warn(`Flake8 analysis failed for ${filePath}:`, error);
return [];
}
}
private async runMypyAnalysis(filePath: string, options: AnalysisOptions): Promise<AnalysisIssue[]> {
try {
const command = `mypy --show-column-numbers --no-error-summary ${filePath}`;
const result = execSync(command, { encoding: 'utf-8', timeout: 30000 });
const issues: AnalysisIssue[] = [];
const lines = result.split('\n');
for (const line of lines) {
if (line.trim() && !line.startsWith('Success:')) {
const issue = this.parseMypyLine(line, filePath);
if (issue && this.shouldIncludeIssue(issue, options)) {
issues.push(issue);
}
}
}
return issues;
} catch (error) {
this.logger.warn(`MyPy analysis failed for ${filePath}:`, error);
return [];
}
}
private async runBlackAnalysis(filePath: string, _options: AnalysisOptions): Promise<AnalysisIssue[]> {
try {
const command = `black --check --diff ${filePath}`;
const result = execSync(command, { encoding: 'utf-8', timeout: 30000 });
const issues: AnalysisIssue[] = [];
if (result.includes('would reformat')) {
// Black found formatting issues
const issue: AnalysisIssue = {
id: `black_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`,
ruleId: 'formatting',
message: 'File would be reformatted by Black',
severity: SeverityLevel.WARNING,
line: 1,
column: 1,
category: IssueCategory.STYLE,
fix: {
description: 'Format file with Black',
replacement: 'Run: black ' + filePath,
range: {
start: { line: 1, column: 1 },
end: { line: 1, column: 1 }
}
},
tags: ['black', 'formatting']
};
issues.push(issue);
}
return issues;
} catch (error) {
this.logger.warn(`Black analysis failed for ${filePath}:`, error);
return [];
}
}
private parsePylintTextOutput(output: string, _filePath: string, _options: AnalysisOptions): AnalysisIssue[] {
const issues: AnalysisIssue[] = [];
const lines = output.split('\n');
for (const line of lines) {
const match = line.match(/^(.+):(\d+):(\d+):\s(.+):\s(.+)$/);
if (match) {
const [, , lineNum, colNum, code, message] = match;
const issue: AnalysisIssue = {
id: `pylint_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`,
ruleId: code,
message: message.trim(),
severity: this.mapPylintSeverity(code),
line: parseInt(lineNum),
column: parseInt(colNum),
category: this.categorizePylintRule(code),
tags: ['pylint']
};
if (this.shouldIncludeIssue(issue, _options)) {
issues.push(issue);
}
}
}
return issues;
}
private parseFlake8TextOutput(output: string, _filePath: string, _options: AnalysisOptions): AnalysisIssue[] {
const issues: AnalysisIssue[] = [];
const lines = output.split('\n');
for (const line of lines) {
const match = line.match(/^(.+):(\d+):(\d+):\s(.+)$/);
if (match) {
const [, , lineNum, colNum, codeMessage] = match;
const [code, ...messageParts] = codeMessage.split(' ');
const message = messageParts.join(' ');
const issue: AnalysisIssue = {
id: `flake8_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`,
ruleId: code,
message: message.trim(),
severity: this.mapFlake8Severity(code),
line: parseInt(lineNum),
column: parseInt(colNum),
category: this.categorizeFlake8Rule(code),
tags: ['flake8']
};
if (this.shouldIncludeIssue(issue, _options)) {
issues.push(issue);
}
}
}
return issues;
}
private parseMypyLine(line: string, _filePath: string): AnalysisIssue | null {
const match = line.match(/^(.+):(\d+):(\d+):\s(error|warning|note):\s(.+)$/);
if (match) {
const [, , lineNum, colNum, severity, message] = match;
return {
id: `mypy_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`,
ruleId: 'mypy',
message: message.trim(),
severity: this.mapMypySeverity(severity),
line: parseInt(lineNum),
column: parseInt(colNum),
category: IssueCategory.MAINTAINABILITY,
tags: ['mypy']
};
}
return null;
}
private convertPylintMessageToIssue(message: any, _filePath: string): AnalysisIssue {
return {
id: `pylint_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`,
ruleId: message['message-id'] || 'pylint',
message: message.message,
severity: this.mapPylintSeverity(message.type),
line: message.line,
column: message.column,
category: this.categorizePylintRule(message['message-id']),
tags: ['pylint']
};
}
private convertFlake8MessageToIssue(message: any, _filePath: string): AnalysisIssue {
return {
id: `flake8_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`,
ruleId: message['error-code'] || 'flake8',
message: message.text,
severity: this.mapFlake8Severity(message['error-code']),
line: message['line-number'],
column: message['column-number'],
category: this.categorizeFlake8Rule(message['error-code']),
tags: ['flake8']
};
}
private mapPylintSeverity(type: string): SeverityLevel {
switch (type.toLowerCase()) {
case 'error':
case 'fatal':
return SeverityLevel.ERROR;
case 'warning':
case 'convention':
case 'refactor':
return SeverityLevel.WARNING;
default:
return SeverityLevel.INFO;
}
}
private mapFlake8Severity(code: string): SeverityLevel {
if (code.startsWith('E')) {
return SeverityLevel.ERROR;
} else if (code.startsWith('W')) {
return SeverityLevel.WARNING;
}
return SeverityLevel.INFO;
}
private mapMypySeverity(severity: string): SeverityLevel {
switch (severity.toLowerCase()) {
case 'error':
return SeverityLevel.ERROR;
case 'warning':
return SeverityLevel.WARNING;
case 'note':
return SeverityLevel.INFO;
default:
return SeverityLevel.INFO;
}
}
private categorizePylintRule(ruleId: string): IssueCategory {
if (ruleId.startsWith('E')) {
return IssueCategory.SYNTAX;
} else if (ruleId.startsWith('W')) {
return IssueCategory.STYLE;
} else if (ruleId.startsWith('R')) {
return IssueCategory.MAINTAINABILITY;
} else if (ruleId.startsWith('C')) {
return IssueCategory.MAINTAINABILITY;
}
return IssueCategory.STYLE;
}
private categorizeFlake8Rule(code: string): IssueCategory {
if (code.startsWith('E9') || code.startsWith('F')) {
return IssueCategory.SYNTAX;
} else if (code.startsWith('E') || code.startsWith('W')) {
return IssueCategory.STYLE;
}
return IssueCategory.STYLE;
}
private shouldIncludeIssue(issue: AnalysisIssue, options: AnalysisOptions): boolean {
// Filter by rules
if (options.rules && options.rules.length > 0) {
if (!options.rules.includes(issue.ruleId)) {
return false;
}
}
// Filter by excluded rules
if (options.exclude && options.exclude.length > 0) {
if (options.exclude.includes(issue.ruleId)) {
return false;
}
}
// Filter by fixable
if (options.fixable && !issue.fix) {
return false;
}
return true;
}
private calculateMetrics(content: string): AnalysisMetrics {
const lines = content.split('\n');
const linesOfCode = lines.length;
const commentLines = lines.filter(line =>
line.trim().startsWith('#') ||
line.trim().startsWith('"""') ||
line.trim().startsWith("'''")
).length;
return {
linesOfCode,
commentLines,
cyclomaticComplexity: 0, // Would need more sophisticated analysis
cognitiveComplexity: 0, // Would need more sophisticated analysis
maintainabilityIndex: 0 // Would need more sophisticated analysis
};
}
private createAnalysisResult(
filePath: string,
language: Language,
issues: AnalysisIssue[],
metrics: AnalysisMetrics
): AnalysisResult {
return {
id: `analysis_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`,
timestamp: new Date(),
filePath,
language,
issues,
metrics,
severity: this.calculateOverallSeverity(issues)
};
}
private calculateOverallSeverity(issues: AnalysisIssue[]): SeverityLevel {
if (issues.length === 0) {
return SeverityLevel.INFO;
}
const hasErrors = issues.some(issue => issue.severity === SeverityLevel.ERROR);
const hasWarnings = issues.some(issue => issue.severity === SeverityLevel.WARNING);
if (hasErrors) {
return SeverityLevel.ERROR;
} else if (hasWarnings) {
return SeverityLevel.WARNING;
} else {
return SeverityLevel.INFO;
}
}
async analyze(filePath: string, _content: string, _options?: AnalysisOptions): Promise<AnalysisResult> {
// Implementation will be added
return {
id: `analysis-${Date.now()}`,
timestamp: new Date(),
filePath,
language: this.language,
issues: [],
metrics: {
cyclomaticComplexity: 0,
cognitiveComplexity: 0,
linesOfCode: 0,
commentLines: 0,
maintainabilityIndex: 0
},
severity: SeverityLevel.INFO
};
}
getSupportedExtensions(): string[] {
return ['.py'];
}
isApplicable(filePath: string): boolean {
return filePath.toLowerCase().endsWith('.py');
}
}