index.ts•4.44 kB
#!/usr/bin/env node
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { CallToolRequestSchema, ListToolsRequestSchema } from "@modelcontextprotocol/sdk/types.js";
import { Reasoner } from './reasoner.js';
import { ReasoningStrategy } from './types.js';
// Initialize server
const server = new Server(
{
name: "pentestthinkingMCP",
version: "1.0.0",
},
{
capabilities: {
tools: {},
},
}
);
// Initialize reasoner
const reasoner = new Reasoner();
// Process input and ensure correct types
function processInput(input: any) {
const result = {
attackStep: String(input.attackStep || ""),
attackStepNumber: Number(input.attackStepNumber || 0),
totalAttackSteps: Number(input.totalAttackSteps || 0),
nextAttackStepNeeded: Boolean(input.nextAttackStepNeeded),
strategyType: input.strategyType as ReasoningStrategy | undefined
};
// Validate
if (!result.attackStep) {
throw new Error("attackStep must be provided");
}
if (result.attackStepNumber < 1) {
throw new Error("attackStepNumber must be >= 1");
}
if (result.totalAttackSteps < 1) {
throw new Error("totalAttackSteps must be >= 1");
}
return result;
}
// Register the tool
server.setRequestHandler(ListToolsRequestSchema, async () => ({
tools: [{
name: "pentestthinkingMCP",
description: "Advanced reasoning tool with multiple strategies including Beam Search and Monte Carlo Tree Search",
inputSchema: {
type: "object",
properties: {
attackStep: {
type: "string",
description: "Current attack step or action in the penetration test"
},
attackStepNumber: {
type: "integer",
description: "Current step number in the attack chain",
minimum: 1
},
totalAttackSteps: {
type: "integer",
description: "Total expected steps in the attack chain",
minimum: 1
},
nextAttackStepNeeded: {
type: "boolean",
description: "Whether another attack step is needed"
},
strategyType: {
type: "string",
enum: Object.values(ReasoningStrategy),
description: "Attack strategy to use (beam_search or mcts)"
}
},
required: ["attackStep", "attackStepNumber", "totalAttackSteps", "nextAttackStepNeeded"]
}
}]
}));
// Handle requests
server.setRequestHandler(CallToolRequestSchema, async (request) => {
if (request.params.name !== "pentestthinkingMCP") {
return {
content: [{
type: "text",
text: JSON.stringify({ error: "Unknown tool", success: false })
}],
isError: true
};
}
try {
// Process and validate input
const step = processInput(request.params.arguments);
// Process attack step with selected strategy
const response = await reasoner.processAttackStep({
attackStep: step.attackStep,
attackStepNumber: step.attackStepNumber,
totalAttackSteps: step.totalAttackSteps,
nextAttackStepNeeded: step.nextAttackStepNeeded,
strategyType: step.strategyType
});
// Get attack chain stats
const stats = await reasoner.getStats();
// Return enhanced response
const result = {
attackStepNumber: step.attackStepNumber,
totalAttackSteps: step.totalAttackSteps,
nextAttackStepNeeded: step.nextAttackStepNeeded,
attackStep: step.attackStep,
nodeId: response.nodeId,
score: response.score,
strategyUsed: response.strategyUsed,
stats: {
totalNodes: stats.totalNodes,
averageScore: stats.averageScore,
maxDepth: stats.maxDepth,
branchingFactor: stats.branchingFactor,
strategyMetrics: stats.strategyMetrics
}
};
return {
content: [{
type: "text",
text: JSON.stringify(result)
}]
};
} catch (error) {
return {
content: [{
type: "text",
text: JSON.stringify({
error: error instanceof Error ? error.message : String(error),
success: false
})
}],
isError: true
};
}
});
// Start server
const transport = new StdioServerTransport();
server.connect(transport).catch(error => {
process.stderr.write(`Error starting server: ${error}\n`);
process.exit(1);
});