index.tsโข21.5 kB
#!/usr/bin/env node
/**
* Python Code Review MCP Server
* Comprehensive code quality and security analysis for backend developers
*
* Features:
* - Detailed security vulnerability scanning
* - Code quality analysis with best practices
* - Consistent, comprehensive reporting
* - Focus on Python backend development patterns
*/
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import {
CallToolRequestSchema,
ListToolsRequestSchema,
Tool
} from '@modelcontextprotocol/sdk/types.js';
import { z } from 'zod';
import { PythonAnalyzer, AnalysisResult } from './python-analyzer.js';
import { ReportFormatter } from './report-formatter.js';
// Input validation schemas
const ReviewCodeSchema = z.object({
code: z.string().min(1, "Code cannot be empty"),
filename: z.string().optional().default("unknown.py"),
reportType: z.enum(["detailed", "summary", "security"]).optional().default("detailed")
});
const SecurityAuditSchema = z.object({
code: z.string().min(1, "Code cannot be empty"),
filename: z.string().optional().default("unknown.py")
});
const QualityAnalysisSchema = z.object({
code: z.string().min(1, "Code cannot be empty"),
filename: z.string().optional().default("unknown.py"),
includeStyle: z.boolean().optional().default(true),
includeMaintainability: z.boolean().optional().default(true)
});
const CompareCodeSchema = z.object({
originalCode: z.string().min(1, "Original code cannot be empty"),
revisedCode: z.string().min(1, "Revised code cannot be empty"),
filename: z.string().optional().default("unknown.py")
});
const GetSuggestionsSchema = z.object({
code: z.string().min(1, "Code cannot be empty"),
filename: z.string().optional().default("unknown.py"),
focusArea: z.enum(["security", "quality", "performance", "style", "all"]).optional().default("all")
});
class PythonCodeReviewServer {
private server: Server;
private analyzer: PythonAnalyzer;
private formatter: ReportFormatter;
constructor() {
this.analyzer = new PythonAnalyzer();
this.formatter = new ReportFormatter();
this.server = new Server({
name: 'python-code-review-mcp',
version: '1.0.0'
}, {
capabilities: {
tools: {}
}
});
this.setupToolHandlers();
}
private setupToolHandlers(): void {
// List available tools
this.server.setRequestHandler(ListToolsRequestSchema, async () => ({
tools: [
{
name: 'review_python_code',
description: 'Comprehensive Python code analysis focusing on quality and security. Provides detailed reports with actionable recommendations.',
inputSchema: {
type: 'object',
properties: {
code: {
type: 'string',
description: 'Python code to analyze'
},
filename: {
type: 'string',
description: 'Name of the file (optional, defaults to unknown.py)',
default: 'unknown.py'
},
reportType: {
type: 'string',
enum: ['detailed', 'summary', 'security'],
description: 'Type of report to generate',
default: 'detailed'
}
},
required: ['code']
}
},
{
name: 'security_audit',
description: 'Focused security vulnerability analysis for Python code. Identifies SQL injection, command injection, and other security risks.',
inputSchema: {
type: 'object',
properties: {
code: {
type: 'string',
description: 'Python code to audit for security vulnerabilities'
},
filename: {
type: 'string',
description: 'Name of the file (optional)',
default: 'unknown.py'
}
},
required: ['code']
}
},
{
name: 'analyze_code_quality',
description: 'Deep code quality analysis including style, maintainability, performance, and best practices compliance.',
inputSchema: {
type: 'object',
properties: {
code: {
type: 'string',
description: 'Python code to analyze for quality issues'
},
filename: {
type: 'string',
description: 'Name of the file (optional)',
default: 'unknown.py'
},
includeStyle: {
type: 'boolean',
description: 'Include style and naming convention checks',
default: true
},
includeMaintainability: {
type: 'boolean',
description: 'Include maintainability and documentation checks',
default: true
}
},
required: ['code']
}
},
{
name: 'compare_code_versions',
description: 'Compare two versions of Python code to identify improvements or regressions in quality and security.',
inputSchema: {
type: 'object',
properties: {
originalCode: {
type: 'string',
description: 'Original version of the Python code'
},
revisedCode: {
type: 'string',
description: 'Revised version of the Python code'
},
filename: {
type: 'string',
description: 'Name of the file (optional)',
default: 'unknown.py'
}
},
required: ['originalCode', 'revisedCode']
}
},
{
name: 'get_improvement_suggestions',
description: 'Get specific, actionable suggestions for improving Python code quality, security, and maintainability.',
inputSchema: {
type: 'object',
properties: {
code: {
type: 'string',
description: 'Python code to get improvement suggestions for'
},
filename: {
type: 'string',
description: 'Name of the file (optional)',
default: 'unknown.py'
},
focusArea: {
type: 'string',
enum: ['security', 'quality', 'performance', 'style', 'all'],
description: 'Focus area for suggestions',
default: 'all'
}
},
required: ['code']
}
}
] as Tool[]
}));
// Handle tool calls
this.server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
try {
switch (name) {
case 'review_python_code':
return await this.handleReviewCode(args);
case 'security_audit':
return await this.handleSecurityAudit(args);
case 'analyze_code_quality':
return await this.handleQualityAnalysis(args);
case 'compare_code_versions':
return await this.handleCompareCode(args);
case 'get_improvement_suggestions':
return await this.handleGetSuggestions(args);
default:
throw new Error(`Unknown tool: ${name}`);
}
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
return {
content: [
{
type: 'text',
text: `โ **ERROR**: ${errorMessage}\n\nPlease check your input parameters and try again.`
}
]
};
}
});
}
private async handleReviewCode(args: unknown) {
const { code, filename, reportType } = ReviewCodeSchema.parse(args);
const result = this.analyzer.analyzePythonCode(code, filename);
let report: string;
switch (reportType) {
case 'summary':
report = this.formatter.generateSummaryReport(result);
break;
case 'security':
report = this.formatter.generateSecurityReport(result);
break;
default:
report = this.formatter.generateDetailedReport(result);
}
return {
content: [
{
type: 'text',
text: report
}
]
};
}
private async handleSecurityAudit(args: unknown) {
const { code, filename } = SecurityAuditSchema.parse(args);
const result = this.analyzer.analyzePythonCode(code, filename);
const securityReport = this.formatter.generateSecurityReport(result);
// Add additional security insights
const securityInsights = this.generateSecurityInsights(result);
return {
content: [
{
type: 'text',
text: `${securityReport}\n\n${securityInsights}`
}
]
};
}
private async handleQualityAnalysis(args: unknown) {
const { code, filename, includeStyle, includeMaintainability } = QualityAnalysisSchema.parse(args);
const result = this.analyzer.analyzePythonCode(code, filename);
// Filter issues based on options
let filteredIssues = result.issues;
if (!includeStyle) {
filteredIssues = filteredIssues.filter(issue =>
issue.type !== 'style' && !issue.rule.includes('naming-convention')
);
}
if (!includeMaintainability) {
filteredIssues = filteredIssues.filter(issue => issue.type !== 'maintainability');
}
const filteredResult = { ...result, issues: filteredIssues, totalIssues: filteredIssues.length };
const qualityReport = this.generateQualityReport(filteredResult);
return {
content: [
{
type: 'text',
text: qualityReport
}
]
};
}
private async handleCompareCode(args: unknown) {
const { originalCode, revisedCode, filename } = CompareCodeSchema.parse(args);
const originalResult = this.analyzer.analyzePythonCode(originalCode, `${filename} (original)`);
const revisedResult = this.analyzer.analyzePythonCode(revisedCode, `${filename} (revised)`);
const comparisonReport = this.generateComparisonReport(originalResult, revisedResult);
return {
content: [
{
type: 'text',
text: comparisonReport
}
]
};
}
private async handleGetSuggestions(args: unknown) {
const { code, filename, focusArea } = GetSuggestionsSchema.parse(args);
const result = this.analyzer.analyzePythonCode(code, filename);
const suggestions = this.generateFocusedSuggestions(result, focusArea);
return {
content: [
{
type: 'text',
text: suggestions
}
]
};
}
private generateSecurityInsights(result: AnalysisResult): string {
const securityIssues = result.issues.filter(i => i.type === 'security');
const insights = [
'## ๐ง **SECURITY INSIGHTS**',
''
];
if (securityIssues.length === 0) {
insights.push('โ
**No immediate security vulnerabilities detected in this code.**');
insights.push('');
insights.push('**Proactive Security Measures:**');
insights.push('- Implement input validation for all user inputs');
insights.push('- Use environment variables for sensitive configuration');
insights.push('- Enable logging for security-relevant events');
insights.push('- Regularly update dependencies to patch known vulnerabilities');
return insights.join('\n');
}
const vulnTypes = [...new Set(securityIssues.map(i => i.rule))];
insights.push(`**Vulnerability Types Found:** ${vulnTypes.length}`);
insights.push(`**Most Common:** ${this.getMostCommonVulnerability(securityIssues)}`);
insights.push('');
const criticalSecurity = securityIssues.filter(i => i.severity === 'critical');
if (criticalSecurity.length > 0) {
insights.push('๐จ **CRITICAL SECURITY ALERT:**');
insights.push(`This code contains ${criticalSecurity.length} critical security vulnerabilities that could lead to:`);
insights.push('- Data breaches and unauthorized access');
insights.push('- Remote code execution');
insights.push('- SQL injection attacks');
insights.push('- Complete system compromise');
insights.push('');
insights.push('**IMMEDIATE ACTION REQUIRED BEFORE DEPLOYMENT**');
}
return insights.join('\n');
}
private generateQualityReport(result: AnalysisResult): string {
const sections = [
'๐ **CODE QUALITY ANALYSIS REPORT**',
'=' + '='.repeat(50),
`**File:** ${result.fileName}`,
`**Quality Score:** ${result.codeQualityScore}/100 ${this.getQualityRating(result.codeQualityScore)}`,
'',
this.generateQualityBreakdown(result),
'',
this.generateQualityRecommendations(result)
];
return sections.join('\n');
}
private generateComparisonReport(original: AnalysisResult, revised: AnalysisResult): string {
const improvementScore = revised.codeQualityScore - original.codeQualityScore;
const securityImprovement = revised.securityScore - original.securityScore;
const issueImprovement = original.totalIssues - revised.totalIssues;
const sections = [
'๐ **CODE COMPARISON REPORT**',
'=' + '='.repeat(50),
'',
'## ๐ **IMPROVEMENT METRICS**',
'',
`| Metric | Original | Revised | Change |`,
`|--------|----------|---------|--------|`,
`| Quality Score | ${original.codeQualityScore}/100 | ${revised.codeQualityScore}/100 | ${this.formatChange(improvementScore)} |`,
`| Security Score | ${original.securityScore}/100 | ${revised.securityScore}/100 | ${this.formatChange(securityImprovement)} |`,
`| Total Issues | ${original.totalIssues} | ${revised.totalIssues} | ${this.formatChange(issueImprovement)} |`,
`| Critical Issues | ${original.criticalIssues} | ${revised.criticalIssues} | ${this.formatChange(original.criticalIssues - revised.criticalIssues)} |`,
'',
this.generateComparisonSummary(improvementScore, securityImprovement, issueImprovement),
'',
this.generateRemainingIssues(revised)
];
return sections.join('\n');
}
private generateFocusedSuggestions(result: AnalysisResult, focusArea: string): string {
const sections = [
`๐ก **${focusArea.toUpperCase()} IMPROVEMENT SUGGESTIONS**`,
'=' + '='.repeat(50),
`**File:** ${result.fileName}`,
''
];
let relevantIssues = result.issues;
if (focusArea !== 'all') {
if (focusArea === 'style') {
relevantIssues = result.issues.filter(i =>
i.type === 'style' || i.rule.includes('naming-convention')
);
} else {
relevantIssues = result.issues.filter(i => i.type === focusArea);
}
}
if (relevantIssues.length === 0) {
sections.push(`โ
No ${focusArea} issues found! Your code looks good in this area.`);
sections.push('');
sections.push('## ๐ **GENERAL BEST PRACTICES:**');
sections.push(this.getGeneralBestPractices(focusArea));
return sections.join('\n');
}
sections.push(`## ๐ฏ **${focusArea.toUpperCase()} ISSUES TO ADDRESS (${relevantIssues.length})**`);
sections.push('');
const groupedByLine = this.groupIssuesByLine(relevantIssues);
Object.entries(groupedByLine).forEach(([line, issues]) => {
sections.push(`### Line ${line}:`);
issues.forEach(issue => {
sections.push(`- ${this.getSeverityIcon(issue.severity)} **${issue.message}**`);
if (issue.suggestion) {
sections.push(` ๐ก *${issue.suggestion}*`);
}
});
sections.push('');
});
sections.push(this.getFocusAreaBestPractices(focusArea));
return sections.join('\n');
}
// Helper methods
private getMostCommonVulnerability(issues: any[]): string {
const counts: Record<string, number> = {};
issues.forEach(issue => {
counts[issue.rule] = (counts[issue.rule] || 0) + 1;
});
const mostCommon = Object.entries(counts).sort(([,a], [,b]) => b - a)[0];
return mostCommon ? mostCommon[0].replace(/-/g, ' ').replace(/\b\w/g, l => l.toUpperCase()) : 'None';
}
private getQualityRating(score: number): string {
if (score >= 90) return '๐ Excellent';
if (score >= 80) return 'โ
Good';
if (score >= 70) return 'โ ๏ธ Fair';
if (score >= 60) return 'โ Poor';
return '๐จ Critical';
}
private generateQualityBreakdown(result: AnalysisResult): string {
const qualityIssues = result.issues.filter(i => i.type === 'quality');
const styleIssues = result.issues.filter(i => i.type === 'style');
const maintainabilityIssues = result.issues.filter(i => i.type === 'maintainability');
return [
'## ๐ **QUALITY BREAKDOWN**',
'',
`- **Code Quality Issues:** ${qualityIssues.length}`,
`- **Style Issues:** ${styleIssues.length}`,
`- **Maintainability Issues:** ${maintainabilityIssues.length}`,
`- **Overall Density:** ${(result.totalIssues / result.totalLines * 100).toFixed(2)} issues per 100 lines`
].join('\n');
}
private generateQualityRecommendations(result: AnalysisResult): string {
const recommendations = ['## ๐ฏ **QUALITY RECOMMENDATIONS**', ''];
if (result.codeQualityScore >= 90) {
recommendations.push('๐ **Excellent code quality!** Continue maintaining these high standards.');
} else if (result.codeQualityScore >= 80) {
recommendations.push('โ
**Good code quality.** Focus on addressing the remaining medium-priority issues.');
} else {
recommendations.push('๐ **Improvement needed.** Prioritize fixing high-impact quality issues.');
}
recommendations.push('');
recommendations.push('**Suggested Actions:**');
result.recommendations.forEach(rec => {
recommendations.push(`- ${rec}`);
});
return recommendations.join('\n');
}
private formatChange(change: number): string {
if (change > 0) return `๐ +${change}`;
if (change < 0) return `๐ ${change}`;
return 'โก๏ธ 0';
}
private generateComparisonSummary(quality: number, security: number, issues: number): string {
const sections = ['## ๐ **SUMMARY**', ''];
if (quality > 0 && security > 0 && issues > 0) {
sections.push('๐ **GREAT IMPROVEMENT!** All metrics have improved.');
} else if (quality < 0 || security < 0 || issues < 0) {
sections.push('โ ๏ธ **REGRESSION DETECTED** - Some metrics have worsened.');
} else {
sections.push('โก๏ธ **MIXED RESULTS** - Some improvements, some areas need attention.');
}
return sections.join('\n');
}
private generateRemainingIssues(result: AnalysisResult): string {
if (result.totalIssues === 0) {
return 'โ
**NO REMAINING ISSUES** - Code is ready for production!';
}
return [
'## ๐ **REMAINING ISSUES TO ADDRESS**',
'',
`- Critical: ${result.criticalIssues}`,
`- High: ${result.highIssues}`,
`- Medium: ${result.mediumIssues}`,
`- Low: ${result.lowIssues}`
].join('\n');
}
private groupIssuesByLine(issues: any[]): Record<string, any[]> {
const grouped: Record<string, any[]> = {};
issues.forEach(issue => {
const line = issue.line.toString();
if (!grouped[line]) grouped[line] = [];
grouped[line].push(issue);
});
return grouped;
}
private getSeverityIcon(severity: string): string {
const icons = { critical: '๐จ', high: '๐ด', medium: '๐ก', low: '๐ต' };
return icons[severity as keyof typeof icons] || 'โ';
}
private getGeneralBestPractices(focusArea: string): string {
const practices: Record<string, string> = {
security: '- Use parameterized queries\n- Store secrets in environment variables\n- Validate all user inputs\n- Enable HTTPS/SSL verification',
quality: '- Write clear, descriptive variable names\n- Keep functions small and focused\n- Handle exceptions properly\n- Follow PEP 8 style guide',
performance: '- Use appropriate data structures\n- Minimize database queries\n- Cache frequently accessed data\n- Profile before optimizing',
style: '- Follow PEP 8 naming conventions\n- Use consistent indentation\n- Add meaningful comments\n- Organize imports properly',
all: '- Follow Python best practices\n- Regular code reviews\n- Automated testing\n- Continuous integration'
};
return practices[focusArea] || practices.all;
}
private getFocusAreaBestPractices(focusArea: string): string {
return [
`## ๐ **${focusArea.toUpperCase()} BEST PRACTICES**`,
'',
this.getGeneralBestPractices(focusArea)
].join('\n');
}
public async start(): Promise<void> {
const transport = new StdioServerTransport();
await this.server.connect(transport);
console.error('Python Code Review MCP server started successfully');
}
}
// Start the server
const server = new PythonCodeReviewServer();
server.start().catch(console.error);