We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/Lyellr88/marm-mcp'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""Complete WebSocket handlers for all MCP protocol methods."""
from fastapi import WebSocket
from typing import Dict, Any
import sqlite3
import logging
from datetime import datetime, timezone
# Setup logging for security error tracking
logger = logging.getLogger(__name__)
from core.memory import memory
from core.events import events
from core.websocket_manager import websocket_manager as ws_manager
from core.response_limiter import MCPResponseLimiter
from utils.helpers import read_protocol_file
# ===== MEMORY HANDLERS =====
async def handle_smart_recall(websocket: WebSocket, client_id: str, message: Dict[str, Any]):
"""Handle smart recall requests via WebSocket"""
try:
query = message.get("params", {}).get("query", "")
session_name = message.get("params", {}).get("session_name", ws_manager.get_client_session(client_id))
search_all = message.get("params", {}).get("search_all", False)
# Perform the search
results = await memory.recall_similar(query, session_name if not search_all else None)
response = {
"jsonrpc": "2.0",
"id": message.get("id"),
"result": {
"memories": results,
"count": len(results)
}
}
await ws_manager.send_personal_message(response, client_id)
except Exception as e:
response = {
"jsonrpc": "2.0",
"id": message.get("id"),
"error": {
"code": -32603,
"message": "Request processing failed"
}
}
await ws_manager.send_personal_message(response, client_id)
async def handle_contextual_log(websocket: WebSocket, client_id: str, message: Dict[str, Any]):
"""Handle contextual log requests via WebSocket"""
try:
params = message.get("params", {})
content = params.get("content", "")
session_name = params.get("session_name", ws_manager.get_client_session(client_id))
if not content:
raise ValueError("content parameter is required")
# Store memory with contextual processing
memory_id = await memory.store_memory(content, session_name)
response = {
"jsonrpc": "2.0",
"id": message.get("id"),
"result": {
"status": "success",
"message": f"π§ Memory stored with contextual processing",
"memory_id": memory_id,
"session_name": session_name,
"content_length": len(content)
}
}
await ws_manager.send_personal_message(response, client_id)
except Exception as e:
response = {
"jsonrpc": "2.0",
"id": message.get("id"),
"error": {
"code": -32603,
"message": "Request processing failed"
}
}
await ws_manager.send_personal_message(response, client_id)
# ===== SESSION HANDLERS =====
async def handle_start(websocket: WebSocket, client_id: str, message: Dict[str, Any]):
"""Handle MARM start requests via WebSocket"""
try:
params = message.get("params", {})
session_name = params.get("session_name", f"ws_session_{client_id}")
with memory.get_connection() as conn:
conn.execute('''
INSERT OR REPLACE INTO sessions (session_name, marm_active, last_accessed)
VALUES (?, TRUE, ?)
''', (session_name, datetime.now(timezone.utc).isoformat()))
conn.commit()
# Read the current protocol from file
protocol_content = await read_protocol_file()
await events.emit('marm_started', {'session': session_name})
response = {
"jsonrpc": "2.0",
"id": message.get("id"),
"result": {
"status": "success",
"message": f"π MARM protocol activated for session '{session_name}'",
"session_name": session_name,
"marm_active": True,
"protocol_content": protocol_content,
"instructions": "The complete MARM protocol documentation has been loaded and is available for reference."
}
}
await ws_manager.send_personal_message(response, client_id)
except Exception as e:
response = {
"jsonrpc": "2.0",
"id": message.get("id"),
"error": {
"code": -32603,
"message": "Request processing failed"
}
}
await ws_manager.send_personal_message(response, client_id)
async def handle_refresh(websocket: WebSocket, client_id: str, message: Dict[str, Any]):
"""Handle MARM refresh requests via WebSocket"""
try:
params = message.get("params", {})
session_name = params.get("session_name", ws_manager.get_client_session(client_id))
with memory.get_connection() as conn:
conn.execute('''
UPDATE sessions SET last_accessed = ? WHERE session_name = ?
''', (datetime.now(timezone.utc).isoformat(), session_name))
conn.commit()
# Read the current protocol from file to reaffirm adherence
protocol_content = await read_protocol_file()
await events.emit('marm_refreshed', {'session': session_name})
response = {
"jsonrpc": "2.0",
"id": message.get("id"),
"result": {
"status": "success",
"message": f"π MARM session '{session_name}' refreshed - protocol adherence reaffirmed",
"session_name": session_name,
"protocol_content": protocol_content,
"instructions": "Protocol documentation refreshed. Please review the current MARM protocol specifications above."
}
}
await ws_manager.send_personal_message(response, client_id)
except Exception as e:
response = {
"jsonrpc": "2.0",
"id": message.get("id"),
"error": {
"code": -32603,
"message": "Request processing failed"
}
}
await ws_manager.send_personal_message(response, client_id)
# ===== LOGGING HANDLERS =====
async def handle_log_entry(websocket: WebSocket, client_id: str, message: Dict[str, Any]):
"""Handle log entry requests via WebSocket"""
try:
params = message.get("params", {})
session_name = params.get("session_name", ws_manager.get_client_session(client_id))
entry = params.get("entry", "")
if not entry:
raise ValueError("Entry parameter is required")
# Log the entry
memory_id = await memory.store_memory(entry, session_name)
response = {
"jsonrpc": "2.0",
"id": message.get("id"),
"result": {
"id": memory_id,
"session_name": session_name,
"timestamp": "current_time"
}
}
await ws_manager.send_personal_message(response, client_id)
except Exception as e:
response = {
"jsonrpc": "2.0",
"id": message.get("id"),
"error": {
"code": -32603,
"message": "Request processing failed"
}
}
await ws_manager.send_personal_message(response, client_id)
async def handle_log_session(websocket: WebSocket, client_id: str, message: Dict[str, Any]):
"""Handle log session requests via WebSocket"""
try:
params = message.get("params", {})
session_name = params.get("session_name", f"ws_session_{client_id}")
# Create the session if it doesn't exist
with memory.get_connection() as conn:
conn.execute('''
INSERT OR IGNORE INTO sessions (session_name, marm_active, last_accessed)
VALUES (?, FALSE, ?)
''', (session_name, datetime.now(timezone.utc).isoformat()))
conn.commit()
await events.emit('log_session_created', {'session': session_name})
response = {
"jsonrpc": "2.0",
"id": message.get("id"),
"result": {
"status": "success",
"message": f"π Log session '{session_name}' created and activated",
"session_name": session_name,
"instructions": "Session is ready for logging entries. Use log_entry to add content."
}
}
await ws_manager.send_personal_message(response, client_id)
except Exception as e:
response = {
"jsonrpc": "2.0",
"id": message.get("id"),
"error": {
"code": -32603,
"message": "Request processing failed"
}
}
await ws_manager.send_personal_message(response, client_id)
async def handle_log_show(websocket: WebSocket, client_id: str, message: Dict[str, Any]):
"""Handle log show requests via WebSocket"""
try:
params = message.get("params", {})
session_name = params.get("session_name")
limit = params.get("limit", 20)
with memory.get_connection() as conn:
if session_name:
# Show specific session
cursor = conn.execute('''
SELECT entry_date, topic, summary, full_entry
FROM log_entries WHERE session_name = ?
ORDER BY entry_date DESC
LIMIT ?
''', (session_name, limit))
entries = cursor.fetchall()
response_data = {
"status": "success",
"session_name": session_name,
"entries": [{"date": e[0], "topic": e[1], "summary": e[2], "content": e[3]} for e in entries],
"entry_count": len(entries)
}
else:
# Show all sessions
cursor = conn.execute('''
SELECT DISTINCT session_name, COUNT(*) as entry_count,
MAX(entry_date) as last_entry
FROM log_entries
GROUP BY session_name
ORDER BY last_entry DESC
LIMIT ?
''', (limit,))
sessions = cursor.fetchall()
response_data = {
"status": "success",
"sessions": [{"name": s[0], "entry_count": s[1], "last_entry": s[2]} for s in sessions],
"session_count": len(sessions)
}
response = {
"jsonrpc": "2.0",
"id": message.get("id"),
"result": response_data
}
await ws_manager.send_personal_message(response, client_id)
except Exception as e:
response = {
"jsonrpc": "2.0",
"id": message.get("id"),
"error": {
"code": -32603,
"message": "Request processing failed"
}
}
await ws_manager.send_personal_message(response, client_id)
async def handle_log_delete(websocket: WebSocket, client_id: str, message: Dict[str, Any]):
"""Handle log delete requests via WebSocket"""
try:
params = message.get("params", {})
session_name = params.get("session_name")
entry_id = params.get("entry_id")
if not session_name:
raise ValueError("session_name parameter is required")
with memory.get_connection() as conn:
if entry_id:
# Delete specific entry
cursor = conn.execute('''
DELETE FROM log_entries WHERE session_name = ? AND id = ?
''', (session_name, entry_id))
deleted = cursor.rowcount
else:
# Delete entire session
cursor = conn.execute('''
DELETE FROM log_entries WHERE session_name = ?
''', (session_name,))
deleted = cursor.rowcount
# Also delete session record
conn.execute('''
DELETE FROM sessions WHERE session_name = ?
''', (session_name,))
conn.commit()
response = {
"jsonrpc": "2.0",
"id": message.get("id"),
"result": {
"status": "success" if deleted > 0 else "not_found",
"message": f"ποΈ Deleted {deleted} entries" if deleted > 0 else "No entries found to delete",
"deleted_count": deleted
}
}
await ws_manager.send_personal_message(response, client_id)
except Exception as e:
response = {
"jsonrpc": "2.0",
"id": message.get("id"),
"error": {
"code": -32603,
"message": "Request processing failed"
}
}
await ws_manager.send_personal_message(response, client_id)
# ===== NOTEBOOK HANDLERS =====
async def handle_notebook_add(websocket: WebSocket, client_id: str, message: Dict[str, Any]):
"""Handle notebook add requests via WebSocket"""
try:
params = message.get("params", {})
name = params.get("name", "")
data = params.get("data", "")
if not name or not data:
raise ValueError("Both name and data parameters are required")
# Generate embedding if available
embedding_bytes = None
if memory.encoder:
try:
embedding = memory.encoder.encode(data)
embedding_bytes = embedding.tobytes()
except Exception as e:
print(f"Failed to generate embedding: {e}")
# Add to notebook using same logic as HTTP endpoint
with memory.get_connection() as conn:
conn.execute('''
INSERT OR REPLACE INTO notebook_entries (name, data, embedding, updated_at)
VALUES (?, ?, ?, ?)
''', (name, data, embedding_bytes, datetime.now(timezone.utc).isoformat()))
conn.commit()
# Emit event
await events.emit('notebook_entry_added', {
'name': name,
'data': data
})
response = {
"jsonrpc": "2.0",
"id": message.get("id"),
"result": {
"status": "success",
"message": f"π Notebook entry '{name}' added",
"name": name
}
}
await ws_manager.send_personal_message(response, client_id)
except Exception as e:
response = {
"jsonrpc": "2.0",
"id": message.get("id"),
"error": {
"code": -32603,
"message": "Request processing failed"
}
}
await ws_manager.send_personal_message(response, client_id)
async def handle_notebook_use(websocket: WebSocket, client_id: str, message: Dict[str, Any]):
"""Handle notebook use requests via WebSocket"""
try:
params = message.get("params", {})
name = params.get("name", "") # Updated from 'names' to 'name' for consistency
if not name:
raise ValueError("name parameter is required")
names = [n.strip() for n in name.split(',')]
activated_entries = []
with memory.get_connection() as conn:
for entry_name in names:
cursor = conn.execute('SELECT name, data FROM notebook_entries WHERE name = ?', (entry_name,))
result = cursor.fetchone()
if result:
activated_entries.append({"name": result[0], "data": result[1]})
# Update active list in memory
memory.active_notebook_entries = activated_entries
response = {
"jsonrpc": "2.0",
"id": message.get("id"),
"result": {
"status": "success",
"message": f"π§ Activated {len(activated_entries)} notebook entries",
"activated_entries": [e["name"] for e in activated_entries],
"entries": activated_entries
}
}
await ws_manager.send_personal_message(response, client_id)
except Exception as e:
response = {
"jsonrpc": "2.0",
"id": message.get("id"),
"error": {
"code": -32603,
"message": "Request processing failed"
}
}
await ws_manager.send_personal_message(response, client_id)
async def handle_notebook_show(websocket: WebSocket, client_id: str, message: Dict[str, Any]):
"""Handle notebook show requests via WebSocket"""
try:
with memory.get_connection() as conn:
cursor = conn.execute('SELECT name, data, created_at, updated_at FROM notebook_entries ORDER BY updated_at DESC')
entries = [{"name": row[0], "preview": row[1][:100] + "..." if len(row[1]) > 100 else row[1],
"created_at": row[2], "updated_at": row[3]} for row in cursor.fetchall()]
response = {"jsonrpc": "2.0", "id": message.get("id"), "result": {"status": "success", "entries": entries, "total_count": len(entries)}}
await ws_manager.send_personal_message(response, client_id)
except Exception as e:
await ws_manager.send_personal_message({"jsonrpc": "2.0", "id": message.get("id"), "error": {"code": -32603, "message": "Request processing failed"}}, client_id)
async def handle_notebook_delete(websocket: WebSocket, client_id: str, message: Dict[str, Any]):
"""Handle notebook delete requests via WebSocket"""
try:
name = message.get("params", {}).get("name", "")
if not name:
raise ValueError("name parameter is required")
with memory.get_connection() as conn:
cursor = conn.execute('DELETE FROM notebook_entries WHERE name = ?', (name,))
deleted = cursor.rowcount
conn.commit()
response = {"jsonrpc": "2.0", "id": message.get("id"), "result": {"status": "success" if deleted > 0 else "not_found", "deleted": deleted > 0}}
await ws_manager.send_personal_message(response, client_id)
except Exception as e:
await ws_manager.send_personal_message({"jsonrpc": "2.0", "id": message.get("id"), "error": {"code": -32603, "message": "Request processing failed"}}, client_id)
async def handle_notebook_clear(websocket: WebSocket, client_id: str, message: Dict[str, Any]):
"""Handle notebook clear requests via WebSocket"""
try:
memory.active_notebook_entries = []
response = {"jsonrpc": "2.0", "id": message.get("id"), "result": {"status": "success", "message": "π§Ή Active notebook entries cleared", "active_count": 0}}
await ws_manager.send_personal_message(response, client_id)
except Exception as e:
await ws_manager.send_personal_message({"jsonrpc": "2.0", "id": message.get("id"), "error": {"code": -32603, "message": "Request processing failed"}}, client_id)
async def handle_notebook_status(websocket: WebSocket, client_id: str, message: Dict[str, Any]):
"""Handle notebook status requests via WebSocket"""
try:
active_names = [entry["name"] for entry in memory.active_notebook_entries]
response = {"jsonrpc": "2.0", "id": message.get("id"), "result": {"status": "success", "active_entries": active_names, "active_count": len(active_names)}}
await ws_manager.send_personal_message(response, client_id)
except Exception as e:
await ws_manager.send_personal_message({"jsonrpc": "2.0", "id": message.get("id"), "error": {"code": -32603, "message": "Request processing failed"}}, client_id)
# ===== REASONING HANDLERS =====
async def handle_summary(websocket: WebSocket, client_id: str, message: Dict[str, Any]):
"""Handle summary requests via WebSocket"""
try:
params = message.get("params", {})
session_name = params.get("session_name", ws_manager.get_client_session(client_id))
limit = params.get("limit", 50)
# Generate summary using same logic as HTTP endpoint
with memory.get_connection() as conn:
# Get total count first
cursor = conn.execute('''
SELECT COUNT(*) FROM log_entries WHERE session_name = ?
''', (session_name,))
total_entries = cursor.fetchone()[0]
# Get limited entries for summary
cursor = conn.execute('''
SELECT entry_date, topic, summary, full_entry
FROM log_entries WHERE session_name = ?
ORDER BY entry_date DESC
LIMIT ?
''', (session_name, limit))
entries = cursor.fetchall()
if not entries:
response = {
"jsonrpc": "2.0",
"id": message.get("id"),
"result": {
"status": "empty",
"message": f"No entries found in session '{session_name}'"
}
}
await ws_manager.send_personal_message(response, client_id)
return
# Build base response metadata
base_response = {
"status": "success",
"session_name": session_name,
"entry_count": len(entries),
"total_entries": total_entries
}
# Build summary with size monitoring
summary_lines = [f"# MARM Session Summary: {session_name}"]
summary_lines.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M UTC')}")
summary_lines.append("")
if total_entries > len(entries):
summary_lines.append(f"*Showing {len(entries)} most recent entries out of {total_entries} total*")
summary_lines.append("")
# Add entries with progressive truncation if needed
included_entries = []
current_summary_lines = summary_lines.copy()
for entry in entries:
# Truncate long summaries to prevent size explosion
entry_summary = entry[2]
if len(entry_summary) > 200:
entry_summary = entry_summary[:197] + "..."
entry_line = f"**{entry[0]}** [{entry[1]}]: {entry_summary}"
test_lines = current_summary_lines + [entry_line]
# Test response size with this entry added
test_summary = "\n".join(test_lines)
test_response = base_response.copy()
test_response["summary"] = test_summary
response_size = MCPResponseLimiter.estimate_response_size(test_response)
if response_size > MCPResponseLimiter.CONTENT_LIMIT:
# Can't fit this entry, stop here
break
# Entry fits, add it
current_summary_lines.append(entry_line)
included_entries.append(entry)
summary_text = "\n".join(current_summary_lines)
# Final response with truncation notice if needed
final_response = {
"jsonrpc": "2.0",
"id": message.get("id"),
"result": {
"status": "success",
"session_name": session_name,
"summary": summary_text,
"entry_count": len(included_entries),
"total_entries": total_entries
}
}
# Add truncation notice if we couldn't fit all entries
if len(included_entries) < len(entries):
final_response["result"]["_mcp_truncated"] = True
final_response["result"]["_truncation_reason"] = "Summary limited to 1MB for MCP compliance"
final_response["result"]["_entries_shown"] = len(included_entries)
final_response["result"]["_entries_available"] = len(entries)
await ws_manager.send_personal_message(final_response, client_id)
except Exception as e:
response = {
"jsonrpc": "2.0",
"id": message.get("id"),
"error": {
"code": -32603,
"message": "Request processing failed"
}
}
await ws_manager.send_personal_message(response, client_id)
async def handle_context_bridge(websocket: WebSocket, client_id: str, message: Dict[str, Any]):
"""Handle context bridge requests via WebSocket"""
try:
params = message.get("params", {})
new_topic = params.get("new_topic", "")
session_name = params.get("session_name", ws_manager.get_client_session(client_id))
bridge_text = f"# Context Bridge: {new_topic}\nSession: {session_name}\n\nReady to proceed with focused work"
response = {"jsonrpc": "2.0", "id": message.get("id"), "result": {"status": "success", "bridge_text": bridge_text, "session_name": session_name}}
await ws_manager.send_personal_message(response, client_id)
except Exception as e:
await ws_manager.send_personal_message({"jsonrpc": "2.0", "id": message.get("id"), "error": {"code": -32603, "message": "Request processing failed"}}, client_id)
# ===== SYSTEM HANDLERS =====
async def handle_current_context(websocket: WebSocket, client_id: str, message: Dict[str, Any]):
"""Handle current context requests via WebSocket"""
try:
current_time = datetime.now(timezone.utc).isoformat()
response = {"jsonrpc": "2.0", "id": message.get("id"), "result": {"status": "success", "current_datetime": current_time}}
await ws_manager.send_personal_message(response, client_id)
except Exception as e:
await ws_manager.send_personal_message({"jsonrpc": "2.0", "id": message.get("id"), "error": {"code": -32603, "message": "Request processing failed"}}, client_id)
async def handle_system_info(websocket: WebSocket, client_id: str, message: Dict[str, Any]):
"""Handle system info requests via WebSocket"""
try:
response = {"jsonrpc": "2.0", "id": message.get("id"), "result": {"status": "success", "message": "MARM MCP Server - WebSocket Protocol", "version": "2.2.6-beta"}}
await ws_manager.send_personal_message(response, client_id)
except Exception as e:
await ws_manager.send_personal_message({"jsonrpc": "2.0", "id": message.get("id"), "error": {"code": -32603, "message": "Request processing failed"}}, client_id)
async def handle_reload_docs(websocket: WebSocket, client_id: str, message: Dict[str, Any]):
"""Handle reload docs requests via WebSocket"""
try:
response = {"jsonrpc": "2.0", "id": message.get("id"), "result": {"status": "success", "message": "π Documentation reloaded"}}
await ws_manager.send_personal_message(response, client_id)
except Exception as e:
await ws_manager.send_personal_message({"jsonrpc": "2.0", "id": message.get("id"), "error": {"code": -32603, "message": "Request processing failed"}}, client_id)