Skip to main content
Glama

ElevenLabs MCP Server

database.py9.47 kB
import aiosqlite import json import os from datetime import datetime from typing import List, Optional from .models import AudioJob def get_database_path() -> str: """Get the database path, ensuring it's in the output directory.""" # First try to get the output directory from the server instance output_dir = os.getenv("ELEVENLABS_OUTPUT_DIR") if not output_dir: # Fall back to default output directory in project root output_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "output") # Ensure output directory exists os.makedirs(output_dir, exist_ok=True) return os.path.join(output_dir, "voiceover_history.db") DATABASE_PATH = get_database_path() CREATE_VOICES_TABLE = """ CREATE TABLE IF NOT EXISTS voices ( voice_id TEXT PRIMARY KEY, name TEXT NOT NULL, category TEXT, labels TEXT, -- JSON string description TEXT, preview_url TEXT, high_quality_base_model_ids TEXT, -- JSON string last_updated TEXT NOT NULL ) """ CREATE_JOBS_TABLE = """ CREATE TABLE IF NOT EXISTS audio_jobs ( id TEXT PRIMARY KEY, status TEXT NOT NULL, script_parts TEXT NOT NULL, -- JSON string output_file TEXT, error TEXT, created_at TEXT NOT NULL, updated_at TEXT NOT NULL, total_parts INTEGER NOT NULL DEFAULT 1, completed_parts INTEGER NOT NULL DEFAULT 0 ) """ class Database: CACHE_DURATION_SECONDS = 24 * 60 * 60 # 24 hours def __init__(self, db_path: str = DATABASE_PATH): self.db_path = db_path # Ensure output directory exists os.makedirs(os.path.dirname(self.db_path), exist_ok=True) async def initialize(self): """Initialize database and create tables if they don't exist.""" async with aiosqlite.connect(self.db_path) as db: # Create tables one at a time await db.execute(CREATE_VOICES_TABLE) await db.execute(CREATE_JOBS_TABLE) await db.commit() async def insert_job(self, job: AudioJob) -> None: """Insert a new audio job into the database.""" async with aiosqlite.connect(self.db_path) as db: await db.execute( """ INSERT INTO audio_jobs (id, status, script_parts, output_file, error, created_at, updated_at, total_parts, completed_parts) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( job.id, job.status, json.dumps(job.script_parts), job.output_file, job.error, job.created_at.isoformat(), job.updated_at.isoformat(), job.total_parts, job.completed_parts ) ) await db.commit() async def update_job(self, job: AudioJob) -> None: """Update an existing audio job in the database.""" job.updated_at = datetime.utcnow() async with aiosqlite.connect(self.db_path) as db: await db.execute( """ UPDATE audio_jobs SET status = ?, script_parts = ?, output_file = ?, error = ?, updated_at = ?, total_parts = ?, completed_parts = ? WHERE id = ? """, ( job.status, json.dumps(job.script_parts), job.output_file, job.error, job.updated_at.isoformat(), job.total_parts, job.completed_parts, job.id ) ) await db.commit() async def get_job(self, job_id: str) -> Optional[AudioJob]: """Get a specific audio job by ID.""" async with aiosqlite.connect(self.db_path) as db: db.row_factory = aiosqlite.Row async with db.execute( "SELECT * FROM audio_jobs WHERE id = ?", (job_id,) ) as cursor: row = await cursor.fetchone() if row is None: return None return AudioJob.from_dict({ "id": row["id"], "status": row["status"], "script_parts": json.loads(row["script_parts"]), "output_file": row["output_file"], "error": row["error"], "created_at": row["created_at"], "updated_at": row["updated_at"], "total_parts": row["total_parts"], "completed_parts": row["completed_parts"] }) async def get_all_jobs(self) -> List[AudioJob]: """Get all audio jobs.""" async with aiosqlite.connect(self.db_path) as db: db.row_factory = aiosqlite.Row async with db.execute("SELECT * FROM audio_jobs ORDER BY created_at DESC") as cursor: rows = await cursor.fetchall() return [AudioJob.from_dict({ "id": row["id"], "status": row["status"], "script_parts": json.loads(row["script_parts"]), "output_file": row["output_file"], "error": row["error"], "created_at": row["created_at"], "updated_at": row["updated_at"], "total_parts": row["total_parts"], "completed_parts": row["completed_parts"] }) for row in rows] async def delete_job(self, job_id: str) -> bool: """Delete an audio job by ID. Returns True if job was deleted.""" async with aiosqlite.connect(self.db_path) as db: cursor = await db.execute("DELETE FROM audio_jobs WHERE id = ?", (job_id,)) deleted = cursor.rowcount > 0 await db.commit() return deleted async def cleanup(self) -> None: """Delete the database file. Useful for testing.""" if os.path.exists(self.db_path): os.remove(self.db_path) async def upsert_voices(self, voices: List[dict]) -> None: """Insert or update voice data in the database.""" async with aiosqlite.connect(self.db_path) as db: now = datetime.utcnow().isoformat() for voice in voices: await db.execute( """ INSERT INTO voices (voice_id, name, category, labels, description, preview_url, high_quality_base_model_ids, last_updated) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(voice_id) DO UPDATE SET name = excluded.name, category = excluded.category, labels = excluded.labels, description = excluded.description, preview_url = excluded.preview_url, high_quality_base_model_ids = excluded.high_quality_base_model_ids, last_updated = excluded.last_updated """, ( voice["voice_id"], voice["name"], voice["category"], json.dumps(voice["labels"]), voice["description"], voice["preview_url"], json.dumps(voice["high_quality_base_model_ids"]), now ) ) await db.commit() async def get_voices(self, max_age_seconds: Optional[int] = None) -> tuple[List[dict], bool]: """ Get all voices from the database. Returns tuple of (voices, needs_refresh) where needs_refresh indicates if cache is stale. """ async with aiosqlite.connect(self.db_path) as db: db.row_factory = aiosqlite.Row async with db.execute("SELECT * FROM voices ORDER BY name") as cursor: rows = await cursor.fetchall() voices = [] needs_refresh = False if not rows: needs_refresh = True else: max_age = max_age_seconds or self.CACHE_DURATION_SECONDS now = datetime.utcnow() for row in rows: last_updated = datetime.fromisoformat(row["last_updated"]) age = (now - last_updated).total_seconds() if age > max_age: needs_refresh = True voices.append({ "voice_id": row["voice_id"], "name": row["name"], "category": row["category"], "labels": json.loads(row["labels"]), "description": row["description"], "preview_url": row["preview_url"], "high_quality_base_model_ids": json.loads(row["high_quality_base_model_ids"]) }) return voices, needs_refresh

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/mamertofabian/elevenlabs-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server