from .utils import catch_error_and_return
from sqlalchemy import create_engine, text, inspect
from dotenv import load_dotenv
from fastmcp import FastMCP
import os
load_dotenv()
postgres_mcp = FastMCP(name="PostgreSQL server")
def create_postgres_connection():
user = os.getenv("POSTGRES_USER")
password = os.getenv("POSTGRES_PASSWORD")
host_port = os.getenv("POSTGRES_HOST")
database = os.getenv("POSTGRES_DB")
DATABASE_URL = f"postgresql+psycopg2://{user}:{password}@{host_port}/{database}"
engine = create_engine(DATABASE_URL)
return engine
@postgres_mcp.tool
@catch_error_and_return
def postgres_execute_query(sql_query: str) -> str:
"""
Execute SQL query
:param sql_query: SQL query
:return: result of execute
"""
engine = create_postgres_connection()
with engine.connect() as connection:
result = connection.execute(text(sql_query))
output = ""
for row in result.fetchall():
output += str(row)
return str(result)
@postgres_mcp.tool
@catch_error_and_return
def postgres_get_schema() -> str:
"""
Get schema DB
:return: Schema DB
"""
engine = create_postgres_connection()
inspector = inspect(engine)
output_text = ""
for table_name in inspector.get_table_names():
output_text += f"\nTable: {table_name}\n"
columns = inspector.get_columns(table_name)
for col in columns:
output_text += f" Column: {col['name']}, Тип: {col['type']}\n"
indexes = inspector.get_indexes(table_name)
for idx in indexes:
output_text += f" Index: {idx['name']}, Колонки: {idx['column_names']}\n"
fks = inspector.get_foreign_keys(table_name)
for fk in fks:
output_text += f" FK: {fk['constrained_columns']} -> {fk['referred_table']}.{fk['referred_columns']}\n"
return output_text
@postgres_mcp.tool
@catch_error_and_return
def postgres_create_table(sql_create_table_query: str) -> str:
"""
Create table on DB
:param sql_create_table_query: SQL query for create table
:return:
"""
engine = create_postgres_connection()
with engine.connect() as connection:
connection.execute(text(sql_create_table_query))
connection.commit()
result = "The table creating success!"
return result
@postgres_mcp.tool
@catch_error_and_return
def postgres_insert_data(sql_insert_data_query: str) -> str:
"""
Insert data to DB
:param sql_insert_data_query: SQL query for insert table
:return:
"""
engine = create_postgres_connection()
with engine.connect() as connection:
connection.execute(text(sql_insert_data_query))
connection.commit()
result = "The inserting data success!"
return result
@postgres_mcp.tool
def health_check():
"""
health check for server
:return: server status
"""
engine = create_postgres_connection()
try:
with engine.connect() as connection:
result = connection.execute(text("SELECT 1")).scalar()
if result == 1:
return {"status": "ok"}
else:
return {"status": "Error"}
except Exception as e:
return {"status": f"Error {e}"}