Skip to main content
Glama
server.py27.2 kB
from collections import defaultdict import os import logging from dotenv import load_dotenv import pyodbc from typing import Any, Dict, List, Optional import json from mcp.server.fastmcp import FastMCP # Load environment variables load_dotenv() # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # Retrieve database connection details from environment variables DB_UID = os.getenv("ODBC_USER") DB_PWD = os.getenv("ODBC_PASSWORD") DB_DSN = os.getenv("ODBC_DSN") MAX_LONG_DATA = int(os.getenv("MAX_LONG_DATA",4096)) API_KEY = os.getenv("API_KEY", "none") ### Database ### def get_connection(readonly=True, uid: str|None = None, pwd: str|None = None, dsn: str|None = None) -> pyodbc.Connection: dsn = DB_DSN if dsn is None or dsn=="" else dsn uid = DB_UID if uid is None or uid=="" else uid pwd = DB_PWD if pwd is None or pwd=="" else pwd if dsn is None: raise ValueError("ODBC_DSN environment variable is not set.") if uid is None: raise ValueError("ODBC_USER environment variable is not set.") if pwd is None: raise ValueError("ODBC_PASSWORD environment variable is not set.") dsn_string = f"DSN={dsn};UID={uid};PWD={pwd}" logging.info(f"DSN:{dsn} UID:{uid}") # connection_string="DSN=VOS;UID=dba;PWD=dba" return pyodbc.connect(dsn_string, autocommit=True, readonly=readonly) def supports_catalogs(conn) -> bool: try: with conn.cursor() as cursor: # Check if the database supports catalogs cursor.tables(catalog="%", schema="", table="", tableType=None) row = cursor.fetchone() return row is not None and row[0] is not None except pyodbc.Error as e: return False ### Constants ### ### MCP ### mcp = FastMCP('mcp-pyodbc-server', transport=["stdio", "sse"]) @mcp.tool( name="podbc_get_schemas", description="Retrieve and return a list of all schema names from the connected database." ) def podbc_get_schemas(user: str="", password: str="", dsn: str="") -> str: """ Retrieve and return a list of all schema names from the connected database. Args: user (Optional[str]=None): Optional username. password (Optional[str]=None): Optional password. dsn (Optional[str]=None): Optional dsn name. Returns: str: A list of schema names. """ try: with get_connection(True, user, password, dsn) as conn: has_cats = supports_catalogs(conn) with conn.cursor() as cursor: if has_cats: rs = cursor.tables(table="", catalog="%", schema="", tableType=None) catalogs = {row[0] for row in rs.fetchall()} else: rs = cursor.tables(table="", catalog=None, schema="%", tableType=None) catalogs = {row[1] for row in rs.fetchall()} return json.dumps(list(catalogs)) except pyodbc.Error as e: logging.error(f"Error retrieving schemas: {e}") raise @mcp.tool( name="podbc_get_tables", description="Retrieve and return a list containing information about tables in specified schema, if empty uses connection default" ) def podbc_get_tables(Schema: str="", user: str="", password: str="", dsn: str="") -> str: """ Retrieve and return a list containing information about tables. If `schema` is None, returns tables for all schemas. If `schema` is not None, returns tables for the specified schema. Args: schema (Optional[str]): The name of the schema to retrieve tables for. If None, retrieves tables for all schemas. user (Optional[str]=None): Optional username. password (Optional[str]=None): Optional password. dsn (Optional[str]=None): Optional dsn name. Returns: str: A list containing information about tables. """ cat = "%" if not Schema else Schema try: with get_connection(True, user, password, dsn) as conn: has_cats = supports_catalogs(conn) with conn.cursor() as cursor: results = [] if has_cats: rs = cursor.tables(table='%', catalog=cat, schema="%", tableType="TABLE") for row in rs: if row[0]==cat: results.append({"TABLE_CAT":row[0], "TABLE_SCHEM":row[1], "TABLE_NAME":row[2]}) else: rs = cursor.tables(table='%', catalog=None, schema=cat, tableType="TABLE") for row in rs: if cat=="%" or row[1]==cat: results.append({"TABLE_CAT":row[0], "TABLE_SCHEM":row[1], "TABLE_NAME":row[2]}) return json.dumps(results, indent=2) except pyodbc.Error as e: logging.error(f"Error retrieving tables: {e}") raise @mcp.tool( name="podbc_describe_table", description="Retrieve and return a dictionary containing the definition of a table, including column names, data types, nullable," " autoincrement, primary key, and foreign keys." ) def podbc_describe_table(Schema: str="", table: str="", user: str="", password: str="", dsn: str="") -> str: """ Retrieve and return a dictionary containing the definition of a table, including column names, data types, nullable, autoincrement, primary key, and foreign keys. If `schema` is None, returns the table definition for the specified table in all schemas. If `schema` is not None, returns the table definition for the specified table in the specified schema. Args: schema (str): The name of the schema to retrieve the table definition for. If None, retrieves the table definition for all schemas. table (str): The name of the table to retrieve the definition for. user (Optional[str]=None): Optional username. password (Optional[str]=None): Optional password. dsn (Optional[str]=None): Optional dsn name. Returns: str: A dictionary containing the table definition, including column names, data types, nullable, autoincrement, primary key, and foreign keys. """ cat = "%" if not Schema else Schema table_definition = {} try: with get_connection(True, user, password, dsn) as conn: rc, tbl = _has_table(conn, cat=cat, table=table) if rc: table_definition = _get_table_info(conn, cat=tbl.get("cat"), sch=tbl.get("sch"), table=tbl.get("name")) return json.dumps(table_definition, indent=2) except pyodbc.Error as e: logging.error(f"Error retrieving table definition: {e}") raise def _has_table(conn, cat:str, table:str): has_cats = supports_catalogs(conn) with conn.cursor() as cursor: if has_cats: row = cursor.tables(table=table, catalog=cat, schema=None, tableType=None).fetchone() else: row = cursor.tables(table=table, catalog=None, schema=cat, tableType=None).fetchone() if row: return True, {"cat":row[0] if has_cats else None, "sch": row[1], "name":row[2]} else: return False, {} def _get_columns(conn, cat: str|None, sch: str, table:str): with conn.cursor() as cursor: ret = [] for row in cursor.columns(table=table, catalog=cat, schema=sch): ret.append({ "name":row[3], "type":row[5], "column_size": row[6], # "decimal_digits":row[8], "num_prec_radix":row[9], "nullable":False if row[10]==0 else True, "default":row[12] }) return ret def _get_pk_constraint(conn, cat: str|None, sch: str, table:str): with conn.cursor() as cursor: ret = None rs = cursor.primaryKeys(table=table, catalog=cat, schema=sch).fetchall() if rs is not None and len(rs) > 0: ret = { "constrained_columns": [row[3] for row in rs], "name": rs[0][5] } return ret def _get_foreign_keys(conn, cat: str|None, sch: str, table:str): def fkey_rec(): return { "name": None, "constrained_columns": [], "referred_cat": None, "referred_schem": None, "referred_table": None, "referred_columns": [], "options": {}, } fkeys = defaultdict(fkey_rec) with conn.cursor() as cursor: rs = cursor.foreignKeys(foreignTable=table, foreignCatalog=cat, foreignSchema=sch) for row in rs: rec = fkeys[row[11]] #.FK_NAME rec["name"] = row[11] #.FK_NAME c_cols = rec["constrained_columns"] c_cols.append(row[7]) #.FKCOLUMN_NAME) r_cols = rec["referred_columns"] r_cols.append(row[3]) #.PKCOLUMN_NAME) if not rec["referred_table"]: rec["referred_table"] = row[2] #.PKTABLE_NAME rec["referred_schem"] = row[1] #.PKTABLE_OWNER rec["referred_cat"] = row[0] #.PKTABLE_CAT return list(fkeys.values()) def _get_table_info(conn, cat:str, sch: str, table: str) -> Dict[str, Any]: try: columns = _get_columns(conn, cat=cat, sch=sch, table=table) pkeys = _get_pk_constraint(conn, cat=cat, sch=sch, table=table) primary_keys = pkeys.get("constrained_columns") if pkeys is not None else [] foreign_keys = _get_foreign_keys(conn, cat=cat, sch=sch, table=table) table_info = { "TABLE_CAT": cat, "TABLE_SCHEM": sch, "TABLE_NAME": table, "columns": columns, "primary_keys": primary_keys, "foreign_keys": foreign_keys } for column in columns: column["primary_key"] = column['name'] in primary_keys return table_info except pyodbc.Error as e: logging.error(f"Error retrieving table info: {e}") raise @mcp.tool( name="podbc_filter_table_names", description="Retrieve and return a list containing information about tables whose names contain the substring 'q' ." ) def podbc_filter_table_names(q: str, Schema: str="", user: str="", password: str="", dsn: str="") -> str: """ Retrieve and return a list containing information about tables whose names contain the substring 'q' Args: q (str): The substring to filter table names by. user (Optional[str]=None): Optional username. password (Optional[str]=None): Optional password. dsn (Optional[str]=None): Optional dsn name. Returns: str: A list containing information about tables whose names contain the substring 'q'. """ cat = "%" if not Schema else Schema try: with get_connection(True, user, password, dsn) as conn: has_cats = supports_catalogs(conn) with conn.cursor() as cursor: if has_cats: rs = cursor.tables(table='%', catalog=cat, schema='%', tableType="TABLE"); else: rs = cursor.tables(table='%', catalog=None, schema=cat, tableType="TABLE"); results = [] for row in rs: if q in row[2]: results.append({"TABLE_CAT":row[0], "TABLE_SCHEM":row[1], "TABLE_NAME":row[2]}) return json.dumps(results, indent=2) except pyodbc.Error as e: logging.error(f"Error filtering table names: {e}") raise @mcp.tool( name="podbc_execute_query", description="Execute a SQL query and return results in JSONL format." ) def podbc_execute_query(query: str, max_rows: int = 100, params: list[Any]|None = None, user:str="", password:str="", dsn:str="") -> str: """ Execute a SQL query and return results in JSONL format. Args: query (str): The SQL query to execute. max_rows (int): Maximum number of rows to return. Default is 100. params (Optional[Dict[str, Any]]): Optional dictionary of parameters to pass to the query. user (Optional[str]=None): Optional username. password (Optional[str]=None): Optional password. dsn (Optional[str]=None): Optional dsn name. Returns: str: Results in JSONL format. """ try: with get_connection(True, user, password, dsn) as conn: with conn.cursor() as cursor: rs = cursor.execute(query) if params is None else cursor.execute(query, params) columns = [column[0] for column in rs.description] results = [] for row in rs: rs_dict = dict(zip(columns, row)) truncated_row = {key: (str(value)[:MAX_LONG_DATA] if value is not None else None) for key, value in rs_dict.items()} results.append(truncated_row) if len(results) >= max_rows: break return json.dumps(results, indent=2) except pyodbc.Error as e: logging.error(f"Error executing query: {e}") raise @mcp.tool( name="podbc_execute_query_md", description="Execute a SQL query and return results in Markdown table format." ) def podbc_execute_query_md(query: str, max_rows: int = 100, params: list[Any]|None = None, user:str="", password:str="", dsn:str="") -> str: """ Execute a SQL query and return results in Markdown table format. Args: query (str): The SQL query to execute. max_rows (int): Maximum number of rows to return. Default is 100. params (Optional[Dict[str, Any]]): Optional dictionary of parameters to pass to the query. user (Optional[str]=None): Optional username. password (Optional[str]=None): Optional password. dsn (Optional[str]=None): Optional dsn name. Returns: str: Results in Markdown table format. """ try: with get_connection(True, user, password, dsn) as conn: with conn.cursor() as cursor: rs = cursor.execute(query) if params is None else cursor.execute(query, params) columns = [column[0] for column in rs.description] results = [] for row in rs: rs_dict = dict(zip(columns, row)) truncated_row = {key: (str(value)[:MAX_LONG_DATA] if value is not None else None) for key, value in rs_dict.items()} results.append(truncated_row) if len(results) >= max_rows: break # Create the Markdown table header md_table = "| " + " | ".join(columns) + " |\n" md_table += "| " + " | ".join(["---"] * len(columns)) + " |\n" # Add rows to the Markdown table for row in results: md_table += "| " + " | ".join(str(row[col]) for col in columns) + " |\n" # Return the Markdown formatted results return md_table except pyodbc.Error as e: logging.error(f"Error executing query: {e}") raise @mcp.tool( name="podbc_query_database", description="Execute a SQL query and return results in JSONL format." ) def podbc_query_database(query: str, params: list[Any]|None = None, user:str="", password:str="", dsn:str="") -> str: """ Execute a SQL query and return results in JSONL format. Args: query (str): The SQL query to execute. user (Optional[str]=None): Optional username. password (Optional[str]=None): Optional password. dsn (Optional[str]=None): Optional dsn name. Returns: str: Results in JSONL format. """ try: with get_connection(True, user, password, dsn) as conn: with conn.cursor() as cursor: rs = cursor.execute(query) if params is None else cursor.execute(query, params) columns = [column[0] for column in rs.description] results = [] for row in rs: rs_dict = dict(zip(columns, row)) truncated_row = {key: (str(value)[:MAX_LONG_DATA] if value is not None else None) for key, value in rs_dict.items()} results.append(truncated_row) return json.dumps(results, indent=2) except pyodbc.Error as e: logging.error(f"Error executing query: {e}") raise @mcp.tool( name="podbc_spasql_query", description="Execute a SPASQL query and return results." ) def podbc_spasql_query(query: str, max_rows:int = 20, timeout:int = 300000, user:str="", password:str="", dsn:str="") -> str: """ Execute a SPASQL query and return results in JSONL format. Args: query (str): The SPASQL query to execute. max_rows (int): Maximum number of rows to return. Default is 20. timeout (int): Query timeout. Default is 30000ms. user (Optional[str]=None): Optional username. password (Optional[str]=None): Optional password. dsn (Optional[str]=None): Optional dsn name. Returns: str: Results in requested format as string. """ try: with get_connection(True, user, password, dsn) as conn: with conn.cursor() as cursor: cmd = f"select Demo.demo.execute_spasql_query(charset_recode(?, '_WIDE_', 'UTF-8'), ?, ?) as result" rs = cursor.execute(cmd, (query, max_rows, timeout,)).fetchone() return rs[0] except pyodbc.Error as e: logging.error(f"Error executing query: {e}") raise @mcp.tool( name="podbc_virtuoso_support_ai", description="Interact with Virtuoso Support AI Agent" ) def podbc_virtuoso_support_ai(prompt: str, api_key:str="", user:str="", password:str="", dsn:str="") -> str: """ Tool for interacting the Virtuoso Support AI Agent Args: prompt (str): AI prompt text (required). api_key (str): API key for AI service (optional). user (Optional[str]=None): Optional username. password (Optional[str]=None): Optional password. dsn (Optional[str]=None): Optional dsn name. Returns: str: Results data in JSON. """ try: _api_key = api_key if not api_key else API_KEY with get_connection(True, user, password, dsn) as conn: with conn.cursor() as cursor: cmd = f"select DEMO.DBA.OAI_VIRTUOSO_SUPPORT_AI(?, ?) as result" rs = cursor.execute(cmd, (prompt, _api_key,)).fetchone() return rs[0] except pyodbc.Error as e: logging.error(f"Error executing request: {e}") raise @mcp.tool( name="podbc_sparql_func", description="Use the SPARQL AI Agent function" ) def podbc_sparql_func(prompt: str, api_key:str="", user:str="", password:str="", dsn:str="") -> str: """ Call SPARQL AI Agent func. Args: prompt (str): The prompt. api_key (str): optional. user (Optional[str]=None): Optional username. password (Optional[str]=None): Optional password. dsn (Optional[str]=None): Optional dsn name. Returns: str: Results data in JSON. """ try: _api_key = api_key if not api_key else API_KEY with get_connection(True, user, password, dsn) as conn: with conn.cursor() as cursor: cmd = f"select DEMO.DBA.OAI_SPARQL_FUNC(?, ?) as result" rs = cursor.execute(cmd, (prompt, _api_key,)).fetchone() return rs[0] except pyodbc.Error as e: logging.error(f"Error executing request: {e}") raise @mcp.tool( name="podbc_sparql_list_entity_types", description="This query retrieves all entity types in the RDF graph, along with their labels and comments if available. " "It filters out blank nodes and ensures that only IRI types are returned. " "The LIMIT clause is set to 100 to restrict the number of entity types returned. " ) def podbc_sparql_list_entity_types(graph_iri:str="", user:str="", password:str="", dsn:str="") -> str: """ Execute a SPARQL query and return results. Args: graph_iri (Optional[str]=None): Optional graph IRI . user (Optional[str]=None): Optional username. password (Optional[str]=None): Optional password. dsn (Optional[str]=None): Optional dsn name. Returns: str: Results in requested format as string. """ graph_clause = f'GRAPH `iri(??)`' if graph_iri else 'GRAPH ?g' params = [graph_iri] if graph_iri else None query = f""" SELECT DISTINCT * FROM ( SPARQL PREFIX owl: <http://www.w3.org/2002/07/owl#> PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#> PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> SELECT ?o WHERE {{ {graph_clause} {{ ?s a ?o . OPTIONAL {{ ?s rdfs:label ?label . FILTER (LANG(?label) = "en" || LANG(?label) = "") }} OPTIONAL {{ ?s rdfs:comment ?comment . FILTER (LANG(?comment) = "en" || LANG(?comment) = "") }} FILTER (isIRI(?o) && !isBlank(?o)) }} }} LIMIT 100 ) AS x """ return podbc_query_database(query, params=params, user=user, password=password, dsn=dsn) @mcp.tool( name="podbc_sparql_list_entity_types_detailed", description="This query retrieves all entity types in the RDF graph, along with their labels and comments if available. " "It filters out blank nodes and ensures that only IRI types are returned. " "The LIMIT clause is set to 100 to restrict the number of entity types returned." ) def podbc_sparql_list_entity_types_detailed(graph_iri:str="", user:str="", password:str="", dsn:str="") -> str: """ Execute a SPARQL query and return results. Args: graph_iri (Optional[str]=None): Optional graph IRI. user (Optional[str]=None): Optional username. password (Optional[str]=None): Optional password. dsn (Optional[str]=None): Optional dsn name. Returns: str: Results in requested format as string. """ graph_clause = f'GRAPH `iri(??)`' if graph_iri else 'GRAPH ?g' params = [graph_iri] if graph_iri else None query = f""" SELECT * FROM ( SPARQL PREFIX owl: <http://www.w3.org/2002/07/owl#> PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#> PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> SELECT ?o, (SAMPLE(?label) AS ?label), (SAMPLE(?comment) AS ?comment) WHERE {{ {graph_clause} {{ ?s a ?o . OPTIONAL {{?o rdfs:label ?label . FILTER (LANG(?label) = "en" || LANG(?label) = "")}} OPTIONAL {{?o rdfs:comment ?comment . FILTER (LANG(?comment) = "en" || LANG(?comment) = "")}} FILTER (isIRI(?o) && !isBlank(?o)) }} }} GROUP BY ?o ORDER BY ?o LIMIT 20 ) AS results """ return podbc_query_database(query, params=params, user=user, password=password, dsn=dsn) @mcp.tool( name="podbc_sparql_list_entity_types_samples", description="This query retrieves samples of entities for each type in the RDF graph, along with their labels and counts. " "It groups by entity type and orders the results by sample count in descending order. " "Note: The LIMIT clause is set to 20 to restrict the number of entity types returned." ) def podbc_sparql_list_entity_types_samples(graph_iri:str="", user:str="", password:str="", dsn:str="") -> str: """ Execute a SPARQL query and return results. Args: graph_iri (Optional[str]=None): Optional graph IRI. user (Optional[str]=None): Optional username. password (Optional[str]=None): Optional password. dsn (Optional[str]=None): Optional dsn name. Returns: str: Results in requested format as string. """ graph_clause = f'GRAPH `iri(??)`' if graph_iri else 'GRAPH ?g' params = [graph_iri] if graph_iri else None query = f""" SELECT * FROM ( SPARQL PREFIX owl: <http://www.w3.org/2002/07/owl#> PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#> PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> SELECT (SAMPLE(?s) AS ?sample), ?slabel, (COUNT(*) AS ?sampleCount), (?o AS ?entityType), ?olabel WHERE {{ {graph_clause} {{ ?s a ?o . OPTIONAL {{?s rdfs:label ?slabel . FILTER (LANG(?slabel) = "en" || LANG(?slabel) = "")}} FILTER (isIRI(?s) && !isBlank(?s)) OPTIONAL {{?o rdfs:label ?olabel . FILTER (LANG(?olabel) = "en" || LANG(?olabel) = "")}} FILTER (isIRI(?o) && !isBlank(?o)) }} }} GROUP BY ?slabel ?o ?olabel ORDER BY DESC(?sampleCount) ?o ?slabel ?olabel LIMIT 20 ) AS results """ return podbc_query_database(query, params=params, user=user, password=password, dsn=dsn) @mcp.tool( name="podbc_sparql_list_ontologies", description="This query retrieves all ontologies in the RDF graph, along with their labels and comments if available." ) def podbc_sparql_list_ontologies(graph_iri:str="", user:str="", password:str="", dsn:str="") -> str: """ Execute a SPARQL query and return results. Args: graph_iri (Optional[str]=None): Optional graph IRI. user (Optional[str]=None): Optional username. password (Optional[str]=None): Optional password. dsn (Optional[str]=None): Optional dsn name. Returns: str: Results in requested format as string. """ graph_clause = f'GRAPH `iri(??)`' if graph_iri else 'GRAPH ?g' params = [graph_iri] if graph_iri else None query = f""" SELECT * FROM ( SPARQL PREFIX owl: <http://www.w3.org/2002/07/owl#> PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#> PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> SELECT ?s, ?label, ?comment WHERE {{ {graph_clause} {{ ?s a owl:Ontology . OPTIONAL {{ ?s rdfs:label ?label . FILTER (LANG(?label) = "en" || LANG(?label) = "") }} OPTIONAL {{ ?s rdfs:comment ?comment . FILTER (LANG(?comment) = "en" || LANG(?comment) = "") }} FILTER (isIRI(?s) && !isBlank(?s)) }} }} LIMIT 100 ) AS x """ return podbc_query_database(query, params=params, user=user, password=password, dsn=dsn) if __name__ == "__main__": mcp.run()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/OpenLinkSoftware/mcp-pyodbc-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server