ElevenLabs MCP Server
- elevenlabs-mcp-server
- src
- elevenlabs_mcp
import aiosqlite
import json
import os
from datetime import datetime
from typing import List, Optional
from .models import AudioJob
def get_database_path() -> str:
"""Get the database path, ensuring it's in the output directory."""
# First try to get the output directory from the server instance
output_dir = os.getenv("ELEVENLABS_OUTPUT_DIR")
if not output_dir:
# Fall back to default output directory in project root
output_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "output")
# Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
return os.path.join(output_dir, "voiceover_history.db")
DATABASE_PATH = get_database_path()
CREATE_VOICES_TABLE = """
CREATE TABLE IF NOT EXISTS voices (
voice_id TEXT PRIMARY KEY,
name TEXT NOT NULL,
category TEXT,
labels TEXT, -- JSON string
description TEXT,
preview_url TEXT,
high_quality_base_model_ids TEXT, -- JSON string
last_updated TEXT NOT NULL
)
"""
CREATE_JOBS_TABLE = """
CREATE TABLE IF NOT EXISTS audio_jobs (
id TEXT PRIMARY KEY,
status TEXT NOT NULL,
script_parts TEXT NOT NULL, -- JSON string
output_file TEXT,
error TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
total_parts INTEGER NOT NULL DEFAULT 1,
completed_parts INTEGER NOT NULL DEFAULT 0
)
"""
class Database:
CACHE_DURATION_SECONDS = 24 * 60 * 60 # 24 hours
def __init__(self, db_path: str = DATABASE_PATH):
self.db_path = db_path
# Ensure output directory exists
os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
async def initialize(self):
"""Initialize database and create tables if they don't exist."""
async with aiosqlite.connect(self.db_path) as db:
# Create tables one at a time
await db.execute(CREATE_VOICES_TABLE)
await db.execute(CREATE_JOBS_TABLE)
await db.commit()
async def insert_job(self, job: AudioJob) -> None:
"""Insert a new audio job into the database."""
async with aiosqlite.connect(self.db_path) as db:
await db.execute(
"""
INSERT INTO audio_jobs
(id, status, script_parts, output_file, error, created_at, updated_at, total_parts, completed_parts)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
job.id,
job.status,
json.dumps(job.script_parts),
job.output_file,
job.error,
job.created_at.isoformat(),
job.updated_at.isoformat(),
job.total_parts,
job.completed_parts
)
)
await db.commit()
async def update_job(self, job: AudioJob) -> None:
"""Update an existing audio job in the database."""
job.updated_at = datetime.utcnow()
async with aiosqlite.connect(self.db_path) as db:
await db.execute(
"""
UPDATE audio_jobs
SET status = ?, script_parts = ?, output_file = ?, error = ?,
updated_at = ?, total_parts = ?, completed_parts = ?
WHERE id = ?
""",
(
job.status,
json.dumps(job.script_parts),
job.output_file,
job.error,
job.updated_at.isoformat(),
job.total_parts,
job.completed_parts,
job.id
)
)
await db.commit()
async def get_job(self, job_id: str) -> Optional[AudioJob]:
"""Get a specific audio job by ID."""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
"SELECT * FROM audio_jobs WHERE id = ?", (job_id,)
) as cursor:
row = await cursor.fetchone()
if row is None:
return None
return AudioJob.from_dict({
"id": row["id"],
"status": row["status"],
"script_parts": json.loads(row["script_parts"]),
"output_file": row["output_file"],
"error": row["error"],
"created_at": row["created_at"],
"updated_at": row["updated_at"],
"total_parts": row["total_parts"],
"completed_parts": row["completed_parts"]
})
async def get_all_jobs(self) -> List[AudioJob]:
"""Get all audio jobs."""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute("SELECT * FROM audio_jobs ORDER BY created_at DESC") as cursor:
rows = await cursor.fetchall()
return [AudioJob.from_dict({
"id": row["id"],
"status": row["status"],
"script_parts": json.loads(row["script_parts"]),
"output_file": row["output_file"],
"error": row["error"],
"created_at": row["created_at"],
"updated_at": row["updated_at"],
"total_parts": row["total_parts"],
"completed_parts": row["completed_parts"]
}) for row in rows]
async def delete_job(self, job_id: str) -> bool:
"""Delete an audio job by ID. Returns True if job was deleted."""
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute("DELETE FROM audio_jobs WHERE id = ?", (job_id,))
deleted = cursor.rowcount > 0
await db.commit()
return deleted
async def cleanup(self) -> None:
"""Delete the database file. Useful for testing."""
if os.path.exists(self.db_path):
os.remove(self.db_path)
async def upsert_voices(self, voices: List[dict]) -> None:
"""Insert or update voice data in the database."""
async with aiosqlite.connect(self.db_path) as db:
now = datetime.utcnow().isoformat()
for voice in voices:
await db.execute(
"""
INSERT INTO voices
(voice_id, name, category, labels, description, preview_url,
high_quality_base_model_ids, last_updated)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(voice_id) DO UPDATE SET
name = excluded.name,
category = excluded.category,
labels = excluded.labels,
description = excluded.description,
preview_url = excluded.preview_url,
high_quality_base_model_ids = excluded.high_quality_base_model_ids,
last_updated = excluded.last_updated
""",
(
voice["voice_id"],
voice["name"],
voice["category"],
json.dumps(voice["labels"]),
voice["description"],
voice["preview_url"],
json.dumps(voice["high_quality_base_model_ids"]),
now
)
)
await db.commit()
async def get_voices(self, max_age_seconds: Optional[int] = None) -> tuple[List[dict], bool]:
"""
Get all voices from the database.
Returns tuple of (voices, needs_refresh) where needs_refresh indicates if cache is stale.
"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute("SELECT * FROM voices ORDER BY name") as cursor:
rows = await cursor.fetchall()
voices = []
needs_refresh = False
if not rows:
needs_refresh = True
else:
max_age = max_age_seconds or self.CACHE_DURATION_SECONDS
now = datetime.utcnow()
for row in rows:
last_updated = datetime.fromisoformat(row["last_updated"])
age = (now - last_updated).total_seconds()
if age > max_age:
needs_refresh = True
voices.append({
"voice_id": row["voice_id"],
"name": row["name"],
"category": row["category"],
"labels": json.loads(row["labels"]),
"description": row["description"],
"preview_url": row["preview_url"],
"high_quality_base_model_ids": json.loads(row["high_quality_base_model_ids"])
})
return voices, needs_refresh