Skip to main content
Glama

API Manager

api_manager.py28 kB
#!/usr/bin/env python3 """ API Manager MCP Server An enhanced MCP server for managing API keys that works with Claude Desktop and Claude Code """ import os import json import sys import argparse import logging import re import shutil from typing import Any, Sequence, Dict, List, Optional from pathlib import Path from datetime import datetime import asyncio from dataclasses import dataclass # MCP SDK imports try: from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.types import Tool, TextContent, ImageContent, EmbeddedResource except ImportError: print("Error: mcp package not found. Install with: pip install mcp", file=sys.stderr) sys.exit(1) # Version information __version__ = "1.2.0" __author__ = "API Manager Team" __description__ = "Enhanced API Key Manager MCP Server for Claude Desktop and Claude Code" # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('api_manager.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) @dataclass class APIKeyStats: """Statistics for API key usage""" total_keys: int categories: Dict[str, int] last_backup: Optional[str] = None last_modified: Optional[str] = None class APIKeyManager: """Enhanced API Key Manager with improved security and features""" def __init__(self, env_file: str = None): """Initialize the API Key Manager""" if env_file: self.env_file = Path(env_file) else: # Use a more secure default location self.env_file = Path.home() / ".config" / "api_manager" / ".env" # Create file and directory if they don't exist if not self.env_file.exists(): self.env_file.parent.mkdir(parents=True, exist_ok=True) self.env_file.touch(mode=0o600) # Secure file permissions self._add_initial_header() logger.info(f"API Manager initialized with env file: {self.env_file}") def _add_initial_header(self): """Add initial header to the .env file""" header = f"""# API Manager Environment File # Created on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} # Managed by API Manager MCP Server v{__version__} # # Format: KEY_NAME=value # Lines starting with # are comments # """ with open(self.env_file, 'w') as f: f.write(header) def load_env(self) -> Dict[str, str]: """Load environment variables from .env file""" env_vars = {} if self.env_file.exists(): try: with open(self.env_file, 'r', encoding='utf-8') as f: for line_num, line in enumerate(f, 1): line = line.strip() if line and not line.startswith('#') and '=' in line: try: key, value = line.split('=', 1) env_vars[key.strip()] = value.strip() except ValueError: logger.warning(f"Malformed line {line_num} in {self.env_file}: {line}") logger.info(f"Loaded {len(env_vars)} API keys from {self.env_file}") except Exception as e: logger.error(f"Error loading env file: {e}") return env_vars def save_env(self, env_vars: Dict[str, str]): """Save environment variables to .env file with atomic write""" try: # Create a temporary file for atomic write temp_file = self.env_file.with_suffix('.tmp') lines = [] # Read original file to preserve comments and structure if self.env_file.exists(): with open(self.env_file, 'r', encoding='utf-8') as f: original_lines = f.readlines() else: original_lines = [] # Update existing values while preserving structure updated_keys = set() for line in original_lines: stripped = line.strip() if stripped and not stripped.startswith('#') and '=' in stripped: key = stripped.split('=', 1)[0].strip() if key in env_vars: lines.append(f"{key}={env_vars[key]}\n") updated_keys.add(key) else: lines.append(line) else: lines.append(line) # Add new keys for key, value in env_vars.items(): if key not in updated_keys: lines.append(f"\n# Added on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") lines.append(f"{key}={value}\n") # Write to temporary file first, then move with open(temp_file, 'w', encoding='utf-8') as f: f.writelines(lines) # Set secure permissions temp_file.chmod(0o600) # Atomic move temp_file.replace(self.env_file) logger.info(f"Saved {len(env_vars)} API keys to {self.env_file}") except Exception as e: logger.error(f"Error saving env file: {e}") raise def get_category(self, key: str) -> str: """Determine category based on key name with expanded categorization""" key_lower = key.lower() categories = { # AI/LLM Services 'anthropic': 'Anthropic (Claude)', 'claude': 'Anthropic (Claude)', 'openai': 'OpenAI', 'google': 'Google AI', 'gemini': 'Google AI', 'groq': 'Groq', 'mistral': 'Mistral AI', 'deepseek': 'DeepSeek', 'huggingface': 'Hugging Face', 'openrouter': 'OpenRouter', 'together': 'Together AI', 'cohere': 'Cohere', 'ai21': 'AI21 Labs', # Automation & Workflow 'n8n': 'N8N Automation', 'zapier': 'Zapier', 'make': 'Make (Integromat)', 'ifttt': 'IFTTT', # Database & Storage 'supabase': 'Supabase', 'neo4j': 'Neo4j', 'quadrant': 'Qdrant Vector DB', 'pinecone': 'Pinecone', 'weaviate': 'Weaviate', 'zep': 'Zep Memory', 'airtable': 'Airtable', 'notion': 'Notion', 'mongodb': 'MongoDB', 'redis': 'Redis', # Communication 'discord': 'Discord', 'slack': 'Slack', 'twilio': 'Twilio', 'telegram': 'Telegram', 'whatsapp': 'WhatsApp', # Development Tools 'github': 'GitHub', 'gitlab': 'GitLab', 'firebase': 'Firebase', 'vercel': 'Vercel', 'netlify': 'Netlify', # Search & Data 'serper': 'Serper', 'serpapi': 'SerpAPI', 'tavily': 'Tavily Search', 'jina': 'Jina AI', 'perplexity': 'Perplexity', # Cloud & Hosting 'railway': 'Railway', 'heroku': 'Heroku', 'aws': 'Amazon Web Services', 'azure': 'Microsoft Azure', 'gcp': 'Google Cloud Platform', # Networking 'ngrok': 'Ngrok', 'cloudflare': 'Cloudflare', # Audio & Media 'elevenlabs': 'ElevenLabs', 'gladia': 'Gladia', 'deepgram': 'Deepgram', 'assemblyai': 'AssemblyAI', # Analytics & Monitoring 'langtrace': 'LangTrace', 'datadog': 'Datadog', 'newrelic': 'New Relic', 'sentry': 'Sentry', # Payment & Commerce 'stripe': 'Stripe', 'paypal': 'PayPal', 'square': 'Square', # Email & Marketing 'sendgrid': 'SendGrid', 'mailgun': 'Mailgun', 'resend': 'Resend', } for keyword, category in categories.items(): if keyword in key_lower: return category return 'Other' def validate_key_name(self, key_name: str) -> bool: """Validate API key name format""" if not key_name or len(key_name.strip()) == 0: return False # Check for invalid characters if re.search(r'[\\/*?:"<>|]', key_name): return False # Check length (reasonable limits) if len(key_name) > 100: return False return True def validate_key_value(self, key_value: str) -> bool: """Validate API key value format""" if not key_value or len(key_value.strip()) == 0: return False # Check length (API keys are usually between 20-200 characters) if len(key_value) < 8 or len(key_value) > 500: return False # Check for obvious patterns that might indicate invalid keys if key_value.lower() in ['test', 'example', 'placeholder', 'your_key_here']: return False return True def mask_value(self, value: str) -> str: """Mask API key value for display""" if len(value) <= 8: return '*' * len(value) elif len(value) <= 16: return value[:2] + '*' * (len(value) - 4) + value[-2:] else: return value[:4] + '*' * (len(value) - 8) + value[-4:] def get_stats(self) -> APIKeyStats: """Get statistics about API keys""" env_vars = self.load_env() categories = {} for key in env_vars.keys(): cat = self.get_category(key) categories[cat] = categories.get(cat, 0) + 1 # Check for last backup backup_files = list(self.env_file.parent.glob(f"{self.env_file.name}.backup_*")) last_backup = None if backup_files: latest_backup = max(backup_files, key=lambda p: p.stat().st_mtime) last_backup = latest_backup.name # Get last modified time last_modified = None if self.env_file.exists(): last_modified = datetime.fromtimestamp(self.env_file.stat().st_mtime).strftime('%Y-%m-%d %H:%M:%S') return APIKeyStats( total_keys=len(env_vars), categories=categories, last_backup=last_backup, last_modified=last_modified ) def create_backup(self) -> str: """Create a timestamped backup of the .env file""" timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') backup_file = f"{self.env_file}.backup_{timestamp}" shutil.copy2(self.env_file, backup_file) os.chmod(backup_file, 0o600) # Secure permissions logger.info(f"Created backup: {backup_file}") return backup_file def export_keys(self, format: str = 'json', include_values: bool = False) -> str: """Export API keys in various formats""" env_vars = self.load_env() if format.lower() == 'json': if include_values: data = env_vars else: data = {key: self.mask_value(value) for key, value in env_vars.items()} return json.dumps(data, indent=2) elif format.lower() == 'env': lines = [] for key, value in env_vars.items(): if include_values: lines.append(f"{key}={value}") else: lines.append(f"{key}={self.mask_value(value)}") return '\n'.join(lines) else: raise ValueError(f"Unsupported export format: {format}") # Parse command line arguments def parse_args(): parser = argparse.ArgumentParser(description=f"API Manager MCP Server v{__version__}") parser.add_argument("--env-file", help="Path to the .env file", default=None) parser.add_argument("--version", action="store_true", help="Show version information") parser.add_argument("--log-level", choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], default='INFO', help="Set logging level") return parser.parse_args() # Initialize with command line arguments args = parse_args() if args.version: print(f"API Manager MCP Server v{__version__}") print(f"Author: {__author__}") print(f"Description: {__description__}") sys.exit(0) # Set logging level logging.getLogger().setLevel(getattr(logging, args.log_level)) # Initialize manager manager = APIKeyManager(args.env_file) # Create MCP server app = Server("api-manager") @app.list_tools() async def list_tools() -> List[Tool]: """List available tools""" return [ Tool( name="list_api_keys", description="List all API keys with their categories. Returns masked values by default for security.", inputSchema={ "type": "object", "properties": { "show_values": { "type": "boolean", "description": "Show actual values instead of masked values", "default": False }, "category": { "type": "string", "description": "Filter by category (optional)" } } } ), Tool( name="get_api_key", description="Get a specific API key by name. Returns the actual unmasked value.", inputSchema={ "type": "object", "properties": { "key_name": { "type": "string", "description": "The name of the API key to retrieve" } }, "required": ["key_name"] } ), Tool( name="add_api_key", description="Add a new API key or update an existing one", inputSchema={ "type": "object", "properties": { "key_name": { "type": "string", "description": "The name of the API key" }, "value": { "type": "string", "description": "The API key value" } }, "required": ["key_name", "value"] } ), Tool( name="delete_api_key", description="Delete an API key", inputSchema={ "type": "object", "properties": { "key_name": { "type": "string", "description": "The name of the API key to delete" } }, "required": ["key_name"] } ), Tool( name="search_api_keys", description="Search for API keys by name or category", inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "Search query (searches in key names and categories)" }, "show_values": { "type": "boolean", "description": "Show actual values instead of masked values", "default": False } }, "required": ["query"] } ), Tool( name="list_categories", description="List all categories of API keys with counts", inputSchema={ "type": "object", "properties": {} } ), Tool( name="backup_env_file", description="Create a timestamped backup of the .env file", inputSchema={ "type": "object", "properties": {} } ), Tool( name="get_stats", description="Get statistics about API keys and usage", inputSchema={ "type": "object", "properties": {} } ), Tool( name="export_keys", description="Export API keys in various formats", inputSchema={ "type": "object", "properties": { "format": { "type": "string", "enum": ["json", "env"], "description": "Export format (json or env)", "default": "json" }, "include_values": { "type": "boolean", "description": "Include actual values (security risk!)", "default": False } } } ), Tool( name="help", description="Show comprehensive help information for the API Manager", inputSchema={ "type": "object", "properties": {} } ) ] @app.call_tool() async def call_tool(name: str, arguments: Any) -> Sequence[TextContent | ImageContent | EmbeddedResource]: """Handle tool calls with enhanced error handling and logging""" try: logger.info(f"Tool called: {name} with arguments: {arguments}") if name == "list_api_keys": env_vars = manager.load_env() show_values = arguments.get("show_values", False) category_filter = arguments.get("category") # Group by category categorized = {} for key, value in env_vars.items(): cat = manager.get_category(key) if category_filter and category_filter.lower() not in cat.lower(): continue if cat not in categorized: categorized[cat] = [] # Mask value if needed display_value = value if show_values else manager.mask_value(value) categorized[cat].append(f"{key}: {display_value}") # Format output result = [] result.append(f"🔐 API Keys in {manager.env_file}\n") result.append("=" * 80 + "\n\n") total = 0 for cat in sorted(categorized.keys()): result.append(f"📁 {cat}\n") result.append("─" * 80 + "\n") for item in sorted(categorized[cat]): result.append(f" {item}\n") total += 1 result.append("\n") result.append(f"📊 Total API Keys: {total}") if category_filter: result.append(f" (filtered by: {category_filter})") return [TextContent(type="text", text="".join(result))] elif name == "get_api_key": env_vars = manager.load_env() key_name = arguments["key_name"] if key_name not in env_vars: return [TextContent(type="text", text=f"❌ Error: Key '{key_name}' not found")] category = manager.get_category(key_name) result = f"🔑 API Key Details\n\n" result += f"📝 Name: {key_name}\n" result += f"📁 Category: {category}\n" result += f"🔐 Value: {env_vars[key_name]}\n" result += f"⚠️ Security: Handle with care - this is the actual unmasked value!" return [TextContent(type="text", text=result)] elif name == "add_api_key": env_vars = manager.load_env() key_name = arguments["key_name"] value = arguments["value"] # Validate inputs if not manager.validate_key_name(key_name): return [TextContent(type="text", text="❌ Error: Invalid key name. Names cannot contain special characters like \\/*?:\"<>|")] if not manager.validate_key_value(value): return [TextContent(type="text", text="❌ Error: Invalid key value. Value seems too short, too long, or invalid.")] exists = key_name in env_vars env_vars[key_name] = value manager.save_env(env_vars) action = "updated" if exists else "added" result = f"✅ Successfully {action} '{key_name}'\n" result += f"📁 Category: {manager.get_category(key_name)}\n" result += f"🔐 Masked Value: {manager.mask_value(value)}" return [TextContent(type="text", text=result)] elif name == "delete_api_key": env_vars = manager.load_env() key_name = arguments["key_name"] if key_name not in env_vars: return [TextContent(type="text", text=f"❌ Error: Key '{key_name}' not found")] # Create backup before deletion backup_file = manager.create_backup() del env_vars[key_name] manager.save_env(env_vars) result = f"✅ Successfully deleted '{key_name}'\n" result += f"💾 Backup created: {Path(backup_file).name}" return [TextContent(type="text", text=result)] elif name == "search_api_keys": env_vars = manager.load_env() query = arguments["query"].lower() show_values = arguments.get("show_values", False) results = [] for key, value in env_vars.items(): category = manager.get_category(key) # Search in both key name and category if query in key.lower() or query in category.lower(): display_value = value if show_values else manager.mask_value(value) results.append(f"📁 {category} | {key}: {display_value}") if not results: return [TextContent(type="text", text=f"🔍 No keys found matching '{query}'")] result = f"🔍 Search Results for '{query}'\n\n" result += "\n".join(sorted(results)) result += f"\n\n📊 Found {len(results)} matching key(s)" return [TextContent(type="text", text=result)] elif name == "list_categories": env_vars = manager.load_env() cats = {} for key in env_vars.keys(): cat = manager.get_category(key) cats[cat] = cats.get(cat, 0) + 1 result = "📁 Available Categories\n\n" for cat in sorted(cats.keys()): result += f"📂 {cat} ({cats[cat]} keys)\n" result += f"\n📊 Total Categories: {len(cats)}" return [TextContent(type="text", text=result)] elif name == "backup_env_file": backup_file = manager.create_backup() result = f"✅ Backup created successfully\n" result += f"📁 Location: {backup_file}\n" result += f"🕒 Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" return [TextContent(type="text", text=result)] elif name == "get_stats": stats = manager.get_stats() result = f"📊 API Manager Statistics\n\n" result += f"🔐 Total API Keys: {stats.total_keys}\n" result += f"📁 Categories: {len(stats.categories)}\n\n" result += "📈 Category Breakdown:\n" for cat, count in sorted(stats.categories.items()): percentage = (count / stats.total_keys * 100) if stats.total_keys > 0 else 0 result += f" 📂 {cat}: {count} keys ({percentage:.1f}%)\n" result += f"\n📝 Environment File: {manager.env_file}\n" if stats.last_modified: result += f"🕒 Last Modified: {stats.last_modified}\n" if stats.last_backup: result += f"💾 Last Backup: {stats.last_backup}\n" return [TextContent(type="text", text=result)] elif name == "export_keys": format_type = arguments.get("format", "json") include_values = arguments.get("include_values", False) try: exported_data = manager.export_keys(format_type, include_values) result = f"📤 API Keys Export ({format_type.upper()})\n\n" if include_values: result += "⚠️ WARNING: This export contains actual API key values!\n\n" else: result += "🔐 Keys are masked for security\n\n" result += "```\n" result += exported_data result += "\n```" return [TextContent(type="text", text=result)] except ValueError as e: return [TextContent(type="text", text=f"❌ Export Error: {str(e)}")] elif name == "help": help_text = f""" 🔐 API Manager MCP Server v{__version__} {__description__} 📋 Available Commands: 🔍 **Query Commands:** • list_api_keys - List all API keys with categories - show_values: Show actual values (default: false) - category: Filter by specific category • get_api_key - Get a specific API key by name - key_name: Name of the key to retrieve • search_api_keys - Search for API keys by name or category - query: Search term - show_values: Show actual values (default: false) • list_categories - List all categories with counts 📝 **Management Commands:** • add_api_key - Add or update an API key - key_name: Name of the API key - value: The API key value • delete_api_key - Delete an API key - key_name: Name of the key to delete 🛠️ **Utility Commands:** • backup_env_file - Create timestamped backup • get_stats - Show detailed statistics • export_keys - Export keys in JSON or ENV format - format: "json" or "env" (default: json) - include_values: Include actual values (default: false) • help - Show this help message 📁 **Supported Categories:** AI/LLM: Anthropic, OpenAI, Google AI, Groq, Mistral, etc. Database: Supabase, Neo4j, Qdrant, Pinecone, Airtable, etc. Communication: Discord, Slack, Twilio, Telegram, etc. Development: GitHub, Firebase, Vercel, Netlify, etc. Cloud: AWS, Azure, GCP, Railway, Heroku, etc. And many more... 🔒 **Security Features:** • Secure file permissions (600) • Atomic file operations • Automatic backups on deletion • Value masking for display • Input validation 📂 **File Location:** {manager.env_file} ⚠️ **Security Note:** API keys are sensitive data. Always use show_values=false for listing/searching unless absolutely necessary. """ return [TextContent(type="text", text=help_text)] else: return [TextContent(type="text", text=f"❌ Unknown tool: {name}")] except Exception as e: logger.error(f"Error in tool call '{name}': {str(e)}", exc_info=True) return [TextContent(type="text", text=f"❌ Error: {str(e)}")] async def main(): """Run the MCP server with enhanced error handling""" try: logger.info(f"Starting API Manager MCP Server v{__version__}") logger.info(f"Environment file: {manager.env_file}") async with stdio_server() as (read_stream, write_stream): await app.run( read_stream, write_stream, app.create_initialization_options() ) except KeyboardInterrupt: logger.info("Server stopped by user") except Exception as e: logger.exception("Unexpected error in main") print(f"Fatal Error: {str(e)}", file=sys.stderr) sys.exit(1) if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/BlackWolf-Media/API_Manager_MCPServer'

If you have feedback or need assistance with the MCP directory API, please join our Discord server