Skip to main content
Glama
ymeng98

ddddocr CAPTCHA Recognition MCP Server

by ymeng98
index.ts7.29 kB
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; import { z } from "zod"; import { spawn } from "child_process"; import { promises as fs } from "fs"; import path from "path"; import { fileURLToPath } from "url"; const __filename = fileURLToPath(import.meta.url); const __dirname = path.dirname(__filename); // Configuration schema export const configSchema = z.object({ debug: z.boolean().default(false).describe("Enable debug logging"), }); // Simple file-based communication with Python backend async function callPythonScript(tool: string, args: any): Promise<string> { const pythonScript = ` import sys import json import base64 import asyncio sys.path.append('${__dirname.replace(/\\/g, '/')}') try: from ddddocr_mcp_server import DDDDOCRServer async def main(): server = DDDDOCRServer() if '${tool}' == 'ocr_recognize': result = await server._ocr_recognize(${JSON.stringify(args)}) elif '${tool}' == 'detect_objects': result = await server._detect_objects(${JSON.stringify(args)}) elif '${tool}' == 'match_slider': result = await server._match_slider(${JSON.stringify(args)}) elif '${tool}' == 'health_check': result = await server._health_check() else: result = [{"type": "text", "text": "Unknown tool"}] print(json.dumps([r.text if hasattr(r, 'text') else str(r) for r in result])) asyncio.run(main()) except Exception as e: print(json.dumps({"error": str(e)})) `; return new Promise((resolve, reject) => { const pythonProcess = spawn("python3", ["-c", pythonScript], { stdio: ["ignore", "pipe", "pipe"], cwd: __dirname, }); let stdout = ""; let stderr = ""; pythonProcess.stdout.on("data", (data) => { stdout += data.toString(); }); pythonProcess.stderr.on("data", (data) => { stderr += data.toString(); }); pythonProcess.on("close", (code) => { if (code === 0) { resolve(stdout.trim()); } else { reject(new Error(`Python process failed: ${stderr || stdout}`)); } }); pythonProcess.on("error", (error) => { reject(error); }); }); } export default function createStatelessServer({ config, }: { config: z.infer<typeof configSchema>; }) { const server = new McpServer({ name: "ddddocr CAPTCHA Recognition Server", version: "1.0.0", }); // OCR Recognition Tool server.tool( "ocr_recognize", "Recognize text content from CAPTCHA images", { image_base64: z.string().optional().describe("Base64 encoded image data"), image_path: z.string().optional().describe("Path to image file"), }, async ({ image_base64, image_path }) => { try { if (!image_base64 && !image_path) { throw new Error("Either image_base64 or image_path must be provided"); } const result = await callPythonScript("ocr_recognize", { image_base64, image_path, }); const parsedResult = JSON.parse(result); if (parsedResult.error) { throw new Error(parsedResult.error); } return { content: [ { type: "text", text: `OCR Recognition Result: ${parsedResult.join(", ")}`, }, ], }; } catch (error) { return { content: [ { type: "text", text: `OCR Recognition Error: ${error instanceof Error ? error.message : "Unknown error"}`, }, ], }; } } ); // Object Detection Tool server.tool( "detect_objects", "Detect target objects in CAPTCHA images", { image_base64: z.string().optional().describe("Base64 encoded image data"), image_path: z.string().optional().describe("Path to image file"), }, async ({ image_base64, image_path }) => { try { if (!image_base64 && !image_path) { throw new Error("Either image_base64 or image_path must be provided"); } const result = await callPythonScript("detect_objects", { image_base64, image_path, }); const parsedResult = JSON.parse(result); if (parsedResult.error) { throw new Error(parsedResult.error); } return { content: [ { type: "text", text: `Object Detection Result: ${parsedResult.join(", ")}`, }, ], }; } catch (error) { return { content: [ { type: "text", text: `Object Detection Error: ${error instanceof Error ? error.message : "Unknown error"}`, }, ], }; } } ); // Slider Matching Tool server.tool( "match_slider", "Match slider CAPTCHA position", { target_base64: z.string().optional().describe("Target image base64 encoded"), background_base64: z.string().optional().describe("Background image base64 encoded"), target_path: z.string().optional().describe("Target image file path"), background_path: z.string().optional().describe("Background image file path"), }, async ({ target_base64, background_base64, target_path, background_path }) => { try { const hasBase64 = target_base64 && background_base64; const hasPaths = target_path && background_path; if (!hasBase64 && !hasPaths) { throw new Error("Either base64 images or file paths must be provided"); } const result = await callPythonScript("match_slider", { target_base64, background_base64, target_path, background_path, }); const parsedResult = JSON.parse(result); if (parsedResult.error) { throw new Error(parsedResult.error); } return { content: [ { type: "text", text: `Slider Matching Result: ${parsedResult.join(", ")}`, }, ], }; } catch (error) { return { content: [ { type: "text", text: `Slider Matching Error: ${error instanceof Error ? error.message : "Unknown error"}`, }, ], }; } } ); // Health Check Tool server.tool( "health_check", "Check ddddocr service health status", {}, async () => { try { const result = await callPythonScript("health_check", {}); const parsedResult = JSON.parse(result); if (parsedResult.error) { throw new Error(parsedResult.error); } return { content: [ { type: "text", text: `Health Check Result: ${parsedResult.join(", ")}`, }, ], }; } catch (error) { return { content: [ { type: "text", text: `Health Check Error: ${error instanceof Error ? error.message : "Unknown error"}`, }, ], }; } } ); return server.server; }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ymeng98/ddddocr-captcha-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server