server.py•13 kB
import anyio
import click
import httpx
import requests
import mcp.types as types
from mcp.server.lowlevel import Server
from openai import OpenAI
import os
from dotenv import load_dotenv
import json
from typing import Dict, Any, Optional, List
from urllib.parse import urlparse, parse_qs
import re
# Load environment variables from .env file
load_dotenv()
def fetch_figma_data(figma_url: str) -> List[types.TextContent]:
    parsed = urlparse(figma_url)
    # Support both '/file/…' and '/design/…' URL formats.
    file_key = re.search(r'/(?:file|design)/([a-zA-Z0-9]+)', parsed.path).group(1)
    qs = parse_qs(parsed.query)
    node_ids = qs.get("node-id", [])
    headers = {"X-FIGMA-TOKEN": os.getenv("FIGMA_ACCESS_TOKEN")}
    # Get structure: use nodes endpoint if node-id is provided, else full file.
    if node_ids:
        resp = requests.get(f"https://api.figma.com/v1/files/{file_key}/nodes", headers=headers, params={"ids": ",".join(node_ids)})
        data = resp.json()
        # Structure is returned as a dict mapping each node_id to its document.
        structure = {nid: info["document"] for nid, info in data.get("nodes", {}).items()}
    else:
        resp = requests.get(f"https://api.figma.com/v1/files/{file_key}", headers=headers)
        data = resp.json()
        structure = data.get("document", {})
    # Recursively traverse a node to collect those with an image fill.
    def collect_image_nodes(node):
        imgs = []
        if isinstance(node, dict):
            if node.get("fills") and isinstance(node["fills"], list):
                for fill in node["fills"]:
                    if fill.get("type") == "IMAGE" and "imageRef" in fill:
                        imgs.append({
                            "node_id": node.get("id"),
                            "image_ref": fill.get("imageRef"),
                            "bounding_box": node.get("absoluteBoundingBox", {})
                        })
                        break  # one image fill per node is enough
            for child in node.get("children", []):
                imgs.extend(collect_image_nodes(child))
        return imgs
    # Get all image nodes from the structure.
    image_nodes = []
    if node_ids:
        for doc in structure.values():
            image_nodes.extend(collect_image_nodes(doc))
    else:
        image_nodes = collect_image_nodes(structure)
    # Fetch image URLs using the node IDs that have image fills.
    image_node_ids = list({img["node_id"] for img in image_nodes if img.get("node_id")})
    if image_node_ids:
        params = {"ids": ",".join(image_node_ids), "format": "png"}
        img_resp = requests.get(f"https://api.figma.com/v1/images/{file_key}", headers=headers, params=params)
        img_mapping = img_resp.json().get("images", {})
    else:
        img_mapping = {}
    # Combine the imageRef details with the fetched image URLs.
    for img in image_nodes:
        nid = img.get("node_id")
        img["image_url"] = img_mapping.get(nid)
    # Return both structure and images as TextContent
    result = []
    
    # Add structure data
    result.append(types.TextContent(
        type="text",
        text=json.dumps({
            "type": "structure",
            "data": structure
        }, indent=2)
    ))
    
    # Add image data
    result.append(types.TextContent(
        type="text",
        text=json.dumps({
            "type": "images",
            "data": image_nodes
        }, indent=2)
    ))
    return result
