Skip to main content
Glama

File Convert MCP Server

by convertguru
server.py9.93 kB
import os import io import re import magic import json from typing import Dict, Any from fastmcp import FastMCP from dotenv import load_dotenv import aiohttp from starlette.responses import JSONResponse import asyncio # Load environment variables load_dotenv() # Initialize FastMCP mcp = FastMCP("file-convert-mcp") base_url = 'https://convert.guru' api_key = os.getenv("CONVERT_GURU_API_KEY") if api_key is None: api_key = '' @mcp.custom_route("/health", methods=["GET"]) async def health_check(request): return JSONResponse({"status": "healthy", "service": "mcp-server"}) @mcp.custom_route("/tools", methods=["GET"]) async def list_tools(request): tools_info = [] for tool_name, tool_func in mcp.tools.items(): tools_info.append({ "name": tool_name, "description": tool_func.__doc__ or "No description available", "async": asyncio.iscoroutinefunction(tool_func) }) return JSONResponse({ "tools": tools_info, "total_tools": len(tools_info) }) @mcp.tool() async def detect_file_type(file_path: str) -> Dict[str, Any]: """Detect file type by sending the first 200 bytes of the file to Convert.Guru API.""" if not file_path: return {"error": "file_path parameter is required"} if not os.path.exists(file_path): return {"error": f"File not found (use the absolute file path): {file_path}"} if not os.path.isfile(file_path): return {"error": f"Not a file: {file_path}"} try: with open(file_path, "rb") as f: first_200_bytes = f.read(200) # Read only the first 200 bytes _, file_extension = os.path.splitext(file_path) file_extension = file_extension.lstrip('.') # Remove dot filename = os.path.basename(file_path) file_size_bytes = os.path.getsize(file_path) size_hex = hex(file_size_bytes) mime_type = magic.Magic(mime=True).from_file(file_path) data = [file_extension,size_hex,mime_type,filename] separator = '\xFE' joined_data = separator.join(data) headers = {'cache-control': 'no-cache', 'api-key': api_key} data_array = bytearray() for char in joined_data: data_array.append(ord(char) & 0xFF) data_array.append(0xFE) data_array.extend(first_200_bytes) async with aiohttp.ClientSession() as session: async with session.post(base_url + '/api/v1/detect_file_type', data=data_array, headers=headers) as response: if response.status == 200: content = await response.text() comment_pattern = re.compile(r'<!--.+-->') # Remove all occurrences of the pattern from the string content = comment_pattern.sub('', content) return {"result": {"file_type_description": content.strip()}} else: error_text = await response.text() return {"error": f"API request failed with status {response.status}: {error_text}"} except aiohttp.ClientError as e: return {"error": f"Error making API request: {e}"} except Exception as e: return {"error": f"Error processing file: {e}"} @mcp.tool() async def convert_file(file_path: str, ext_out: str) -> Dict[str, Any]: """Convert a file to other format via Convert.Guru API. Pass the file path and output extension.""" if not file_path: return {"error": "file_path parameter is required"} if not ext_out: return {"error": "ext_out parameter is required"} if not os.path.exists(file_path): return {"error": f"File not found (use the absolute file path): {file_path}"} if not os.path.isfile(file_path): return {"error": f"Not a file: {file_path}"} try: with open(file_path, "rb") as f: chunk = f.read(40*1024*1024) filename = os.path.basename(file_path) data = aiohttp.FormData() data.add_field('file', chunk, filename=filename) headers = {'cache-control': 'no-cache', 'api-key': api_key} async with aiohttp.ClientSession() as session: async with session.post(base_url + '/api/v1/upload', data=data, headers=headers) as response: if response.status == 200: response_text = await response.text() try: # Explicitly parse using json.loads json_result = json.loads(response_text) json_payload = { "filename": json_result['Filename'], "ext_out": ext_out.lower()} conv_response_text = await post_json(base_url + '/api/v1/convert', json_payload, headers) try: # Explicitly parse using json.loads conv_json_result = json.loads( conv_response_text) ## check for error in conv_json_result if conv_json_result.get('error'): conv_json_result['error'] = f"File conversion failed: {conv_json_result.get('error')}" return conv_json_result file_info = conv_json_result.get('file') if file_info and 'ext_out' in file_info and 'urls' in file_info and file_info['urls']: file_url_path = file_info['urls'][0] file_extension = file_info.get('ext_out') if file_url_path.startswith('/'): full_file_url = base_url + file_url_path else: full_file_url = file_url_path save_path = file_path + '.converted' + \ f".{file_extension}" download_result = await download_file(session, full_file_url, save_path) return download_result else: return {"error": f"File URL information not found in the API response.: {conv_response_text}"} except json.JSONDecodeError: # If the response is not JSON (convert) return {"error": f"API convert request failed - response text is not JSON: {conv_response_text}"} except json.JSONDecodeError: # If the response is not JSON (upload) return {"error": f"API upload request failed - response text is not JSON: {response_text}"} else: error_text = await response.text() return {"error": f"API request failed with status {response.status}: {error_text}"} except aiohttp.ClientError as e: return {"error": f"Error making API request: {e}"} except Exception as e: return {"error": f"Error processing file: {e}"} async def post_json(url: str, data: dict, headers): """ Posts JSON data to a specified URL using aiohttp. Args: url: The URL to post the JSON data to. data: A Python dictionary that will be converted to JSON. """ cloned_headers = headers.copy() cloned_headers['Content-Type'] = 'application/json' async with aiohttp.ClientSession() as session: try: async with session.post(url, json=data, headers=cloned_headers) as response: response_text = await response.text() status = response.status err = {} if status == 200 or status == 400: return response_text elif status == 524: err = {"error": f"Error HTTP 524 - operation takes too long to complete, timeout occured."} elif status == 500: err = {"error": f"Error HTTP 500 - operation takes too long to complete, internal server error."} else: err = {"error": f"Error HTTP {status})."} return json.dumps(err) except aiohttp.ClientError as e: err = {"error": f"An error occurred during the request (exception): {e}"} return json.dumps(err) async def download_file(session: aiohttp.ClientSession, url: str, save_path: str) -> Dict[str, Any]: """Downloads a file from a URL and saves it to the specified path.""" try: async with session.get(url) as response: if response.status == 200: with open(save_path, 'wb') as f: async for chunk in response.content.iter_chunked(8192): f.write(chunk) return {"result": {"converted_file_path": save_path}} else: response_text = await response.text() return {"error": f"File download from {url} failed with status {response.status}: {response_text}"} except aiohttp.ClientError as e: return {"error": f"An error occurred during file download from {url}: {e}"} if __name__ == "__main__": transport = os.getenv("TRANSPORT", "").upper() if transport == "HTTP": port = os.getenv("PORT", "8000") try: port_int = int(port) mcp.run(transport="http", host="0.0.0.0", port=port_int) except ValueError: print(f"Invalid PORT value: {port}. Using default stdio transport.") mcp.run() else: mcp.run()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/convertguru/file-convert-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server