Skip to main content
Glama
aech-ai

Teams Messenger MCP App

by aech-ai
server.py15.1 kB
""" Ensure the project root is on sys.path so imports like 'teams' work correctly when this script is executed as a standalone file. """ import os import sys # Add project root (parent directory of this file) to Python path sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir))) from mcp.server.fastmcp import FastMCP, Context from teams.graph import GraphClient from db.schema import init_db, insert_chat, insert_message import asyncio import time import json from config import POLL_INTERVAL, DEMO_MODE # Numerical library and HTTP client import numpy as np import httpx """ Embedding backend: always use OpenAI API for embeddings. """ mcp = FastMCP("Teams Messenger MCP App", log_level="DEBUG") graph = GraphClient() # Initialize database connection only when not in demo mode to avoid lock conflicts db_con = init_db() if not DEMO_MODE else None print(f"mcp_server: FastMCP instance created, persistence={'enabled' if db_con else 'disabled'}", file=sys.stderr) import threading # Lazy embedder instance _embedder = None _embedder_lock = threading.Lock() class OpenAIEmbedder: """Embedder using OpenAI API for embeddings.""" def __init__(self, model: str = "text-embedding-ada-002"): self.model = model self.api_key = os.getenv("OPENAI_API_KEY") def encode(self, texts: list[str]) -> np.ndarray: url = "https://api.openai.com/v1/embeddings" headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"} payload = {"input": texts, "model": self.model} resp = httpx.post(url, json=payload, headers=headers, timeout=30) resp.raise_for_status() data = resp.json().get("data", []) embeddings = [item.get("embedding", []) for item in data] return np.array(embeddings, dtype=np.float32) def get_embedder(): """Return a singleton OpenAI embedder.""" global _embedder if _embedder is None: with _embedder_lock: if _embedder is None: _embedder = OpenAIEmbedder() return _embedder # Track last message time per chat last_message_time = {} # Track chat updates last_chat_updates = {} # Error handling decorator def handle_api_errors(func): """Decorator to handle API errors gracefully.""" import functools @functools.wraps(func) async def wrapper(*args, **kwargs): try: return await func(*args, **kwargs) except Exception as e: error_msg = str(e) if "401" in error_msg or "unauthorized" in error_msg.lower(): return {"error": "Authentication error. Please login again or refresh token.", "details": error_msg} elif "404" in error_msg or "not found" in error_msg.lower(): return {"error": "Resource not found. Check the ID or permissions.", "details": error_msg} elif "429" in error_msg or "too many requests" in error_msg.lower(): return {"error": "Rate limit exceeded. Please try again later.", "details": error_msg} else: return {"error": f"API error: {error_msg}", "details": error_msg} return wrapper @mcp.tool() @handle_api_errors async def list_chats() -> list: """List all Teams chats the agent is a member of and store in DuckDB.""" print("mcp_server: tool list_chats invoked", file=sys.stderr) chats = await graph.list_chats() for chat in chats: # Store chat in DB if persistence enabled if db_con: insert_chat(db_con, chat) return chats @mcp.resource("mcp://chats") def all_chats(): """Live resource representing all chats the agent is a member of. Subscribe to receive updates when chats are created or modified.""" # This resource is updated by the polling loop return [] @mcp.tool() @handle_api_errors async def get_messages(chat_id: str) -> list: """Get recent messages from a specified chat, store in DuckDB with embeddings.""" if not chat_id: return {"error": "Chat ID is required"} messages = await graph.get_messages(chat_id) for msg in messages: msg["chatId"] = chat_id content = msg["body"].get("content", "") embedding = None # Only generate embeddings if content is non-empty and embedder is available if content.strip(): try: ed = get_embedder() if ed: emb_arr = ed.encode([content]) # take first vector, cast and serialize embedding = emb_arr[0].astype(np.float32).tobytes() except Exception as e: import sys print(f"Error generating embedding: {e}", file=sys.stderr) # Store message in DB if persistence enabled if db_con: insert_message(db_con, msg, embedding) return messages @mcp.tool() @handle_api_errors async def send_message(chat_id: str, text: str) -> dict: """Send a message to a specified chat.""" if not chat_id: return {"error": "Chat ID is required"} if not text or not text.strip(): return {"error": "Message text is required"} return await graph.send_message(chat_id, text) @mcp.tool() def search_messages(query: str, mode: str = "hybrid", top_k: int = 10) -> list: """Hybrid search over messages: mode can be 'bm25', 'vector', or 'hybrid'.""" if not query: return {"error": "Search query is required"} try: # Check for valid mode if mode not in ["bm25", "vector", "hybrid"]: return {"error": f"Invalid search mode: {mode}. Use 'bm25', 'vector', or 'hybrid'."} results = [] if mode == "bm25": sql = """ SELECT id, chat_id, sender_name, content, created, rank FROM ( SELECT *, bm25(messages_fts) AS rank FROM messages_fts WHERE messages_fts MATCH ? ORDER BY rank LIMIT ? ) JOIN messages ON messages.id = messages_fts.rowid """ results = db_con.execute(sql, [query, top_k]).fetchall() elif mode == "vector": # vector search: compute embedding lazily ed = get_embedder() q_emb = None if ed: q_arr = ed.encode([query]) q_emb = q_arr[0].astype(np.float32).tobytes() sql = """ SELECT id, chat_id, sender_name, content, created, vector_l2_distance(embedding, ?) AS dist FROM messages WHERE embedding IS NOT NULL ORDER BY dist ASC LIMIT ? """ results = db_con.execute(sql, [q_emb, top_k]).fetchall() else: # hybrid # First get BM25 results sql = """ SELECT messages.id, chat_id, sender_name, content, created, bm25(messages_fts) AS rank FROM messages_fts JOIN messages ON messages.id = messages_fts.rowid WHERE messages_fts MATCH ? ORDER BY rank LIMIT ? """ bm25_results = db_con.execute(sql, [query, top_k * 2]).fetchall() if not bm25_results: return [] # Then re-rank using vector similarity if embedder available ed = get_embedder() if ed: texts = [row[3] for row in bm25_results] embs = ed.encode(texts) q_arr = ed.encode([query]) q_vec = q_arr[0] dists = np.linalg.norm(embs - q_vec, axis=1) sorted_idx = np.argsort(dists)[:top_k] results = [bm25_results[i] for i in sorted_idx] else: # fallback to BM25 ordering results = bm25_results[:top_k] # Format results for return return [ { "id": row[0], "chat_id": row[1], "sender_name": row[2], "content": row[3], "created": row[4], "score": row[5], } for row in results ] except Exception as e: return {"error": f"Search error: {str(e)}"} @mcp.tool() def search_messages_llm_rerank(query: str, top_k: int = 10, ctx: Context = None) -> list: """Advanced search with LLM reranking: first retrieves candidates with hybrid search, then optionally uses LLM to rerank results based on query relevance.""" # Get initial candidates using hybrid search candidates = search_messages(query, mode="hybrid", top_k=top_k * 2) if not candidates or not ctx: return candidates[:top_k] # Simulate LLM reranking with locally available information # In a real implementation, this would call a deployed LLM for reranking # For now, we'll use a simple heuristic based on word overlap query_words = set(query.lower().split()) for candidate in candidates: content = candidate["content"].lower() word_overlap = sum(1 for word in query_words if word in content) candidate["llm_score"] = word_overlap / len(query_words) if query_words else 0 # Sort by reranked score candidates.sort(key=lambda x: x["llm_score"], reverse=True) return candidates[:top_k] # Connect to the IR server via simple HTTP requests (since MCP client in container isn't working properly) import os import httpx # Direct HTTP connection to simulated API endpoint IR_SERVER_HOST = os.environ.get("IR_SERVER_HOST", "ir_server") IR_SERVER_PORT = os.environ.get("IR_SERVER_PORT", "8090") IR_SERVER_URL = f"http://{IR_SERVER_HOST}:{IR_SERVER_PORT}" # IR integration tools that use HTTP requests @mcp.tool() async def ir_search(query: str, search_type: str = "hybrid", limit: int = 10) -> dict: """Perform information retrieval search using the IR server.""" import sys print(f"Making HTTP request to IR server: {IR_SERVER_URL}/api/tools/search", file=sys.stderr) try: async with httpx.AsyncClient() as client: response = await client.post( f"{IR_SERVER_URL}/api/tools/search", json={ "query": query, "search_type": search_type, "limit": limit }, timeout=30.0 ) if response.status_code == 200: result = response.json() return result.get("result", {"error": "No result returned", "results": []}) else: return { "error": f"IR server returned status code {response.status_code}", "results": [] } except Exception as e: print(f"Error calling IR search: {e}", file=sys.stderr) return {"error": str(e), "results": []} @mcp.resource("mcp://messages/incoming") def incoming_messages(): """Live resource for incoming messages (updated by polling). Subscribe to receive real-time notifications when new messages arrive in any chat.""" # This resource is updated by the polling loop return [] async def poll_new_messages(): """Background task that polls for new messages and chat updates.""" while True: try: chats = await graph.list_chats() # Check for chat updates for chat in chats: chat_id = chat["id"] last_updated = chat.get("lastUpdatedDateTime") # Detect chat updates (new or modified chats) if last_updated and (chat_id not in last_chat_updates or last_updated > last_chat_updates[chat_id]): # Store chat update in DB if persistence enabled if db_con: insert_chat(db_con, chat) last_chat_updates[chat_id] = last_updated # Emit event on chats resource await mcp.emit_resource_event("mcp://chats", chat) # Check for new messages in each chat for chat in chats: chat_id = chat["id"] messages = await graph.get_messages(chat_id) # Only process new messages for msg in messages: msg["chatId"] = chat_id created = msg.get("createdDateTime") if not created: continue if chat_id not in last_message_time or created > last_message_time[chat_id]: content = msg["body"].get("content", "") embedding = None # Only generate embeddings if model is loaded and content is non-empty if content.strip(): try: ed = get_embedder() if ed: emb_arr = ed.encode([content]) embedding = emb_arr[0].astype(np.float32).tobytes() except Exception as e: import sys print(f"Error generating embedding during polling: {e}", file=sys.stderr) # Store incoming message in DB if persistence enabled if db_con: insert_message(db_con, msg, embedding) last_message_time[chat_id] = created # Emit event on messages/incoming resource await mcp.emit_resource_event("mcp://messages/incoming", msg) except Exception as e: import sys error_msg = str(e) if "401" in error_msg or "unauthorized" in error_msg.lower(): print(f"Authentication error during polling: {e}", file=sys.stderr) # Wait longer before retrying on auth errors (could be token issue) await asyncio.sleep(POLL_INTERVAL * 2) else: print(f"Polling error: {e}", file=sys.stderr) await asyncio.sleep(POLL_INTERVAL) continue await asyncio.sleep(POLL_INTERVAL) @mcp.tool() @handle_api_errors async def create_chat(user_id_or_email: str) -> dict: """Create a 1:1 chat with a user by email or user ID.""" if not user_id_or_email: return {"error": "User ID or email is required"} chat = await graph.create_chat(user_id_or_email) # Store new chat in DuckDB if persistence enabled if db_con: insert_chat(db_con, chat) # Also emit as event on chats resource await mcp.emit_resource_event("mcp://chats", chat) return chat # TODO: Add Teams integration, tools, resources, and events if __name__ == "__main__": mcp.run()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aech-ai/mcp-teams'

If you have feedback or need assistance with the MCP directory API, please join our Discord server