MCP Server Semgrep
by Szowesgad
Verified
#!/usr/bin/env node
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import {
CallToolRequestSchema,
ErrorCode,
ListToolsRequestSchema,
McpError,
} from '@modelcontextprotocol/sdk/types.js';
import { exec } from 'child_process';
import { promisify } from 'util';
import path from 'path';
import { fileURLToPath } from 'url';
const execAsync = promisify(exec);
// Determine the MCP directory
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
const BASE_ALLOWED_PATH = path.resolve(__dirname, '../..');
class SemgrepServer {
private server: Server;
constructor() {
this.server = new Server(
{
name: 'mcp-server-semgrep',
version: '1.0.0',
},
{
capabilities: {
tools: {},
},
}
);
this.setupToolHandlers();
this.server.onerror = (error) => console.error('[MCP Error]', error);
process.on('SIGINT', async () => {
await this.server.close();
process.exit(0);
});
}
private async checkSemgrepInstallation(): Promise<boolean> {
try {
await execAsync('semgrep --version');
return true;
} catch (error) {
return false;
}
}
private async installSemgrep(): Promise<void> {
console.error('Installing Semgrep...');
try {
// Check if pip is installed
await execAsync('pip3 --version');
} catch (error) {
throw new Error('Python/pip3 is not installed. Please install Python and pip3.');
}
try {
// Install Semgrep via pip
await execAsync('pip3 install semgrep');
console.error('Semgrep was successfully installed');
} catch (error: any) {
throw new Error(`Error installing Semgrep: ${error.message}`);
}
}
private async ensureSemgrepAvailable(): Promise<void> {
const isInstalled = await this.checkSemgrepInstallation();
if (!isInstalled) {
await this.installSemgrep();
}
}
private validateAbsolutePath(pathToValidate: string, paramName: string): string {
// Skip validation for special configuration values like "p/security"
if (paramName === 'config' && (pathToValidate.startsWith('p/') || pathToValidate.startsWith('r/') || pathToValidate === 'auto')) {
return pathToValidate;
}
if (!path.isAbsolute(pathToValidate)) {
throw new McpError(
ErrorCode.InvalidParams,
`${paramName} must be an absolute path. Received: ${pathToValidate}`
);
}
// Normalize the path and ensure no path traversal is possible
const normalizedPath = path.normalize(pathToValidate);
// Check if the normalized path is still absolute
if (!path.isAbsolute(normalizedPath)) {
throw new McpError(
ErrorCode.InvalidParams,
`${paramName} contains invalid path traversal sequences`
);
}
// Check if the path is within the allowed base directory
if (!normalizedPath.startsWith(BASE_ALLOWED_PATH)) {
throw new McpError(
ErrorCode.InvalidParams,
`${paramName} must be within the MCP directory (${BASE_ALLOWED_PATH})`
);
}
return normalizedPath;
}
private setupToolHandlers() {
this.server.setRequestHandler(ListToolsRequestSchema, async () => ({
tools: [
{
name: 'scan_directory',
description: 'Performs a Semgrep scan on a directory',
inputSchema: {
type: 'object',
properties: {
path: {
type: 'string',
description: `Absolute path to the directory to scan (must be within MCP directory)`
},
config: {
type: 'string',
description: 'Semgrep configuration (e.g. "auto" or absolute path to rule file)',
default: 'auto'
}
},
required: ['path']
}
},
{
name: 'list_rules',
description: 'Lists available Semgrep rules',
inputSchema: {
type: 'object',
properties: {
language: {
type: 'string',
description: 'Programming language for rules (optional)'
}
}
}
},
{
name: 'analyze_results',
description: 'Analyzes scan results',
inputSchema: {
type: 'object',
properties: {
results_file: {
type: 'string',
description: `Absolute path to JSON results file (must be within MCP directory)`
}
},
required: ['results_file']
}
},
{
name: 'create_rule',
description: 'Creates a new Semgrep rule',
inputSchema: {
type: 'object',
properties: {
output_path: {
type: 'string',
description: 'Absolute path for output rule file'
},
pattern: {
type: 'string',
description: 'Search pattern for the rule'
},
language: {
type: 'string',
description: 'Target language for the rule'
},
message: {
type: 'string',
description: 'Message to display when rule matches'
},
severity: {
type: 'string',
description: 'Rule severity (ERROR, WARNING, INFO)',
default: 'WARNING'
},
id: {
type: 'string',
description: 'Rule identifier',
default: 'custom_rule'
}
},
required: ['output_path', 'pattern', 'language', 'message']
}
},
{
name: 'filter_results',
description: 'Filters scan results by various criteria',
inputSchema: {
type: 'object',
properties: {
results_file: {
type: 'string',
description: 'Absolute path to JSON results file'
},
severity: {
type: 'string',
description: 'Filter by severity (ERROR, WARNING, INFO)'
},
rule_id: {
type: 'string',
description: 'Filter by rule ID'
},
path_pattern: {
type: 'string',
description: 'Filter by file path pattern (regex)'
},
language: {
type: 'string',
description: 'Filter by programming language'
},
message_pattern: {
type: 'string',
description: 'Filter by message content (regex)'
}
},
required: ['results_file']
}
},
{
name: 'export_results',
description: 'Exports scan results in various formats',
inputSchema: {
type: 'object',
properties: {
results_file: {
type: 'string',
description: 'Absolute path to JSON results file'
},
output_file: {
type: 'string',
description: 'Absolute path to output file'
},
format: {
type: 'string',
description: 'Output format (json, sarif, text)',
default: 'text'
}
},
required: ['results_file', 'output_file']
}
},
{
name: 'compare_results',
description: 'Compares two scan results',
inputSchema: {
type: 'object',
properties: {
old_results: {
type: 'string',
description: 'Absolute path to older JSON results file'
},
new_results: {
type: 'string',
description: 'Absolute path to newer JSON results file'
}
},
required: ['old_results', 'new_results']
}
}
]
}));
this.server.setRequestHandler(CallToolRequestSchema, async (request) => {
// Ensure Semgrep is available before executing any tool
await this.ensureSemgrepAvailable();
switch (request.params.name) {
case 'scan_directory':
return await this.handleScanDirectory(request.params.arguments);
case 'list_rules':
return await this.handleListRules(request.params.arguments);
case 'analyze_results':
return await this.handleAnalyzeResults(request.params.arguments);
case 'create_rule':
return await this.handleCreateRule(request.params.arguments);
case 'filter_results':
return await this.handleFilterResults(request.params.arguments);
case 'export_results':
return await this.handleExportResults(request.params.arguments);
case 'compare_results':
return await this.handleCompareResults(request.params.arguments);
default:
throw new McpError(
ErrorCode.MethodNotFound,
`Unknown tool: ${request.params.name}`
);
}
});
}
private async handleScanDirectory(args: any) {
if (!args.path) {
throw new McpError(ErrorCode.InvalidParams, 'Path is required');
}
const scanPath = this.validateAbsolutePath(args.path, 'path');
const config = args.config || 'auto';
// Use validateAbsolutePath which now handles special config values
const configParam = this.validateAbsolutePath(config, 'config');
try {
// Check for SEMGREP_APP_TOKEN in environment
let cmd = `semgrep scan --json --config ${configParam} ${scanPath}`;
// Add token if available - note that Semgrep CLI might use different formats
// for different versions, so we'll try both environment variable and flag approaches
if (process.env.SEMGREP_APP_TOKEN) {
// First approach: Set environment for child process
const env = { ...process.env };
// Second approach: Try adding the flag
// Some Semgrep versions accept --oauth-token instead of --auth-token
if (config.startsWith('r/')) {
// For Pro rules, we definitely need the token
cmd = `semgrep scan --json --oauth-token ${process.env.SEMGREP_APP_TOKEN} --config ${configParam} ${scanPath}`;
}
}
console.error(`Executing: ${cmd.replace(process.env.SEMGREP_APP_TOKEN || '', '[REDACTED]')}`);
const { stdout, stderr } = await execAsync(cmd);
return {
content: [
{
type: 'text',
text: stdout
}
]
};
} catch (error: any) {
return {
content: [
{
type: 'text',
text: `Error scanning: ${error.message}`
}
],
isError: true
};
}
}
private async handleListRules(args: any) {
const languageFilter = args.language ? `--lang ${args.language}` : '';
try {
// Check for SEMGREP_APP_TOKEN in environment
const hasToken = process.env.SEMGREP_APP_TOKEN ? true : false;
// Build the rules list with standard rules
let rulesList = `Available Semgrep Registry Rules:
Standard rule collections:
- p/ci: Basic CI rules
- p/security: Security rules
- p/performance: Performance rules
- p/best-practices: Best practice rules
`;
// Add Pro rules information if token is available
if (hasToken) {
rulesList += `
Pro Rule Collections (available with your SEMGREP_APP_TOKEN):
- r/java.lang.security.audit.crypto.ssl.weak-protocol
- r/javascript.express.security.audit.cookie-session-no-secure
- r/go.lang.security.audit.crypto.bad_imports
- And many more...
`;
}
rulesList += `
Use these rule collections with --config, e.g.:
semgrep scan --config=p/ci`;
return {
content: [
{
type: 'text',
text: rulesList
}
]
};
} catch (error: any) {
return {
content: [
{
type: 'text',
text: `Error retrieving rules: ${error.message}`
}
],
isError: true
};
}
}
private async handleAnalyzeResults(args: any) {
if (!args.results_file) {
throw new McpError(ErrorCode.InvalidParams, 'Results file is required');
}
const resultsFile = this.validateAbsolutePath(args.results_file, 'results_file');
try {
const { stdout } = await execAsync(`cat ${resultsFile}`);
const results = JSON.parse(stdout);
// Simple analysis of the results
const summary = {
total_findings: results.results?.length || 0,
by_severity: {} as Record<string, number>,
by_rule: {} as Record<string, number>
};
for (const finding of results.results || []) {
const severity = finding.extra.severity || 'unknown';
const rule = finding.check_id || 'unknown';
summary.by_severity[severity] = (summary.by_severity[severity] || 0) + 1;
summary.by_rule[rule] = (summary.by_rule[rule] || 0) + 1;
}
return {
content: [
{
type: 'text',
text: JSON.stringify(summary, null, 2)
}
]
};
} catch (error: any) {
return {
content: [
{
type: 'text',
text: `Error analyzing results: ${error.message}`
}
],
isError: true
};
}
}
private async handleCreateRule(args: any) {
if (!args.output_path || !args.pattern || !args.language || !args.message) {
throw new McpError(
ErrorCode.InvalidParams,
'output_path, pattern, language and message are required'
);
}
const outputPath = this.validateAbsolutePath(args.output_path, 'output_path');
const severity = args.severity || 'WARNING';
const id = args.id || 'custom_rule';
// Create YAML rule
const ruleYaml = `
rules:
- id: ${id}
pattern: ${args.pattern}
message: ${args.message}
languages: [${args.language}]
severity: ${severity}
`;
try {
await execAsync(`echo '${ruleYaml}' > ${outputPath}`);
return {
content: [
{
type: 'text',
text: `Rule successfully created at ${outputPath}`
}
]
};
} catch (error: any) {
return {
content: [
{
type: 'text',
text: `Error creating rule: ${error.message}`
}
],
isError: true
};
}
}
private async handleFilterResults(args: any) {
if (!args.results_file) {
throw new McpError(ErrorCode.InvalidParams, 'results_file is required');
}
const resultsFile = this.validateAbsolutePath(args.results_file, 'results_file');
try {
const { stdout } = await execAsync(`cat ${resultsFile}`);
const results = JSON.parse(stdout);
let filteredResults = results.results || [];
// Filter by severity
if (args.severity) {
filteredResults = filteredResults.filter(
(finding: any) => finding.extra.severity === args.severity
);
}
// Filter by rule ID
if (args.rule_id) {
filteredResults = filteredResults.filter(
(finding: any) => finding.check_id === args.rule_id
);
}
// Filter by path pattern
if (args.path_pattern) {
const pathRegex = new RegExp(args.path_pattern);
filteredResults = filteredResults.filter(
(finding: any) => pathRegex.test(finding.path)
);
}
// Filter by language
if (args.language) {
filteredResults = filteredResults.filter(
(finding: any) => finding.extra.metadata?.language === args.language
);
}
// Filter by message pattern
if (args.message_pattern) {
const messageRegex = new RegExp(args.message_pattern);
filteredResults = filteredResults.filter(
(finding: any) => messageRegex.test(finding.extra.message)
);
}
return {
content: [
{
type: 'text',
text: JSON.stringify({ results: filteredResults }, null, 2)
}
]
};
} catch (error: any) {
return {
content: [
{
type: 'text',
text: `Error filtering results: ${error.message}`
}
],
isError: true
};
}
}
private async handleExportResults(args: any) {
if (!args.results_file || !args.output_file) {
throw new McpError(
ErrorCode.InvalidParams,
'results_file and output_file are required'
);
}
const resultsFile = this.validateAbsolutePath(args.results_file, 'results_file');
const outputFile = this.validateAbsolutePath(args.output_file, 'output_file');
const format = args.format || 'text';
try {
const { stdout } = await execAsync(`cat ${resultsFile}`);
const results = JSON.parse(stdout);
let output = '';
switch (format) {
case 'json':
output = JSON.stringify(results, null, 2);
break;
case 'sarif':
// Create SARIF format
const sarifOutput = {
$schema: "https://raw.githubusercontent.com/oasis-tcs/sarif-spec/master/Schemata/sarif-schema-2.1.0.json",
version: "2.1.0",
runs: [{
tool: {
driver: {
name: "semgrep",
rules: results.results.map((r: any) => ({
id: r.check_id,
name: r.check_id,
shortDescription: {
text: r.extra.message
},
defaultConfiguration: {
level: r.extra.severity === 'ERROR' ? 'error' : 'warning'
}
}))
}
},
results: results.results.map((r: any) => ({
ruleId: r.check_id,
message: {
text: r.extra.message
},
locations: [{
physicalLocation: {
artifactLocation: {
uri: r.path
},
region: {
startLine: r.start.line,
startColumn: r.start.col,
endLine: r.end.line,
endColumn: r.end.col
}
}
}]
}))
}]
};
output = JSON.stringify(sarifOutput, null, 2);
break;
case 'text':
default:
// Human readable format
output = results.results.map((r: any) =>
`[${r.extra.severity}] ${r.check_id}\n` +
`File: ${r.path}\n` +
`Lines: ${r.start.line}-${r.end.line}\n` +
`Message: ${r.extra.message}\n` +
'-------------------'
).join('\n');
break;
}
await execAsync(`echo '${output}' > ${outputFile}`);
return {
content: [
{
type: 'text',
text: `Results successfully exported to ${outputFile}`
}
]
};
} catch (error: any) {
return {
content: [
{
type: 'text',
text: `Error exporting results: ${error.message}`
}
],
isError: true
};
}
}
private async handleCompareResults(args: any) {
if (!args.old_results || !args.new_results) {
throw new McpError(
ErrorCode.InvalidParams,
'old_results and new_results are required'
);
}
const oldResultsFile = this.validateAbsolutePath(args.old_results, 'old_results');
const newResultsFile = this.validateAbsolutePath(args.new_results, 'new_results');
try {
const { stdout: oldContent } = await execAsync(`cat ${oldResultsFile}`);
const { stdout: newContent } = await execAsync(`cat ${newResultsFile}`);
const oldResults = JSON.parse(oldContent).results || [];
const newResults = JSON.parse(newContent).results || [];
// Compare findings
const oldFindings = new Set(oldResults.map((r: any) =>
`${r.check_id}:${r.path}:${r.start.line}:${r.start.col}`
));
const comparison = {
total_old: oldResults.length,
total_new: newResults.length,
added: [] as any[],
removed: [] as any[],
unchanged: [] as any[]
};
// Identify new and unchanged findings
newResults.forEach((finding: any) => {
const key = `${finding.check_id}:${finding.path}:${finding.start.line}:${finding.start.col}`;
if (oldFindings.has(key)) {
comparison.unchanged.push(finding);
} else {
comparison.added.push(finding);
}
});
// Identify removed findings
oldResults.forEach((finding: any) => {
const key = `${finding.check_id}:${finding.path}:${finding.start.line}:${finding.start.col}`;
const exists = newResults.some((newFinding: any) =>
`${newFinding.check_id}:${newFinding.path}:${newFinding.start.line}:${newFinding.start.col}` === key
);
if (!exists) {
comparison.removed.push(finding);
}
});
return {
content: [
{
type: 'text',
text: JSON.stringify({
summary: {
old_findings: comparison.total_old,
new_findings: comparison.total_new,
added: comparison.added.length,
removed: comparison.removed.length,
unchanged: comparison.unchanged.length
},
details: comparison
}, null, 2)
}
]
};
} catch (error: any) {
return {
content: [
{
type: 'text',
text: `Error comparing results: ${error.message}`
}
],
isError: true
};
}
}
async run() {
// Check and potentially install Semgrep on server start
try {
await this.ensureSemgrepAvailable();
} catch (error: any) {
console.error(`Error setting up Semgrep: ${error.message}`);
process.exit(1);
}
const transport = new StdioServerTransport();
await this.server.connect(transport);
console.error('MCP Server Semgrep running on stdio');
}
}
const server = new SemgrepServer();
server.run().catch(console.error);