Skip to main content
Glama
newserver.py19.3 kB
import asyncio import logging.config import os import random import time from collections import defaultdict from datetime import datetime from enum import Enum from typing import Annotated, Optional import anyio import httpx import markdownify import psutil import readabilipy import structlog from bs4 import BeautifulSoup from dotenv import load_dotenv from fastmcp import FastMCP from fastmcp.server.auth.providers.bearer import BearerAuthProvider, RSAKeyPair from mcp import ErrorData, McpError from mcp.server.auth.provider import AccessToken from mcp.types import INTERNAL_ERROR from pydantic import BaseModel, Field from pydantic_settings import BaseSettings from transformers import pipeline, set_seed import functools import inspect # ─────────────────────────────── # 0. Environment & Configuration # ─────────────────────────────── load_dotenv() class ServerSettings(BaseSettings): auth_token: str my_number: str host: str = "0.0.0.0" port: int = 8086 log_level: str = "INFO" model_cache_dir: str = "./model_cache" max_concurrent_requests: int = 10 request_timeout: int = 30 rate_limit: int = 30 # requests rate_window: int = 60 # seconds class Config: env_file = ".env" env_file_encoding = "utf-8" settings = ServerSettings() # Ensure model cache directory exists os.makedirs(settings.model_cache_dir, exist_ok=True) os.environ["TRANSFORMERS_CACHE"] = settings.model_cache_dir os.environ["HF_HOME"] = settings.model_cache_dir # ─────────────────────────────── # 1. Structured Logging # ─────────────────────────────── logging.config.dictConfig( { "version": 1, "disable_existing_loggers": False, "formatters": { "json": { "()": structlog.stdlib.ProcessorFormatter, "processor": structlog.processors.JSONRenderer(), } }, "handlers": { "console": { "class": "logging.StreamHandler", "formatter": "json", "level": settings.log_level, } }, "loggers": {"": {"handlers": ["console"], "level": settings.log_level}}, } ) logger = structlog.get_logger() # ─────────────────────────────── # 2. Authentication Provider # ─────────────────────────────── class SimpleBearerAuthProvider(BearerAuthProvider): def __init__(self, token: str): k = RSAKeyPair.generate() super().__init__(public_key=k.public_key, jwks_uri=None, issuer=None, audience=None) self._token = token async def load_access_token(self, token: str) -> Optional[AccessToken]: if token == self._token: return AccessToken(token=token, client_id="puch-client", scopes=["*"], expires_at=None) return None # ─────────────────────────────── # 3. Utility Classes # ─────────────────────────────── class Fetch: USER_AGENT = "Puch/2.0 (Enhanced)" @classmethod async def fetch_url(cls, url: str, force_raw: bool = False) -> tuple[str, str]: async with httpx.AsyncClient(timeout=settings.request_timeout) as client: try: resp = await client.get(url, follow_redirects=True, headers={"User-Agent": cls.USER_AGENT}) except httpx.HTTPError as e: raise McpError(ErrorData(code=INTERNAL_ERROR, message=f"Failed to fetch {url}: {e!r}")) if resp.status_code >= 400: raise McpError(ErrorData(code=INTERNAL_ERROR, message=f"Failed to fetch {url}: {resp.status_code}")) content_type = resp.headers.get("content-type", "") is_html = "text/html" in content_type if is_html and not force_raw: return cls._extract_markdown(resp.text), "" return resp.text, f"Raw content (type: {content_type}) follows:\n" @staticmethod def _extract_markdown(html: str) -> str: ret = readabilipy.simple_json.simple_json_from_html_string(html, use_readability=True) if not ret or not ret.get("content"): return "Could not simplify HTML" return markdownify.markdownify(ret["content"], heading_style=markdownify.ATX) # ─────────────────────────────── # 4. Rate Limiter # ─────────────────────────────── class RateLimiter: def __init__(self, max_requests: int, window: int): self.max_requests = max_requests self.window = window self._requests: defaultdict[str, list[float]] = defaultdict(list) def is_allowed(self, client_id: str) -> bool: now = time.time() self._requests[client_id] = [ t for t in self._requests[client_id] if now - t < self.window ] if len(self._requests[client_id]) >= self.max_requests: return False self._requests[client_id].append(now) return True rate_limiter = RateLimiter(settings.rate_limit, settings.rate_window) # ─────────────────────────────── # 5. Error Handling # ─────────────────────────────── class ErrorCategory(str, Enum): MODEL = "model_error" NETWORK = "network_error" VALIDATION = "validation_error" SYSTEM = "system_error" class ErrorHandler: async def handle(self, err: Exception, context: str) -> str: error_id = f"err_{int(time.time())}" logger.error( "UnhandledError", error_id=error_id, context=context, err=str(err), traceback=structlog.processors.format_exc_info(err), ) return f"❌ **Error {error_id}**: Something went wrong while processing your request." error_handler = ErrorHandler() # ─────────────────────────────── # 6. Retry Decorator # ─────────────────────────────── def with_retry(max_attempts: int = 3, delay: float = 1.0): def decorator(fn): sig = inspect.signature(fn) @functools.wraps(fn) async def wrapper(*args, **kwargs): for attempt in range(max_attempts): try: return await fn(*args, **kwargs) except Exception as e: if attempt == max_attempts - 1: raise await asyncio.sleep(delay * (2 ** attempt)) # overwrite wrapper’s signature to match fn’s signature wrapper.__signature__ = sig return wrapper return decorator # ─────────────────────────────── # 7. Model Manager (Caching) # ─────────────────────────────── class ModelManager: def __init__(self, cache_dir: str): self._cache_dir = cache_dir self._models: dict[str, pipeline] = {} set_seed(42) # deterministic output async def get(self, model_name: str) -> pipeline: if model_name in self._models: return self._models[model_name] def _load(): logger.info("LoadingModel", model=model_name, cache_dir=self._cache_dir) return pipeline( "text-generation", model=model_name, tokenizer=model_name, cache_dir=self._cache_dir, device_map="auto", torch_dtype="auto", pad_token_id=50256, ) model = await anyio.to_thread.run_sync(_load) self._models[model_name] = model return model model_manager = ModelManager(settings.model_cache_dir) # ─────────────────────────────── # 8. FastMCP Server Setup # ─────────────────────────────── mcp = FastMCP( "Shah Rukh Khan Pickup Lines & Date Locations MCP Server (Enhanced)", auth=SimpleBearerAuthProvider(settings.auth_token), ) start_time = time.time() # ─────────────────────────────── # 9. Helper Functions # ─────────────────────────────── def _clean_output(text: str, prompt: str) -> str: return ( text.replace(prompt, "") .replace("Pickup line:", "") .replace("SRK says:", "") .strip() ) def _srk_fallback(user_info: str) -> str: lines = [ f"Kuch kuch hota hai, {user_info}, every time I see your smile.", f"If love had a face, {user_info}, it would look just like you.", f"Main hoon na, {user_info}—to hold you when the world lets go.", ] return random.choice(lines) # ─────────────────────────────── # 10. Tools # ─────────────────────────────── class RichToolDescription(BaseModel): description: str use_when: str side_effects: Optional[str] = None @mcp.tool async def validate() -> str: return settings.my_number PickupLineDescription = RichToolDescription( description="Generate romantic pickup lines in Shah Rukh Khan style.", use_when="User wants a charming SRK pickup line.", side_effects="Returns stylised romantic text.", ) # @with_retry() @mcp.tool(description=PickupLineDescription.model_dump_json()) async def generate_srk_pickup_line( user_info: Annotated[str, Field(description="Information about the user (name, interests)")] = "friend", target_info: Annotated[str | None, Field(description="Person to impress")] = None, ) -> str: """ Generate an SRK-style pickup line using cached local models, falling back to curated lines when necessary. """ try: context = f"User: {user_info}" if target_info: context += f" | Target: {target_info}" prompt = ( "Generate a romantic, witty pickup line in Shah Rukh Khan's voice.\n" f"Context: {context}\n" "- Use Bollywood flair and Hindi phrases.\n" "Pickup line:" ) model = await model_manager.get("microsoft/DialoGPT-small") generated = await anyio.to_thread.run_sync( lambda: model(prompt, max_length=len(prompt.split()) + 40, temperature=0.8, top_p=0.9)[0][ "generated_text" ] ) logger.info("GeneratedPickupLine", prompt=prompt, generated=generated) line = _clean_output(generated, prompt) if len(line) < 8: raise ValueError("Empty generation") return f"💕 **SRK Pickup Line**\n\n*\"{line}\"*" except Exception as e: logger.warning("FallbackPickupLine", err=str(e)) return f"💕 **SRK Pickup Line**\n\n*\"{_srk_fallback(user_info)}\"*" # generate_srk_pickup_line = with_retry()(generate_srk_pickup_line) DateLocationDescription = RichToolDescription( description="Find romantic date venues in a given city.", use_when="User needs date locations.", side_effects="Performs web searches.", ) @mcp.tool(description=DateLocationDescription.model_dump_json()) async def find_date_locations( city: Annotated[str, Field(description="City to search")] = "Mumbai", date_type: Annotated[str, Field(description="Type of date")] = "romantic dinner", budget: Annotated[str, Field(description="Budget level")] = "moderate", ) -> str: """Search DuckDuckGo for date spots and return formatted list.""" try: query = f"best {date_type} {city} {budget}" url = f"https://html.duckduckgo.com/html/?q={query.replace(' ', '+')}" async with httpx.AsyncClient(timeout=15) as client: resp = await client.get(url, headers={"User-Agent": Fetch.USER_AGENT}) resp.raise_for_status() soup = BeautifulSoup(resp.text, "html.parser") results = [ ( r.find("a", class_="result__a").get_text(strip=True), r.find("a", class_="result__snippet").get_text(strip=True), r.find("a", class_="result__a")["href"], ) for r in soup.find_all("div", class_="result__body")[:8] if r.find("a", class_="result__a") and r.find("a", class_="result__snippet") ] if not results: return "❌ No specific locations found—try adjusting your query." formatted = f"💝 **Date Spots in {city.title()}**\n\n" for i, (name, desc, link) in enumerate(results, 1): formatted += f"**{i}. {name}**\n{desc}\n🔗 {link}\n\n" return formatted except Exception as e: return await error_handler.handle(e, "find_date_locations") # find_date_locations = with_retry()(find_date_locations) FlirtyReplyDescription = RichToolDescription( description="Generate SRK-style flirty reply to a message.", use_when="User wants SRK reply.", side_effects="Returns stylised reply.", ) @mcp.tool(description=FlirtyReplyDescription.model_dump_json()) async def generate_srk_flirty_reply( message: Annotated[str, Field(description="Message to reply to")], your_name: Annotated[str | None, Field(description="Your name")] = None, ) -> str: """Generate an SRK-style flirty response to `message`.""" try: prompt = ( "You are Shah Rukh Khan. Craft a flirty, witty reply in Hindi-English.\n" f"Incoming message: \"{message}\"\n" "Reply:" ) model = await model_manager.get("microsoft/DialoGPT-small") generated = await anyio.to_thread.run_sync( lambda: model(prompt, max_length=len(prompt.split()) + 30, temperature=0.8, top_p=0.9)[0][ "generated_text" ] ) reply = _clean_output(generated, prompt) if len(reply) < 5: raise ValueError("Empty generation") return f"💕 **SRK Reply**\n\n*\"{reply}\"*" except Exception as e: logger.warning("FallbackFlirtyReply", err=str(e)) fallback = f"Kuch kuch hota hai jab tum baat karti ho—couldn't resist replying, {your_name or 'jaan'}!" return f"💕 **SRK Reply**\n\n*\"{fallback}\"*" # generate_srk_flirty_reply = with_retry()(generate_srk_flirty_reply) class OutfitInspoDescription(BaseModel): description: str = "Suggest SRK-inspired outfit ideas for dates and special occasions." use_when: str = "User wants wardrobe inspiration in a Bollywood style." side_effects: Optional[str] = None OutfitInspoMetadata = OutfitInspoDescription() @mcp.tool(description=OutfitInspoMetadata.model_dump_json()) async def outfit_inspiration( occasion: Annotated[str, Field(description="Type of occasion (e.g., romantic dinner, casual day out)")] = "romantic dinner", budget: Annotated[str, Field(description="Budget level (low, moderate, high)")] = "moderate", color_palette: Annotated[Optional[str], Field(description="Preferred colors (e.g., pastels, jewel tones)")] = None, ) -> str: """ Provide outfit inspiration inspired by Shah Rukh Khan's style. """ try: # Example static suggestions; replace with API or database lookup as needed. inspo = { "romantic dinner": { "low": [ "White linen shirt + dark jeans + brown loafers", "Soft pink kurta + denim jacket + minimal accessories" ], "moderate": [ "Fitted black blazer + charcoal trousers + white tee", "Burgundy silk shirt + black chinos + loafers" ], "high": [ "Tailored velvet tuxedo + silk lapel + patent leather shoes", "Custom Nehru jacket + silk trousers + statement watch" ] }, "casual day out": { "low": ["Graphic tee + ripped jeans + sneakers", "Striped polo + khaki shorts + canvas shoes"], "moderate": ["Denim jacket + white shirt + chinos", "Linen shirt + slim-fit joggers + slip-ons"], "high": ["Designer bomber jacket + tapered jeans + leather sneakers", "Cashmere sweater + tailored joggers + loafers"] } } options = inspo.get(occasion.lower(), inspo["romantic dinner"]) choices = options.get(budget.lower(), options["moderate"]) if color_palette: # Append color cue choices = [f"{c}, in {color_palette} hues" for c in choices] formatted = f"👔 **Outfit Inspiration** for a {occasion.title()} (Budget: {budget.title()})\n\n" for i, suggestion in enumerate(choices, 1): formatted += f"**{i}.** {suggestion}\n\n" return formatted except Exception as e: return await error_handler.handle(e, "outfit_inspiration") # outfit_inspiration = with_retry()(outfit_inspiration) # ─────────────────────────────── # 11. Health Check Tool # ─────────────────────────────── @mcp.tool async def health_check() -> str: uptime = time.time() - start_time mem_mb = psutil.Process().memory_info().rss / 1024 / 1024 cpu_pct = psutil.cpu_percent() cached_models = len(model_manager._models) return ( f"🟢 **Server Healthy**\n\n" f"Uptime: {uptime:.1f}s\nMemory: {mem_mb:.1f} MB\nCPU: {cpu_pct}%\n" f"Cached models: {cached_models}" ) # ─────────────────────────────── # 12. Global Request Interceptor # ─────────────────────────────── # @mcp.middleware # async def apply_rate_limit(request, call_next): # client_id = request.headers.get("X-Client-ID", "anonymous") # if not rate_limiter.is_allowed(client_id): # return "❌ Rate limit exceeded. Please try again later." # return await call_next(request) # ─────────────────────────────── # 13. Run Server # ─────────────────────────────── async def main(): logger.info("ServerStarting", host=settings.host, port=settings.port) await mcp.run_async("streamable-http", host=settings.host, port=settings.port) if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/DevReev/mew__mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server