movies_database_zh.py•7.55 kB
"""
MySQL-MCP 示例:电影数据库
本示例展示如何将 MySQL-MCP 与电影数据库一起使用。
包含创建示例数据库的设置说明。
设置步骤:
1. 确保 MySQL 正在运行
2. 执行 setup_movies_db.sql 中的 SQL 脚本创建示例数据库
3. 使用您的 MySQL 凭据更新 .env 文件
4. 运行此示例:
fastmcp dev examples/movies_database_zh.py
"""
from fastmcp import FastMCP, Context
import mysql.connector
from mysql.connector import Error
import os
from dotenv import load_dotenv
from typing import Dict, Any, List, Optional
# 加载环境变量
load_dotenv()
# 初始化 FastMCP 应用
mcp = FastMCP(
"电影数据库",
description="探索和查询电影数据库",
dependencies=["mysql-connector-python", "python-dotenv"]
)
# 数据库连接配置
DB_CONFIG = {
"host": os.getenv("MYSQL_HOST", "localhost"),
"port": int(os.getenv("MYSQL_PORT", "3306")),
"user": os.getenv("MYSQL_USER", "root"),
"password": os.getenv("MYSQL_PASSWORD", ""),
"database": "movies" # 本示例使用固定数据库
}
def get_connection():
"""创建 MySQL 连接"""
try:
conn = mysql.connector.connect(**DB_CONFIG)
return conn
except Error as e:
raise Exception(f"数据库连接错误: {e}")
@mcp.tool()
def search_movies(title_keyword: str = "", genre: Optional[str] = None, min_rating: Optional[float] = None) -> Dict[str, Any]:
"""按标题关键词、类型和/或最低评分搜索电影"""
conn = get_connection()
cursor = conn.cursor(dictionary=True)
# 使用参数构建查询
query = "SELECT m.id, m.title, m.release_year, m.rating, GROUP_CONCAT(g.name) as genres "
query += "FROM movies m "
query += "LEFT JOIN movie_genres mg ON m.id = mg.movie_id "
query += "LEFT JOIN genres g ON mg.genre_id = g.id "
query += "WHERE 1=1 "
params = []
if title_keyword:
query += "AND m.title LIKE %s "
params.append(f"%{title_keyword}%")
if genre:
query += "AND g.name = %s "
params.append(genre)
if min_rating is not None:
query += "AND m.rating >= %s "
params.append(min_rating)
query += "GROUP BY m.id "
query += "ORDER BY m.rating DESC "
query += "LIMIT 20"
try:
cursor.execute(query, params)
results = cursor.fetchall()
return {
"movies": results,
"count": len(results)
}
except Error as e:
raise Exception(f"搜索错误: {e}")
finally:
cursor.close()
conn.close()
@mcp.tool()
def get_movie_details(movie_id: int) -> Dict[str, Any]:
"""通过ID获取电影的详细信息"""
conn = get_connection()
movie_cursor = conn.cursor(dictionary=True)
cast_cursor = conn.cursor(dictionary=True)
try:
# 获取电影详情
movie_cursor.execute("""
SELECT m.*, GROUP_CONCAT(DISTINCT g.name) as genres
FROM movies m
LEFT JOIN movie_genres mg ON m.id = mg.movie_id
LEFT JOIN genres g ON mg.genre_id = g.id
WHERE m.id = %s
GROUP BY m.id
""", (movie_id,))
movie = movie_cursor.fetchone()
if not movie:
raise ValueError(f"未找到ID为{movie_id}的电影")
# 获取演员信息
cast_cursor.execute("""
SELECT a.name, c.character_name, c.role
FROM movie_cast c
JOIN actors a ON c.actor_id = a.id
WHERE c.movie_id = %s
ORDER BY c.role
""", (movie_id,))
cast = cast_cursor.fetchall()
return {
"movie": movie,
"cast": cast
}
except Error as e:
raise Exception(f"查询错误: {e}")
finally:
movie_cursor.close()
cast_cursor.close()
conn.close()
@mcp.tool()
def list_genres() -> Dict[str, Any]:
"""列出数据库中所有电影类型"""
conn = get_connection()
cursor = conn.cursor(dictionary=True)
try:
cursor.execute("""
SELECT g.name, COUNT(mg.movie_id) as movie_count
FROM genres g
LEFT JOIN movie_genres mg ON g.id = mg.genre_id
GROUP BY g.id
ORDER BY movie_count DESC
""")
genres = cursor.fetchall()
return {
"genres": genres
}
except Error as e:
raise Exception(f"查询错误: {e}")
finally:
cursor.close()
conn.close()
@mcp.tool()
def top_rated_movies(limit: int = 10, genre: Optional[str] = None) -> Dict[str, Any]:
"""获取评分最高的电影,可按类型筛选"""
conn = get_connection()
cursor = conn.cursor(dictionary=True)
# 安全限制
safe_limit = min(limit, 50)
try:
query = """
SELECT m.id, m.title, m.release_year, m.rating, GROUP_CONCAT(g.name) as genres
FROM movies m
LEFT JOIN movie_genres mg ON m.id = mg.movie_id
LEFT JOIN genres g ON mg.genre_id = g.id
"""
params = []
if genre:
query += "WHERE g.name = %s "
params.append(genre)
query += """
GROUP BY m.id
HAVING m.rating IS NOT NULL
ORDER BY m.rating DESC
LIMIT %s
"""
params.append(safe_limit)
cursor.execute(query, params)
movies = cursor.fetchall()
return {
"movies": movies,
"count": len(movies)
}
except Error as e:
raise Exception(f"查询错误: {e}")
finally:
cursor.close()
conn.close()
@mcp.resource("schema://movies")
def get_movies_schema() -> str:
"""获取电影数据库的数据库架构"""
conn = get_connection()
cursor = conn.cursor()
schema = []
try:
# 获取所有表
cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
# 获取每个表的CREATE TABLE语句
for table in tables:
cursor.execute(f"SHOW CREATE TABLE {table}")
create_stmt = cursor.fetchone()[1]
schema.append(create_stmt)
return "\n\n".join(schema)
except Error as e:
raise Exception(f"获取架构错误: {e}")
finally:
cursor.close()
conn.close()
@mcp.prompt()
def movie_recommendation() -> str:
"""基于用户偏好推荐电影"""
return """我想让您推荐一些我可能喜欢的电影。
在提供推荐之前:
1. 使用list_genres工具查看有哪些电影类型可用
2. 使用schema://movies资源了解数据库结构
3. 使用search_movies工具查找符合我条件的电影
4. 使用get_movie_details工具获取特定电影的更多信息
然后提供个性化推荐,简要解释您选择每部电影的原因。
"""
@mcp.prompt()
def analyze_movie_trends() -> str:
"""帮助Claude分析电影数据库中的趋势"""
return """我想让您分析电影数据库中的趋势。
一些值得探索的有趣方面:
1. 各类型电影评分如何变化
2. 某些演员是否更频繁地出现在高评分电影中
3. 电影发行是否随时间有变化趋势
4. 不同时期哪些类型最受欢迎
使用可用工具查询数据库,提供由数据支持的有趣见解。
"""
if __name__ == "__main__":
# 直接运行服务器
mcp.run()