Skip to main content
Glama

WYGIWYH Expense Tracking MCP Server

by ReNewator
server.py13 kB
#!/usr/bin/env python3 """ WYGIWYH API MCP Server Universal MCP server for the WYGIWYH expense tracking API Copyright (c) 2025 ReNewator.com All rights reserved. """ import os import json import base64 from typing import Any import yaml import httpx from mcp.server import Server from mcp.types import Tool, TextContent from pydantic import AnyUrl import asyncio API_BASE_URL = "https://your-WYGIWYH.com" def get_auth_header() -> str: """Get the current Basic auth header from environment variables.""" api_username = os.getenv("API_USERNAME", "") api_password = os.getenv("API_PASSWORD", "") if api_username and api_password: return base64.b64encode(f"{api_username}:{api_password}".encode()).decode() return "" with open("attached_assets/WYGIWYH API (1)_1759581638933.yaml", "r", encoding="utf-8") as f: openapi_spec = yaml.safe_load(f) app = Server("wygiwyh-api-server") async def get_tools_list(): """Get list of all tools (for HTTP transport).""" tools = generate_tools_from_openapi(openapi_spec) return [ { "name": tool.name, "description": tool.description, "inputSchema": tool.inputSchema } for tool in tools ] async def call_tool_internal(name: str, arguments: dict): """Call a tool directly (for HTTP transport).""" from mcp.types import TextContent # Call the existing call_tool function result = await call_tool(name, arguments) # Convert TextContent to dict if result: # Handle different result types if isinstance(result, list) and len(result) > 0: first_item = result[0] if isinstance(first_item, TextContent): return {"text": first_item.text} return {"text": str(result)} return {"error": "No response from tool"} def convert_openapi_to_json_schema(schema: dict, components: dict) -> dict: """Convert OpenAPI schema to JSON Schema format for MCP tools.""" if not schema: return {"type": "object", "properties": {}} result = {} if "$ref" in schema: ref_path = schema["$ref"].split("/") if ref_path[0] == "#" and ref_path[1] == "components" and ref_path[2] == "schemas": schema_name = ref_path[3] if schema_name in components.get("schemas", {}): return convert_openapi_to_json_schema(components["schemas"][schema_name], components) if "allOf" in schema: merged_properties = {} merged_required = [] merged_type = None for sub_schema in schema["allOf"]: converted = convert_openapi_to_json_schema(sub_schema, components) if "properties" in converted: merged_properties.update(converted["properties"]) if "required" in converted: merged_required.extend(converted["required"]) if "type" in converted and not merged_type: merged_type = converted["type"] result["type"] = merged_type or "object" if merged_properties: result["properties"] = merged_properties if merged_required: result["required"] = list(set(merged_required)) return result if "oneOf" in schema: return schema.get("oneOf", [{}])[0] if "type" in schema: result["type"] = schema["type"] if "properties" in schema: result["properties"] = {} for prop_name, prop_schema in schema["properties"].items(): if not prop_schema.get("readOnly", False): result["properties"][prop_name] = convert_openapi_to_json_schema(prop_schema, components) if "required" in schema: result["required"] = [r for r in schema["required"] if r in result.get("properties", {})] for key in ["description", "title", "maxLength", "minLength", "maximum", "minimum", "pattern", "format", "nullable", "items", "enum"]: if key in schema: result[key] = schema[key] return result def generate_tools_from_openapi(spec: dict) -> list[Tool]: """Generate MCP tools from OpenAPI specification.""" tools = [] paths = spec.get("paths", {}) components = spec.get("components", {}) for path, path_item in paths.items(): for method, operation in path_item.items(): if method.upper() not in ["GET", "POST", "PUT", "PATCH", "DELETE"]: continue operation_id = operation.get("operationId", f"{method}_{path}") description = operation.get("description", operation.get("summary", f"{method.upper()} {path}")) input_schema = { "type": "object", "properties": {}, "required": [] } if "parameters" in operation: for param in operation["parameters"]: param_name = param["name"] param_schema = convert_openapi_to_json_schema(param.get("schema", {"type": "string"}), components) param_schema["description"] = param.get("description", "") if param["in"] == "path": param_name = f"path_{param_name}" elif param["in"] == "query": param_name = f"query_{param_name}" input_schema["properties"][param_name] = param_schema if param.get("required", False): input_schema["required"].append(param_name) if "requestBody" in operation: req_body = operation["requestBody"] content = req_body.get("content", {}) body_schema = None preferred_content_type = None for content_type in ["application/json", "application/x-www-form-urlencoded", "multipart/form-data", "text/plain"]: if content_type in content: body_schema = content[content_type].get("schema", {}) preferred_content_type = content_type break if body_schema: converted_schema = convert_openapi_to_json_schema(body_schema, components) if "properties" in converted_schema: for prop_name, prop_schema in converted_schema["properties"].items(): input_schema["properties"][f"body_{prop_name}"] = prop_schema if "required" in converted_schema: for req_field in converted_schema["required"]: input_schema["required"].append(f"body_{req_field}") tool = Tool( name=operation_id, description=description[:1024], inputSchema=input_schema ) tools.append(tool) return tools @app.list_tools() async def list_tools() -> list[Tool]: """List all available API tools.""" return generate_tools_from_openapi(openapi_spec) @app.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: """Execute API calls based on tool name and arguments.""" auth_header = get_auth_header() if not auth_header: return [TextContent( type="text", text="Error: API_USERNAME and API_PASSWORD environment variables must be set to make API calls" )] paths = openapi_spec.get("paths", {}) path = None method = None operation = None for api_path, path_item in paths.items(): for http_method, op in path_item.items(): if http_method.upper() not in ["GET", "POST", "PUT", "PATCH", "DELETE"]: continue if op.get("operationId") == name: path = api_path method = http_method.upper() operation = op break if path: break if not path or not method: return [TextContent( type="text", text=f"Error: Tool '{name}' not found in API specification" )] path_params = {} query_params = {} body_data = {} for key, value in arguments.items(): if key.startswith("path_"): path_params[key[5:]] = value elif key.startswith("query_"): query_params[key[6:]] = value elif key.startswith("body_"): body_data[key[5:]] = value url = path for param_name, param_value in path_params.items(): url = url.replace(f"{{{param_name}}}", str(param_value)) full_url = f"{API_BASE_URL}{url}" headers = { "Authorization": f"Basic {auth_header}", "Accept": "application/json" } try: async with httpx.AsyncClient(timeout=30.0) as client: if method == "GET": response = await client.get(full_url, headers=headers, params=query_params) elif method == "POST": if body_data: headers["Content-Type"] = "application/json" response = await client.post(full_url, headers=headers, params=query_params, json=body_data) else: response = await client.post(full_url, headers=headers, params=query_params) elif method == "PUT": if body_data: headers["Content-Type"] = "application/json" response = await client.put(full_url, headers=headers, params=query_params, json=body_data) else: response = await client.put(full_url, headers=headers, params=query_params) elif method == "PATCH": if body_data: headers["Content-Type"] = "application/json" response = await client.patch(full_url, headers=headers, params=query_params, json=body_data) else: response = await client.patch(full_url, headers=headers, params=query_params) elif method == "DELETE": response = await client.delete(full_url, headers=headers, params=query_params) else: return [TextContent(type="text", text=f"Error: Unsupported HTTP method: {method}")] response.raise_for_status() if response.status_code == 204: result = {"status": "success", "message": "Resource deleted or no content returned"} else: try: result = response.json() except: content_type = response.headers.get("content-type", "") if "text/html" in content_type: result = { "status": "success", "content_type": content_type, "message": "Response received (HTML content)", "text_preview": response.text[:500] } else: result = {"response": response.text} return [TextContent( type="text", text=json.dumps(result, indent=2, ensure_ascii=False) )] except httpx.HTTPStatusError as e: error_detail = "" status_code = e.response.status_code try: error_json = e.response.json() error_detail = json.dumps(error_json, indent=2) except: content_type = e.response.headers.get("content-type", "") if "text/html" in content_type: error_detail = f"HTML Error Page (status {status_code})" if e.response.text: import re title_match = re.search(r'<title>(.*?)</title>', e.response.text, re.IGNORECASE) if title_match: error_detail += f"\nTitle: {title_match.group(1)}" else: error_detail = e.response.text[:500] return [TextContent( type="text", text=f"HTTP Error {status_code}:\n{error_detail}" )] except Exception as e: return [TextContent( type="text", text=f"Error: {type(e).__name__}: {str(e)}" )] async def main(): from mcp.server.stdio import stdio_server async with stdio_server() as (read_stream, write_stream): await app.run( read_stream, write_stream, app.create_initialization_options() ) if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ReNewator/MCP-WYGIWYH'

If you have feedback or need assistance with the MCP directory API, please join our Discord server