#!/usr/bin/env python3
"""
MCP Talk - Inter-agent messaging server via Model Context Protocol
A lightweight MCP server for real-time messaging between AI agents (Claude, Codex, Gemini).
Messages are stored as JSON files for persistence and visibility.
Usage:
# Run as MCP server (stdio transport)
python server.py
# Or with uvx
uvx mcp_talk
Tools provided:
- send: Send a message to another agent
- chk: Check messages for an agent (shortcut for check)
- check: Check messages for an agent
- ack: Acknowledge/delete a processed message
- broadcast: Send to all agents
- list: List all pending messages (PM visibility)
"""
import json
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import uuid
# MCP SDK imports
try:
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
except ImportError:
print("MCP SDK not installed. Run: pip install mcp", file=sys.stderr)
sys.exit(1)
# Configuration
DEFAULT_QUEUE_DIR = Path.home() / ".mcp_talk" / "q"
BASE_QUEUE_DIR = Path(os.getenv("MCP_TALK_QUEUE", DEFAULT_QUEUE_DIR))
# Known agents (extensible)
KNOWN_AGENTS = {"claude", "codex", "gemini", "devin", "pm", "all"}
def get_queue_dir(namespace: str | None = None) -> Path:
"""Get the queue directory, optionally namespaced."""
if namespace:
# Sanitize namespace to prevent path traversal
safe_ns = "".join(c for c in namespace if c.isalnum() or c in "-_.")
safe_ns = safe_ns.strip(".")
if safe_ns:
return BASE_QUEUE_DIR / safe_ns
return BASE_QUEUE_DIR
# Pagination / preview defaults
DEFAULT_PAGE_SIZE = 5
MAX_PAGE_SIZE = 50
PREVIEW_CHARS = 160
# Cleanup / guardrails
AUTO_CLEAN_MAX_AGE_HOURS = int(os.getenv("MCP_TALK_AUTO_CLEAN_HOURS", "24") or 0)
MAX_MESSAGE_CHARS = int(os.getenv("MCP_TALK_MAX_MESSAGE_CHARS", "2000") or 0)
def ensure_queue_dir(namespace: str | None = None):
"""Ensure the message queue directory exists."""
get_queue_dir(namespace).mkdir(parents=True, exist_ok=True)
def generate_message_id() -> str:
"""Generate a unique message ID."""
ts = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
short_uuid = uuid.uuid4().hex[:8]
return f"{ts}_{short_uuid}"
def send_message(to_agent: str, from_agent: str, message: str, msg_type: str = "direct", namespace: str | None = None) -> dict:
"""Send a message to an agent."""
ensure_queue_dir(namespace)
queue_dir = get_queue_dir(namespace)
if MAX_MESSAGE_CHARS and len(message) > MAX_MESSAGE_CHARS:
raise ValueError(
f"Message too long: {len(message)} chars exceeds limit of {MAX_MESSAGE_CHARS}. "
"Consider sharing a file path or repo link instead."
)
msg_id = generate_message_id()
msg_data = {
"id": msg_id,
"from": from_agent.lower(),
"to": to_agent.lower(),
"type": msg_type,
"created": datetime.now(timezone.utc).isoformat(),
"message": message,
}
if namespace:
msg_data["namespace"] = namespace
filename = f"{msg_id}_{from_agent.lower()}_to_{to_agent.lower()}.json"
filepath = queue_dir / filename
filepath.write_text(json.dumps(msg_data, indent=2))
result = {"status": "sent", "id": msg_id, "to": to_agent, "file": filename}
if namespace:
result["namespace"] = namespace
return result
def get_messages(for_agent: str, include_broadcasts: bool = True, namespace: str | None = None) -> list[dict]:
"""Get all pending messages for an agent."""
ensure_queue_dir(namespace)
queue_dir = get_queue_dir(namespace)
_auto_clean_queue(namespace)
messages = []
agent_lower = for_agent.lower()
for filepath in sorted(queue_dir.glob("*.json")):
try:
data = json.loads(filepath.read_text())
to = data.get("to", "").lower()
# Include if addressed to this agent or broadcast
if to == agent_lower or (include_broadcasts and to == "all"):
data["_file"] = filepath.name
messages.append(data)
except (json.JSONDecodeError, OSError):
continue
return messages
def ack_message(message_id: str, namespace: str | None = None) -> dict:
"""Acknowledge and delete a message by ID."""
ensure_queue_dir(namespace)
queue_dir = get_queue_dir(namespace)
for filepath in queue_dir.glob("*.json"):
try:
data = json.loads(filepath.read_text())
if data.get("id") == message_id:
filepath.unlink()
return {"status": "deleted", "id": message_id}
except (json.JSONDecodeError, OSError):
continue
return {"status": "not_found", "id": message_id}
def reply_to_message(message_id: str, from_agent: str, reply_text: str, namespace: str | None = None) -> dict:
"""Reply to a message - sends response back to original sender."""
ensure_queue_dir(namespace)
queue_dir = get_queue_dir(namespace)
# Find the original message to get sender info
original = None
for filepath in queue_dir.glob("*.json"):
try:
data = json.loads(filepath.read_text())
if data.get("id") == message_id:
original = data
# Delete original after reading
filepath.unlink()
break
except (json.JSONDecodeError, OSError):
continue
if not original:
return {"status": "not_found", "id": message_id}
# Send reply back to original sender (in same namespace)
original_sender = original.get("from", "unknown")
try:
result = send_message(
to_agent=original_sender,
from_agent=from_agent,
message=reply_text,
msg_type="reply",
namespace=namespace
)
except ValueError as exc:
return {"status": "error", "error": str(exc), "reply_to": message_id}
result["reply_to"] = message_id
result["original_from"] = original_sender
return result
def list_all_messages(namespace: str | None = None) -> list[dict]:
"""List all messages in the queue (PM visibility)."""
ensure_queue_dir(namespace)
queue_dir = get_queue_dir(namespace)
_auto_clean_queue(namespace)
messages = []
for filepath in sorted(queue_dir.glob("*.json")):
try:
data = json.loads(filepath.read_text())
data["_file"] = filepath.name
messages.append(data)
except (json.JSONDecodeError, OSError):
continue
return messages
def clear_old_messages(max_age_hours: int = 24, namespace: str | None = None) -> dict:
"""Clear messages older than max_age_hours."""
ensure_queue_dir(namespace)
queue_dir = get_queue_dir(namespace)
now = datetime.now(timezone.utc)
deleted = 0
for filepath in queue_dir.glob("*.json"):
try:
data = json.loads(filepath.read_text())
created = datetime.fromisoformat(data.get("created", "").replace("Z", "+00:00"))
age_hours = (now - created).total_seconds() / 3600
if age_hours > max_age_hours:
filepath.unlink()
deleted += 1
except (json.JSONDecodeError, OSError, ValueError):
continue
return {"status": "cleaned", "deleted": deleted}
def _auto_clean_queue(namespace: str | None = None):
"""Remove stale messages if auto-clean is enabled."""
if AUTO_CLEAN_MAX_AGE_HOURS > 0:
clear_old_messages(AUTO_CLEAN_MAX_AGE_HOURS, namespace)
def _normalize_limit(limit_value: Any) -> int:
"""Clamp limit to a reasonable range."""
if limit_value is None:
return DEFAULT_PAGE_SIZE
try:
limit = int(limit_value)
except (TypeError, ValueError):
return DEFAULT_PAGE_SIZE
if limit <= 0:
return DEFAULT_PAGE_SIZE
return min(limit, MAX_PAGE_SIZE)
def _normalize_offset(offset_value: Any) -> int:
"""Ensure offset is a non-negative integer."""
try:
offset = int(offset_value or 0)
except (TypeError, ValueError):
return 0
return max(offset, 0)
def _preview(text: str, limit: int = PREVIEW_CHARS) -> str:
"""Return a truncated preview of the message body."""
if len(text) <= limit:
return text
return text[:limit].rstrip() + "…"
def _render_messages(
messages: list[dict],
total: int,
offset: int,
limit: int,
include_body: bool,
header: str,
) -> str:
"""Create a concise textual representation of messages."""
if not messages:
return header + "\n(No messages in this range.)"
lines = [
f"{header}",
f"Showing {len(messages)} of {total} pending | offset {offset} | limit {limit}",
]
for idx, msg in enumerate(messages, start=offset + 1):
created = msg.get("created", "?")
msg_type = msg.get("type", "direct")
line = (
f"{idx}. {msg.get('id', 'unknown')} | {created} | "
f"{msg.get('from', 'unknown')} -> {msg.get('to', 'unknown')} ({msg_type})"
)
lines.append(line)
body = msg.get("message", "")
if include_body:
lines.append(f" body ({len(body)} chars): {body}")
else:
preview = _preview(body)
lines.append(f" preview ({len(body)} chars): {preview}")
lines.append(f" file: {msg.get('_file', 'n/a')}")
remaining = total - (offset + len(messages))
if remaining > 0:
lines.append(
f"… {remaining} more message(s) available. "
f"Call again with offset={offset + len(messages)} to continue."
)
if not include_body:
lines.append("Set include_body=true to see full text for this page.")
return "\n".join(lines)
# MCP Server Setup
server = Server("mcp-talk")
@server.list_tools()
async def list_tools() -> list[Tool]:
"""List available tools."""
return [
Tool(
name="send",
description="Send a message to another agent. Usage: send(to='claude', message='Hello!', namespace='myproject')",
inputSchema={
"type": "object",
"properties": {
"to": {
"type": "string",
"description": "Recipient agent (claude, codex, gemini, devin, pm, all)"
},
"message": {
"type": "string",
"description": "Message content"
},
"from_agent": {
"type": "string",
"description": "Sender name (optional, defaults to 'unknown')"
},
"namespace": {
"type": "string",
"description": "Project namespace for message isolation (optional, defaults to shared queue)"
}
},
"required": ["to", "message"]
}
),
Tool(
name="chk",
description="Check messages for an agent (shortcut). Usage: chk(agent='claude', namespace='myproject')",
inputSchema={
"type": "object",
"properties": {
"agent": {
"type": "string",
"description": "Agent name to check messages for"
},
"namespace": {
"type": "string",
"description": "Project namespace for message isolation (optional, defaults to shared queue)"
},
"limit": {
"type": "integer",
"description": f"Maximum number of messages to return (default: {DEFAULT_PAGE_SIZE})"
},
"offset": {
"type": "integer",
"description": "Start at this message index (default: 0)"
},
"include_body": {
"type": "boolean",
"description": "Include full message text (default: false)"
},
"auto_ack": {
"type": "boolean",
"description": "Delete messages after returning them (default: false)"
}
},
"required": ["agent"]
}
),
Tool(
name="check",
description="Check messages for an agent. Usage: check(agent='claude', namespace='myproject')",
inputSchema={
"type": "object",
"properties": {
"agent": {
"type": "string",
"description": "Agent name to check messages for"
},
"namespace": {
"type": "string",
"description": "Project namespace for message isolation (optional, defaults to shared queue)"
},
"include_broadcasts": {
"type": "boolean",
"description": "Include broadcast messages (default: true)"
},
"limit": {
"type": "integer",
"description": f"Maximum number of messages to return (default: {DEFAULT_PAGE_SIZE})"
},
"offset": {
"type": "integer",
"description": "Start at this message index (default: 0)"
},
"include_body": {
"type": "boolean",
"description": "Include full message text (default: false)"
},
"auto_ack": {
"type": "boolean",
"description": "Delete messages after returning them (default: false)"
}
},
"required": ["agent"]
}
),
Tool(
name="ack",
description="Acknowledge/delete a message after processing. Usage: ack(id='msg_id', namespace='myproject')",
inputSchema={
"type": "object",
"properties": {
"id": {
"type": "string",
"description": "Message ID to acknowledge"
},
"namespace": {
"type": "string",
"description": "Project namespace for message isolation (optional, defaults to shared queue)"
}
},
"required": ["id"]
}
),
Tool(
name="broadcast",
description="Send a message to all agents. Usage: broadcast(message='Team update', namespace='myproject')",
inputSchema={
"type": "object",
"properties": {
"message": {
"type": "string",
"description": "Message to broadcast"
},
"from_agent": {
"type": "string",
"description": "Sender name (optional)"
},
"namespace": {
"type": "string",
"description": "Project namespace for message isolation (optional, defaults to shared queue)"
}
},
"required": ["message"]
}
),
Tool(
name="list",
description="List all pending messages in the queue (PM visibility). Usage: list(namespace='myproject')",
inputSchema={
"type": "object",
"properties": {
"namespace": {
"type": "string",
"description": "Project namespace for message isolation (optional, defaults to shared queue)"
},
"limit": {
"type": "integer",
"description": f"Maximum number of messages to return (default: {DEFAULT_PAGE_SIZE})"
},
"offset": {
"type": "integer",
"description": "Start at this message index (default: 0)"
},
"include_body": {
"type": "boolean",
"description": "Include full message text (default: false)"
}
},
}
),
Tool(
name="clean",
description="Clean up old messages. Usage: clean(hours=24, namespace='myproject')",
inputSchema={
"type": "object",
"properties": {
"hours": {
"type": "integer",
"description": "Delete messages older than this many hours (default: 24)"
},
"namespace": {
"type": "string",
"description": "Project namespace for message isolation (optional, defaults to shared queue)"
}
}
}
),
Tool(
name="reply",
description="Reply to a message (acks original and sends response to sender). Usage: reply(id='msg_id', message='Got it!', namespace='myproject')",
inputSchema={
"type": "object",
"properties": {
"id": {
"type": "string",
"description": "Message ID to reply to"
},
"message": {
"type": "string",
"description": "Reply message content"
},
"from_agent": {
"type": "string",
"description": "Your agent name (optional)"
},
"namespace": {
"type": "string",
"description": "Project namespace for message isolation (optional, defaults to shared queue)"
}
},
"required": ["id", "message"]
}
),
]
@server.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]:
"""Handle tool calls."""
namespace = arguments.get("namespace")
if name == "send":
to = arguments.get("to", "")
message = arguments.get("message", "")
from_agent = arguments.get("from_agent", "unknown")
try:
result = send_message(to, from_agent, message, namespace=namespace)
except ValueError as exc:
result = {"status": "error", "error": str(exc)}
elif name in ("chk", "check"):
agent = arguments.get("agent", "")
include_broadcasts = arguments.get("include_broadcasts", True)
include_body = bool(arguments.get("include_body", False))
limit = _normalize_limit(arguments.get("limit"))
offset = _normalize_offset(arguments.get("offset"))
auto_ack = bool(arguments.get("auto_ack", False))
messages = get_messages(agent, include_broadcasts, namespace=namespace)
total = len(messages)
page = messages[offset:offset + limit]
ns_label = f" [{namespace}]" if namespace else ""
header = f"Messages for {agent or 'unknown'}{ns_label}"
rendered = _render_messages(page, total, offset, limit, include_body, header)
if auto_ack and page:
for msg in page:
msg_id = msg.get("id")
if msg_id:
ack_message(msg_id, namespace=namespace)
rendered += "\nAuto-acknowledged returned messages."
result = {"status": "ok", "text": rendered}
elif name == "ack":
msg_id = arguments.get("id", "")
result = ack_message(msg_id, namespace=namespace)
elif name == "broadcast":
message = arguments.get("message", "")
from_agent = arguments.get("from_agent", "unknown")
try:
result = send_message("all", from_agent, message, msg_type="broadcast", namespace=namespace)
except ValueError as exc:
result = {"status": "error", "error": str(exc)}
elif name == "list":
include_body = bool(arguments.get("include_body", False))
limit = _normalize_limit(arguments.get("limit"))
offset = _normalize_offset(arguments.get("offset"))
messages = list_all_messages(namespace=namespace)
total = len(messages)
page = messages[offset:offset + limit]
ns_label = f" [{namespace}]" if namespace else ""
header = f"All pending messages{ns_label}"
rendered = _render_messages(page, total, offset, limit, include_body, header)
result = {"status": "ok", "text": rendered}
elif name == "clean":
hours = arguments.get("hours", 24)
result = clear_old_messages(hours, namespace=namespace)
elif name == "reply":
msg_id = arguments.get("id", "")
message = arguments.get("message", "")
from_agent = arguments.get("from_agent", "unknown")
result = reply_to_message(msg_id, from_agent, message, namespace=namespace)
else:
result = {"error": f"Unknown tool: {name}"}
if isinstance(result, dict) and "text" in result and result.keys() <= {"status", "text"}:
output_text = result["text"]
else:
output_text = json.dumps(result, indent=2)
return [TextContent(type="text", text=output_text)]
async def _async_main():
"""Run the MCP server (async)."""
async with stdio_server() as (read_stream, write_stream):
await server.run(read_stream, write_stream, server.create_initialization_options())
def main():
"""Entry point for mcp-talk command."""
import asyncio
asyncio.run(_async_main())
if __name__ == "__main__":
main()