Skip to main content
Glama

MCP Everything

by s2005
mcp_server.py15.8 kB
import asyncio import os import json import base64 import random # Added random from contextlib import asynccontextmanager from enum import Enum from mcp.server.fast_mcp import FastMCP, Context from mcp.messages import UserMessage, TextContent, ImageContent, AssistantMessage from .constants import MCP_TINY_IMAGE # import mcp # For mcp.run() # Removed as per instructions # Define PromptName Enum at module level class PromptName(Enum): SIMPLE = "simple_prompt" COMPLEX = "complex_prompt" # ALL_RESOURCES constant ALL_RESOURCES = [ { "uri": "test://static/resource/1", "name": "Resource 1", "mime_type": "text/plain", "text": "Resource 1: This is a plaintext resource" }, { "uri": "test://static/resource/2", "name": "Resource 2", "mime_type": "application/octet-stream", "blob": base64.b64encode(b"Resource 2: This is a base64 blob").decode('utf-8') }, { "uri": "test://static/resource/3", "name": "Resource 3 - Another Text", "mime_type": "text/plain", "text": "Resource 3: More plaintext content here." }, { "uri": "test://static/resource/4", "name": "Resource 4 - Image (Fake)", "mime_type": "image/png", "blob": base64.b64encode(b"Fake PNG data").decode('utf-8') }, { "uri": "test://static/resource/5", "name": "Resource 5 - JSON Data", "mime_type": "application/json", "text": json.dumps({"key": "value", "number": 123}) }, { "uri": "test://static/resource/6", "name": "Resource 6 - Long Text", "mime_type": "text/plain", "text": "This is resource number six, and it has a slightly longer description to see how text wrapping or truncation might be handled by a client." }, { "uri": "test://static/resource/7", "name": "Resource 7 - Another Blob", "mime_type": "application/octet-stream", "blob": base64.b64encode(b"Yet another piece of binary data for Resource 7").decode('utf-8') } ] # EXAMPLE_COMPLETIONS constant EXAMPLE_COMPLETIONS = { "style": ["casual", "formal", "technical", "friendly"], "temperature": ["0", "0.5", "0.7", "1.0"], "resourceId": ["1", "2", "3", "4", "5"], } # Logging constants and helper LOG_LEVEL_ORDER = ["debug", "info", "notice", "warning", "error", "critical", "alert", "emergency"] LOG_MESSAGES = [ {"level": "debug", "data": "This is a debug message: Variable x = 10"}, {"level": "info", "data": "Service started successfully on port 8080."}, {"level": "notice", "data": "User 'admin' logged in from IP 192.168.1.100."}, {"level": "warning", "data": "Disk space is nearing capacity (90% used)."}, {"level": "error", "data": "Failed to connect to database: Connection timed out."}, {"level": "critical", "data": "Critical system failure: Unable to allocate memory."}, {"level": "alert", "data": "Security alert: Possible intrusion detected."}, {"level": "emergency", "data": "System is shutting down due to an emergency."} ] def is_message_ignored(current_level_str: str, message_level_str: str) -> bool: try: current_idx = LOG_LEVEL_ORDER.index(current_level_str.lower()) message_idx = LOG_LEVEL_ORDER.index(message_level_str.lower()) return message_idx < current_idx except ValueError: return True async def main(): # Initialization for subscriptions and periodic task subscriptions: set[str] = set() subs_update_interval_task: asyncio.Task | None = None # Initialization for logging log_level: str = "debug" # Default log level logs_update_interval_task: asyncio.Task | None = None # Periodic resource update notification function async def notify_resource_updates(server_ref: FastMCP): while True: await asyncio.sleep(5) for uri in list(subscriptions): try: await server_ref.notify_resource_updated(uri) except Exception as e: pass # Periodic log message notification function async def notify_log_messages(server_ref: FastMCP): nonlocal log_level # To access the current log_level from main's scope while True: await asyncio.sleep(15) # 15-second interval random_message = random.choice(LOG_MESSAGES) if not is_message_ignored(log_level, random_message["level"]): try: await server_ref.notify_message( level=random_message["level"], data=random_message["data"], logger="python-mcp-server" ) except Exception as e: # Potentially log this error to stderr or a file if server_ref.notify_message fails print(f"Error sending log message: {e}") # Lifespan context manager @asynccontextmanager async def app_lifespan(app: FastMCP): nonlocal subs_update_interval_task, logs_update_interval_task print("[SERVER_LOG][LIFESPAN] Startup: Entered app_lifespan.", flush=True) # Startup subs_update_interval_task = asyncio.create_task(notify_resource_updates(app)) logs_update_interval_task = asyncio.create_task(notify_log_messages(app)) print("[SERVER_LOG][LIFESPAN] Startup: Background tasks created.", flush=True) try: yield print("[SERVER_LOG][LIFESPAN] Normal execution, pre-shutdown.", flush=True) finally: print("[SERVER_LOG][LIFESPAN] Shutdown: Entered finally block.", flush=True) # Shutdown tasks_to_cancel = [] if subs_update_interval_task: subs_update_interval_task.cancel() tasks_to_cancel.append(subs_update_interval_task) if logs_update_interval_task: logs_update_interval_task.cancel() tasks_to_cancel.append(logs_update_interval_task) if tasks_to_cancel: await asyncio.gather(*tasks_to_cancel, return_exceptions=True) print("[SERVER_LOG][LIFESPAN] Shutdown: Background tasks gathered.", flush=True) print("[SERVER_LOG][LIFESPAN] Shutdown: app_lifespan cleanup complete.", flush=True) server = FastMCP( name="python-mcp-everything", version="0.1.0", capabilities={ "prompts": {}, "resources": {"subscribe": True}, "logging": {} # Indicates logging capability }, lifespan=app_lifespan ) # --- Logging Control Handler --- @server.set_logging_level_handler() async def set_logging_level_handler(level: str, ctx: Context) -> None: nonlocal log_level log_level = level.lower() # Normalize to lower case # Notify client about the change. Using "info" level for this notification itself. await ctx.notify_message( level="info", data=f"Logging level set to: {log_level}", logger="python-mcp-server" ) # Echo tool @server.tool() def echo(message: str) -> str: return f"Echo: {message}" # Add tool @server.tool() def add(a: int, b: int) -> str: return f"The sum of {a} and {b} is {a + b}." # LongRunningOperation tool @server.tool() async def long_running_operation(duration: int = 10, steps: int = 5, ctx: Context = None) -> str: for i in range(steps): await asyncio.sleep(duration / steps) if ctx: await ctx.report_progress(current_step=i + 1, total_steps=steps) return f"Long running operation completed. Duration: {duration} seconds, Steps: {steps}." # PrintEnv tool @server.tool() def print_env() -> str: return json.dumps(dict(os.environ), indent=2) # SampleLLM tool @server.tool() async def sample_llm(prompt: str, max_tokens: int = 100, ctx: Context = None) -> str: if not ctx: return "Error: Context not available for LLM sampling." response_message = await ctx.create_message( messages=[UserMessage(content=TextContent(text=prompt))], max_tokens=max_tokens ) if response_message and hasattr(response_message, 'content') and hasattr(response_message.content, 'text'): result_text = response_message.content.text return f"LLM sampling result: {result_text}" else: return "LLM sampling result: (Could not extract text from response)" # GetTinyImage tool @server.tool() def get_tiny_image() -> list: return [ TextContent(text="This is a tiny image:"), ImageContent(data=MCP_TINY_IMAGE, mime_type="image/png"), TextContent(text="The image above is the MCP tiny image.") ] # AnnotatedMessage tool @server.tool() def annotated_message(message_type: str, include_image: bool = False) -> list: content_parts = [] if message_type == "error": content_parts.append(TextContent( text="Error: Operation failed", annotations={"priority": 1.0, "audience": ["user", "assistant"]} )) elif message_type == "success": content_parts.append(TextContent( text="Operation completed successfully", annotations={"priority": 0.7, "audience": ["user"]} )) elif message_type == "debug": content_parts.append(TextContent( text="Debug: Cache hit ratio 0.95, latency 150ms", annotations={"priority": 0.3, "audience": ["assistant"]} )) else: content_parts.append(TextContent( text=f"Unknown message type: {message_type}", annotations={"priority": 0.5, "audience": ["user", "assistant"]} )) if include_image: content_parts.append(ImageContent( data=MCP_TINY_IMAGE, mime_type="image/png", annotations={"priority": 0.5, "audience": ["user"]} )) return content_parts # --- Resource Listing Logic --- PAGE_SIZE_RESOURCES = 10 async def list_all_static_resources(cursor: str | None = None, page_size: int = PAGE_SIZE_RESOURCES, ctx: Context = None) -> tuple[list[dict], str | None]: start_index = 0 if cursor: try: decoded_cursor = int(base64.b64decode(cursor).decode('utf-8')) start_index = decoded_cursor except Exception as e: if ctx and hasattr(ctx, 'error'): await ctx.error(f"Invalid cursor format received: {cursor}. Error: {e}") start_index = 0 actual_page_size = page_size if page_size > 0 else PAGE_SIZE_RESOURCES end_index = min(start_index + actual_page_size, len(ALL_RESOURCES)) resources_page = ALL_RESOURCES[start_index:end_index] next_cursor_val = None if end_index < len(ALL_RESOURCES): next_cursor_val = base64.b64encode(str(end_index).encode('utf-8')).decode('utf-8') return resources_page, next_cursor_val server.set_list_resources_handler(list_all_static_resources, prefix="test://static/resource/") @server.resource("test://static/resource/{resource_id}") async def get_static_resource(resource_id: str, ctx: Context = None) -> dict: uri_to_find = f"test://static/resource/{resource_id}" resource = next((r for r in ALL_RESOURCES if r["uri"] == uri_to_find), None) if resource: return resource else: raise ValueError(f"Resource with ID {resource_id} not found.") @server.list_resource_templates_handler() async def list_resource_templates_handler(ctx: Context = None) -> list: return [ { "uri_template": "test://static/resource/{resource_id}", "name": "Static Resource", "description": "A static resource with a numeric ID.", "parameters": [{"name": "resource_id", "description": "The unique identifier for the static resource."}] } ] # --- Prompt Handling Logic --- @server.list_prompts_handler() async def list_prompts_handler(ctx: Context = None) -> list: return [ { "name": PromptName.SIMPLE.value, "description": "A prompt without arguments", "arguments": [] }, { "name": PromptName.COMPLEX.value, "description": "A prompt with arguments", "arguments": [ {"name": "temperature", "description": "Temperature setting", "required": True}, {"name": "style", "description": "Output style", "required": False}, ], }, ] @server.get_prompt_handler() async def get_prompt_handler(name: str, arguments: dict[str, any] | None = None, ctx: Context = None) -> dict: if name == PromptName.SIMPLE.value: return {"messages": [UserMessage(content=TextContent(text="This is a simple prompt without arguments."))]} elif name == PromptName.COMPLEX.value: temp = arguments.get("temperature") if arguments else None style = arguments.get("style") if arguments else None return { "messages": [ UserMessage(content=TextContent(text=f"This is a complex prompt with arguments: temperature={temp}, style={style}")), AssistantMessage(content=TextContent(text="I understand. You've provided a complex prompt with temperature and style arguments. How would you like me to proceed?")), UserMessage(content=ImageContent(data=MCP_TINY_IMAGE, mime_type="image/png")) ] } else: raise ValueError(f"Unknown prompt: {name}") # --- Argument Completion Logic --- @server.completion_handler() async def completion_handler(ref: dict, argument: dict, ctx: Context = None) -> dict: arg_name = argument.get("name") arg_value_str = str(argument.get("value", "")).lower() completions_to_return = [] if arg_name in EXAMPLE_COMPLETIONS: source_list = EXAMPLE_COMPLETIONS[arg_name] completions_to_return = [item for item in source_list if item.lower().startswith(arg_value_str)] return { "completion": { "values": completions_to_return, "has_more": False, "total": len(completions_to_return), } } # Subscribe handler @server.subscribe_handler() async def subscribe_handler(uri: str, ctx: Context) -> None: subscriptions.add(uri) await ctx.info(f"Client subscribed to resource: {uri}") # Unsubscribe handler @server.unsubscribe_handler() async def unsubscribe_handler(uri: str, ctx: Context) -> None: subscriptions.discard(uri) await ctx.info(f"Client unsubscribed from resource: {uri}") print("[SERVER_LOG] Server setup complete. Attempting to start server.run()...", flush=True) try: await server.run() print("[SERVER_LOG] server.run() completed.", flush=True) # Might not be reached if it blocks except Exception as e: print(f"[SERVER_LOG] Exception from server.run(): {e}", flush=True) finally: print("[SERVER_LOG] main() function is exiting.", flush=True) if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/s2005/mcp-everything'

If you have feedback or need assistance with the MCP directory API, please join our Discord server