PeakMojo Server
Official
by peakmojo
- src
- mcp_server_peakmojo
import argparse
import json
import logging
import os
from typing import Any, Dict, Optional
import requests
import yaml
from mcp.server import Server
import mcp.types as types
import mcp.server.stdio
from mcp.server.models import InitializationOptions
from mcp.server import NotificationOptions
from pydantic import AnyUrl
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("peakmojo_server")
def parse_arguments() -> argparse.Namespace:
"""Use argparse to allow values to be set as CLI switches
or environment variables
"""
parser = argparse.ArgumentParser(description='PeakMojo Server')
parser.add_argument('--api-key', help='PeakMojo API key', default=os.environ.get('PEAKMOJO_API_KEY'))
parser.add_argument('--base-url', help='PeakMojo API base URL', default=os.environ.get('PEAKMOJO_BASE_URL', 'https://api.staging.readymojo.com'))
return parser.parse_args()
class PeakMojoQuerier:
def __init__(self):
"""Initialize PeakMojo API client"""
args = parse_arguments()
self.api_key = args.api_key
self.base_url = args.base_url
if not self.api_key:
logger.warning("PeakMojo API key not found in environment variables")
def get_headers(self) -> dict:
"""Get request headers with Bearer token"""
return {
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json'
}
def execute_query(self, endpoint: str, method: str = 'GET', data: Optional[Dict[str, Any]] = None, params: Optional[Dict[str, Any]] = None) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
"""Execute a query against the PeakMojo API and return response in YAML format"""
try:
url = f"{self.base_url}{endpoint}"
headers = self.get_headers()
response = requests.request(
method=method,
url=url,
headers=headers,
json=data if data else None,
params=params if params else None
)
# Raise an exception for bad status codes
response.raise_for_status()
# Parse JSON response and convert to YAML
json_response = response.json()
yaml_response = yaml.dump(json_response, sort_keys=False, allow_unicode=True)
return [types.TextContent(type="text", text=yaml_response)]
except requests.exceptions.RequestException as e:
logger.error(f"Request error: {str(e)}")
error_response = {"error": str(e)}
yaml_response = yaml.dump(error_response, sort_keys=False, allow_unicode=True)
return [types.TextContent(type="text", text=yaml_response)]
async def main():
"""Run the PeakMojo Server"""
logger.info("PeakMojo Server starting")
peakmojo = PeakMojoQuerier()
server = Server("peakmojo")
@server.list_resources()
async def handle_list_resources() -> list[types.Resource]:
return [
types.Resource(
uri=AnyUrl("peakmojo://api"),
name="PeakMojo API",
description="Access any PeakMojo API endpoint",
mimeType="application/json",
),
]
@server.read_resource()
async def handle_read_resource(uri: AnyUrl) -> str:
if uri.scheme != "peakmojo":
raise ValueError(f"Unsupported URI scheme: {uri.scheme}")
path = str(uri).replace("peakmojo://", "")
if path != "api":
raise ValueError(f"Unknown resource path: {path}")
return peakmojo.execute_query("/")
@server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
"""List available tools"""
return [
types.Tool(
name="peakmojo_make_api_request",
description="Make a request to any PeakMojo API endpoint. Use the API documentation at /api/docs/categories to discover available endpoints.",
inputSchema={
"type": "object",
"properties": {
"endpoint": {
"type": "string",
"description": "The API endpoint to call (e.g. '/v1/users' or '/api/docs/categories')"
},
"method": {
"type": "string",
"description": "HTTP method (GET, POST, PUT, DELETE, etc)",
"default": "GET",
"enum": ["GET", "POST", "PUT", "DELETE", "PATCH"]
},
"params": {
"type": "object",
"description": "Query parameters to include in the request",
"additionalProperties": True
},
"data": {
"type": "object",
"description": "Request body for POST/PUT/PATCH requests",
"additionalProperties": True
}
},
"required": ["endpoint"]
},
),
]
@server.call_tool()
async def handle_invoke_tool(name: str, inputs: Dict[str, Any]) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
"""Handle tool invocations"""
try:
if name == "peakmojo_make_api_request":
endpoint = inputs["endpoint"]
method = inputs.get("method", "GET")
params = inputs.get("params", {})
data = inputs.get("data", {})
# Ensure endpoint starts with /
if not endpoint.startswith("/"):
endpoint = f"/{endpoint}"
return peakmojo.execute_query(
endpoint=endpoint,
method=method,
params=params,
data=data
)
else:
raise ValueError(f"Unknown tool: {name}")
except Exception as e:
logger.error(f"Error invoking tool {name}: {str(e)}")
return [types.TextContent(type="text", text=yaml.dump({"error": str(e)}, sort_keys=False, allow_unicode=True))]
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
logger.info("Server running with stdio transport")
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="peakmojo",
server_version="0.1.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
if __name__ == "__main__":
import asyncio
asyncio.run(main())