Skip to main content
Glama

Image Processor MCP Server

by sanliunanjue
index.ts8.29 kB
#!/usr/bin/env node /** * 图像处理MCP服务器 * 使用qwen2.5-vl模型处理图像,提供以下功能: * - 处理图像并生成代码 * - 处理图像并生成描述 */ import { Server } from "@modelcontextprotocol/sdk/server/index.js"; import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"; import { CallToolRequestSchema, ListToolsRequestSchema, } from "@modelcontextprotocol/sdk/types.js"; import axios from "axios"; import FormData from "form-data"; import fs from "fs"; import path from "path"; import os from "os"; import { promisify } from "util"; import { pipeline } from "stream"; // API密钥,从环境变量中获取 const API_KEY = process.env.QWEN_API_KEY; if (!API_KEY) { console.error("警告: 未设置QWEN_API_KEY环境变量,API调用可能会失败"); } // API端点 const API_ENDPOINT = process.env.QWEN_API_ENDPOINT || "https://dashscope.aliyuncs.com/api/v1/services/aigc/multimodal-generation/generation"; // 创建临时目录用于存储下载的图像 const TEMP_DIR = path.join(os.tmpdir(), "image-processor"); if (!fs.existsSync(TEMP_DIR)) { fs.mkdirSync(TEMP_DIR, { recursive: true }); } // 流式下载辅助函数 const streamPipeline = promisify(pipeline); /** * 下载图像到临时文件 * @param imageUrl 图像URL * @returns 临时文件路径 */ async function downloadImage(imageUrl: string): Promise<string> { try { // 为图像生成唯一文件名 const filename = `${Date.now()}-${Math.random().toString(36).substring(2, 15)}.jpg`; const filePath = path.join(TEMP_DIR, filename); // 下载图像 const response = await axios({ method: 'GET', url: imageUrl, responseType: 'stream' }); await streamPipeline(response.data, fs.createWriteStream(filePath)); return filePath; } catch (error: any) { console.error("下载图像失败:", error); throw new Error(`下载图像失败: ${error.message}`); } } /** * 调用qwen2.5-vl模型API处理图像 * @param imagePath 图像路径 * @param prompt 提示词 * @returns 模型响应 */ async function processImageWithQwen(imagePath: string, prompt: string): Promise<string> { try { // 读取图像文件 const imageBuffer = fs.readFileSync(imagePath); const base64Image = imageBuffer.toString('base64'); // 准备请求数据 const requestData = { model: "qwen-vl-plus", input: { messages: [ { role: "user", content: [ { type: "text", text: prompt }, { type: "image", image: base64Image } ] } ] }, parameters: {} }; // 发送请求到API const response = await axios.post(API_ENDPOINT, requestData, { headers: { 'Authorization': `Bearer ${API_KEY}`, 'Content-Type': 'application/json' } }); // 处理响应 if (response.data && response.data.output && response.data.output.text) { return response.data.output.text; } else { throw new Error("API响应格式不正确"); } } catch (error: any) { console.error("调用qwen2.5-vl模型API失败:", error); if (error.response) { console.error("API响应:", error.response.data); } throw new Error(`处理图像失败: ${error.message}`); } finally { // 清理临时文件 try { fs.unlinkSync(imagePath); } catch (e) { console.error("清理临时文件失败:", e); } } } /** * 创建MCP服务器 */ const server = new Server( { name: "image-processor", version: "0.1.0", }, { capabilities: { tools: {}, }, } ); /** * 列出可用工具 */ server.setRequestHandler(ListToolsRequestSchema, async () => { return { tools: [ { name: "process_image_to_code", description: "处理图像并生成代码", inputSchema: { type: "object", properties: { image_url: { type: "string", description: "图像URL" }, language: { type: "string", description: "目标编程语言,例如:python, javascript, html, css等", default: "python" }, instructions: { type: "string", description: "额外的指令或要求", default: "" } }, required: ["image_url"] } }, { name: "process_image_to_description", description: "处理图像并生成描述", inputSchema: { type: "object", properties: { image_url: { type: "string", description: "图像URL" }, detail_level: { type: "string", description: "描述详细程度:简洁(brief)、标准(standard)或详细(detailed)", enum: ["brief", "standard", "detailed"], default: "standard" }, focus: { type: "string", description: "描述重点,例如:物体、场景、人物、情感等", default: "" } }, required: ["image_url"] } } ] }; }); /** * 处理工具调用 */ server.setRequestHandler(CallToolRequestSchema, async (request) => { switch (request.params.name) { case "process_image_to_code": { const imageUrl = String(request.params.arguments?.image_url); const language = String(request.params.arguments?.language || "python"); const instructions = String(request.params.arguments?.instructions || ""); if (!imageUrl) { throw new Error("图像URL是必需的"); } try { // 下载图像 const imagePath = await downloadImage(imageUrl); // 构建提示词 let prompt = `请根据这张图片生成${language}代码。`; if (instructions) { prompt += ` ${instructions}`; } // 处理图像 const result = await processImageWithQwen(imagePath, prompt); return { content: [{ type: "text", text: result }] }; } catch (error: any) { return { content: [{ type: "text", text: `处理图像失败: ${error.message}` }], isError: true }; } } case "process_image_to_description": { const imageUrl = String(request.params.arguments?.image_url); const detailLevel = String(request.params.arguments?.detail_level || "standard"); const focus = String(request.params.arguments?.focus || ""); if (!imageUrl) { throw new Error("图像URL是必需的"); } try { // 下载图像 const imagePath = await downloadImage(imageUrl); // 构建提示词 let prompt = "请描述这张图片"; if (detailLevel === "brief") { prompt += ",给出简洁的描述"; } else if (detailLevel === "detailed") { prompt += ",提供详细的描述,包括细节、背景和上下文"; } if (focus) { prompt += `,重点关注${focus}`; } // 处理图像 const result = await processImageWithQwen(imagePath, prompt); return { content: [{ type: "text", text: result }] }; } catch (error: any) { return { content: [{ type: "text", text: `处理图像失败: ${error.message}` }], isError: true }; } } default: throw new Error("未知工具"); } }); /** * 启动服务器 */ async function main() { const transport = new StdioServerTransport(); await server.connect(transport); console.error("图像处理MCP服务器已启动"); } main().catch((error) => { console.error("服务器错误:", error); process.exit(1); });

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/sanliunanjue/image-processor-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server