Skip to main content
Glama

Multi-Agent Communication Platform (MCP)

message_service.py11.1 kB
from typing import List, Dict, Optional, Any from datetime import datetime from mcp_server.models.database import get_db from mcp_server.utils.database import generate_uuid, dict_from_row, get_next_sequence_number, parse_mentions from mcp_server.services.agent_service import validate_agent_in_channel, get_agent_by_username import logging logger = logging.getLogger(__name__) class MessageError(Exception): """Message-related errors.""" pass async def send_message(channel_id: str, agent_id: str, content: str) -> Dict[str, Any]: """Send a message to a channel with @ mention support.""" # Validate content length if not content or len(content) > 4000: raise MessageError("Message content must be 1-4000 characters") # Validate agent is in channel agent = await validate_agent_in_channel(agent_id, channel_id) # Parse mentions mentions = parse_mentions(content) async with get_db() as db: # Validate mentioned users exist in channel for username in mentions: mentioned_agent = await get_agent_by_username(channel_id, username) if not mentioned_agent: raise MessageError(f"Mentioned user @{username} not found in channel") # Get next sequence number sequence_number = await get_next_sequence_number(db, channel_id) message_id = generate_uuid() # Insert message await db.execute( """INSERT INTO messages (message_id, channel_id, agent_id, content, sequence_number) VALUES (?, ?, ?, ?, ?)""", (message_id, channel_id, agent_id, content, sequence_number) ) # Insert mentions for username in mentions: await db.execute( """INSERT INTO message_mentions (message_id, mentioned_username) VALUES (?, ?)""", (message_id, username) ) # Mark as read by sender await db.execute( """INSERT INTO read_status (agent_id, message_id) VALUES (?, ?)""", (agent_id, message_id) ) await db.commit() logger.info(f"Message {message_id} sent by {agent['username']} in channel {channel_id}") return { "message_id": message_id, "timestamp": datetime.utcnow().isoformat() + 'Z', "sequence_number": sequence_number } async def get_new_messages(channel_id: str, agent_id: str, limit: int = 50) -> List[Dict[str, Any]]: """Retrieve unread messages and automatically mark them as read.""" # Validate agent is in channel agent = await validate_agent_in_channel(agent_id, channel_id) async with get_db() as db: # Get unread messages cursor = await db.execute( """SELECT m.*, a.username as sender_username FROM messages m JOIN agents a ON m.agent_id = a.agent_id WHERE m.channel_id = ? AND m.message_id NOT IN ( SELECT message_id FROM read_status WHERE agent_id = ? ) AND m.created_at >= ( SELECT joined_at FROM agents WHERE agent_id = ? ) ORDER BY m.sequence_number ASC LIMIT ?""", (channel_id, agent_id, agent_id, limit) ) messages = [] message_ids = [] async for row in cursor: message_dict = dict_from_row(row) message_ids.append(message_dict['message_id']) # Get mentions mention_cursor = await db.execute( "SELECT mentioned_username FROM message_mentions WHERE message_id = ?", (message_dict['message_id'],) ) mentions = [r['mentioned_username'] async for r in mention_cursor] # Get read_by info read_cursor = await db.execute( """SELECT r.agent_id, a.username, r.read_at FROM read_status r JOIN agents a ON r.agent_id = a.agent_id WHERE r.message_id = ?""", (message_dict['message_id'],) ) read_by = [] async for r in read_cursor: read_by.append({ "agent_id": r['agent_id'], "username": r['username'], "read_at": r['read_at'] }) messages.append({ "message_id": message_dict['message_id'], "sender": { "agent_id": message_dict['agent_id'], "username": message_dict['sender_username'] }, "content": message_dict['content'], "mentions": mentions, "timestamp": message_dict['created_at'], "sequence_number": message_dict['sequence_number'], "read_by": read_by }) # Mark messages as read atomically if message_ids: placeholders = ','.join(['?'] * len(message_ids)) await db.executemany( "INSERT OR IGNORE INTO read_status (agent_id, message_id) VALUES (?, ?)", [(agent_id, mid) for mid in message_ids] ) await db.commit() logger.info(f"Marked {len(message_ids)} messages as read for agent {agent_id}") return messages async def get_message_history(channel_id: str, agent_id: str, limit: int = 50, before_sequence: Optional[int] = None) -> List[Dict[str, Any]]: """Retrieve message history and mark any unread messages as read.""" # Validate agent is in channel agent = await validate_agent_in_channel(agent_id, channel_id) async with get_db() as db: # Build query based on before_sequence if before_sequence: query = """SELECT m.*, a.username as sender_username FROM messages m JOIN agents a ON m.agent_id = a.agent_id WHERE m.channel_id = ? AND m.sequence_number < ? ORDER BY m.sequence_number DESC LIMIT ?""" params = (channel_id, before_sequence, limit) else: query = """SELECT m.*, a.username as sender_username FROM messages m JOIN agents a ON m.agent_id = a.agent_id WHERE m.channel_id = ? ORDER BY m.sequence_number DESC LIMIT ?""" params = (channel_id, limit) cursor = await db.execute(query, params) messages = [] unread_message_ids = [] async for row in cursor: message_dict = dict_from_row(row) # Check if unread read_check = await db.execute( "SELECT 1 FROM read_status WHERE agent_id = ? AND message_id = ?", (agent_id, message_dict['message_id']) ) if not await read_check.fetchone(): unread_message_ids.append(message_dict['message_id']) # Get mentions mention_cursor = await db.execute( "SELECT mentioned_username FROM message_mentions WHERE message_id = ?", (message_dict['message_id'],) ) mentions = [r['mentioned_username'] async for r in mention_cursor] # Get read_by info read_cursor = await db.execute( """SELECT r.agent_id, a.username, r.read_at FROM read_status r JOIN agents a ON r.agent_id = a.agent_id WHERE r.message_id = ?""", (message_dict['message_id'],) ) read_by = [] async for r in read_cursor: read_by.append({ "agent_id": r['agent_id'], "username": r['username'], "read_at": r['read_at'] }) messages.append({ "message_id": message_dict['message_id'], "sender": { "agent_id": message_dict['agent_id'], "username": message_dict['sender_username'] }, "content": message_dict['content'], "mentions": mentions, "timestamp": message_dict['created_at'], "sequence_number": message_dict['sequence_number'], "read_by": read_by }) # Mark unread messages as read if unread_message_ids: await db.executemany( "INSERT OR IGNORE INTO read_status (agent_id, message_id) VALUES (?, ?)", [(agent_id, mid) for mid in unread_message_ids] ) await db.commit() logger.info(f"Marked {len(unread_message_ids)} historical messages as read for agent {agent_id}") # Reverse to get chronological order messages.reverse() return messages async def get_agent_messages(channel_id: str, target_username: str, limit: int = 20) -> List[Dict[str, Any]]: """Get messages from a specific agent.""" # Get target agent target_agent = await get_agent_by_username(channel_id, target_username) if not target_agent: raise MessageError(f"Agent @{target_username} not found in channel") async with get_db() as db: cursor = await db.execute( """SELECT m.*, a.username as sender_username FROM messages m JOIN agents a ON m.agent_id = a.agent_id WHERE m.channel_id = ? AND m.agent_id = ? ORDER BY m.sequence_number DESC LIMIT ?""", (channel_id, target_agent['agent_id'], limit) ) messages = [] async for row in cursor: message_dict = dict_from_row(row) # Get mentions mention_cursor = await db.execute( "SELECT mentioned_username FROM message_mentions WHERE message_id = ?", (message_dict['message_id'],) ) mentions = [r['mentioned_username'] async for r in mention_cursor] messages.append({ "message_id": message_dict['message_id'], "sender": { "agent_id": message_dict['agent_id'], "username": message_dict['sender_username'] }, "content": message_dict['content'], "mentions": mentions, "timestamp": message_dict['created_at'], "sequence_number": message_dict['sequence_number'] }) messages.reverse() return messages

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/thiagovictorino/chat-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server