Semgrep MCP Server
#!/usr/bin/env node
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import {
CallToolRequestSchema,
ErrorCode,
ListToolsRequestSchema,
McpError,
} from '@modelcontextprotocol/sdk/types.js';
import { exec } from 'child_process';
import { promisify } from 'util';
import path from 'path';
import { fileURLToPath } from 'url';
const execAsync = promisify(exec);
// Dynamisch das MCP-Verzeichnis bestimmen
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
const BASE_ALLOWED_PATH = path.resolve(__dirname, '../..');
class SemgrepServer {
private server: Server;
constructor() {
this.server = new Server(
{
name: 'semgrep-server',
version: '0.1.0',
},
{
capabilities: {
tools: {},
},
}
);
this.setupToolHandlers();
this.server.onerror = (error) => console.error('[MCP Error]', error);
process.on('SIGINT', async () => {
await this.server.close();
process.exit(0);
});
}
private async checkSemgrepInstallation(): Promise<boolean> {
try {
await execAsync('semgrep --version');
return true;
} catch (error) {
return false;
}
}
private async installSemgrep(): Promise<void> {
console.error('Semgrep wird installiert...');
try {
// Prüfen ob pip installiert ist
await execAsync('pip3 --version');
} catch (error) {
throw new Error('Python/pip3 ist nicht installiert. Bitte installieren Sie Python und pip3.');
}
try {
// Semgrep über pip installieren
await execAsync('pip3 install semgrep');
console.error('Semgrep wurde erfolgreich installiert');
} catch (error: any) {
throw new Error(`Fehler bei der Installation von Semgrep: ${error.message}`);
}
}
private async ensureSemgrepAvailable(): Promise<void> {
const isInstalled = await this.checkSemgrepInstallation();
if (!isInstalled) {
await this.installSemgrep();
}
}
private validateAbsolutePath(pathToValidate: string, paramName: string): string {
if (!path.isAbsolute(pathToValidate)) {
throw new McpError(
ErrorCode.InvalidParams,
`${paramName} muss ein absoluter Pfad sein. Erhalten: ${pathToValidate}`
);
}
// Normalisiere den Pfad und stelle sicher, dass keine Path Traversal möglich ist
const normalizedPath = path.normalize(pathToValidate);
// Überprüfe, ob der normalisierte Pfad immer noch absolut ist
if (!path.isAbsolute(normalizedPath)) {
throw new McpError(
ErrorCode.InvalidParams,
`${paramName} enthält ungültige Path Traversal Sequenzen`
);
}
// Überprüfe, ob der Pfad innerhalb des erlaubten Basis-Verzeichnisses liegt
if (!normalizedPath.startsWith(BASE_ALLOWED_PATH)) {
throw new McpError(
ErrorCode.InvalidParams,
`${paramName} muss innerhalb des MCP-Verzeichnisses (${BASE_ALLOWED_PATH}) liegen`
);
}
return normalizedPath;
}
private setupToolHandlers() {
this.server.setRequestHandler(ListToolsRequestSchema, async () => ({
tools: [
{
name: 'scan_directory',
description: 'Führt einen Semgrep-Scan in einem Verzeichnis aus',
inputSchema: {
type: 'object',
properties: {
path: {
type: 'string',
description: `Absoluter Pfad zum zu scannenden Verzeichnis (muss innerhalb des MCP-Verzeichnisses liegen)`
},
config: {
type: 'string',
description: 'Semgrep-Konfiguration (z.B. "auto" oder absoluter Pfad zur Regel-Datei)',
default: 'auto'
}
},
required: ['path']
}
},
{
name: 'list_rules',
description: 'Listet verfügbare Semgrep-Regeln auf',
inputSchema: {
type: 'object',
properties: {
language: {
type: 'string',
description: 'Programmiersprache für die Regeln (optional)'
}
}
}
},
{
name: 'analyze_results',
description: 'Analysiert die Scan-Ergebnisse',
inputSchema: {
type: 'object',
properties: {
results_file: {
type: 'string',
description: `Absoluter Pfad zur JSON-Ergebnisdatei (muss innerhalb des MCP-Verzeichnisses liegen)`
}
},
required: ['results_file']
}
},
{
name: 'create_rule',
description: 'Erstellt eine neue Semgrep-Regel',
inputSchema: {
type: 'object',
properties: {
output_path: {
type: 'string',
description: 'Absoluter Pfad zur Ausgabedatei für die neue Regel'
},
pattern: {
type: 'string',
description: 'Das Suchmuster für die Regel'
},
language: {
type: 'string',
description: 'Die Zielsprache der Regel'
},
message: {
type: 'string',
description: 'Die Nachricht, die angezeigt wird, wenn die Regel zutrifft'
},
severity: {
type: 'string',
description: 'Schweregrad der Regel (ERROR, WARNING, INFO)',
default: 'WARNING'
}
},
required: ['output_path', 'pattern', 'language', 'message']
}
},
{
name: 'filter_results',
description: 'Filtert Scan-Ergebnisse nach verschiedenen Kriterien',
inputSchema: {
type: 'object',
properties: {
results_file: {
type: 'string',
description: 'Absoluter Pfad zur JSON-Ergebnisdatei'
},
severity: {
type: 'string',
description: 'Nach Schweregrad filtern (ERROR, WARNING, INFO)'
},
rule_id: {
type: 'string',
description: 'Nach Regel-ID filtern'
},
path_pattern: {
type: 'string',
description: 'Nach Dateipfad-Muster filtern (regex)'
}
},
required: ['results_file']
}
},
{
name: 'export_results',
description: 'Exportiert Scan-Ergebnisse in verschiedene Formate',
inputSchema: {
type: 'object',
properties: {
results_file: {
type: 'string',
description: 'Absoluter Pfad zur JSON-Ergebnisdatei'
},
output_file: {
type: 'string',
description: 'Absoluter Pfad zur Ausgabedatei'
},
format: {
type: 'string',
description: 'Ausgabeformat (json, sarif, text)',
default: 'text'
}
},
required: ['results_file', 'output_file']
}
},
{
name: 'compare_results',
description: 'Vergleicht zwei Scan-Ergebnisse',
inputSchema: {
type: 'object',
properties: {
old_results: {
type: 'string',
description: 'Absoluter Pfad zur älteren JSON-Ergebnisdatei'
},
new_results: {
type: 'string',
description: 'Absoluter Pfad zur neueren JSON-Ergebnisdatei'
}
},
required: ['old_results', 'new_results']
}
}
]
}));
this.server.setRequestHandler(CallToolRequestSchema, async (request) => {
// Vor jeder Tool-Ausführung sicherstellen, dass Semgrep verfügbar ist
await this.ensureSemgrepAvailable();
switch (request.params.name) {
case 'scan_directory':
return await this.handleScanDirectory(request.params.arguments);
case 'list_rules':
return await this.handleListRules(request.params.arguments);
case 'analyze_results':
return await this.handleAnalyzeResults(request.params.arguments);
case 'create_rule':
return await this.handleCreateRule(request.params.arguments);
case 'filter_results':
return await this.handleFilterResults(request.params.arguments);
case 'export_results':
return await this.handleExportResults(request.params.arguments);
case 'compare_results':
return await this.handleCompareResults(request.params.arguments);
default:
throw new McpError(
ErrorCode.MethodNotFound,
`Unbekanntes Tool: ${request.params.name}`
);
}
});
}
private async handleScanDirectory(args: any) {
if (!args.path) {
throw new McpError(ErrorCode.InvalidParams, 'Pfad ist erforderlich');
}
const scanPath = this.validateAbsolutePath(args.path, 'path');
const config = args.config || 'auto';
// Wenn config ein Pfad ist (nicht 'auto'), validieren dass es ein absoluter Pfad ist
const configParam = config !== 'auto'
? this.validateAbsolutePath(config, 'config')
: config;
try {
const { stdout, stderr } = await execAsync(
`semgrep scan --json --config ${configParam} ${scanPath}`
);
return {
content: [
{
type: 'text',
text: stdout
}
]
};
} catch (error: any) {
return {
content: [
{
type: 'text',
text: `Fehler beim Scannen: ${error.message}`
}
],
isError: true
};
}
}
private async handleListRules(args: any) {
const languageFilter = args.language ? `--lang ${args.language}` : '';
try {
// Hole die Registry-Regeln
const { stdout } = await execAsync('semgrep login --help');
// Formatiere die Ausgabe
const formattedOutput = `Verfügbare Semgrep Registry-Regeln:
Standardmäßig verfügbare Regelsammlungen:
- p/ci: Grundlegende CI-Regeln
- p/security: Sicherheitsregeln
- p/performance: Performance-Regeln
- p/best-practices: Best-Practice-Regeln
Verwenden Sie diese Regelsammlungen mit --config, z.B.:
semgrep scan --config=p/ci`;
return {
content: [
{
type: 'text',
text: formattedOutput
}
]
};
} catch (error: any) {
return {
content: [
{
type: 'text',
text: `Fehler beim Abrufen der Regeln: ${error.message}`
}
],
isError: true
};
}
}
private async handleAnalyzeResults(args: any) {
if (!args.results_file) {
throw new McpError(ErrorCode.InvalidParams, 'Ergebnisdatei ist erforderlich');
}
const resultsFile = this.validateAbsolutePath(args.results_file, 'results_file');
try {
const { stdout } = await execAsync(`cat ${resultsFile}`);
const results = JSON.parse(stdout);
// Einfache Analyse der Ergebnisse
const summary = {
total_findings: results.results?.length || 0,
by_severity: {} as Record<string, number>,
by_rule: {} as Record<string, number>
};
for (const finding of results.results || []) {
const severity = finding.extra.severity || 'unknown';
const rule = finding.check_id || 'unknown';
summary.by_severity[severity] = (summary.by_severity[severity] || 0) + 1;
summary.by_rule[rule] = (summary.by_rule[rule] || 0) + 1;
}
return {
content: [
{
type: 'text',
text: JSON.stringify(summary, null, 2)
}
]
};
} catch (error: any) {
return {
content: [
{
type: 'text',
text: `Fehler bei der Analyse: ${error.message}`
}
],
isError: true
};
}
}
private async handleCreateRule(args: any) {
if (!args.output_path || !args.pattern || !args.language || !args.message) {
throw new McpError(
ErrorCode.InvalidParams,
'output_path, pattern, language und message sind erforderlich'
);
}
const outputPath = this.validateAbsolutePath(args.output_path, 'output_path');
const severity = args.severity || 'WARNING';
// YAML-Regel erstellen
const ruleYaml = `
rules:
- id: custom_rule
pattern: ${args.pattern}
message: ${args.message}
languages: [${args.language}]
severity: ${severity}
`;
try {
await execAsync(`echo '${ruleYaml}' > ${outputPath}`);
return {
content: [
{
type: 'text',
text: `Regel wurde erfolgreich in ${outputPath} erstellt`
}
]
};
} catch (error: any) {
return {
content: [
{
type: 'text',
text: `Fehler beim Erstellen der Regel: ${error.message}`
}
],
isError: true
};
}
}
private async handleFilterResults(args: any) {
if (!args.results_file) {
throw new McpError(ErrorCode.InvalidParams, 'results_file ist erforderlich');
}
const resultsFile = this.validateAbsolutePath(args.results_file, 'results_file');
try {
const { stdout } = await execAsync(`cat ${resultsFile}`);
const results = JSON.parse(stdout);
let filteredResults = results.results || [];
// Nach Schweregrad filtern
if (args.severity) {
filteredResults = filteredResults.filter(
(finding: any) => finding.extra.severity === args.severity
);
}
// Nach Regel-ID filtern
if (args.rule_id) {
filteredResults = filteredResults.filter(
(finding: any) => finding.check_id === args.rule_id
);
}
// Nach Dateipfad-Muster filtern
if (args.path_pattern) {
const pathRegex = new RegExp(args.path_pattern);
filteredResults = filteredResults.filter(
(finding: any) => pathRegex.test(finding.path)
);
}
return {
content: [
{
type: 'text',
text: JSON.stringify({ results: filteredResults }, null, 2)
}
]
};
} catch (error: any) {
return {
content: [
{
type: 'text',
text: `Fehler beim Filtern der Ergebnisse: ${error.message}`
}
],
isError: true
};
}
}
private async handleExportResults(args: any) {
if (!args.results_file || !args.output_file) {
throw new McpError(
ErrorCode.InvalidParams,
'results_file und output_file sind erforderlich'
);
}
const resultsFile = this.validateAbsolutePath(args.results_file, 'results_file');
const outputFile = this.validateAbsolutePath(args.output_file, 'output_file');
const format = args.format || 'text';
try {
const { stdout } = await execAsync(`cat ${resultsFile}`);
const results = JSON.parse(stdout);
let output = '';
switch (format) {
case 'json':
output = JSON.stringify(results, null, 2);
break;
case 'sarif':
// SARIF-Format erstellen
const sarifOutput = {
$schema: "https://raw.githubusercontent.com/oasis-tcs/sarif-spec/master/Schemata/sarif-schema-2.1.0.json",
version: "2.1.0",
runs: [{
tool: {
driver: {
name: "semgrep",
rules: results.results.map((r: any) => ({
id: r.check_id,
name: r.check_id,
shortDescription: {
text: r.extra.message
},
defaultConfiguration: {
level: r.extra.severity === 'ERROR' ? 'error' : 'warning'
}
}))
}
},
results: results.results.map((r: any) => ({
ruleId: r.check_id,
message: {
text: r.extra.message
},
locations: [{
physicalLocation: {
artifactLocation: {
uri: r.path
},
region: {
startLine: r.start.line,
startColumn: r.start.col,
endLine: r.end.line,
endColumn: r.end.col
}
}
}]
}))
}]
};
output = JSON.stringify(sarifOutput, null, 2);
break;
case 'text':
default:
// Menschenlesbares Format
output = results.results.map((r: any) =>
`[${r.extra.severity}] ${r.check_id}\n` +
`File: ${r.path}\n` +
`Lines: ${r.start.line}-${r.end.line}\n` +
`Message: ${r.extra.message}\n` +
'-------------------'
).join('\n');
break;
}
await execAsync(`echo '${output}' > ${outputFile}`);
return {
content: [
{
type: 'text',
text: `Ergebnisse wurden erfolgreich nach ${outputFile} exportiert`
}
]
};
} catch (error: any) {
return {
content: [
{
type: 'text',
text: `Fehler beim Exportieren der Ergebnisse: ${error.message}`
}
],
isError: true
};
}
}
private async handleCompareResults(args: any) {
if (!args.old_results || !args.new_results) {
throw new McpError(
ErrorCode.InvalidParams,
'old_results und new_results sind erforderlich'
);
}
const oldResultsFile = this.validateAbsolutePath(args.old_results, 'old_results');
const newResultsFile = this.validateAbsolutePath(args.new_results, 'new_results');
try {
const { stdout: oldContent } = await execAsync(`cat ${oldResultsFile}`);
const { stdout: newContent } = await execAsync(`cat ${newResultsFile}`);
const oldResults = JSON.parse(oldContent).results || [];
const newResults = JSON.parse(newContent).results || [];
// Findings vergleichen
const oldFindings = new Set(oldResults.map((r: any) =>
`${r.check_id}:${r.path}:${r.start.line}:${r.start.col}`
));
const comparison = {
total_old: oldResults.length,
total_new: newResults.length,
added: [] as any[],
removed: [] as any[],
unchanged: [] as any[]
};
// Neue und unveränderte Findings identifizieren
newResults.forEach((finding: any) => {
const key = `${finding.check_id}:${finding.path}:${finding.start.line}:${finding.start.col}`;
if (oldFindings.has(key)) {
comparison.unchanged.push(finding);
} else {
comparison.added.push(finding);
}
});
// Entfernte Findings identifizieren
oldResults.forEach((finding: any) => {
const key = `${finding.check_id}:${finding.path}:${finding.start.line}:${finding.start.col}`;
const exists = newResults.some((newFinding: any) =>
`${newFinding.check_id}:${newFinding.path}:${newFinding.start.line}:${newFinding.start.col}` === key
);
if (!exists) {
comparison.removed.push(finding);
}
});
return {
content: [
{
type: 'text',
text: JSON.stringify({
summary: {
old_findings: comparison.total_old,
new_findings: comparison.total_new,
added: comparison.added.length,
removed: comparison.removed.length,
unchanged: comparison.unchanged.length
},
details: comparison
}, null, 2)
}
]
};
} catch (error: any) {
return {
content: [
{
type: 'text',
text: `Fehler beim Vergleichen der Ergebnisse: ${error.message}`
}
],
isError: true
};
}
}
async run() {
// Beim Start des Servers prüfen und ggf. installieren
try {
await this.ensureSemgrepAvailable();
} catch (error: any) {
console.error(`Fehler beim Setup von Semgrep: ${error.message}`);
process.exit(1);
}
const transport = new StdioServerTransport();
await this.server.connect(transport);
console.error('Semgrep MCP Server läuft auf stdio');
}
}
const server = new SemgrepServer();
server.run().catch(console.error);