FastMCP

"""Enhanced Smart client for MCP Servers with example model discovery.""" import asyncio import json import sys from pydantic import BaseModel, Field, create_model from typing import Any, Dict, Type, get_type_hints, List, Optional import traceback from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client def generate_example_input(model: Type[BaseModel]) -> str: """Generate example input data based on a Pydantic model's structure.""" def generate_field_example(field_type: Any, field_name: str) -> Any: if field_type == str: return f"<string: {field_name}>" elif field_type == int: return f"<integer: {field_name}>" elif field_type == float: return f"<float: {field_name}>" elif field_type == bool: return f"<boolean: {field_name}>" elif hasattr(field_type, "__origin__"): origin = field_type.__origin__ if origin == list: inner_type = field_type.__args__[0] return [generate_field_example(inner_type, f"{field_name} item")] elif origin == dict: key_type, value_type = field_type.__args__ return { generate_field_example(key_type, f"{field_name} key"): generate_field_example(value_type, f"{field_name} value") } return f"<any: {field_name}>" example_data = {} for field_name, field_info in model.model_fields.items(): field_type = get_type_hints(model).get(field_name, field_info.annotation) example_data[field_name] = generate_field_example(field_type, field_name) return json.dumps(example_data, indent=4) def print_pydantic_model(model: Type[BaseModel]) -> None: """Prints a Pydantic model representation.""" model_name = model.__name__ print(f"class {model_name}(BaseModel):") for field_name, field_info in model.model_fields.items(): field_type = get_type_hints(model).get(field_name, field_info.annotation) field_description = field_info.description or "No description provided" print(f" {field_name}: {field_type} = Field(..., description={field_description!r})") def extract_type_from_description(description: str) -> Optional[Any]: """Extract type information from field description.""" import re match = re.search(r'\((.*?)\)$', description) if not match: return None type_str = match.group(1) basic_types = { 'str': str, 'int': int, 'float': float, 'bool': bool, 'EmailStr': str } if type_str in basic_types: return basic_types[type_str] def parse_nested_type(type_str: str) -> Any: list_match = re.match(r'List\[(.*)\]', type_str) if list_match: inner = list_match.group(1) return List[basic_types.get(inner, Any)] dict_match = re.match(r'Dict\[(.*),\s*(.*)\]', type_str) if dict_match: key_type = dict_match.group(1).strip() value_type = dict_match.group(2).strip() key = basic_types.get(key_type, str) return Dict[key, basic_types.get(value_type, Any)] return Any return parse_nested_type(type_str) def create_pydantic_model_from_schema(schema: Dict[str, Any], model_name: str = "PromptInputModel") -> Type[BaseModel]: """Generate a Pydantic model from a JSON schema.""" fields = {} for argument in schema.get("arguments", []): field_name = argument["name"] description = argument.get("description", "") field_type = extract_type_from_description(description) if field_type is None: field_type = { "string": str, "integer": int, "boolean": bool, "number": float, "null": type(None), "array": List[Any], "object": Dict[str, Any], }.get(argument["type"], Any) required = argument.get("required", False) fields[field_name] = (field_type, Field(..., description=description) if required else Field(None, description=description)) return create_model(model_name, **fields) async def discover_capabilities(session: ClientSession): """Discover and print server capabilities.""" try: prompts_response = await session.list_prompts() print("Server Prompts:", [p.name for p in prompts_response.prompts]) for p in prompts_response.prompts: print(f"#" * 80) print(f"\nPrompt: {p.name}") print(f"Description: {p.description}") json_schema = p.model_dump_json() DynamicModel = create_pydantic_model_from_schema(json.loads(json_schema)) print("\nModel Class:") print_pydantic_model(DynamicModel) print("\nExample Input Structure:") print(generate_example_input(DynamicModel)) except Exception as e: print(f"Error: {str(e)}") print(traceback.format_exc()) async def run_with_server(script_path: str): """Run operations within server connection context.""" params = StdioServerParameters(command="/Users/deepnpisgah/.local/bin/uv", args=["run", "--with", "mcp", "mcp", "run", script_path]) async with stdio_client(params) as streams: async with ClientSession(streams[0], streams[1]) as session: await session.initialize() await discover_capabilities(session) async def main(): if len(sys.argv) < 2: print("Usage: python prompt_client.py <server.py>") sys.exit(1) await run_with_server(sys.argv[1]) if __name__ == "__main__": asyncio.run(main())