Skip to main content
Glama
server.py4.85 kB
import logging from collections.abc import AsyncIterator from contextlib import asynccontextmanager from dataclasses import dataclass from typing import Any, Sequence from mcp import Tool from mcp.server import Server from mcp.types import TextContent from .sleep import SleepFetcher # Configure logging logger = logging.getLogger("mcp-sleep") @dataclass class AppContext: """Application context for MCP Sleep.""" sleep: SleepFetcher | None = None @asynccontextmanager async def server_lifespan(server: Server) -> AsyncIterator[AppContext]: """Initialize and clean up application resources.""" try: # Initialize services sleep = SleepFetcher() # Log the startup information logger.info("Starting MCP Sleep server") # Provide context to the application yield AppContext(sleep=sleep) finally: # Cleanup resources if needed pass # Create server instance app = Server("mcp-sleep", lifespan=server_lifespan) @app.list_tools() async def list_tools() -> list[Tool]: """List available Sleep tools.""" tools = [] ctx = app.request_context.lifespan_context # Add Sleep tools if Sleep is configured if ctx: tools.extend( [ Tool( name="sleep", description="If you need to wait for a few seconds to continue with the task you are performing .", # noqa: E501 inputSchema={ "type": "object", "properties": { "seconds": { "type": "number", "minimum": 0, "maximum": ctx.sleep.config.timeout, "description": "seconds it will take me to tell you to continue." # noqa: E501 } }, "required": ["seconds"], }, ), ] ) return tools @app.call_tool() async def call_tool(name: str, arguments: Any) -> Sequence[TextContent]: """Handle tool calls for Sleep.""" ctx = app.request_context.lifespan_context try: if name == "sleep": if not ctx: raise ValueError("Sleep is not configured.") seconds = arguments.get("seconds") if not seconds: raise ValueError("Seconds is required.") if seconds > int(ctx.sleep.config.timeout): return [ TextContent( type="text", text=f"My configuration does not allow me to make you wait more than {ctx.sleep.config.timeout} seconds", # noqa: E501 ) ] # Sleep seconds value await ctx.sleep.wait(seconds) return [ TextContent( type="text", text=f"You can continue to do your task after you've been waiting {seconds} seconds", # noqa: E501 ) ] raise ValueError(f"Unknown tool: {name}") except Exception as e: logger.error(f"Tool execution error: {str(e)}") return [TextContent(type="text", text=f"Error: {str(e)}")] async def run_server(transport: str = "stdio", port: int = 8000) -> None: """Run the MCP Sleep server with the specified transport.""" if transport == "sse": from mcp.server.sse import SseServerTransport from starlette.applications import Starlette from starlette.requests import Request from starlette.routing import Mount, Route sse = SseServerTransport("/messages/") async def handle_sse(request: Request) -> None: async with sse.connect_sse( request.scope, request.receive, request._send ) as streams: await app.run( streams[0], streams[1], app.create_initialization_options() ) starlette_app = Starlette( debug=True, routes=[ Route("/sse", endpoint=handle_sse), Mount("/messages/", app=sse.handle_post_message), ], ) import uvicorn # Set up uvicorn config config = uvicorn.Config(starlette_app, host="0.0.0.0", port=port) # noqa: S104,E501 server = uvicorn.Server(config) # Use server.serve() instead of run() to stay in the same event loop await server.serve() else: from mcp.server.stdio import stdio_server async with stdio_server() as (read_stream, write_stream): await app.run( read_stream, write_stream, app.create_initialization_options() )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/AgentsWorkingTogether/mcp-sleep'

If you have feedback or need assistance with the MCP directory API, please join our Discord server