index.ts•8.29 kB
#!/usr/bin/env node
/**
* 图像处理MCP服务器
* 使用qwen2.5-vl模型处理图像,提供以下功能:
* - 处理图像并生成代码
* - 处理图像并生成描述
*/
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
CallToolRequestSchema,
ListToolsRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
import axios from "axios";
import FormData from "form-data";
import fs from "fs";
import path from "path";
import os from "os";
import { promisify } from "util";
import { pipeline } from "stream";
// API密钥,从环境变量中获取
const API_KEY = process.env.QWEN_API_KEY;
if (!API_KEY) {
console.error("警告: 未设置QWEN_API_KEY环境变量,API调用可能会失败");
}
// API端点
const API_ENDPOINT = process.env.QWEN_API_ENDPOINT || "https://dashscope.aliyuncs.com/api/v1/services/aigc/multimodal-generation/generation";
// 创建临时目录用于存储下载的图像
const TEMP_DIR = path.join(os.tmpdir(), "image-processor");
if (!fs.existsSync(TEMP_DIR)) {
fs.mkdirSync(TEMP_DIR, { recursive: true });
}
// 流式下载辅助函数
const streamPipeline = promisify(pipeline);
/**
* 下载图像到临时文件
* @param imageUrl 图像URL
* @returns 临时文件路径
*/
async function downloadImage(imageUrl: string): Promise<string> {
try {
// 为图像生成唯一文件名
const filename = `${Date.now()}-${Math.random().toString(36).substring(2, 15)}.jpg`;
const filePath = path.join(TEMP_DIR, filename);
// 下载图像
const response = await axios({
method: 'GET',
url: imageUrl,
responseType: 'stream'
});
await streamPipeline(response.data, fs.createWriteStream(filePath));
return filePath;
} catch (error: any) {
console.error("下载图像失败:", error);
throw new Error(`下载图像失败: ${error.message}`);
}
}
/**
* 调用qwen2.5-vl模型API处理图像
* @param imagePath 图像路径
* @param prompt 提示词
* @returns 模型响应
*/
async function processImageWithQwen(imagePath: string, prompt: string): Promise<string> {
try {
// 读取图像文件
const imageBuffer = fs.readFileSync(imagePath);
const base64Image = imageBuffer.toString('base64');
// 准备请求数据
const requestData = {
model: "qwen-vl-plus",
input: {
messages: [
{
role: "user",
content: [
{
type: "text",
text: prompt
},
{
type: "image",
image: base64Image
}
]
}
]
},
parameters: {}
};
// 发送请求到API
const response = await axios.post(API_ENDPOINT, requestData, {
headers: {
'Authorization': `Bearer ${API_KEY}`,
'Content-Type': 'application/json'
}
});
// 处理响应
if (response.data && response.data.output && response.data.output.text) {
return response.data.output.text;
} else {
throw new Error("API响应格式不正确");
}
} catch (error: any) {
console.error("调用qwen2.5-vl模型API失败:", error);
if (error.response) {
console.error("API响应:", error.response.data);
}
throw new Error(`处理图像失败: ${error.message}`);
} finally {
// 清理临时文件
try {
fs.unlinkSync(imagePath);
} catch (e) {
console.error("清理临时文件失败:", e);
}
}
}
/**
* 创建MCP服务器
*/
const server = new Server(
{
name: "image-processor",
version: "0.1.0",
},
{
capabilities: {
tools: {},
},
}
);
/**
* 列出可用工具
*/
server.setRequestHandler(ListToolsRequestSchema, async () => {
return {
tools: [
{
name: "process_image_to_code",
description: "处理图像并生成代码",
inputSchema: {
type: "object",
properties: {
image_url: {
type: "string",
description: "图像URL"
},
language: {
type: "string",
description: "目标编程语言,例如:python, javascript, html, css等",
default: "python"
},
instructions: {
type: "string",
description: "额外的指令或要求",
default: ""
}
},
required: ["image_url"]
}
},
{
name: "process_image_to_description",
description: "处理图像并生成描述",
inputSchema: {
type: "object",
properties: {
image_url: {
type: "string",
description: "图像URL"
},
detail_level: {
type: "string",
description: "描述详细程度:简洁(brief)、标准(standard)或详细(detailed)",
enum: ["brief", "standard", "detailed"],
default: "standard"
},
focus: {
type: "string",
description: "描述重点,例如:物体、场景、人物、情感等",
default: ""
}
},
required: ["image_url"]
}
}
]
};
});
/**
* 处理工具调用
*/
server.setRequestHandler(CallToolRequestSchema, async (request) => {
switch (request.params.name) {
case "process_image_to_code": {
const imageUrl = String(request.params.arguments?.image_url);
const language = String(request.params.arguments?.language || "python");
const instructions = String(request.params.arguments?.instructions || "");
if (!imageUrl) {
throw new Error("图像URL是必需的");
}
try {
// 下载图像
const imagePath = await downloadImage(imageUrl);
// 构建提示词
let prompt = `请根据这张图片生成${language}代码。`;
if (instructions) {
prompt += ` ${instructions}`;
}
// 处理图像
const result = await processImageWithQwen(imagePath, prompt);
return {
content: [{
type: "text",
text: result
}]
};
} catch (error: any) {
return {
content: [{
type: "text",
text: `处理图像失败: ${error.message}`
}],
isError: true
};
}
}
case "process_image_to_description": {
const imageUrl = String(request.params.arguments?.image_url);
const detailLevel = String(request.params.arguments?.detail_level || "standard");
const focus = String(request.params.arguments?.focus || "");
if (!imageUrl) {
throw new Error("图像URL是必需的");
}
try {
// 下载图像
const imagePath = await downloadImage(imageUrl);
// 构建提示词
let prompt = "请描述这张图片";
if (detailLevel === "brief") {
prompt += ",给出简洁的描述";
} else if (detailLevel === "detailed") {
prompt += ",提供详细的描述,包括细节、背景和上下文";
}
if (focus) {
prompt += `,重点关注${focus}`;
}
// 处理图像
const result = await processImageWithQwen(imagePath, prompt);
return {
content: [{
type: "text",
text: result
}]
};
} catch (error: any) {
return {
content: [{
type: "text",
text: `处理图像失败: ${error.message}`
}],
isError: true
};
}
}
default:
throw new Error("未知工具");
}
});
/**
* 启动服务器
*/
async function main() {
const transport = new StdioServerTransport();
await server.connect(transport);
console.error("图像处理MCP服务器已启动");
}
main().catch((error) => {
console.error("服务器错误:", error);
process.exit(1);
});