sqlite-explorer-fastmcp-mcp-server

by hannesrudolph
Verified
import logging from collections.abc import Sequence from functools import lru_cache import subprocess from typing import Any import traceback from dotenv import load_dotenv from mcp.server import Server import threading from mcp.types import ( Tool, TextContent, ImageContent, EmbeddedResource, ) import json from . import gauth from http.server import BaseHTTPRequestHandler,HTTPServer from urllib.parse import ( urlparse, parse_qs, ) class OauthListener(BaseHTTPRequestHandler): def do_GET(self): url = urlparse(self.path) if url.path != "/code": self.send_response(404) self.end_headers() return query = parse_qs(url.query) if "code" not in query: self.send_response(400) self.end_headers() return self.send_response(200) self.end_headers() self.wfile.write("Auth successful! You can close the tab!".encode("utf-8")) self.wfile.flush() storage = {} creds = gauth.get_credentials(authorization_code=query["code"][0], state=storage) t = threading.Thread(target = self.server.shutdown) t.daemon = True t.start() load_dotenv() from . import tools_gmail from . import tools_calendar from . import toolhandler # Load environment variables # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger("mcp-gsuite") def start_auth_flow(user_id: str): auth_url = gauth.get_authorization_url(user_id, state={}) subprocess.Popen(['open', auth_url]) # start server for code callback server_address = ('', 4100) server = HTTPServer(server_address, OauthListener) server.serve_forever() def setup_oauth2(user_id: str): accounts = gauth.get_account_info() if len(accounts) == 0: raise RuntimeError("No accounts specified in .gauth.json") if user_id not in [a.email for a in accounts]: raise RuntimeError(f"Account for email: {user_id} not specified in .gauth.json") credentials = gauth.get_stored_credentials(user_id=user_id) if not credentials: start_auth_flow(user_id=user_id) else: if credentials.access_token_expired: logger.error("credentials expired. try refresh") # this call refreshes access token user_info = gauth.get_user_info(credentials=credentials) #logging.error(f"User info: {json.dumps(user_info)}") gauth.store_credentials(credentials=credentials, user_id=user_id) app = Server("mcp-gsuite") tool_handlers = {} def add_tool_handler(tool_class: toolhandler.ToolHandler): global tool_handlers tool_handlers[tool_class.name] = tool_class def get_tool_handler(name: str) -> toolhandler.ToolHandler | None: if name not in tool_handlers: return None return tool_handlers[name] add_tool_handler(tools_gmail.QueryEmailsToolHandler()) add_tool_handler(tools_gmail.GetEmailByIdToolHandler()) add_tool_handler(tools_gmail.CreateDraftToolHandler()) add_tool_handler(tools_gmail.DeleteDraftToolHandler()) add_tool_handler(tools_gmail.ReplyEmailToolHandler()) add_tool_handler(tools_gmail.GetAttachmentToolHandler()) add_tool_handler(tools_gmail.BulkGetEmailsByIdsToolHandler()) add_tool_handler(tools_gmail.BulkSaveAttachmentsToolHandler()) add_tool_handler(tools_calendar.ListCalendarsToolHandler()) add_tool_handler(tools_calendar.GetCalendarEventsToolHandler()) add_tool_handler(tools_calendar.CreateCalendarEventToolHandler()) add_tool_handler(tools_calendar.DeleteCalendarEventToolHandler()) @app.list_tools() async def list_tools() -> list[Tool]: """List available tools.""" return [th.get_tool_description() for th in tool_handlers.values()] @app.call_tool() async def call_tool(name: str, arguments: Any) -> Sequence[TextContent | ImageContent | EmbeddedResource]: try: if not isinstance(arguments, dict): raise RuntimeError("arguments must be dictionary") if toolhandler.USER_ID_ARG not in arguments: raise RuntimeError("user_id argument is missing in dictionary.") setup_oauth2(user_id=arguments.get(toolhandler.USER_ID_ARG, "")) tool_handler = get_tool_handler(name) if not tool_handler: raise ValueError(f"Unknown tool: {name}") return tool_handler.run_tool(arguments) except Exception as e: logging.error(traceback.format_exc()) logging.error(f"Error during call_tool: str(e)") raise RuntimeError(f"Caught Exception. Error: {str(e)}") async def main(): accounts = gauth.get_account_info() for account in accounts: creds = gauth.get_stored_credentials(user_id=account.email) if creds: logging.info(f"found credentials for {account.email}") from mcp.server.stdio import stdio_server async with stdio_server() as (read_stream, write_stream): await app.run( read_stream, write_stream, app.create_initialization_options() )