import asyncio
import logging
import os
import sys
from mysql.connector import connect, Error
from mcp.server import Server
from mcp.types import Resource, Tool, TextContent
from pydantic import AnyUrl
# 로깅 설정
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("mysql_mcp_server")
def get_db_config():
"""환경 변수에서 데이터베이스 설정을 가져옵니다."""
config = {
"host": os.getenv("MYSQL_HOST", "localhost"),
"port": int(os.getenv("MYSQL_PORT", "3306")),
"user": os.getenv("MYSQL_USER"),
"password": os.getenv("MYSQL_PASSWORD"),
"database": os.getenv("MYSQL_DATABASE")
}
if not all([config["user"], config["password"], config["database"]]):
logger.error("필수 데이터베이스 설정이 누락되었습니다. 환경 변수를 확인해주세요:")
logger.error("MYSQL_USER, MYSQL_PASSWORD, MYSQL_DATABASE가 필요합니다")
raise ValueError("필수 데이터베이스 설정이 누락되었습니다")
return config
# 서버 초기화
app = Server("mysql_mcp_server")
@app.list_resources()
async def list_resources() -> list[Resource]:
"""MySQL 테이블을 리소스로 나열합니다."""
config = get_db_config()
try:
with connect(**config) as conn:
with conn.cursor() as cursor:
cursor.execute("SHOW TABLES")
tables = cursor.fetchall()
logger.info(f"발견된 테이블: {tables}")
resources = []
for table in tables:
resources.append(
Resource(
uri=f"mysql://{table[0]}/data",
name=f"테이블: {table[0]}",
mimeType="text/plain",
description=f"테이블 데이터: {table[0]}"
)
)
return resources
except Error as e:
logger.error(f"리소스 목록 조회 실패: {str(e)}")
return []
@app.read_resource()
async def read_resource(uri: AnyUrl) -> str:
"""테이블 내용을 읽습니다."""
config = get_db_config()
uri_str = str(uri)
logger.info(f"리소스 읽기: {uri_str}")
if not uri_str.startswith("mysql://"):
raise ValueError(f"잘못된 URI 형식: {uri_str}")
parts = uri_str[8:].split('/')
table = parts[0]
try:
with connect(**config) as conn:
with conn.cursor() as cursor:
cursor.execute(f"SELECT * FROM {table} LIMIT 100")
columns = [desc[0] for desc in cursor.description]
rows = cursor.fetchall()
result = [",".join(map(str, row)) for row in rows]
return "\n".join([",".join(columns)] + result)
except Error as e:
logger.error(f"리소스 {uri} 읽기 중 데이터베이스 오류: {str(e)}")
raise RuntimeError(f"데이터베이스 오류: {str(e)}")
@app.list_tools()
async def list_tools() -> list[Tool]:
"""사용 가능한 MySQL 도구를 나열합니다."""
logger.info("도구 목록 조회 중...")
return [
Tool(
name="execute_sql",
description="MySQL 서버에서 SQL 쿼리를 실행합니다",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "실행할 SQL 쿼리"
}
},
"required": ["query"]
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
"""SQL 명령을 실행합니다."""
config = get_db_config()
logger.info(f"도구 호출: {name}, 인자: {arguments}")
if name != "execute_sql":
raise ValueError(f"알 수 없는 도구: {name}")
query = arguments.get("query")
if not query:
raise ValueError("쿼리가 필요합니다")
try:
with connect(**config) as conn:
with conn.cursor() as cursor:
cursor.execute(query)
# SHOW TABLES 명령어 특별 처리
if query.strip().upper().startswith("SHOW TABLES"):
tables = cursor.fetchall()
result = ["Tables_in_" + config["database"]] # 헤더
result.extend([table[0] for table in tables])
return [TextContent(type="text", text="\n".join(result))]
# 결과 집합을 반환하는 다른 쿼리 처리 (SELECT, SHOW, DESCRIBE 등)
elif cursor.description is not None:
columns = [desc[0] for desc in cursor.description]
try:
rows = cursor.fetchall()
result = [",".join(map(str, row)) for row in rows]
return [TextContent(type="text", text="\n".join([",".join(columns)] + result))]
except Error as e:
logger.warning(f"결과 가져오기 오류: {str(e)}")
return [TextContent(type="text", text=f"쿼리 실행됨, 결과 가져오기 오류: {str(e)}")]
# SELECT가 아닌 쿼리
else:
conn.commit()
return [TextContent(type="text", text=f"쿼리 실행 성공. 영향받은 행: {cursor.rowcount}")]
except Error as e:
logger.error(f"SQL '{query}' 실행 오류: {e}")
return [TextContent(type="text", text=f"쿼리 실행 오류: {str(e)}")]
async def main():
"""MCP 서버 실행을 위한 메인 진입점."""
from mcp.server.stdio import stdio_server
# 추가 디버그 출력
print("MySQL MCP 서버 시작, 설정:", file=sys.stderr)
config = get_db_config()
print(f"호스트: {config['host']}", file=sys.stderr)
print(f"포트: {config['port']}", file=sys.stderr)
print(f"사용자: {config['user']}", file=sys.stderr)
print(f"데이터베이스: {config['database']}", file=sys.stderr)
logger.info("MySQL MCP 서버 시작 중...")
logger.info(f"데이터베이스 설정: {config['host']}/{config['database']} 사용자: {config['user']}")
async with stdio_server() as (read_stream, write_stream):
try:
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)
except Exception as e:
logger.error(f"서버 오류: {str(e)}", exc_info=True)
raise
if __name__ == "__main__":
asyncio.run(main())