import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { z } from "zod";
import { spawn } from "child_process";
import { promises as fs } from "fs";
import path from "path";
import { fileURLToPath } from "url";
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
// Configuration schema
export const configSchema = z.object({
debug: z.boolean().default(false).describe("Enable debug logging"),
});
// Simple file-based communication with Python backend
async function callPythonScript(tool: string, args: any): Promise<string> {
const pythonScript = `
import sys
import json
import base64
import asyncio
sys.path.append('${__dirname.replace(/\\/g, '/')}')
try:
from ddddocr_mcp_server import DDDDOCRServer
async def main():
server = DDDDOCRServer()
if '${tool}' == 'ocr_recognize':
result = await server._ocr_recognize(${JSON.stringify(args)})
elif '${tool}' == 'detect_objects':
result = await server._detect_objects(${JSON.stringify(args)})
elif '${tool}' == 'match_slider':
result = await server._match_slider(${JSON.stringify(args)})
elif '${tool}' == 'health_check':
result = await server._health_check()
else:
result = [{"type": "text", "text": "Unknown tool"}]
print(json.dumps([r.text if hasattr(r, 'text') else str(r) for r in result]))
asyncio.run(main())
except Exception as e:
print(json.dumps({"error": str(e)}))
`;
return new Promise((resolve, reject) => {
const pythonProcess = spawn("python3", ["-c", pythonScript], {
stdio: ["ignore", "pipe", "pipe"],
cwd: __dirname,
});
let stdout = "";
let stderr = "";
pythonProcess.stdout.on("data", (data) => {
stdout += data.toString();
});
pythonProcess.stderr.on("data", (data) => {
stderr += data.toString();
});
pythonProcess.on("close", (code) => {
if (code === 0) {
resolve(stdout.trim());
} else {
reject(new Error(`Python process failed: ${stderr || stdout}`));
}
});
pythonProcess.on("error", (error) => {
reject(error);
});
});
}
export default function createStatelessServer({
config,
}: {
config: z.infer<typeof configSchema>;
}) {
const server = new McpServer({
name: "ddddocr CAPTCHA Recognition Server",
version: "1.0.0",
});
// OCR Recognition Tool
server.tool(
"ocr_recognize",
"Recognize text content from CAPTCHA images",
{
image_base64: z.string().optional().describe("Base64 encoded image data"),
image_path: z.string().optional().describe("Path to image file"),
},
async ({ image_base64, image_path }) => {
try {
if (!image_base64 && !image_path) {
throw new Error("Either image_base64 or image_path must be provided");
}
const result = await callPythonScript("ocr_recognize", {
image_base64,
image_path,
});
const parsedResult = JSON.parse(result);
if (parsedResult.error) {
throw new Error(parsedResult.error);
}
return {
content: [
{
type: "text",
text: `OCR Recognition Result: ${parsedResult.join(", ")}`,
},
],
};
} catch (error) {
return {
content: [
{
type: "text",
text: `OCR Recognition Error: ${error instanceof Error ? error.message : "Unknown error"}`,
},
],
};
}
}
);
// Object Detection Tool
server.tool(
"detect_objects",
"Detect target objects in CAPTCHA images",
{
image_base64: z.string().optional().describe("Base64 encoded image data"),
image_path: z.string().optional().describe("Path to image file"),
},
async ({ image_base64, image_path }) => {
try {
if (!image_base64 && !image_path) {
throw new Error("Either image_base64 or image_path must be provided");
}
const result = await callPythonScript("detect_objects", {
image_base64,
image_path,
});
const parsedResult = JSON.parse(result);
if (parsedResult.error) {
throw new Error(parsedResult.error);
}
return {
content: [
{
type: "text",
text: `Object Detection Result: ${parsedResult.join(", ")}`,
},
],
};
} catch (error) {
return {
content: [
{
type: "text",
text: `Object Detection Error: ${error instanceof Error ? error.message : "Unknown error"}`,
},
],
};
}
}
);
// Slider Matching Tool
server.tool(
"match_slider",
"Match slider CAPTCHA position",
{
target_base64: z.string().optional().describe("Target image base64 encoded"),
background_base64: z.string().optional().describe("Background image base64 encoded"),
target_path: z.string().optional().describe("Target image file path"),
background_path: z.string().optional().describe("Background image file path"),
},
async ({ target_base64, background_base64, target_path, background_path }) => {
try {
const hasBase64 = target_base64 && background_base64;
const hasPaths = target_path && background_path;
if (!hasBase64 && !hasPaths) {
throw new Error("Either base64 images or file paths must be provided");
}
const result = await callPythonScript("match_slider", {
target_base64,
background_base64,
target_path,
background_path,
});
const parsedResult = JSON.parse(result);
if (parsedResult.error) {
throw new Error(parsedResult.error);
}
return {
content: [
{
type: "text",
text: `Slider Matching Result: ${parsedResult.join(", ")}`,
},
],
};
} catch (error) {
return {
content: [
{
type: "text",
text: `Slider Matching Error: ${error instanceof Error ? error.message : "Unknown error"}`,
},
],
};
}
}
);
// Health Check Tool
server.tool(
"health_check",
"Check ddddocr service health status",
{},
async () => {
try {
const result = await callPythonScript("health_check", {});
const parsedResult = JSON.parse(result);
if (parsedResult.error) {
throw new Error(parsedResult.error);
}
return {
content: [
{
type: "text",
text: `Health Check Result: ${parsedResult.join(", ")}`,
},
],
};
} catch (error) {
return {
content: [
{
type: "text",
text: `Health Check Error: ${error instanceof Error ? error.message : "Unknown error"}`,
},
],
};
}
}
);
return server.server;
}