Skip to main content
Glama

MCP Memory Service

#!/usr/bin/env python3 """ Automated Cloudflare resource setup for MCP Memory Service. This script creates the required Cloudflare resources using the HTTP API. """ import os import sys import asyncio import json import logging from typing import Dict, Any, Optional import httpx logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class CloudflareSetup: def __init__(self, api_token: str, account_id: str): self.api_token = api_token self.account_id = account_id self.base_url = f"https://api.cloudflare.com/client/v4/accounts/{account_id}" self.client = None async def _get_client(self) -> httpx.AsyncClient: if self.client is None: headers = { "Authorization": f"Bearer {self.api_token}", "Content-Type": "application/json" } self.client = httpx.AsyncClient(headers=headers, timeout=30.0) return self.client async def _make_request(self, method: str, url: str, **kwargs) -> Dict[str, Any]: """Make authenticated request to Cloudflare API.""" client = await self._get_client() response = await client.request(method, url, **kwargs) if response.status_code not in [200, 201]: logger.error(f"API request failed: {response.status_code} {response.text}") response.raise_for_status() return response.json() async def create_vectorize_index(self, name: str = "mcp-memory-index") -> str: """Create Vectorize index and return its ID.""" logger.info(f"Creating Vectorize index: {name}") # Check if index already exists try: url = f"{self.base_url}/vectorize/indexes/{name}" result = await self._make_request("GET", url) if result.get("success"): logger.info(f"Vectorize index {name} already exists") return name except httpx.HTTPStatusError as e: if e.response.status_code != 404: raise # Create new index url = f"{self.base_url}/vectorize/indexes" payload = { "name": name, "config": { "dimensions": 768, "metric": "cosine" } } result = await self._make_request("POST", url, json=payload) if result.get("success"): logger.info(f"✅ Created Vectorize index: {name}") return name else: raise ValueError(f"Failed to create Vectorize index: {result}") async def create_d1_database(self, name: str = "mcp-memory-db") -> str: """Create D1 database and return its ID.""" logger.info(f"Creating D1 database: {name}") # List existing databases to check if it exists url = f"{self.base_url}/d1/database" result = await self._make_request("GET", url) if result.get("success"): for db in result.get("result", []): if db.get("name") == name: db_id = db.get("uuid") logger.info(f"D1 database {name} already exists with ID: {db_id}") return db_id # Create new database payload = {"name": name} result = await self._make_request("POST", url, json=payload) if result.get("success"): db_id = result["result"]["uuid"] logger.info(f"✅ Created D1 database: {name} (ID: {db_id})") return db_id else: raise ValueError(f"Failed to create D1 database: {result}") async def create_r2_bucket(self, name: str = "mcp-memory-content") -> str: """Create R2 bucket and return its name.""" logger.info(f"Creating R2 bucket: {name}") # Check if bucket already exists try: url = f"{self.base_url}/r2/buckets/{name}" result = await self._make_request("GET", url) if result.get("success"): logger.info(f"R2 bucket {name} already exists") return name except httpx.HTTPStatusError as e: if e.response.status_code != 404: raise # Create new bucket url = f"{self.base_url}/r2/buckets" payload = {"name": name} result = await self._make_request("POST", url, json=payload) if result.get("success"): logger.info(f"✅ Created R2 bucket: {name}") return name else: raise ValueError(f"Failed to create R2 bucket: {result}") async def verify_workers_ai_access(self) -> bool: """Verify Workers AI access and embedding model.""" logger.info("Verifying Workers AI access...") # Test embedding generation url = f"{self.base_url}/ai/run/@cf/baai/bge-base-en-v1.5" payload = {"text": ["test embedding"]} try: result = await self._make_request("POST", url, json=payload) if result.get("success"): logger.info("✅ Workers AI access verified") return True else: logger.warning(f"Workers AI test failed: {result}") return False except Exception as e: logger.warning(f"Workers AI verification failed: {e}") return False async def close(self): """Close HTTP client.""" if self.client: await self.client.aclose() async def main(): """Main setup routine.""" print("🚀 Cloudflare Backend Setup for MCP Memory Service") print("=" * 55) # Check for required environment variables api_token = os.getenv("CLOUDFLARE_API_TOKEN") account_id = os.getenv("CLOUDFLARE_ACCOUNT_ID") if not api_token: print("❌ CLOUDFLARE_API_TOKEN environment variable not set") print("Please create an API token at: https://dash.cloudflare.com/profile/api-tokens") print("Required permissions: Vectorize:Edit, D1:Edit, Workers AI:Edit, R2:Edit") return False if not account_id: print("❌ CLOUDFLARE_ACCOUNT_ID environment variable not set") print("You can find your account ID in the Cloudflare dashboard sidebar") return False setup = CloudflareSetup(api_token, account_id) try: # Create resources vectorize_index = await setup.create_vectorize_index() d1_database_id = await setup.create_d1_database() # R2 bucket is optional r2_bucket = None create_r2 = input("\n🪣 Create R2 bucket for large content storage? (y/N): ").lower().strip() if create_r2 in ['y', 'yes']: try: r2_bucket = await setup.create_r2_bucket() except Exception as e: logger.warning(f"Failed to create R2 bucket: {e}") logger.warning("Continuing without R2 storage...") # Verify Workers AI ai_available = await setup.verify_workers_ai_access() print("\n🎉 Setup Complete!") print("=" * 20) print(f"Vectorize Index: {vectorize_index}") print(f"D1 Database ID: {d1_database_id}") print(f"R2 Bucket: {r2_bucket or 'Not configured'}") print(f"Workers AI: {'Available' if ai_available else 'Limited access'}") print("\n📝 Environment Variables:") print("=" * 25) print(f"export CLOUDFLARE_API_TOKEN=\"{api_token[:10]}...\"") print(f"export CLOUDFLARE_ACCOUNT_ID=\"{account_id}\"") print(f"export CLOUDFLARE_VECTORIZE_INDEX=\"{vectorize_index}\"") print(f"export CLOUDFLARE_D1_DATABASE_ID=\"{d1_database_id}\"") if r2_bucket: print(f"export CLOUDFLARE_R2_BUCKET=\"{r2_bucket}\"") print("export MCP_MEMORY_STORAGE_BACKEND=\"cloudflare\"") print("\n🧪 Test the setup:") print("python test_cloudflare_backend.py") return True except Exception as e: logger.error(f"Setup failed: {e}") return False finally: await setup.close() if __name__ == "__main__": success = asyncio.run(main()) sys.exit(0 if success else 1)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/doobidoo/mcp-memory-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server