#!/usr/bin/env python3
"""ANSES Ciqual MCP Server with SQL query interface
Provides a Model Context Protocol (MCP) server for querying the ANSES Ciqual
French food composition database using SQL.
"""
from fastmcp import FastMCP
import sqlite3
import os
from pathlib import Path
import sys
import logging
import fcntl
import time
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
mcp = FastMCP("ANSES Ciqual")
DB_PATH = Path.home() / ".ciqual" / "ciqual.db"
@mcp.tool()
async def query(sql: str) -> list[dict]:
"""Execute SQL query on ANSES Ciqual French food composition database.
⚠️ EFFICIENCY: Follow this 2-step workflow to minimize queries!
STEP 1 - SEARCH (one query):
SELECT alim_code, alim_nom_fr FROM foods_fts WHERE foods_fts MATCH 'steak OR boeuf';
Note: FTS uses OR between words. For "steak sauce poivre", search "steak" first.
STEP 2 - GET ALL NUTRIENTS (one query with JOIN):
SELECT f.alim_nom_fr, n.const_nom_fr, c.teneur, n.unit
FROM foods f
JOIN composition c ON f.alim_code = c.alim_code
JOIN nutrients n ON c.const_code = n.const_code
WHERE f.alim_code = <code_from_step1>;
🛑 STOP after finding a matching food! Don't keep searching with different terms.
COMPOUND DISHES (steak + sauce):
- CIQUAL has individual ingredients, not full recipes
- Search each component: "steak" then "sauce poivre"
- Sum the calories (typical portions: meat 150g, sauce 30g)
QUICK CALORIE LOOKUP (const_code 328 = kcal/100g):
SELECT f.alim_nom_fr, c.teneur as kcal_100g
FROM foods f JOIN composition c ON f.alim_code = c.alim_code
WHERE f.alim_code = <code> AND c.const_code = 328;
KEY NUTRIENT CODES:
Energy: 328 (kcal), 327 (kJ)
Macros: 25000 (protein), 31000 (carbs), 40000 (fat), 34100 (fiber), 32000 (sugars)
Minerals: 10110 (sodium), 10200 (calcium), 10260 (iron), 10190 (potassium), 10120 (magnesium)
Vitamins: 55100 (vit C), 52100 (vit D), 56600 (vit B12), 53100 (vit E), 56700 (folates)
SCHEMA:
- foods: alim_code (PK), alim_nom_fr, alim_nom_eng, alim_grp_code
- nutrients: const_code (PK), const_nom_fr, const_nom_eng, unit
- composition: alim_code, const_code, teneur (value per 100g), code_confiance
- food_groups: grp_code, grp_nom_fr, grp_nom_eng
- foods_fts: FTS5 virtual table for full-text search (alim_code, alim_nom_fr, alim_nom_eng)
"""
# Ensure database exists
if not DB_PATH.exists():
logger.warning("Database not found at %s", DB_PATH)
return [{"error": "Database not initialized. Please run the server first to download data."}]
# Validate SQL query (basic safety check)
sql_lower = sql.strip().lower()
if not sql_lower.startswith(('select', 'with')):
return [{"error": "Only SELECT queries are allowed for safety."}]
# Connect with read-only mode
try:
logger.debug("Executing query: %s", sql[:100] + '...' if len(sql) > 100 else sql)
conn = sqlite3.connect(f"file:{DB_PATH}?mode=ro", uri=True)
conn.row_factory = sqlite3.Row
# Execute query with timeout
conn.execute("PRAGMA query_only = ON")
conn.execute("PRAGMA temp_store = MEMORY")
cursor = conn.execute(sql)
results = [dict(row) for row in cursor.fetchall()]
logger.debug("Query returned %d rows", len(results))
return results
except sqlite3.OperationalError as e:
logger.error("SQL operational error: %s", e)
if "no such table" in str(e):
return [{"error": f"Table not found. Available tables: foods, nutrients, composition, foods_fts, food_groups"}]
elif "read-only" in str(e) or "readonly" in str(e):
return [{"error": "Database is read-only. Only SELECT queries are allowed."}]
else:
return [{"error": f"SQL error: {str(e)}"}]
except sqlite3.Error as e:
logger.error("Database error: %s", e)
return [{"error": f"Database error: {str(e)}"}]
except Exception as e:
logger.error("Unexpected error: %s", e)
return [{"error": f"Unexpected error: {str(e)}"}]
finally:
if 'conn' in locals():
conn.close()
def main():
"""Main entry point for the MCP server
Initializes the database if needed and starts the MCP server.
"""
# Check and update database if needed
from data_loader import initialize_database
import sqlite3
def repair_fts5_if_corrupted():
"""Repair FTS5 index if corrupted (e.g., from concurrent writes)
Uses file locking to prevent multiple instances from repairing simultaneously.
"""
lock_file = DB_PATH.parent / ".ciqual.lock"
lock_fd = None
try:
conn = sqlite3.connect(DB_PATH)
# Test FTS5 integrity
conn.execute("SELECT * FROM foods_fts LIMIT 1").fetchone()
conn.close()
return False # No corruption
except sqlite3.OperationalError as e:
if "fts5" in str(e).lower() or "missing row" in str(e).lower():
logger.warning("FTS5 corruption detected, acquiring lock for repair...")
print("FTS5 corruption detected, acquiring lock for repair...", file=sys.stderr)
# Acquire exclusive lock
try:
lock_fd = open(lock_file, 'w')
fcntl.flock(lock_fd.fileno(), fcntl.LOCK_EX)
logger.info("Lock acquired, rebuilding FTS5 index...")
print("Lock acquired, rebuilding FTS5 index...", file=sys.stderr)
# Double-check corruption still exists (another instance might have fixed it)
try:
conn = sqlite3.connect(DB_PATH)
conn.execute("SELECT * FROM foods_fts LIMIT 1").fetchone()
conn.close()
logger.info("FTS5 already repaired by another instance")
print("FTS5 already repaired by another instance", file=sys.stderr)
return False
except sqlite3.OperationalError:
pass # Still corrupted, proceed with repair
# Rebuild FTS5 index
conn = sqlite3.connect(DB_PATH)
conn.execute("DELETE FROM foods_fts")
conn.execute("INSERT INTO foods_fts SELECT alim_code, alim_nom_fr, alim_nom_eng FROM foods")
conn.commit()
conn.close()
logger.info("FTS5 index rebuilt successfully")
print("FTS5 index rebuilt successfully", file=sys.stderr)
return True
except Exception as repair_error:
logger.error("Failed to repair FTS5 index: %s", repair_error)
print(f"Failed to repair FTS5 index: {repair_error}", file=sys.stderr)
return False
finally:
# Release lock
if lock_fd:
fcntl.flock(lock_fd.fileno(), fcntl.LOCK_UN)
lock_fd.close()
try:
lock_file.unlink()
except:
pass
raise # Re-raise if not FTS5-related
try:
if not DB_PATH.exists():
logger.info("First run: Downloading Ciqual database...")
print("First run: Downloading Ciqual database...", file=sys.stderr)
initialize_database()
logger.info("Database initialized successfully!")
print("Database initialized successfully!", file=sys.stderr)
else:
# Disabled auto-update to prevent unnecessary rebuilds
# Database is static (ANSES data from 2020, no new updates expected)
# Auto-repair FTS5 index if corrupted (e.g., from concurrent writes)
repair_fts5_if_corrupted()
except Exception as e:
if not DB_PATH.exists():
logger.error("Failed to initialize database: %s", e)
print(f"Failed to initialize database: {e}", file=sys.stderr)
sys.exit(1)
else:
logger.warning("Failed to update database, using existing version: %s", e)
print(f"Failed to update database, using existing version: {e}", file=sys.stderr)
logger.info("Starting Ciqual MCP server")
print("Ciqual MCP server running", flush=True)
try:
mcp.run()
except KeyboardInterrupt:
logger.info("Shutting down Ciqual MCP server")
print("\nShutting down Ciqual MCP server", file=sys.stderr)
sys.exit(0)
except Exception as e:
logger.error("Server error: %s", e)
print(f"Server error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()