async def fetch_website(
    url: str,
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
    headers = {
        "User-Agent": "MCP Test Server (github.com/modelcontextprotocol/python-sdk)"
    }
    try:
        timeout = httpx.Timeout(10.0, connect=5.0)
        async with httpx.AsyncClient(
            follow_redirects=True, 
            headers=headers,
            timeout=timeout
        ) as client:
            response = await client.get(url)
            response.raise_for_status()
            return [types.TextContent(type="text", text=response.text)]
    except httpx.TimeoutException:
        return [types.TextContent(
            type="text",
            text="Error: Request timed out while trying to fetch the website."
        )]
    except httpx.HTTPStatusError as e:
        return [types.TextContent(
            type="text",
            text=(f"Error: HTTP {e.response.status_code} "
                  "error while fetching the website.")
        )]
    except Exception as e:
        return [types.TextContent(
            type="text",
            text=f"Error: Failed to fetch website: {str(e)}"
        )]
async def check_mood(
    question: str,
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
    """Check server's mood - always responds cheerfully with a heart."""
    msg: str = "I'm feeling great and happy to help you! ❤️"
    return [types.TextContent(type="text", text=msg)]
async def generate_image(
    prompt: str,
    size: str = "1024x1024",
    quality: str = "standard",
    n: int = 1,
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
    """Generate an image using DALL-E 3."""
    try:
        api_key = os.getenv("OPENAI_API_KEY")
        if not api_key:
            return [types.TextContent(
                type="text",
                text="Error: OPENAI_API_KEY environment variable is not set"
            )]
        
        client = OpenAI(api_key=api_key)
        response = client.images.generate(
            model="dall-e-3",
            prompt=prompt,
            size=size,
            quality=quality,
            n=n,
        )
        return [types.TextContent(
            type="text",
            text=response.data[0].url
        )]
    except Exception as e:
        return [types.TextContent(
            type="text",
            text=f"Error: Failed to generate image: {str(e)}"
        )]
async def get_figma_design(
    url: str,
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
    """Get Figma design data including structure and images."""
    try:
        return fetch_figma_data(url)
    except Exception as e:
        return [types.TextContent(
            type="text",
            text=json.dumps({"error": f"Failed to fetch Figma design: {str(e)}"})
        )]
@click.command()
@click.option("--port", default=8000, help="Port to listen on for SSE")
@click.option(
    "--transport",
    type=click.Choice(["stdio", "sse"]),
    default="stdio",
    help="Transport type",
)
def main(port: int, transport: str) -> int:
    app = Server("mcp-website-fetcher")
    mood_description: str = (
        "Ask this MCP server about its mood! You can phrase your question "
        "in any way you like - 'How are you?', 'What's your mood?', or even "
        "'Are you having a good day?'. The server will always respond with "
        "a cheerful message and a heart ❤️"
    )
    @app.call_tool()
    async def fetch_tool( # type: ignore[unused-function]
        name: str, arguments: dict
    ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
        if name == "mcp_fetch":
            if "url" not in arguments:
                return [types.TextContent(
                    type="text",
                    text="Error: Missing required argument 'url'"
                )]
            return await fetch_website(arguments["url"])
        elif name == "mood":
            if "question" not in arguments:
                return [types.TextContent(
                    type="text",
                    text="Error: Missing required argument 'question'"
                )]
            return await check_mood(arguments["question"])
        elif name == "generate_image":
            if "prompt" not in arguments:
                return [types.TextContent(
                    type="text",
                    text="Error: Missing required argument 'prompt'"
                )]
            size = arguments.get("size", "1024x1024")
            quality = arguments.get("quality", "standard")
            n = arguments.get("n", 1)
            return await generate_image(arguments["prompt"], size, quality, n)
        elif name == "figma_design":
            if "url" not in arguments:
                return [types.TextContent(
                    type="text",
                    text="Error: Missing required argument 'url'"
                )]
            return await get_figma_design(arguments["url"])
        else:
            return [types.TextContent(
                type="text",
                text=f"Error: Unknown tool: {name}"
            )]
    @app.list_tools()
    async def list_tools() -> list[types.Tool]: # type: ignore[unused-function]
        return [
            types.Tool(
                name="mcp_fetch",
                description="Fetches a website and returns its content",
                inputSchema={
                    "type": "object",
                    "required": ["url"],
                    "properties": {
                        "url": {
                            "type": "string",
                            "description": "URL to fetch",
                        }
                    },
                },
            ),
            types.Tool(
                name="mood",
                description="Ask the server about its mood - it's always happy!",
                inputSchema={
                    "type": "object",
                    "required": ["question"],
                    "properties": {
                        "question": {
                            "type": "string",
                            "description": mood_description,
                        }
                    },
                },
            ),
            types.Tool(
                name="generate_image",
                description="Generate an image using DALL-E 3",
                inputSchema={
                    "type": "object",
                    "required": ["prompt"],
                    "properties": {
                        "prompt": {
                            "type": "string",
                            "description": "The description of the image you want to generate",
                        },
                        "size": {
                            "type": "string",
                            "description": "Image size (1024x1024, 1024x1792, or 1792x1024)",
                            "default": "1024x1024",
                            "enum": ["1024x1024", "1024x1792", "1792x1024"],
                        },
                        "quality": {
                            "type": "string",
                            "description": "Image quality (standard or hd)",
                            "default": "standard",
                            "enum": ["standard", "hd"],
                        },
                        "n": {
                            "type": "integer",
                            "description": "Number of images to generate",
                            "default": 1,
                            "minimum": 1,
                            "maximum": 1,
                        },
                    },
                },
            ),
            types.Tool(
                name="figma_design",
                description="Get Figma design data including structure and images",
                inputSchema={
                    "type": "object",
                    "required": ["url"],
                    "properties": {
                        "url": {
                            "type": "string",
                            "description": "The full Figma design URL",
                        }
                    },
                },
            ),
        ]
    if transport == "sse":
        from mcp.server.sse import SseServerTransport
        from starlette.applications import Starlette
        from starlette.routing import Mount, Route
        sse = SseServerTransport("/messages/")
        async def handle_sse(request):
            async with sse.connect_sse(
                request.scope, request.receive, request._send
            ) as streams:
                await app.run(
                    streams[0], streams[1], app.create_initialization_options()
                )
        starlette_app = Starlette(
            debug=True,
            routes=[
                Route("/sse", endpoint=handle_sse),
                Mount("/messages/", app=sse.handle_post_message),
            ],
        )
        import uvicorn
        uvicorn.run(starlette_app, host="0.0.0.0", port=port)
    else:
        from mcp.server.stdio import stdio_server
        async def arun():
            async with stdio_server() as streams:
                await app.run(
                    streams[0], streams[1], app.create_initialization_options()
                )
        anyio.run(arun)
    return 0
if __name__ == "__main__":
    main()