Skip to main content
Glama

MockLoop MCP Server

Official
by MockLoop
embed_signatures.pyโ€ข10.8 kB
#!/usr/bin/env python3 """ Embed Signature Metadata Script This script takes the output from sign_all_schemas.py and embeds the signature metadata directly into the @signed_tool decorators in the source code. This allows committing signatures to Git for public verification. Usage: python scripts/embed_signatures.py --input signing_results.json python scripts/embed_signatures.py --domain mockloop.com --public-key-url https://mockloop.com/.well-known/schemapin/public-key.pem """ import argparse import ast import json import logging import os import sys from pathlib import Path from typing import Any # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class SignatureEmbedder: """Embeds signature metadata into @signed_tool decorators.""" def __init__(self, domain: str, public_key_url: str | None = None): """Initialize signature embedder.""" self.domain = domain self.public_key_url = public_key_url or f"https://{domain}/.well-known/schemapin/public-key.pem" self.updated_files: set[str] = set() def embed_from_results(self, results_file: str) -> None: """Embed signatures from signing results file.""" logger.info(f"Loading signing results from {results_file}") with open(results_file, encoding='utf-8') as f: results = json.load(f) if results.get("status") != "completed": raise ValueError(f"Invalid results file: {results_file}") logger.info(f"Processing {len(results['results'])} signed tools") for tool_result in results["results"]: if tool_result["status"] == "signed": self._embed_tool_signature(tool_result) logger.info(f"Updated {len(self.updated_files)} files") def embed_from_environment(self) -> None: """Generate signatures and embed them using environment variables.""" # Import here to avoid circular imports sys.path.insert(0, str(Path(__file__).parent.parent / "src")) sys.path.insert(0, str(Path(__file__).parent)) # Import the classes directly from the sign_all_schemas module import importlib.util spec = importlib.util.spec_from_file_location( "sign_all_schemas", Path(__file__).parent / "sign_all_schemas.py" ) if not spec or not spec.loader: raise ImportError("Could not load sign_all_schemas module") sign_module = importlib.util.module_from_spec(spec) spec.loader.exec_module(sign_module) ToolDiscovery = sign_module.ToolDiscovery BatchSigner = sign_module.BatchSigner # Get environment variables domain = os.getenv("MOCKLOOP_SIGNING_DOMAIN") private_key_path = os.getenv("MOCKLOOP_PRIVATE_KEY_PATH") if not domain or not private_key_path: raise ValueError("MOCKLOOP_SIGNING_DOMAIN and MOCKLOOP_PRIVATE_KEY_PATH environment variables required") logger.info(f"Generating signatures for domain: {domain}") # Discover and sign tools discovery = ToolDiscovery([Path("src")]) tools = discovery.discover_tools() signer = BatchSigner( domain=domain, private_key_path=private_key_path ) results = signer.sign_tools(tools, force_resign=True) # Embed signatures for tool_result in results["results"]: if tool_result["status"] == "signed": self._embed_tool_signature(tool_result) logger.info(f"Updated {len(self.updated_files)} files") def _embed_tool_signature(self, tool_result: dict[str, Any]) -> None: """Embed signature for a single tool.""" file_path = tool_result["file_path"] tool_name = tool_result["tool_name"] logger.info(f"Embedding signature for {tool_name} in {file_path}") try: # Read the file with open(file_path, encoding='utf-8') as f: content = f.read() # Parse AST tree = ast.parse(content) # Find the function and its decorator updated_content = self._update_decorator_in_content( content, tool_name, tool_result ) if updated_content != content: # Write back the updated content with open(file_path, 'w', encoding='utf-8') as f: f.write(updated_content) self.updated_files.add(file_path) logger.info(f"Updated {tool_name} in {file_path}") else: logger.warning(f"No changes made to {tool_name} in {file_path}") except Exception as e: logger.exception(f"Failed to embed signature for {tool_name}") raise def _update_decorator_in_content(self, content: str, tool_name: str, tool_result: dict[str, Any]) -> str: """Update the @signed_tool decorator in the content.""" lines = content.split('\n') updated_lines = [] i = 0 while i < len(lines): line = lines[i] # Look for @signed_tool decorator if '@signed_tool' in line and not line.strip().startswith('#'): # Find the function this decorator belongs to func_line_idx = self._find_function_after_decorator(lines, i) if func_line_idx and tool_name in lines[func_line_idx]: # Replace the decorator new_decorator = self._generate_embedded_decorator(tool_result) # Handle multi-line decorators decorator_end = self._find_decorator_end(lines, i) # Replace the decorator lines updated_lines.extend(lines[:i]) updated_lines.extend(new_decorator.split('\n')) i = decorator_end + 1 continue updated_lines.append(line) i += 1 return '\n'.join(updated_lines) def _find_function_after_decorator(self, lines: list[str], decorator_idx: int) -> int | None: """Find the function definition after a decorator.""" for i in range(decorator_idx + 1, min(decorator_idx + 10, len(lines))): line = lines[i].strip() if line.startswith('def ') or line.startswith('async def '): return i return None def _find_decorator_end(self, lines: list[str], start_idx: int) -> int: """Find the end of a decorator (handles multi-line decorators).""" i = start_idx paren_count = 0 in_decorator = False while i < len(lines): line = lines[i] if '@signed_tool' in line: in_decorator = True if in_decorator: paren_count += line.count('(') - line.count(')') # If we've closed all parentheses, decorator is complete if paren_count == 0 and '(' in lines[start_idx]: return i # If no parentheses, single line decorator if '(' not in lines[start_idx]: return i i += 1 return start_idx def _generate_embedded_decorator(self, tool_result: dict[str, Any]) -> str: """Generate the new @signed_tool decorator with embedded metadata.""" signature = tool_result["signature"] schema_hash = tool_result["schema_hash"] domain = tool_result["domain"] # Generate timestamp if not present signed_at = tool_result.get("signed_at") if not signed_at: from datetime import datetime, timezone signed_at = datetime.now(timezone.utc).isoformat() decorator = f'''@signed_tool( domain="{domain}", signature="{signature}", schema_hash="{schema_hash}", public_key_url="{self.public_key_url}", signed_at="{signed_at}" )''' return decorator def main(): """Main entry point.""" parser = argparse.ArgumentParser( description="Embed signature metadata into @signed_tool decorators", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: %(prog)s --input signing_results.json %(prog)s --domain mockloop.com --public-key-url https://mockloop.com/.well-known/schemapin/public-key.pem """ ) parser.add_argument( "--input", help="JSON file with signing results from sign_all_schemas.py" ) parser.add_argument( "--domain", help="Domain for signatures (used with environment variables)" ) parser.add_argument( "--public-key-url", help="URL to public key for verification" ) parser.add_argument( "--verbose", action="store_true", help="Enable verbose logging" ) args = parser.parse_args() if args.verbose: logging.getLogger().setLevel(logging.DEBUG) try: if args.input: # Use results file if not Path(args.input).exists(): logger.error(f"Results file not found: {args.input}") sys.exit(1) domain = args.domain if not domain: # Try to extract domain from results file with open(args.input, encoding='utf-8') as f: results = json.load(f) domain = results.get("domain") if not domain: logger.error("Domain must be specified via --domain or present in results file") sys.exit(1) embedder = SignatureEmbedder(domain, args.public_key_url) embedder.embed_from_results(args.input) elif args.domain: # Use environment variables embedder = SignatureEmbedder(args.domain, args.public_key_url) embedder.embed_from_environment() else: logger.error("Either --input or --domain must be specified") sys.exit(1) logger.info("Signature embedding completed successfully") except Exception: logger.exception("Signature embedding failed") sys.exit(1) if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MockLoop/mockloop-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server