server.py•5.1 kB
import logging
from collections.abc import Sequence
from functools import lru_cache
import subprocess
from typing import Any
import traceback
from dotenv import load_dotenv
from mcp.server import Server
import threading
from mcp.types import (
    Tool,
    TextContent,
    ImageContent,
    EmbeddedResource,
)
import json
from . import gauth
from http.server import BaseHTTPRequestHandler,HTTPServer
from urllib.parse import (
    urlparse,
    parse_qs,
)
class OauthListener(BaseHTTPRequestHandler):
    def do_GET(self):
        url = urlparse(self.path)
        if url.path != "/code":
            self.send_response(404)
            self.end_headers()
            return
        query = parse_qs(url.query)
        if "code" not in query:
            self.send_response(400)
            self.end_headers()
            return
        
        self.send_response(200)
        self.end_headers()
        self.wfile.write("Auth successful! You can close the tab!".encode("utf-8"))
        self.wfile.flush()
        storage = {}
        creds = gauth.get_credentials(authorization_code=query["code"][0], state=storage)
        t = threading.Thread(target = self.server.shutdown)
        t.daemon = True
        t.start()
        
load_dotenv()
from . import tools_gmail
from . import tools_calendar
from . import toolhandler
# Load environment variables
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp-gsuite")
def start_auth_flow(user_id: str):
    auth_url = gauth.get_authorization_url(user_id, state={})
    subprocess.Popen(['open', auth_url])
    # start server for code callback
    server_address = ('', 4100)
    server = HTTPServer(server_address, OauthListener)
    server.serve_forever()
def setup_oauth2(user_id: str):
    accounts = gauth.get_account_info()
    if len(accounts) == 0:
        raise RuntimeError("No accounts specified in .gauth.json")
    if user_id not in [a.email for a in accounts]:
        raise RuntimeError(f"Account for email: {user_id} not specified in .gauth.json")
    credentials = gauth.get_stored_credentials(user_id=user_id)
    if not credentials:
        start_auth_flow(user_id=user_id)
    else:
        if credentials.access_token_expired:
            logger.error("credentials expired. try refresh")
        # this call refreshes access token
        user_info = gauth.get_user_info(credentials=credentials)
        #logging.error(f"User info: {json.dumps(user_info)}")
        gauth.store_credentials(credentials=credentials, user_id=user_id)
app = Server("mcp-gsuite")
tool_handlers = {}
def add_tool_handler(tool_class: toolhandler.ToolHandler):
    global tool_handlers
    tool_handlers[tool_class.name] = tool_class
def get_tool_handler(name: str) -> toolhandler.ToolHandler | None:
    if name not in tool_handlers:
        return None
    
    return tool_handlers[name]
add_tool_handler(tools_gmail.QueryEmailsToolHandler())
add_tool_handler(tools_gmail.GetEmailByIdToolHandler())
add_tool_handler(tools_gmail.CreateDraftToolHandler())
add_tool_handler(tools_gmail.DeleteDraftToolHandler())
add_tool_handler(tools_gmail.ReplyEmailToolHandler())
add_tool_handler(tools_gmail.GetAttachmentToolHandler())
add_tool_handler(tools_gmail.BulkGetEmailsByIdsToolHandler())
add_tool_handler(tools_gmail.BulkSaveAttachmentsToolHandler())
add_tool_handler(tools_calendar.ListCalendarsToolHandler())
add_tool_handler(tools_calendar.GetCalendarEventsToolHandler())
add_tool_handler(tools_calendar.CreateCalendarEventToolHandler())
add_tool_handler(tools_calendar.DeleteCalendarEventToolHandler())
@app.list_tools()
async def list_tools() -> list[Tool]:
    """List available tools."""
    return [th.get_tool_description() for th in tool_handlers.values()]
@app.call_tool()
async def call_tool(name: str, arguments: Any) -> Sequence[TextContent | ImageContent | EmbeddedResource]:
    try:        
        if not isinstance(arguments, dict):
            raise RuntimeError("arguments must be dictionary")
        
        if toolhandler.USER_ID_ARG not in arguments:
            raise RuntimeError("user_id argument is missing in dictionary.")
        setup_oauth2(user_id=arguments.get(toolhandler.USER_ID_ARG, ""))
        tool_handler = get_tool_handler(name)
        if not tool_handler:
            raise ValueError(f"Unknown tool: {name}")
        return tool_handler.run_tool(arguments)
    except Exception as e:
        logging.error(traceback.format_exc())
        logging.error(f"Error during call_tool: str(e)")
        raise RuntimeError(f"Caught Exception. Error: {str(e)}")
async def main():
    accounts = gauth.get_account_info()
    for account in accounts:
        creds = gauth.get_stored_credentials(user_id=account.email)
        if creds:
            logging.info(f"found credentials for {account.email}")
    from mcp.server.stdio import stdio_server
    async with stdio_server() as (read_stream, write_stream):
        await app.run(
            read_stream,
            write_stream,
            app.create_initialization_options()
        )