#!/usr/bin/env python3
"""
Azure SQL Database MCP Server
This server provides tools to interact with Azure SQL databases, including:
- Query execution
- Schema inspection
- Table data retrieval
- Database information
"""
import os
import json
import logging
import urllib.parse
from typing import Optional, List, Dict, Any
from enum import Enum
from contextlib import asynccontextmanager
import pyodbc
from mcp.server.fastmcp import FastMCP
from pydantic import BaseModel, Field, field_validator, ConfigDict
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Database configuration from environment variables
DB_SERVER = os.getenv("AZURE_SQL_SERVER", "")
DB_NAME = os.getenv("AZURE_SQL_DATABASE", "")
DB_USERNAME = os.getenv("AZURE_SQL_USERNAME", "")
DB_PASSWORD = os.getenv("AZURE_SQL_PASSWORD", "")
DB_DRIVER = os.getenv("AZURE_SQL_DRIVER", "ODBC Driver 18 for SQL Server")
# Connection pool (simple approach - single connection for now)
_db_connection = None
def get_connection_string() -> str:
"""Build Azure SQL connection string."""
return (
f"Driver={{{DB_DRIVER}}};"
f"Server=tcp:{DB_SERVER},1433;"
f"Database={DB_NAME};"
f"Uid={DB_USERNAME};"
f"Pwd={DB_PASSWORD};"
f"Encrypt=yes;"
f"TrustServerCertificate=no;"
f"Connection Timeout=30;"
)
def get_db_connection():
"""Get or create database connection."""
global _db_connection
if _db_connection is None or not _db_connection:
try:
connection_string = get_connection_string()
_db_connection = pyodbc.connect(connection_string)
logger.info("Database connection established")
except Exception as e:
logger.error(f"Failed to connect to database: {e}")
raise
return _db_connection
def execute_query(query: str, params: Optional[tuple] = None) -> List[Dict[str, Any]]:
"""
Execute a SQL query and return results as list of dictionaries.
Args:
query: SQL query to execute
params: Optional query parameters
Returns:
List of dictionaries representing rows
"""
conn = get_db_connection()
cursor = conn.cursor()
try:
if params:
cursor.execute(query, params)
else:
cursor.execute(query)
# For SELECT queries, fetch results
if cursor.description:
columns = [column[0] for column in cursor.description]
results = []
for row in cursor.fetchall():
results.append(dict(zip(columns, row)))
return results
else:
# For INSERT/UPDATE/DELETE, return affected rows
conn.commit()
return [{"rows_affected": cursor.rowcount}]
except Exception as e:
conn.rollback()
raise e
finally:
cursor.close()
def _handle_db_error(e: Exception) -> str:
"""Consistent error formatting for database operations."""
if isinstance(e, pyodbc.ProgrammingError):
return f"Error: SQL syntax error - {str(e)}"
elif isinstance(e, pyodbc.IntegrityError):
return f"Error: Data integrity violation - {str(e)}"
elif isinstance(e, pyodbc.OperationalError):
return f"Error: Database connection issue - {str(e)}"
elif isinstance(e, pyodbc.DataError):
return f"Error: Invalid data - {str(e)}"
return f"Error: Unexpected database error - {type(e).__name__}: {str(e)}"
# Initialize MCP server with lifespan management
@asynccontextmanager
async def app_lifespan(server: FastMCP):
"""Manage database connection lifecycle."""
logger.info("Initializing Azure SQL MCP server...")
# Test connection on startup
try:
get_db_connection()
logger.info("Database connection verified")
except Exception as e:
logger.error(f"Failed to initialize database connection: {e}")
raise
yield {}
# Cleanup on shutdown
global _db_connection
if _db_connection:
_db_connection.close()
logger.info("Database connection closed")
mcp = FastMCP("azure_sql_mcp", host="0.0.0.0", port=8000, lifespan=app_lifespan)
# ============================================================================
# Pydantic Models for Input Validation
# ============================================================================
class ResponseFormat(str, Enum):
"""Output format for tool responses."""
MARKDOWN = "markdown"
JSON = "json"
class QueryInput(BaseModel):
"""Input model for executing SQL queries."""
model_config = ConfigDict(
str_strip_whitespace=True,
validate_assignment=True,
extra='forbid'
)
query: str = Field(
...,
description="SQL query to execute (e.g., 'SELECT * FROM customers WHERE city = ?')",
min_length=1
)
params: Optional[List[Any]] = Field(
default=None,
description="Optional query parameters for parameterized queries (e.g., ['New York'])"
)
response_format: ResponseFormat = Field(
default=ResponseFormat.MARKDOWN,
description="Output format: 'markdown' for human-readable or 'json' for machine-readable"
)
@field_validator('query')
@classmethod
def validate_query(cls, v: str) -> str:
"""Validate query is not empty."""
if not v.strip():
raise ValueError("Query cannot be empty")
# Basic security check - prevent multiple statements
if ';' in v.strip()[:-1]: # Allow trailing semicolon
raise ValueError("Multiple statements not allowed for security reasons")
return v.strip()
class TableInfoInput(BaseModel):
"""Input model for table information queries."""
model_config = ConfigDict(
str_strip_whitespace=True,
validate_assignment=True,
extra='forbid'
)
table_name: Optional[str] = Field(
default=None,
description="Specific table name to get info about (e.g., 'customers'). Leave empty to list all tables."
)
response_format: ResponseFormat = Field(
default=ResponseFormat.MARKDOWN,
description="Output format: 'markdown' for human-readable or 'json' for machine-readable"
)
class TableDataInput(BaseModel):
"""Input model for retrieving table data."""
model_config = ConfigDict(
str_strip_whitespace=True,
validate_assignment=True,
extra='forbid'
)
table_name: str = Field(
...,
description="Name of the table to retrieve data from (e.g., 'customers')",
min_length=1
)
limit: int = Field(
default=100,
description="Maximum number of rows to return",
ge=1,
le=1000
)
offset: int = Field(
default=0,
description="Number of rows to skip for pagination",
ge=0
)
response_format: ResponseFormat = Field(
default=ResponseFormat.MARKDOWN,
description="Output format: 'markdown' for human-readable or 'json' for machine-readable"
)
class ColumnInfoInput(BaseModel):
"""Input model for getting column information."""
model_config = ConfigDict(
str_strip_whitespace=True,
validate_assignment=True,
extra='forbid'
)
table_name: str = Field(
...,
description="Name of the table to get columns for (e.g., 'customers')",
min_length=1
)
response_format: ResponseFormat = Field(
default=ResponseFormat.MARKDOWN,
description="Output format: 'markdown' for human-readable or 'json' for machine-readable"
)
class VisualizeInput(BaseModel):
"""Input model for data visualization with charts."""
model_config = ConfigDict(
str_strip_whitespace=True,
validate_assignment=True,
extra='forbid'
)
query: str = Field(
...,
description="SQL query to get data for chart (e.g., 'SELECT region, SUM(sales) as total FROM orders GROUP BY region')",
min_length=1
)
chart_type: str = Field(
default="bar",
description="Chart type: bar, pie, line, doughnut, radar, polarArea"
)
title: str = Field(
...,
description="Chart title to display (e.g., 'Sales by Region')",
min_length=1
)
label_column: str = Field(
...,
description="Column name for chart labels/categories (X-axis or pie slices)",
min_length=1
)
value_column: str = Field(
...,
description="Column name for chart values (Y-axis or pie values)",
min_length=1
)
width: int = Field(
default=800,
description="Chart width in pixels",
ge=400,
le=1200
)
height: int = Field(
default=500,
description="Chart height in pixels",
ge=300,
le=800
)
@field_validator('chart_type')
@classmethod
def validate_chart_type(cls, v: str) -> str:
"""Validate chart type."""
allowed_types = ['bar', 'pie', 'line', 'doughnut', 'radar', 'polarArea']
if v.lower() not in allowed_types:
raise ValueError(f"Chart type must be one of: {', '.join(allowed_types)}")
return v.lower()
@field_validator('query')
@classmethod
def validate_query(cls, v: str) -> str:
"""Validate query is not empty."""
if not v.strip():
raise ValueError("Query cannot be empty")
# Basic security check - prevent multiple statements
if ';' in v.strip()[:-1]: # Allow trailing semicolon
raise ValueError("Multiple statements not allowed for security reasons")
return v.strip()
class RecordInput(BaseModel):
"""Input model for CRUD operations on records."""
model_config = ConfigDict(
str_strip_whitespace=True,
validate_assignment=True,
extra='forbid'
)
table_name: str = Field(
...,
description="Name of the table to operate on",
min_length=1
)
data: Dict[str, Any] = Field(
...,
description="Column-value pairs for the record (e.g., {'name': 'John', 'age': 30})"
)
where: Optional[Dict[str, Any]] = Field(
default=None,
description="WHERE clause conditions for UPDATE/DELETE (e.g., {'id': 123})"
)
class SearchInput(BaseModel):
"""Input model for searching across table columns."""
model_config = ConfigDict(
str_strip_whitespace=True,
validate_assignment=True,
extra='forbid'
)
table_name: str = Field(
...,
description="Name of the table to search in",
min_length=1
)
search_term: str = Field(
...,
description="Term to search for",
min_length=1
)
columns: Optional[List[str]] = Field(
default=None,
description="Specific columns to search in (if None, searches all text columns)"
)
limit: int = Field(
default=50,
description="Maximum number of results",
ge=1,
le=1000
)
class TableSchemaInput(BaseModel):
"""Input model for table creation/modification."""
model_config = ConfigDict(
str_strip_whitespace=True,
validate_assignment=True,
extra='forbid'
)
table_name: str = Field(
...,
description="Name of the table",
min_length=1
)
columns: List[Dict[str, Any]] = Field(
...,
description="Column definitions: [{'name': 'id', 'type': 'INT', 'primary_key': True}, ...]"
)
if_not_exists: bool = Field(
default=True,
description="Only create if table doesn't exist (prevents errors)"
)
# ============================================================================
# Tool Implementations
# ============================================================================
@mcp.tool(
name="azure_sql_execute_query",
annotations={
"title": "Execute SQL Query",
"readOnlyHint": False,
"destructiveHint": True,
"idempotentHint": False,
"openWorldHint": True
}
)
async def execute_sql_query(params: QueryInput) -> str:
"""
Execute a SQL query on the Azure SQL database.
This tool allows you to run SELECT, INSERT, UPDATE, DELETE, and other SQL statements.
For SELECT queries, results are returned. For modification queries, the number of
affected rows is returned.
Args:
params (QueryInput): Query parameters containing:
- query (str): SQL query to execute
- params (Optional[List]): Query parameters for parameterized queries
- response_format (str): Output format ('markdown' or 'json')
Returns:
str: Query results in the specified format
Examples:
- SELECT query: {"query": "SELECT * FROM customers WHERE city = ?", "params": ["Seattle"]}
- INSERT query: {"query": "INSERT INTO customers (name, city) VALUES (?, ?)", "params": ["John Doe", "Seattle"]}
- COUNT query: {"query": "SELECT COUNT(*) as total FROM orders"}
"""
try:
# Convert params list to tuple for pyodbc
query_params = tuple(params.params) if params.params else None
results = execute_query(params.query, query_params)
if params.response_format == ResponseFormat.JSON:
return json.dumps(results, indent=2, default=str)
# Markdown format
if not results:
return "**Query executed successfully**\n\nNo results returned."
# Check if it's a modification query
if "rows_affected" in results[0]:
return f"**Query executed successfully**\n\nRows affected: {results[0]['rows_affected']}"
# Format SELECT results as markdown table
if len(results) == 0:
return "**Query executed successfully**\n\nNo rows returned."
output = [f"**Query Results** ({len(results)} rows)\n"]
# Create markdown table
headers = list(results[0].keys())
output.append("| " + " | ".join(headers) + " |")
output.append("| " + " | ".join(["---"] * len(headers)) + " |")
for row in results[:50]: # Limit to first 50 rows in markdown
values = [str(row.get(h, "")) for h in headers]
output.append("| " + " | ".join(values) + " |")
if len(results) > 50:
output.append(f"\n*Showing first 50 of {len(results)} rows*")
return "\n".join(output)
except Exception as e:
return _handle_db_error(e)
@mcp.tool(
name="azure_sql_list_tables",
annotations={
"title": "List Database Tables",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False
}
)
async def list_tables(params: TableInfoInput) -> str:
"""
List all tables in the database or get detailed information about a specific table.
This tool retrieves table information including table names, schemas, and row counts.
If a specific table name is provided, it returns detailed information about that table.
Args:
params (TableInfoInput): Input parameters containing:
- table_name (Optional[str]): Specific table to get info about
- response_format (str): Output format ('markdown' or 'json')
Returns:
str: Table information in the specified format
Schema (JSON format):
{
"tables": [
{
"table_schema": "dbo",
"table_name": "customers",
"row_count": 1234
}
]
}
"""
try:
if params.table_name:
# Get detailed info about specific table
query = """
SELECT
t.TABLE_SCHEMA,
t.TABLE_NAME,
t.TABLE_TYPE,
p.rows as ROW_COUNT
FROM INFORMATION_SCHEMA.TABLES t
LEFT JOIN sys.partitions p ON OBJECT_ID(t.TABLE_SCHEMA + '.' + t.TABLE_NAME) = p.object_id
WHERE t.TABLE_NAME = ? AND (p.index_id IN (0,1) OR p.index_id IS NULL)
"""
results = execute_query(query, (params.table_name,))
else:
# List all tables
query = """
SELECT
t.TABLE_SCHEMA,
t.TABLE_NAME,
t.TABLE_TYPE,
SUM(p.rows) as ROW_COUNT
FROM INFORMATION_SCHEMA.TABLES t
LEFT JOIN sys.partitions p ON OBJECT_ID(t.TABLE_SCHEMA + '.' + t.TABLE_NAME) = p.object_id
WHERE t.TABLE_TYPE = 'BASE TABLE' AND (p.index_id IN (0,1) OR p.index_id IS NULL)
GROUP BY t.TABLE_SCHEMA, t.TABLE_NAME, t.TABLE_TYPE
ORDER BY t.TABLE_NAME
"""
results = execute_query(query)
if params.response_format == ResponseFormat.JSON:
return json.dumps({"tables": results}, indent=2, default=str)
# Markdown format
if not results:
if params.table_name:
return f"**Table not found:** {params.table_name}"
return "**No tables found in database**"
output = [f"**Database Tables** ({len(results)} tables)\n"]
for table in results:
schema = table.get('TABLE_SCHEMA', 'dbo')
name = table.get('TABLE_NAME', '')
row_count = table.get('ROW_COUNT', 0) or 0
table_type = table.get('TABLE_TYPE', 'TABLE')
output.append(f"### {schema}.{name}")
output.append(f"- Type: {table_type}")
output.append(f"- Rows: {row_count:,}")
output.append("")
return "\n".join(output)
except Exception as e:
return _handle_db_error(e)
@mcp.tool(
name="azure_sql_get_table_schema",
annotations={
"title": "Get Table Schema",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False
}
)
async def get_table_schema(params: ColumnInfoInput) -> str:
"""
Get detailed schema information for a specific table including columns, data types, and constraints.
This tool retrieves complete schema information including column names, data types,
nullable status, default values, and primary key information.
Args:
params (ColumnInfoInput): Input parameters containing:
- table_name (str): Name of the table to get schema for
- response_format (str): Output format ('markdown' or 'json')
Returns:
str: Table schema information in the specified format
Schema (JSON format):
{
"table_name": "customers",
"columns": [
{
"column_name": "id",
"data_type": "int",
"is_nullable": "NO",
"column_default": null,
"is_primary_key": true
}
]
}
"""
try:
# Get column information
query = """
SELECT
c.COLUMN_NAME,
c.DATA_TYPE,
c.CHARACTER_MAXIMUM_LENGTH,
c.NUMERIC_PRECISION,
c.NUMERIC_SCALE,
c.IS_NULLABLE,
c.COLUMN_DEFAULT,
CASE
WHEN pk.COLUMN_NAME IS NOT NULL THEN 1
ELSE 0
END as IS_PRIMARY_KEY
FROM INFORMATION_SCHEMA.COLUMNS c
LEFT JOIN (
SELECT ku.TABLE_SCHEMA, ku.TABLE_NAME, ku.COLUMN_NAME
FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS tc
INNER JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE AS ku
ON tc.CONSTRAINT_TYPE = 'PRIMARY KEY'
AND tc.CONSTRAINT_NAME = ku.CONSTRAINT_NAME
) pk
ON c.TABLE_SCHEMA = pk.TABLE_SCHEMA
AND c.TABLE_NAME = pk.TABLE_NAME
AND c.COLUMN_NAME = pk.COLUMN_NAME
WHERE c.TABLE_NAME = ?
ORDER BY c.ORDINAL_POSITION
"""
results = execute_query(query, (params.table_name,))
if not results:
return f"**Error:** Table '{params.table_name}' not found"
if params.response_format == ResponseFormat.JSON:
return json.dumps({
"table_name": params.table_name,
"columns": results
}, indent=2, default=str)
# Markdown format
output = [f"# Schema: {params.table_name}\n"]
output.append(f"**Columns:** {len(results)}\n")
# Create markdown table
output.append("| Column | Type | Nullable | Default | Primary Key |")
output.append("|--------|------|----------|---------|-------------|")
for col in results:
col_name = col['COLUMN_NAME']
data_type = col['DATA_TYPE']
# Add length/precision info
if col['CHARACTER_MAXIMUM_LENGTH']:
data_type += f"({col['CHARACTER_MAXIMUM_LENGTH']})"
elif col['NUMERIC_PRECISION']:
if col['NUMERIC_SCALE']:
data_type += f"({col['NUMERIC_PRECISION']},{col['NUMERIC_SCALE']})"
else:
data_type += f"({col['NUMERIC_PRECISION']})"
is_nullable = col['IS_NULLABLE']
default = col['COLUMN_DEFAULT'] or "-"
is_pk = "✓" if col['IS_PRIMARY_KEY'] else ""
output.append(f"| {col_name} | {data_type} | {is_nullable} | {default} | {is_pk} |")
return "\n".join(output)
except Exception as e:
return _handle_db_error(e)
@mcp.tool(
name="azure_sql_get_table_data",
annotations={
"title": "Get Table Data",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False
}
)
async def get_table_data(params: TableDataInput) -> str:
"""
Retrieve data from a specific table with pagination support.
This tool fetches rows from a table with support for limiting results and pagination.
Use this for browsing table contents or getting sample data.
Args:
params (TableDataInput): Input parameters containing:
- table_name (str): Name of the table to retrieve data from
- limit (int): Maximum number of rows to return (1-1000, default: 100)
- offset (int): Number of rows to skip for pagination (default: 0)
- response_format (str): Output format ('markdown' or 'json')
Returns:
str: Table data in the specified format
Schema (JSON format):
{
"table_name": "customers",
"rows": [...],
"count": 100,
"offset": 0,
"has_more": true
}
"""
try:
# Get total count
count_query = f"SELECT COUNT(*) as total FROM [{params.table_name}]"
count_result = execute_query(count_query)
total_rows = count_result[0]['total'] if count_result else 0
# Get paginated data
query = f"""
SELECT *
FROM [{params.table_name}]
ORDER BY (SELECT NULL)
OFFSET ? ROWS
FETCH NEXT ? ROWS ONLY
"""
results = execute_query(query, (params.offset, params.limit))
has_more = (params.offset + len(results)) < total_rows
if params.response_format == ResponseFormat.JSON:
return json.dumps({
"table_name": params.table_name,
"rows": results,
"count": len(results),
"total": total_rows,
"offset": params.offset,
"has_more": has_more,
"next_offset": params.offset + len(results) if has_more else None
}, indent=2, default=str)
# Markdown format
output = [f"# Data from {params.table_name}\n"]
output.append(f"**Showing:** {len(results)} rows (offset: {params.offset}, total: {total_rows:,})")
if has_more:
output.append(f"**More data available** - use offset {params.offset + len(results)} to continue")
output.append("")
if not results:
output.append("*No data found*")
return "\n".join(output)
# Create markdown table
headers = list(results[0].keys())
output.append("| " + " | ".join(headers) + " |")
output.append("| " + " | ".join(["---"] * len(headers)) + " |")
for row in results:
values = [str(row.get(h, "")) for h in headers]
output.append("| " + " | ".join(values) + " |")
return "\n".join(output)
except Exception as e:
return _handle_db_error(e)
@mcp.tool(
name="azure_sql_get_database_info",
annotations={
"title": "Get Database Information",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False
}
)
async def get_database_info(response_format: ResponseFormat = ResponseFormat.MARKDOWN) -> str:
"""
Get general information about the Azure SQL database including version, size, and statistics.
This tool retrieves database-level information such as SQL Server version,
database name, creation date, and other metadata.
Args:
response_format (ResponseFormat): Output format ('markdown' or 'json')
Returns:
str: Database information in the specified format
Schema (JSON format):
{
"database_name": "mydb",
"server_version": "Microsoft SQL Server 2019",
"create_date": "2024-01-01",
"tables_count": 15,
"views_count": 5
}
"""
try:
# Get database info
info_query = """
SELECT
DB_NAME() as database_name,
@@VERSION as server_version,
(SELECT create_date FROM sys.databases WHERE name = DB_NAME()) as create_date,
(SELECT COUNT(*) FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE') as tables_count,
(SELECT COUNT(*) FROM INFORMATION_SCHEMA.VIEWS) as views_count,
(SELECT COUNT(*) FROM INFORMATION_SCHEMA.ROUTINES WHERE ROUTINE_TYPE = 'PROCEDURE') as procedures_count
"""
result = execute_query(info_query)
if not result:
return "**Error:** Could not retrieve database information"
info = result[0]
if response_format == ResponseFormat.JSON:
return json.dumps(info, indent=2, default=str)
# Markdown format
output = ["# Azure SQL Database Information\n"]
output.append(f"**Database Name:** {info.get('database_name', 'N/A')}")
output.append(f"**Created:** {info.get('create_date', 'N/A')}")
output.append(f"\n## Server")
version = info.get('server_version', '').split('\n')[0]
output.append(f"**Version:** {version}")
output.append(f"\n## Objects")
output.append(f"- **Tables:** {info.get('tables_count', 0)}")
output.append(f"- **Views:** {info.get('views_count', 0)}")
output.append(f"- **Stored Procedures:** {info.get('procedures_count', 0)}")
return "\n".join(output)
except Exception as e:
return _handle_db_error(e)
@mcp.tool(
name="azure_sql_visualize_data",
annotations={
"title": "Visualize Data as Chart",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": True
}
)
async def visualize_data(params: VisualizeInput) -> str:
"""
Execute SQL query and generate a visual chart for Copilot Studio display.
This tool runs a SQL query, extracts the data, and generates a beautiful chart
using QuickChart API. The chart is returned as an Adaptive Card that Copilot Studio
can display directly in the chat interface.
Args:
params (VisualizeInput): Visualization parameters containing:
- query (str): SQL query to get chart data
- chart_type (str): Type of chart (bar, pie, line, doughnut, radar, polarArea)
- title (str): Chart title
- label_column (str): Column name for labels (X-axis/categories)
- value_column (str): Column name for values (Y-axis/data)
- width (int): Chart width in pixels (default: 800)
- height (int): Chart height in pixels (default: 500)
Returns:
str: Adaptive Card JSON with embedded chart image
Examples:
Bar Chart:
{
"query": "SELECT region, SUM(sales) as total FROM orders GROUP BY region",
"chart_type": "bar",
"title": "Sales by Region",
"label_column": "region",
"value_column": "total"
}
Pie Chart:
{
"query": "SELECT category, COUNT(*) as count FROM products GROUP BY category",
"chart_type": "pie",
"title": "Products by Category",
"label_column": "category",
"value_column": "count"
}
"""
try:
# Execute SQL query
results = execute_query(params.query)
if not results:
return json.dumps({
"type": "AdaptiveCard",
"version": "1.5",
"body": [
{
"type": "TextBlock",
"text": "⚠️ No Data Found",
"size": "Large",
"weight": "Bolder",
"color": "Warning"
},
{
"type": "TextBlock",
"text": "The query returned no results. Please check your query.",
"wrap": True
}
]
}, indent=2)
# Extract data from results
try:
labels = [str(row[params.label_column]) for row in results]
values = [float(row[params.value_column]) for row in results]
except KeyError as e:
return json.dumps({
"type": "AdaptiveCard",
"version": "1.5",
"body": [
{
"type": "TextBlock",
"text": "❌ Column Not Found",
"size": "Large",
"weight": "Bolder",
"color": "Attention"
},
{
"type": "TextBlock",
"text": f"Column '{str(e)}' not found in query results. Available columns: {', '.join(results[0].keys())}",
"wrap": True
}
]
}, indent=2)
# Define color schemes for different chart types
color_schemes = {
'bar': [
"rgba(102, 126, 234, 0.8)",
"rgba(237, 100, 166, 0.8)",
"rgba(255, 159, 64, 0.8)",
"rgba(75, 192, 192, 0.8)",
"rgba(153, 102, 255, 0.8)",
"rgba(255, 205, 86, 0.8)"
],
'pie': [
"rgba(255, 99, 132, 0.8)",
"rgba(54, 162, 235, 0.8)",
"rgba(255, 206, 86, 0.8)",
"rgba(75, 192, 192, 0.8)",
"rgba(153, 102, 255, 0.8)",
"rgba(255, 159, 64, 0.8)"
],
'line': [
"rgba(75, 192, 192, 1)"
],
'doughnut': [
"rgba(255, 99, 132, 0.8)",
"rgba(54, 162, 235, 0.8)",
"rgba(255, 206, 86, 0.8)",
"rgba(75, 192, 192, 0.8)",
"rgba(153, 102, 255, 0.8)"
]
}
colors = color_schemes.get(params.chart_type, color_schemes['bar'])
# Build QuickChart configuration
chart_config = {
"type": params.chart_type,
"data": {
"labels": labels,
"datasets": [{
"label": params.title,
"data": values,
"backgroundColor": colors[:len(values)],
"borderColor": colors[:len(values)],
"borderWidth": 2
}]
},
"options": {
"responsive": True,
"plugins": {
"title": {
"display": True,
"text": params.title,
"font": {
"size": 18,
"weight": "bold"
}
},
"legend": {
"display": True,
"position": "bottom"
}
}
}
}
# Add specific options for line charts
if params.chart_type == 'line':
chart_config['data']['datasets'][0]['fill'] = False
chart_config['data']['datasets'][0]['tension'] = 0.1
# Create QuickChart URL
config_json = json.dumps(chart_config)
encoded_config = urllib.parse.quote(config_json)
chart_url = (
f"https://quickchart.io/chart?c={encoded_config}"
f"&width={params.width}&height={params.height}"
f"&backgroundColor=white&devicePixelRatio=2.0"
)
# Calculate summary statistics
total = sum(values)
avg = total / len(values) if values else 0
max_val = max(values) if values else 0
min_val = min(values) if values else 0
max_label = labels[values.index(max_val)] if values else ""
min_label = labels[values.index(min_val)] if values else ""
# Build Adaptive Card with chart and statistics
adaptive_card = {
"type": "AdaptiveCard",
"$schema": "http://adaptivecards.io/schemas/adaptive-card.json",
"version": "1.5",
"body": [
{
"type": "TextBlock",
"text": f"📊 {params.title}",
"size": "Large",
"weight": "Bolder",
"color": "Accent"
},
{
"type": "Image",
"url": chart_url,
"size": "Stretch",
"spacing": "Medium"
},
{
"type": "FactSet",
"spacing": "Medium",
"facts": [
{
"title": "📈 Total",
"value": f"{total:,.2f}"
},
{
"title": "📊 Average",
"value": f"{avg:,.2f}"
},
{
"title": "⬆️ Highest",
"value": f"{max_label}: {max_val:,.2f}"
},
{
"title": "⬇️ Lowest",
"value": f"{min_label}: {min_val:,.2f}"
},
{
"title": "🔢 Data Points",
"value": str(len(values))
}
]
},
{
"type": "TextBlock",
"text": f"Chart Type: {params.chart_type.capitalize()} | Data from Azure SQL",
"size": "Small",
"color": "Accent",
"isSubtle": True,
"spacing": "Small"
}
]
}
return json.dumps(adaptive_card, indent=2)
except Exception as e:
error_card = {
"type": "AdaptiveCard",
"version": "1.5",
"body": [
{
"type": "TextBlock",
"text": "❌ Error Creating Chart",
"size": "Large",
"weight": "Bolder",
"color": "Attention"
},
{
"type": "TextBlock",
"text": _handle_db_error(e),
"wrap": True
}
]
}
return json.dumps(error_card, indent=2)
@mcp.tool(
name="azure_sql_create_record",
annotations={
"title": "Create Record (INSERT)",
"readOnlyHint": False,
"destructiveHint": False,
"idempotentHint": False,
"openWorldHint": False
}
)
async def create_record(params: RecordInput) -> str:
"""
Insert a new record into a table.
Args:
params (RecordInput): Contains table_name and data dict
Returns:
str: Success message with inserted record details
Example:
{
"table_name": "customers",
"data": {
"name": "John Doe",
"email": "john@example.com",
"age": 30
}
}
"""
try:
# Build INSERT query
columns = list(params.data.keys())
placeholders = ', '.join(['?' for _ in columns])
column_list = ', '.join([f"[{col}]" for col in columns])
query = f"INSERT INTO [{params.table_name}] ({column_list}) VALUES ({placeholders})"
values = tuple(params.data.values())
# Execute
execute_query(query, values)
return f"✅ Record created successfully in table '{params.table_name}'\n\nInserted data:\n{json.dumps(params.data, indent=2)}"
except Exception as e:
return _handle_db_error(e)
@mcp.tool(
name="azure_sql_update_record",
annotations={
"title": "Update Record",
"readOnlyHint": False,
"destructiveHint": True,
"idempotentHint": False,
"openWorldHint": False
}
)
async def update_record(params: RecordInput) -> str:
"""
Update existing record(s) in a table.
Args:
params (RecordInput): Contains table_name, data dict, and where dict
Returns:
str: Success message with number of rows updated
Example:
{
"table_name": "customers",
"data": {"email": "newemail@example.com"},
"where": {"id": 123}
}
"""
try:
if not params.where:
return "❌ Error: 'where' clause is required for UPDATE to prevent accidental mass updates"
# Build UPDATE query
set_clause = ', '.join([f"[{col}] = ?" for col in params.data.keys()])
where_clause = ' AND '.join([f"[{col}] = ?" for col in params.where.keys()])
query = f"UPDATE [{params.table_name}] SET {set_clause} WHERE {where_clause}"
values = tuple(list(params.data.values()) + list(params.where.values()))
# Execute
rows_affected = execute_query(query, values)
return f"✅ Updated {rows_affected} row(s) in table '{params.table_name}'\n\nSet:\n{json.dumps(params.data, indent=2)}\n\nWhere:\n{json.dumps(params.where, indent=2)}"
except Exception as e:
return _handle_db_error(e)
@mcp.tool(
name="azure_sql_delete_record",
annotations={
"title": "Delete Record",
"readOnlyHint": False,
"destructiveHint": True,
"idempotentHint": False,
"openWorldHint": False
}
)
async def delete_record(params: RecordInput) -> str:
"""
Delete record(s) from a table.
Args:
params (RecordInput): Contains table_name and where dict
Returns:
str: Success message with number of rows deleted
Example:
{
"table_name": "customers",
"where": {"id": 123}
}
"""
try:
if not params.where:
return "❌ Error: 'where' clause is required for DELETE to prevent accidental mass deletion"
# Build DELETE query
where_clause = ' AND '.join([f"[{col}] = ?" for col in params.where.keys()])
query = f"DELETE FROM [{params.table_name}] WHERE {where_clause}"
values = tuple(params.where.values())
# Execute
rows_affected = execute_query(query, values)
return f"✅ Deleted {rows_affected} row(s) from table '{params.table_name}'\n\nWhere:\n{json.dumps(params.where, indent=2)}"
except Exception as e:
return _handle_db_error(e)
@mcp.tool(
name="azure_sql_search",
annotations={
"title": "Search Table",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": True
}
)
async def search_table(params: SearchInput) -> str:
"""
Search for a term across table columns.
Args:
params (SearchInput): Contains table_name, search_term, optional columns, and limit
Returns:
str: Search results in JSON format
Example:
{
"table_name": "customers",
"search_term": "john",
"columns": ["name", "email"],
"limit": 50
}
"""
try:
# Get table columns if not specified
if not params.columns:
schema_query = f"""
SELECT COLUMN_NAME, DATA_TYPE
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = '{params.table_name}'
AND DATA_TYPE IN ('varchar', 'nvarchar', 'char', 'nchar', 'text', 'ntext')
"""
columns_result = execute_query(schema_query)
params.columns = [row['COLUMN_NAME'] for row in columns_result]
if not params.columns:
return f"❌ No searchable text columns found in table '{params.table_name}'"
# Build search query
search_conditions = ' OR '.join([f"[{col}] LIKE ?" for col in params.columns])
query = f"SELECT TOP {params.limit} * FROM [{params.table_name}] WHERE {search_conditions}"
search_pattern = f"%{params.search_term}%"
values = tuple([search_pattern for _ in params.columns])
# Execute
results = execute_query(query, values)
if not results:
return f"No results found for '{params.search_term}' in table '{params.table_name}'"
return json.dumps({
"search_term": params.search_term,
"table": params.table_name,
"columns_searched": params.columns,
"results_count": len(results),
"results": results
}, indent=2, default=str)
except Exception as e:
return _handle_db_error(e)
@mcp.tool(
name="azure_sql_create_table",
annotations={
"title": "Create Table",
"readOnlyHint": False,
"destructiveHint": False,
"idempotentHint": False,
"openWorldHint": False
}
)
async def create_table(params: TableSchemaInput) -> str:
"""
Create a new table with specified columns.
Args:
params (TableSchemaInput): Contains table_name, columns list, and if_not_exists flag
Returns:
str: Success message
Example:
{
"table_name": "employees",
"columns": [
{"name": "id", "type": "INT", "primary_key": True, "identity": True},
{"name": "name", "type": "NVARCHAR(100)", "nullable": False},
{"name": "email", "type": "NVARCHAR(255)"},
{"name": "hire_date", "type": "DATE"},
{"name": "salary", "type": "DECIMAL(10,2)"}
],
"if_not_exists": True
}
"""
try:
# Build column definitions
column_defs = []
primary_keys = []
for col in params.columns:
col_def = f"[{col['name']}] {col['type']}"
if col.get('identity'):
col_def += " IDENTITY(1,1)"
if col.get('primary_key'):
primary_keys.append(col['name'])
if col.get('nullable', True) == False:
col_def += " NOT NULL"
if 'default' in col:
col_def += f" DEFAULT {col['default']}"
column_defs.append(col_def)
# Add primary key constraint
if primary_keys:
pk_cols = ', '.join([f"[{pk}]" for pk in primary_keys])
column_defs.append(f"PRIMARY KEY ({pk_cols})")
# Build CREATE TABLE query
columns_sql = ',\n '.join(column_defs)
if params.if_not_exists:
query = f"""
IF NOT EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[{params.table_name}]') AND type in (N'U'))
BEGIN
CREATE TABLE [{params.table_name}] (
{columns_sql}
)
END
"""
else:
query = f"CREATE TABLE [{params.table_name}] (\n {columns_sql}\n)"
# Execute
execute_query(query)
return f"✅ Table '{params.table_name}' created successfully\n\nColumns:\n{json.dumps(params.columns, indent=2)}"
except Exception as e:
return _handle_db_error(e)
@mcp.tool(
name="azure_sql_drop_table",
annotations={
"title": "Drop Table",
"readOnlyHint": False,
"destructiveHint": True,
"idempotentHint": False,
"openWorldHint": False
}
)
async def drop_table(table_name: str) -> str:
"""
Drop (delete) a table from the database.
Args:
table_name (str): Name of the table to drop
Returns:
str: Success message
Example:
"old_backup_table"
"""
try:
query = f"DROP TABLE IF EXISTS [{table_name}]"
execute_query(query)
return f"✅ Table '{table_name}' dropped successfully"
except Exception as e:
return _handle_db_error(e)
# ============================================================================
# Server Entry Point
# ============================================================================
if __name__ == "__main__":
mcp.run(transport="streamable-http")