Skip to main content
Glama
jaredlang
by jaredlang
server.py13.3 kB
""" MCP Server for Forecast Storage Exposes weather forecast storage operations as MCP tools. Provides upload, retrieval, cleanup, and statistics for forecasts stored in Cloud SQL. """ import asyncio import json import os import logging from typing import Any from mcp.server import Server from mcp.types import Tool, TextContent # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) from tools.forecast_operations import ( upload_forecast, get_cached_forecast, cleanup_expired_forecasts, get_storage_stats, list_forecasts ) from tools.connection import test_connection, close_connector # Initialize MCP server server = Server("forecast_storage") @server.list_tools() async def list_tools() -> list[Tool]: """ List available forecast storage tools. """ return [ Tool( name="upload_forecast", description=( "Upload a complete forecast (text + audio) to Cloud SQL. " "Stores text as binary with unicode support and audio as binary WAV data. " "Automatically detects optimal text encoding and sets TTL for cache expiration." ), inputSchema={ "type": "object", "properties": { "city": { "type": "string", "description": "City name (e.g., 'chicago', 'new york')" }, "forecast_text": { "type": "string", "description": "Generated forecast text content (supports all unicode languages)" }, "audio_data": { "type": "string", "description": "Base64-encoded audio WAV data" }, "forecast_at": { "type": "string", "description": "ISO 8601 timestamp (e.g., '2025-12-26T15:00:00Z')" }, "ttl_minutes": { "type": "integer", "description": "Time-to-live in minutes (default: 30)", "default": 30 }, "encoding": { "type": "string", "description": "Text encoding: 'utf-8', 'utf-16', 'utf-32' (auto-detect if not specified)", "enum": ["utf-8", "utf-16", "utf-32"] }, "language": { "type": "string", "description": "ISO 639-1 language code (e.g., 'en', 'es', 'ja', 'zh')" }, "locale": { "type": "string", "description": "Full locale (e.g., 'en-US', 'es-MX', 'ja-JP')" } }, "required": ["city", "forecast_text", "audio_data", "forecast_at"] } ), Tool( name="get_cached_forecast", description=( "Retrieve cached forecast from Cloud SQL if available and not expired. " "Returns forecast text and base64-encoded audio data if valid cache exists. " "Checks TTL and returns cached=False if forecast is expired." ), inputSchema={ "type": "object", "properties": { "city": { "type": "string", "description": "City name to query" }, "language": { "type": "string", "description": "Optional language filter (e.g., 'en', 'es', 'ja')" } }, "required": ["city"] } ), Tool( name="cleanup_expired_forecasts", description=( "Remove expired forecasts from Cloud SQL database. " "Deletes all forecasts where expires_at < NOW(). " "Returns count of deleted and remaining forecasts." ), inputSchema={ "type": "object", "properties": {} } ), Tool( name="get_storage_stats", description=( "Get database storage statistics including total forecasts, storage sizes, " "encodings used, languages used, and per-city breakdown. " "Useful for monitoring and capacity planning." ), inputSchema={ "type": "object", "properties": {} } ), Tool( name="list_forecasts", description=( "List forecast history with optional filtering by city. " "Returns metadata for forecasts including forecast times, sizes, encoding, and expiration status." ), inputSchema={ "type": "object", "properties": { "city": { "type": "string", "description": "City name to filter (optional, lists all if omitted)" }, "limit": { "type": "integer", "description": "Maximum number of results (default: 10)", "default": 10 } }, "required": [] } ), Tool( name="test_connection", description=( "Test the Cloud SQL database connection and verify setup. " "Returns connection status, PostgreSQL version, and table existence." ), inputSchema={ "type": "object", "properties": {} } ) ] @server.call_tool() async def call_tool(name: str, arguments: Any) -> list[TextContent]: """ Handle tool execution requests. """ # Log incoming request with structured data logger.info("MCP tool call request", extra={ "tool_name": name, "arguments": arguments }) try: # Route to appropriate function if name == "upload_forecast": result = upload_forecast( city=arguments["city"], forecast_text=arguments["forecast_text"], audio_data=arguments["audio_data"], forecast_at=arguments["forecast_at"], ttl_minutes=arguments.get("ttl_minutes", 30), encoding=arguments.get("encoding"), language=arguments.get("language"), locale=arguments.get("locale") ) elif name == "get_cached_forecast": result = get_cached_forecast( city=arguments["city"], language=arguments.get("language") ) elif name == "cleanup_expired_forecasts": result = cleanup_expired_forecasts() elif name == "get_storage_stats": result = get_storage_stats() elif name == "list_forecasts": result = list_forecasts( city=arguments.get("city"), limit=arguments.get("limit", 10) ) elif name == "test_connection": result = test_connection() else: result = { "status": "error", "message": f"Unknown tool: {name}" } # Log response with structured data logger.info("MCP tool call response", extra={ "tool_name": name, "result_status": result.get("status"), "forecast_id": result.get("forecast_id"), "cached": result.get("cached"), "result_message": result.get("message") }) # Return result as JSON return [TextContent( type="text", text=json.dumps(result, indent=2) )] except Exception as e: # Log error with full stack trace logger.error("MCP tool call failed", extra={ "tool_name": name, "error": str(e), "arguments": arguments }, exc_info=True) # Handle unexpected errors error_result = { "status": "error", "message": f"Tool execution failed: {str(e)}" } return [TextContent( type="text", text=json.dumps(error_result, indent=2) )] async def stdio_main(): """ Run MCP server in stdio mode (for local development/testing). """ from mcp.server.stdio import stdio_server async with stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, server.create_initialization_options() ) async def http_main(): """ Run MCP server in HTTP/SSE mode (for Cloud Run deployment). """ from mcp.server.sse import SseServerTransport from starlette.applications import Starlette from starlette.routing import Route from starlette.responses import Response import uvicorn # Create SSE transport logger.info("Creating SSE transport...") sse = SseServerTransport("/messages") logger.info("SSE transport created") async def handle_sse(request): """Handle SSE connection from client.""" async with sse.connect_sse( request.scope, request.receive, request._send, ) as streams: await server.run( streams[0], streams[1], server.create_initialization_options(), ) return Response(status_code=200) async def handle_messages(request): """Handle incoming messages via POST.""" # Get request body request_body = await request.body() # Log incoming request logger.debug("Received HTTP POST request", extra={ "body_preview": request_body[:200].decode('utf-8', errors='replace') }) async def receive(): return { "type": "http.request", "body": request_body, "more_body": False, } # Response data collector response_data = [] response_status = 200 async def send(message): nonlocal response_status msg_type = message.get("type") logger.debug("ASGI send() called", extra={"message_type": msg_type}) if msg_type == "http.response.start": response_status = message.get("status", 200) logger.debug("Response status set", extra={"status": response_status}) elif msg_type == "http.response.body": body_chunk = message.get("body", b"") logger.debug("Response body chunk", extra={"chunk_length": len(body_chunk)}) if body_chunk: response_data.append(body_chunk) # Call SSE transport handler await sse.handle_post_message(request.scope, receive, send) # Combine response content = b"".join(response_data) logger.debug("Final response prepared", extra={ "response_length": len(content), "response_preview": content[:200].decode('utf-8', errors='replace') }) if not content: content = b'{"error": "Empty response from MCP server"}' return Response( content=content, media_type="application/json", status_code=response_status ) # Create Starlette app logger.info("Creating Starlette app...") app = Starlette( routes=[ Route("/sse", endpoint=handle_sse), Route("/messages", endpoint=handle_messages, methods=["POST"]), ], ) logger.info("Starlette app created") # Run with uvicorn port = int(os.getenv("PORT", "8080")) logger.info(f"Configuring uvicorn server on host=0.0.0.0 port={port}") config = uvicorn.Config( app, host="0.0.0.0", port=port, log_level="info", ) logger.info("Creating uvicorn.Server instance...") server_instance = uvicorn.Server(config) logger.info("uvicorn.Server instance created") logger.info("Starting uvicorn server...") await server_instance.serve() logger.info("Server stopped") def cleanup(): """ Cleanup function called on server shutdown. """ close_connector() if __name__ == "__main__": import atexit atexit.register(cleanup) # Determine which mode to run based on environment variable transport_mode = os.getenv("MCP_TRANSPORT", "stdio").lower() if transport_mode == "http" or transport_mode == "sse": print(f"Starting MCP server in HTTP/SSE mode on port {os.getenv('PORT', '8080')}...") asyncio.run(http_main()) else: print("Starting MCP server in stdio mode...") asyncio.run(stdio_main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jaredlang/forecast_storage_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server