api_manager.py•28 kB
#!/usr/bin/env python3
"""
API Manager MCP Server
An enhanced MCP server for managing API keys that works with Claude Desktop and Claude Code
"""
import os
import json
import sys
import argparse
import logging
import re
import shutil
from typing import Any, Sequence, Dict, List, Optional
from pathlib import Path
from datetime import datetime
import asyncio
from dataclasses import dataclass
# MCP SDK imports
try:
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent, ImageContent, EmbeddedResource
except ImportError:
print("Error: mcp package not found. Install with: pip install mcp", file=sys.stderr)
sys.exit(1)
# Version information
__version__ = "1.2.0"
__author__ = "API Manager Team"
__description__ = "Enhanced API Key Manager MCP Server for Claude Desktop and Claude Code"
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('api_manager.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
@dataclass
class APIKeyStats:
"""Statistics for API key usage"""
total_keys: int
categories: Dict[str, int]
last_backup: Optional[str] = None
last_modified: Optional[str] = None
class APIKeyManager:
"""Enhanced API Key Manager with improved security and features"""
def __init__(self, env_file: str = None):
"""Initialize the API Key Manager"""
if env_file:
self.env_file = Path(env_file)
else:
# Use a more secure default location
self.env_file = Path.home() / ".config" / "api_manager" / ".env"
# Create file and directory if they don't exist
if not self.env_file.exists():
self.env_file.parent.mkdir(parents=True, exist_ok=True)
self.env_file.touch(mode=0o600) # Secure file permissions
self._add_initial_header()
logger.info(f"API Manager initialized with env file: {self.env_file}")
def _add_initial_header(self):
"""Add initial header to the .env file"""
header = f"""# API Manager Environment File
# Created on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
# Managed by API Manager MCP Server v{__version__}
#
# Format: KEY_NAME=value
# Lines starting with # are comments
#
"""
with open(self.env_file, 'w') as f:
f.write(header)
def load_env(self) -> Dict[str, str]:
"""Load environment variables from .env file"""
env_vars = {}
if self.env_file.exists():
try:
with open(self.env_file, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if line and not line.startswith('#') and '=' in line:
try:
key, value = line.split('=', 1)
env_vars[key.strip()] = value.strip()
except ValueError:
logger.warning(f"Malformed line {line_num} in {self.env_file}: {line}")
logger.info(f"Loaded {len(env_vars)} API keys from {self.env_file}")
except Exception as e:
logger.error(f"Error loading env file: {e}")
return env_vars
def save_env(self, env_vars: Dict[str, str]):
"""Save environment variables to .env file with atomic write"""
try:
# Create a temporary file for atomic write
temp_file = self.env_file.with_suffix('.tmp')
lines = []
# Read original file to preserve comments and structure
if self.env_file.exists():
with open(self.env_file, 'r', encoding='utf-8') as f:
original_lines = f.readlines()
else:
original_lines = []
# Update existing values while preserving structure
updated_keys = set()
for line in original_lines:
stripped = line.strip()
if stripped and not stripped.startswith('#') and '=' in stripped:
key = stripped.split('=', 1)[0].strip()
if key in env_vars:
lines.append(f"{key}={env_vars[key]}\n")
updated_keys.add(key)
else:
lines.append(line)
else:
lines.append(line)
# Add new keys
for key, value in env_vars.items():
if key not in updated_keys:
lines.append(f"\n# Added on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
lines.append(f"{key}={value}\n")
# Write to temporary file first, then move
with open(temp_file, 'w', encoding='utf-8') as f:
f.writelines(lines)
# Set secure permissions
temp_file.chmod(0o600)
# Atomic move
temp_file.replace(self.env_file)
logger.info(f"Saved {len(env_vars)} API keys to {self.env_file}")
except Exception as e:
logger.error(f"Error saving env file: {e}")
raise
def get_category(self, key: str) -> str:
"""Determine category based on key name with expanded categorization"""
key_lower = key.lower()
categories = {
# AI/LLM Services
'anthropic': 'Anthropic (Claude)',
'claude': 'Anthropic (Claude)',
'openai': 'OpenAI',
'google': 'Google AI',
'gemini': 'Google AI',
'groq': 'Groq',
'mistral': 'Mistral AI',
'deepseek': 'DeepSeek',
'huggingface': 'Hugging Face',
'openrouter': 'OpenRouter',
'together': 'Together AI',
'cohere': 'Cohere',
'ai21': 'AI21 Labs',
# Automation & Workflow
'n8n': 'N8N Automation',
'zapier': 'Zapier',
'make': 'Make (Integromat)',
'ifttt': 'IFTTT',
# Database & Storage
'supabase': 'Supabase',
'neo4j': 'Neo4j',
'quadrant': 'Qdrant Vector DB',
'pinecone': 'Pinecone',
'weaviate': 'Weaviate',
'zep': 'Zep Memory',
'airtable': 'Airtable',
'notion': 'Notion',
'mongodb': 'MongoDB',
'redis': 'Redis',
# Communication
'discord': 'Discord',
'slack': 'Slack',
'twilio': 'Twilio',
'telegram': 'Telegram',
'whatsapp': 'WhatsApp',
# Development Tools
'github': 'GitHub',
'gitlab': 'GitLab',
'firebase': 'Firebase',
'vercel': 'Vercel',
'netlify': 'Netlify',
# Search & Data
'serper': 'Serper',
'serpapi': 'SerpAPI',
'tavily': 'Tavily Search',
'jina': 'Jina AI',
'perplexity': 'Perplexity',
# Cloud & Hosting
'railway': 'Railway',
'heroku': 'Heroku',
'aws': 'Amazon Web Services',
'azure': 'Microsoft Azure',
'gcp': 'Google Cloud Platform',
# Networking
'ngrok': 'Ngrok',
'cloudflare': 'Cloudflare',
# Audio & Media
'elevenlabs': 'ElevenLabs',
'gladia': 'Gladia',
'deepgram': 'Deepgram',
'assemblyai': 'AssemblyAI',
# Analytics & Monitoring
'langtrace': 'LangTrace',
'datadog': 'Datadog',
'newrelic': 'New Relic',
'sentry': 'Sentry',
# Payment & Commerce
'stripe': 'Stripe',
'paypal': 'PayPal',
'square': 'Square',
# Email & Marketing
'sendgrid': 'SendGrid',
'mailgun': 'Mailgun',
'resend': 'Resend',
}
for keyword, category in categories.items():
if keyword in key_lower:
return category
return 'Other'
def validate_key_name(self, key_name: str) -> bool:
"""Validate API key name format"""
if not key_name or len(key_name.strip()) == 0:
return False
# Check for invalid characters
if re.search(r'[\\/*?:"<>|]', key_name):
return False
# Check length (reasonable limits)
if len(key_name) > 100:
return False
return True
def validate_key_value(self, key_value: str) -> bool:
"""Validate API key value format"""
if not key_value or len(key_value.strip()) == 0:
return False
# Check length (API keys are usually between 20-200 characters)
if len(key_value) < 8 or len(key_value) > 500:
return False
# Check for obvious patterns that might indicate invalid keys
if key_value.lower() in ['test', 'example', 'placeholder', 'your_key_here']:
return False
return True
def mask_value(self, value: str) -> str:
"""Mask API key value for display"""
if len(value) <= 8:
return '*' * len(value)
elif len(value) <= 16:
return value[:2] + '*' * (len(value) - 4) + value[-2:]
else:
return value[:4] + '*' * (len(value) - 8) + value[-4:]
def get_stats(self) -> APIKeyStats:
"""Get statistics about API keys"""
env_vars = self.load_env()
categories = {}
for key in env_vars.keys():
cat = self.get_category(key)
categories[cat] = categories.get(cat, 0) + 1
# Check for last backup
backup_files = list(self.env_file.parent.glob(f"{self.env_file.name}.backup_*"))
last_backup = None
if backup_files:
latest_backup = max(backup_files, key=lambda p: p.stat().st_mtime)
last_backup = latest_backup.name
# Get last modified time
last_modified = None
if self.env_file.exists():
last_modified = datetime.fromtimestamp(self.env_file.stat().st_mtime).strftime('%Y-%m-%d %H:%M:%S')
return APIKeyStats(
total_keys=len(env_vars),
categories=categories,
last_backup=last_backup,
last_modified=last_modified
)
def create_backup(self) -> str:
"""Create a timestamped backup of the .env file"""
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_file = f"{self.env_file}.backup_{timestamp}"
shutil.copy2(self.env_file, backup_file)
os.chmod(backup_file, 0o600) # Secure permissions
logger.info(f"Created backup: {backup_file}")
return backup_file
def export_keys(self, format: str = 'json', include_values: bool = False) -> str:
"""Export API keys in various formats"""
env_vars = self.load_env()
if format.lower() == 'json':
if include_values:
data = env_vars
else:
data = {key: self.mask_value(value) for key, value in env_vars.items()}
return json.dumps(data, indent=2)
elif format.lower() == 'env':
lines = []
for key, value in env_vars.items():
if include_values:
lines.append(f"{key}={value}")
else:
lines.append(f"{key}={self.mask_value(value)}")
return '\n'.join(lines)
else:
raise ValueError(f"Unsupported export format: {format}")
# Parse command line arguments
def parse_args():
parser = argparse.ArgumentParser(description=f"API Manager MCP Server v{__version__}")
parser.add_argument("--env-file", help="Path to the .env file", default=None)
parser.add_argument("--version", action="store_true", help="Show version information")
parser.add_argument("--log-level", choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'],
default='INFO', help="Set logging level")
return parser.parse_args()
# Initialize with command line arguments
args = parse_args()
if args.version:
print(f"API Manager MCP Server v{__version__}")
print(f"Author: {__author__}")
print(f"Description: {__description__}")
sys.exit(0)
# Set logging level
logging.getLogger().setLevel(getattr(logging, args.log_level))
# Initialize manager
manager = APIKeyManager(args.env_file)
# Create MCP server
app = Server("api-manager")
@app.list_tools()
async def list_tools() -> List[Tool]:
"""List available tools"""
return [
Tool(
name="list_api_keys",
description="List all API keys with their categories. Returns masked values by default for security.",
inputSchema={
"type": "object",
"properties": {
"show_values": {
"type": "boolean",
"description": "Show actual values instead of masked values",
"default": False
},
"category": {
"type": "string",
"description": "Filter by category (optional)"
}
}
}
),
Tool(
name="get_api_key",
description="Get a specific API key by name. Returns the actual unmasked value.",
inputSchema={
"type": "object",
"properties": {
"key_name": {
"type": "string",
"description": "The name of the API key to retrieve"
}
},
"required": ["key_name"]
}
),
Tool(
name="add_api_key",
description="Add a new API key or update an existing one",
inputSchema={
"type": "object",
"properties": {
"key_name": {
"type": "string",
"description": "The name of the API key"
},
"value": {
"type": "string",
"description": "The API key value"
}
},
"required": ["key_name", "value"]
}
),
Tool(
name="delete_api_key",
description="Delete an API key",
inputSchema={
"type": "object",
"properties": {
"key_name": {
"type": "string",
"description": "The name of the API key to delete"
}
},
"required": ["key_name"]
}
),
Tool(
name="search_api_keys",
description="Search for API keys by name or category",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query (searches in key names and categories)"
},
"show_values": {
"type": "boolean",
"description": "Show actual values instead of masked values",
"default": False
}
},
"required": ["query"]
}
),
Tool(
name="list_categories",
description="List all categories of API keys with counts",
inputSchema={
"type": "object",
"properties": {}
}
),
Tool(
name="backup_env_file",
description="Create a timestamped backup of the .env file",
inputSchema={
"type": "object",
"properties": {}
}
),
Tool(
name="get_stats",
description="Get statistics about API keys and usage",
inputSchema={
"type": "object",
"properties": {}
}
),
Tool(
name="export_keys",
description="Export API keys in various formats",
inputSchema={
"type": "object",
"properties": {
"format": {
"type": "string",
"enum": ["json", "env"],
"description": "Export format (json or env)",
"default": "json"
},
"include_values": {
"type": "boolean",
"description": "Include actual values (security risk!)",
"default": False
}
}
}
),
Tool(
name="help",
description="Show comprehensive help information for the API Manager",
inputSchema={
"type": "object",
"properties": {}
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: Any) -> Sequence[TextContent | ImageContent | EmbeddedResource]:
"""Handle tool calls with enhanced error handling and logging"""
try:
logger.info(f"Tool called: {name} with arguments: {arguments}")
if name == "list_api_keys":
env_vars = manager.load_env()
show_values = arguments.get("show_values", False)
category_filter = arguments.get("category")
# Group by category
categorized = {}
for key, value in env_vars.items():
cat = manager.get_category(key)
if category_filter and category_filter.lower() not in cat.lower():
continue
if cat not in categorized:
categorized[cat] = []
# Mask value if needed
display_value = value if show_values else manager.mask_value(value)
categorized[cat].append(f"{key}: {display_value}")
# Format output
result = []
result.append(f"🔐 API Keys in {manager.env_file}\n")
result.append("=" * 80 + "\n\n")
total = 0
for cat in sorted(categorized.keys()):
result.append(f"📁 {cat}\n")
result.append("─" * 80 + "\n")
for item in sorted(categorized[cat]):
result.append(f" {item}\n")
total += 1
result.append("\n")
result.append(f"📊 Total API Keys: {total}")
if category_filter:
result.append(f" (filtered by: {category_filter})")
return [TextContent(type="text", text="".join(result))]
elif name == "get_api_key":
env_vars = manager.load_env()
key_name = arguments["key_name"]
if key_name not in env_vars:
return [TextContent(type="text", text=f"❌ Error: Key '{key_name}' not found")]
category = manager.get_category(key_name)
result = f"🔑 API Key Details\n\n"
result += f"📝 Name: {key_name}\n"
result += f"📁 Category: {category}\n"
result += f"🔐 Value: {env_vars[key_name]}\n"
result += f"⚠️ Security: Handle with care - this is the actual unmasked value!"
return [TextContent(type="text", text=result)]
elif name == "add_api_key":
env_vars = manager.load_env()
key_name = arguments["key_name"]
value = arguments["value"]
# Validate inputs
if not manager.validate_key_name(key_name):
return [TextContent(type="text", text="❌ Error: Invalid key name. Names cannot contain special characters like \\/*?:\"<>|")]
if not manager.validate_key_value(value):
return [TextContent(type="text", text="❌ Error: Invalid key value. Value seems too short, too long, or invalid.")]
exists = key_name in env_vars
env_vars[key_name] = value
manager.save_env(env_vars)
action = "updated" if exists else "added"
result = f"✅ Successfully {action} '{key_name}'\n"
result += f"📁 Category: {manager.get_category(key_name)}\n"
result += f"🔐 Masked Value: {manager.mask_value(value)}"
return [TextContent(type="text", text=result)]
elif name == "delete_api_key":
env_vars = manager.load_env()
key_name = arguments["key_name"]
if key_name not in env_vars:
return [TextContent(type="text", text=f"❌ Error: Key '{key_name}' not found")]
# Create backup before deletion
backup_file = manager.create_backup()
del env_vars[key_name]
manager.save_env(env_vars)
result = f"✅ Successfully deleted '{key_name}'\n"
result += f"💾 Backup created: {Path(backup_file).name}"
return [TextContent(type="text", text=result)]
elif name == "search_api_keys":
env_vars = manager.load_env()
query = arguments["query"].lower()
show_values = arguments.get("show_values", False)
results = []
for key, value in env_vars.items():
category = manager.get_category(key)
# Search in both key name and category
if query in key.lower() or query in category.lower():
display_value = value if show_values else manager.mask_value(value)
results.append(f"📁 {category} | {key}: {display_value}")
if not results:
return [TextContent(type="text", text=f"🔍 No keys found matching '{query}'")]
result = f"🔍 Search Results for '{query}'\n\n"
result += "\n".join(sorted(results))
result += f"\n\n📊 Found {len(results)} matching key(s)"
return [TextContent(type="text", text=result)]
elif name == "list_categories":
env_vars = manager.load_env()
cats = {}
for key in env_vars.keys():
cat = manager.get_category(key)
cats[cat] = cats.get(cat, 0) + 1
result = "📁 Available Categories\n\n"
for cat in sorted(cats.keys()):
result += f"📂 {cat} ({cats[cat]} keys)\n"
result += f"\n📊 Total Categories: {len(cats)}"
return [TextContent(type="text", text=result)]
elif name == "backup_env_file":
backup_file = manager.create_backup()
result = f"✅ Backup created successfully\n"
result += f"📁 Location: {backup_file}\n"
result += f"🕒 Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
return [TextContent(type="text", text=result)]
elif name == "get_stats":
stats = manager.get_stats()
result = f"📊 API Manager Statistics\n\n"
result += f"🔐 Total API Keys: {stats.total_keys}\n"
result += f"📁 Categories: {len(stats.categories)}\n\n"
result += "📈 Category Breakdown:\n"
for cat, count in sorted(stats.categories.items()):
percentage = (count / stats.total_keys * 100) if stats.total_keys > 0 else 0
result += f" 📂 {cat}: {count} keys ({percentage:.1f}%)\n"
result += f"\n📝 Environment File: {manager.env_file}\n"
if stats.last_modified:
result += f"🕒 Last Modified: {stats.last_modified}\n"
if stats.last_backup:
result += f"💾 Last Backup: {stats.last_backup}\n"
return [TextContent(type="text", text=result)]
elif name == "export_keys":
format_type = arguments.get("format", "json")
include_values = arguments.get("include_values", False)
try:
exported_data = manager.export_keys(format_type, include_values)
result = f"📤 API Keys Export ({format_type.upper()})\n\n"
if include_values:
result += "⚠️ WARNING: This export contains actual API key values!\n\n"
else:
result += "🔐 Keys are masked for security\n\n"
result += "```\n"
result += exported_data
result += "\n```"
return [TextContent(type="text", text=result)]
except ValueError as e:
return [TextContent(type="text", text=f"❌ Export Error: {str(e)}")]
elif name == "help":
help_text = f"""
🔐 API Manager MCP Server v{__version__}
{__description__}
📋 Available Commands:
🔍 **Query Commands:**
• list_api_keys - List all API keys with categories
- show_values: Show actual values (default: false)
- category: Filter by specific category
• get_api_key - Get a specific API key by name
- key_name: Name of the key to retrieve
• search_api_keys - Search for API keys by name or category
- query: Search term
- show_values: Show actual values (default: false)
• list_categories - List all categories with counts
📝 **Management Commands:**
• add_api_key - Add or update an API key
- key_name: Name of the API key
- value: The API key value
• delete_api_key - Delete an API key
- key_name: Name of the key to delete
🛠️ **Utility Commands:**
• backup_env_file - Create timestamped backup
• get_stats - Show detailed statistics
• export_keys - Export keys in JSON or ENV format
- format: "json" or "env" (default: json)
- include_values: Include actual values (default: false)
• help - Show this help message
📁 **Supported Categories:**
AI/LLM: Anthropic, OpenAI, Google AI, Groq, Mistral, etc.
Database: Supabase, Neo4j, Qdrant, Pinecone, Airtable, etc.
Communication: Discord, Slack, Twilio, Telegram, etc.
Development: GitHub, Firebase, Vercel, Netlify, etc.
Cloud: AWS, Azure, GCP, Railway, Heroku, etc.
And many more...
🔒 **Security Features:**
• Secure file permissions (600)
• Atomic file operations
• Automatic backups on deletion
• Value masking for display
• Input validation
📂 **File Location:** {manager.env_file}
⚠️ **Security Note:** API keys are sensitive data. Always use show_values=false
for listing/searching unless absolutely necessary.
"""
return [TextContent(type="text", text=help_text)]
else:
return [TextContent(type="text", text=f"❌ Unknown tool: {name}")]
except Exception as e:
logger.error(f"Error in tool call '{name}': {str(e)}", exc_info=True)
return [TextContent(type="text", text=f"❌ Error: {str(e)}")]
async def main():
"""Run the MCP server with enhanced error handling"""
try:
logger.info(f"Starting API Manager MCP Server v{__version__}")
logger.info(f"Environment file: {manager.env_file}")
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)
except KeyboardInterrupt:
logger.info("Server stopped by user")
except Exception as e:
logger.exception("Unexpected error in main")
print(f"Fatal Error: {str(e)}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main())