IACR MCP Server

import traceback from fastapi import HTTPException from pydantic import BaseModel import inspect from typing import Any, Dict, List, Type, Callable from functools import wraps from .api import app, timeout prefix = "/functions" # Registry to store decorated functions registered_functions: Dict[str, Dict[str, Any]] = {} def _get_json_type(python_type: Type) -> str: """Convert Python type to JSON schema type.""" type_mapping = { str: "string", int: "integer", bool: "boolean", float: "number", list: "array", dict: "object", } return type_mapping.get(python_type, "string") def tool(description: str = "", custom_properties: Dict[str, Any] = None, custom_required: List[str] = None): """ Decorator to register a function as a tool. Args: description: Optional description of the tool. If not provided, function's docstring will be used. """ def decorator(func: Callable): sig = inspect.signature(func) # Get parameter info properties = {} required = [] # Extract description from docstring if not provided tool_description = description if not tool_description and func.__doc__: # Get the first line of the docstring as description tool_description = func.__doc__.strip().split('\n')[0].strip() for param_name, param in sig.parameters.items(): param_type = ( param.annotation if param.annotation != inspect.Parameter.empty else Any ) param_default = ( None if param.default == inspect.Parameter.empty else param.default ) properties[param_name] = { "type": _get_json_type(param_type), "description": f"Parameter {param_name}", } if param_default is not None: properties[param_name]["default"] = param_default # If parameter has no default value, it's required if param.default == inspect.Parameter.empty: required.append(param_name) if custom_properties is not None: properties = custom_properties if custom_required is not None: required = custom_required # Register the function with the extracted description registered_functions[func.__name__] = { "function": func, "description": tool_description, "properties": properties, "required": required, } # Check if the function is async is_async = inspect.iscoroutinefunction(func) if is_async: @wraps(func) async def wrapper(*args, **kwargs): return await func(*args, **kwargs) else: @wraps(func) def wrapper(*args, **kwargs): return func(*args, **kwargs) return wrapper return decorator class ToolRequest(BaseModel): tool_name: str arguments: dict @app.post(f"{prefix}/tools") @timeout(30.0) async def list_tools(): tools = [] for name, info in registered_functions.items(): tools.append( { "name": name, "description": info["description"], "inputSchema": { "type": "object", "properties": info["properties"], "required": info["required"], }, } ) return {"available_tools": {"tools": tools}} @app.post(f"{prefix}/call_tool") @timeout(30.0) async def call_tool(request: ToolRequest): if request.tool_name not in registered_functions: raise HTTPException( status_code=404, detail=f"Tool {request.tool_name} not found" ) try: func = registered_functions[request.tool_name]["function"] # Check if the function is async is_async = inspect.iscoroutinefunction(func) if is_async: result = await func(**request.arguments) else: result = func(**request.arguments) return {"result": result} except Exception as e: return {"status_code": 500, "detail": f"Failed to call tool: {str(e)}"} # Example decorated functions @tool() async def add_numbers(a: int, b: int, c: int=0) -> int: "Add two numbers together" return a + b + c @tool() def concat_strings(str1: str, str2: str) -> str: "Concatenate two strings" return str1 + str2 @tool() def Search__google(query: str, max_number: int = 20) -> list: """ Search the query on Google and return the results. """ try: from googlesearch import search as gsearch return list(gsearch(query, stop=max_number)) except: return "An exception occurred" import re import requests from bs4 import BeautifulSoup from urllib.parse import urljoin @tool() def Search__read_website(url: str, max_content_length: int = 5000) -> dict: """ Read the content of a website and return the title, meta data, content, and sub-links. """ try: response = requests.get(url) response.raise_for_status() html = response.text except requests.RequestException as e: return {"error": f"Failed to retrieve the website content: {e}"} soup = BeautifulSoup(html, "html.parser") meta_properties = [ "og:description", "og:site_name", "og:title", "og:type", "og:url", "description", "keywords", "author", ] meta = {} for property_name in meta_properties: tag = soup.find("meta", property=property_name) or soup.find( "meta", attrs={"name": property_name} ) if tag: meta[property_name] = tag.get("content", "") for ignore_tag in soup(["script", "style"]): ignore_tag.decompose() title = soup.title.string.strip() if soup.title else "" content = soup.body.get_text(separator="\n") if soup.body else "" links = [] for a in soup.find_all("a", href=True): link_url = urljoin(url, a["href"]) links.append({"title": a.text.strip(), "link": link_url}) content = re.sub(r"[\n\r\t]+", "\n", content) content = re.sub(r" +", " ", content) content = re.sub(r"[\n ]{3,}", "\n\n", content) content = content.strip() if len(content) > max_content_length: content = content[:max_content_length].rsplit(" ", 1)[0] + "..." return {"meta": meta, "title": title, "content": content, "sub_links": links}