MCP Server Semgrep
by Szowesgad
Verified
#!/usr/bin/env node
import { spawn } from 'child_process';
import fs from 'fs';
import path from 'path';
import { fileURLToPath } from 'url';
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
// Create a test directory with a simple file for scanning
const testDir = path.join(__dirname, 'test-scan');
if (!fs.existsSync(testDir)) {
fs.mkdirSync(testDir);
}
// Create a test file with a potential security issue
const testFile = path.join(testDir, 'test.js');
fs.writeFileSync(testFile, `
// Test file with potential security issues
const password = "hardcoded_password";
const sql = "SELECT * FROM users WHERE username = '" + username + "'"; // Potential SQL injection
eval(userInput); // Dangerous eval
`);
// Path to the MCP server executable
const serverPath = path.join(__dirname, 'build', 'index.js');
// Spawn the server process with Semgrep token
const server = spawn('node', [serverPath], {
env: {
...process.env,
SEMGREP_APP_TOKEN: '778e6a44531475d91dfa1b38d07f7e233ebad1d876cffc37a5b72b9460a25848'
},
stdio: ['pipe', 'pipe', 'pipe']
});
// Log server output for debugging
server.stderr.on('data', (data) => {
console.log(`[Server debug]: ${data}`);
});
// Function to send a message to the server
function sendMessage(message) {
const serialized = JSON.stringify(message) + '\n';
console.log(`\nSending: ${serialized}`);
server.stdin.write(serialized);
}
// Process server responses
let responseBuffer = '';
server.stdout.on('data', (data) => {
responseBuffer += data.toString();
// Process complete messages (messages are newline delimited)
const messages = responseBuffer.split('\n');
responseBuffer = messages.pop(); // Keep any partial message for next time
messages.forEach(message => {
if (message.trim()) {
console.log('\nReceived response:');
try {
const parsed = JSON.parse(message);
if (parsed.result && parsed.result.content && parsed.result.content[0].text) {
// For list_rules and scan responses, we'll check if we have Pro-specific content
const responseText = parsed.result.content[0].text;
// Check for indicators of Pro features
let hasPro = false;
if (responseText.includes('--auth-token') ||
responseText.includes('pro rules') ||
responseText.includes('team rules') ||
responseText.includes('r/') || // Pro rule prefix
responseText.includes('supply-chain') || // Pro rule category
responseText.includes('SEMGREP_APP_TOKEN')) {
hasPro = true;
console.log('\n✅ PRO FEATURES DETECTED: Response contains Pro-specific content');
}
// Print a shorter version of the response
console.log('\nResponse text (excerpt):');
console.log(responseText.substring(0, 500) + '...');
if (!hasPro) {
console.log('\n❌ NO PRO FEATURES DETECTED: Response does not contain Pro-specific content');
}
} else {
console.log(JSON.stringify(parsed, null, 2));
}
} catch (e) {
console.log('Error parsing response:', e);
console.log('Raw response:', message);
}
}
});
});
// Send test messages in sequence
setTimeout(() => {
console.log('\nTesting initialize...');
sendMessage({
jsonrpc: '2.0',
id: 1,
method: 'initialize',
params: {
protocolVersion: '0.1.0',
clientInfo: {
name: 'test-client',
version: '1.0.0'
},
capabilities: {}
}
});
}, 1000);
setTimeout(() => {
console.log('\nTesting list_rules (should include Pro rules)...');
sendMessage({
jsonrpc: '2.0',
id: 2,
method: 'tools/call',
params: {
name: 'list_rules',
arguments: {}
}
});
}, 3000);
setTimeout(() => {
console.log('\nTesting scan_directory (should use Pro rules)...');
sendMessage({
jsonrpc: '2.0',
id: 3,
method: 'tools/call',
params: {
name: 'scan_directory',
arguments: {
path: testDir,
config: 'p/security' // Using a standard ruleset
}
}
});
}, 6000);
// Give the server time to respond and then exit
setTimeout(() => {
console.log('\nTest completed, cleaning up...');
server.kill();
// Clean up test files
try {
fs.unlinkSync(testFile);
fs.rmdirSync(testDir);
console.log('Test files cleaned up');
} catch (err) {
console.error('Error cleaning up test files:', err);
}
process.exit(0);
}, 10000);