Skip to main content
Glama

Agent MCP

routes.py35.8 kB
# Agent-MCP/mcp_template/mcp_server_src/app/routes.py import os import json import datetime import sqlite3 from pathlib import Path from typing import Callable, List, Dict, Any # Added List, Dict, Any from starlette.routing import Route, Mount from starlette.staticfiles import StaticFiles from starlette.responses import JSONResponse, Response, PlainTextResponse from starlette.requests import Request # Project-specific imports from ..core.config import logger from ..core import globals as g from ..core.auth import verify_token, get_agent_id as auth_get_agent_id from ..utils.json_utils import get_sanitized_json_body from ..db.connection import get_db_connection from ..db.actions.agent_actions_db import log_agent_action_to_db from ..features.dashboard.api import ( fetch_graph_data_logic, fetch_task_tree_data_logic ) from ..features.dashboard.styles import get_node_style # Import tool implementations that these dashboard APIs will call from ..tools.admin_tools import ( create_agent_tool_impl, terminate_agent_tool_impl ) import mcp.types as mcp_types # For handling the result from tool_impl # --- Dashboard and API Endpoints --- async def simple_status_api_route(request: Request) -> JSONResponse: # Handle OPTIONS for CORS preflight if request.method == 'OPTIONS': return await handle_options(request) try: # Get system status from ..db.actions.agent_db import get_all_active_agents_from_db from ..db.actions.task_db import get_all_tasks_from_db agents = get_all_active_agents_from_db() tasks = get_all_tasks_from_db() # Count task statuses pending_tasks = len([t for t in tasks if t.get('status') == 'pending']) completed_tasks = len([t for t in tasks if t.get('status') == 'completed']) return JSONResponse({ "server_running": True, "total_agents": len(agents), "active_agents": len([a for a in agents if a.get('status') == 'active']), "total_tasks": len(tasks), "pending_tasks": pending_tasks, "completed_tasks": completed_tasks, "last_updated": datetime.datetime.now().isoformat() }) except Exception as e: logger.error(f"Error in simple_status_api_route: {e}", exc_info=True) return JSONResponse({"error": f"Failed to get simple status: {str(e)}"}, status_code=500) async def graph_data_api_route(request: Request) -> JSONResponse: # // ... (implementation from previous response) try: data = await fetch_graph_data_logic(g.file_map.copy()) return JSONResponse(data) except Exception as e: logger.error(f"Error serving graph data: {e}", exc_info=True) return JSONResponse({'nodes': [], 'edges': [], 'error': str(e)}, status_code=500) async def task_tree_data_api_route(request: Request) -> JSONResponse: # // ... (implementation from previous response) try: data = await fetch_task_tree_data_logic() return JSONResponse(data) except Exception as e: logger.error(f"Error serving task tree data: {e}", exc_info=True) return JSONResponse({'nodes': [], 'edges': [], 'error': str(e)}, status_code=500) async def node_details_api_route(request: Request) -> JSONResponse: # // ... (implementation from previous response) node_id = request.query_params.get('node_id') if not node_id: return JSONResponse({'error': 'Missing node_id parameter'}, status_code=400) details: Dict[str, Any] = {'id': node_id, 'type': 'unknown', 'data': {}, 'actions': [], 'related': {}} conn = None try: conn = get_db_connection() cursor = conn.cursor() parts = node_id.split('_', 1) node_type_from_id = parts[0] if len(parts) > 1 else node_id actual_id_from_node = parts[1] if len(parts) > 1 else (node_id if node_type_from_id != 'admin' else 'admin') details['type'] = node_type_from_id if node_type_from_id == 'agent': cursor.execute("SELECT * FROM agents WHERE agent_id = ?", (actual_id_from_node,)) row = cursor.fetchone(); if row: details['data'] = dict(row) cursor.execute("SELECT timestamp, action_type, task_id, details FROM agent_actions WHERE agent_id = ? ORDER BY timestamp DESC LIMIT 10", (actual_id_from_node,)) details['actions'] = [dict(r) for r in cursor.fetchall()] cursor.execute("SELECT task_id, title, status, priority FROM tasks WHERE assigned_to = ? ORDER BY created_at DESC LIMIT 10", (actual_id_from_node,)) details['related']['assigned_tasks'] = [dict(r) for r in cursor.fetchall()] elif node_type_from_id == 'task': cursor.execute("SELECT * FROM tasks WHERE task_id = ?", (actual_id_from_node,)) row = cursor.fetchone(); if row: details['data'] = dict(row) cursor.execute("SELECT timestamp, agent_id, action_type, details FROM agent_actions WHERE task_id = ? ORDER BY timestamp DESC LIMIT 10", (actual_id_from_node,)) details['actions'] = [dict(r) for r in cursor.fetchall()] elif node_type_from_id == 'context': cursor.execute("SELECT * FROM project_context WHERE context_key = ?", (actual_id_from_node,)) row = cursor.fetchone(); if row: details['data'] = dict(row) cursor.execute("SELECT timestamp, agent_id, action_type FROM agent_actions WHERE (action_type = 'updated_context' OR action_type = 'update_project_context') AND details LIKE ? ORDER BY timestamp DESC LIMIT 5", (f'%"{actual_id_from_node}"%',)) details['actions'] = [dict(r) for r in cursor.fetchall()] elif node_type_from_id == 'file': details['data'] = {'filepath': actual_id_from_node, 'info': g.file_map.get(actual_id_from_node, {})} cursor.execute("SELECT timestamp, agent_id, action_type, details FROM agent_actions WHERE (action_type LIKE '%_file' OR action_type LIKE 'claim_file_%' OR action_type = 'release_file') AND details LIKE ? ORDER BY timestamp DESC LIMIT 5", (f'%"{actual_id_from_node}"%',)) details['actions'] = [dict(r) for r in cursor.fetchall()] elif node_type_from_id == 'admin': details['data'] = {'name': 'Admin User / System'} cursor.execute("SELECT timestamp, action_type, task_id, details FROM agent_actions WHERE agent_id = 'admin' ORDER BY timestamp DESC LIMIT 10") details['actions'] = [dict(r) for r in cursor.fetchall()] if not details.get('data') and node_type_from_id not in ['admin']: return JSONResponse({'error': 'Node data not found or type unrecognized'}, status_code=404) except Exception as e: logger.error(f"Error fetching details for node {node_id}: {e}", exc_info=True) return JSONResponse({'error': f'Failed to fetch node details: {str(e)}'}, status_code=500) finally: if conn: conn.close() return JSONResponse(details) async def agents_list_api_route(request: Request) -> JSONResponse: # // ... (implementation from previous response) agents_list_data: List[Dict[str, Any]] = [] conn = None try: conn = get_db_connection() cursor = conn.cursor() admin_style = get_node_style('admin') agents_list_data.append({ 'agent_id': 'Admin', 'status': 'system', 'color': admin_style.get('color', '#607D8B'), 'created_at': 'N/A', 'current_task': 'N/A' }) cursor.execute("SELECT agent_id, status, color, created_at, current_task FROM agents ORDER BY created_at DESC") for row in cursor.fetchall(): agents_list_data.append(dict(row)) except Exception as e: logger.error(f"Error fetching agents list: {e}", exc_info=True) return JSONResponse({'error': f'Failed to fetch agents list: {str(e)}'}, status_code=500) finally: if conn: conn.close() return JSONResponse(agents_list_data) async def tokens_api_route(request: Request) -> JSONResponse: # // ... (implementation from previous response) try: agent_tokens_list = [] for token, data in g.active_agents.items(): if data.get("status") != "terminated": agent_tokens_list.append({"agent_id": data.get("agent_id"), "token": token}) return JSONResponse({"admin_token": g.admin_token, "agent_tokens": agent_tokens_list}) except Exception as e: logger.error(f"Error retrieving tokens for dashboard: {e}", exc_info=True) return JSONResponse({"error": f"Error retrieving tokens: {str(e)}"}, status_code=500) async def all_tasks_api_route(request: Request) -> JSONResponse: # // ... (implementation from previous response) conn = None try: conn = get_db_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM tasks ORDER BY created_at DESC") tasks_data = [dict(row) for row in cursor.fetchall()] return JSONResponse(tasks_data) except Exception as e: logger.error(f"Error fetching all tasks: {e}", exc_info=True) return JSONResponse({"error": f"Failed to fetch all tasks: {str(e)}"}, status_code=500) finally: if conn: conn.close() async def update_task_details_api_route(request: Request) -> JSONResponse: # // ... (implementation from previous response) if request.method != 'POST': return JSONResponse({"error": "Method not allowed"}, status_code=405) conn = None try: data = await get_sanitized_json_body(request) admin_auth_token = data.get('token'); task_id_to_update = data.get('task_id'); new_status = data.get('status') if not task_id_to_update or not new_status: return JSONResponse({"error": "task_id and status are required fields."}, status_code=400) if not verify_token(admin_auth_token, required_role='admin'): return JSONResponse({"error": "Invalid admin token"}, status_code=403) requesting_admin_id = auth_get_agent_id(admin_auth_token) conn = get_db_connection(); cursor = conn.cursor() cursor.execute("SELECT notes FROM tasks WHERE task_id = ?", (task_id_to_update,)); task_row = cursor.fetchone() if not task_row: return JSONResponse({"error": "Task not found"}, status_code=404) existing_notes_str = task_row["notes"] update_fields: List[str] = []; params: List[Any] = []; log_details: Dict[str, Any] = {"status_updated_to": new_status} update_fields.append("status = ?"); params.append(new_status) update_fields.append("updated_at = ?"); params.append(datetime.datetime.now().isoformat()) if 'title' in data and data['title'] is not None: update_fields.append("title = ?"); params.append(data['title']); log_details["title_changed"] = True if 'description' in data and data['description'] is not None: update_fields.append("description = ?"); params.append(data['description']); log_details["description_changed"] = True if 'priority' in data and data['priority']: update_fields.append("priority = ?"); params.append(data['priority']); log_details["priority_changed"] = True if 'notes' in data and data['notes'] and isinstance(data['notes'], str) and data['notes'].strip(): try: current_notes_list = json.loads(existing_notes_str or "[]") except json.JSONDecodeError: current_notes_list = [] new_note_entry = {"timestamp": datetime.datetime.now().isoformat(), "author": requesting_admin_id, "content": data['notes'].strip()} current_notes_list.append(new_note_entry); update_fields.append("notes = ?"); params.append(json.dumps(current_notes_list)); log_details["notes_added"] = True params.append(task_id_to_update) if update_fields: placeholders = ', '.join(update_fields) query = f"UPDATE tasks SET {placeholders} WHERE task_id = ?" cursor.execute(query, tuple(params)) log_agent_action_to_db(cursor, requesting_admin_id, "updated_task_dashboard", task_id=task_id_to_update, details=log_details); conn.commit() if task_id_to_update in g.tasks: cursor.execute("SELECT * FROM tasks WHERE task_id = ?", (task_id_to_update,)); updated_task_for_cache = cursor.fetchone() if updated_task_for_cache: g.tasks[task_id_to_update] = dict(updated_task_for_cache) for field_key in ["child_tasks", "depends_on_tasks", "notes"]: if isinstance(g.tasks[task_id_to_update].get(field_key), str): try: g.tasks[task_id_to_update][field_key] = json.loads(g.tasks[task_id_to_update][field_key] or "[]") except json.JSONDecodeError: g.tasks[task_id_to_update][field_key] = [] else: del g.tasks[task_id_to_update] return JSONResponse({"success": True, "message": "Task updated successfully via dashboard."}) except ValueError as e_val: return JSONResponse({"error": str(e_val)}, status_code=400) except sqlite3.Error as e_sql: if conn: conn.rollback(); logger.error(f"DB error updating task via dashboard: {e_sql}", exc_info=True) return JSONResponse({"error": f"Failed to update task (DB): {str(e_sql)}"}, status_code=500) except Exception as e: if conn: conn.rollback(); logger.error(f"Error updating task via dashboard: {e}", exc_info=True) return JSONResponse({"error": f"Failed to update task: {str(e)}"}, status_code=500) finally: if conn: conn.close() # --- ADDED: Dashboard-specific Agent Management API Endpoints --- # Original: main.py lines 2022-2058 (create_agent_api function) async def create_agent_dashboard_api_route(request: Request) -> JSONResponse: """Dashboard API endpoint to create an agent. Calls the admin tool internally.""" if request.method != 'POST': return JSONResponse({"error": "Method not allowed"}, status_code=405) try: data = await get_sanitized_json_body(request) admin_auth_token = data.get("token") agent_id = data.get("agent_id") capabilities = data.get("capabilities", []) # Optional working_directory = data.get("working_directory") # Optional # This endpoint itself requires admin authentication if not verify_token(admin_auth_token, "admin"): return JSONResponse({"message": "Unauthorized: Invalid admin token for API call"}, status_code=401) if not agent_id: return JSONResponse({"message": "Agent ID is required"}, status_code=400) # Prepare arguments for the create_agent_tool_impl tool_args = { "token": admin_auth_token, # The tool_impl will verify this again "agent_id": agent_id, "capabilities": capabilities, "working_directory": working_directory } # Call the already refactored tool implementation result_list: List[mcp_types.TextContent] = await create_agent_tool_impl(tool_args) # Process the result from tool_impl to form a JSONResponse # The tool_impl returns a list of TextContent objects. # The original API returned a simple JSON message. if result_list and result_list[0].text.startswith(f"Agent '{agent_id}' created successfully."): # Extract token if possible for dashboard convenience (original API did this) # This is a bit fragile as it relies on string parsing of the tool's output. agent_token_from_result = None for line in result_list[0].text.split('\n'): if line.startswith("Token: "): agent_token_from_result = line.split("Token: ", 1)[1] break return JSONResponse({ "message": f"Agent '{agent_id}' created successfully via dashboard API.", "agent_token": agent_token_from_result # May be None if not parsed }) else: # Return the error message from the tool error_message = result_list[0].text if result_list else "Unknown error creating agent." # Determine appropriate status code based on error message status_code = 400 # Default bad request if "Unauthorized" in error_message: status_code = 401 if "already exists" in error_message: status_code = 409 # Conflict return JSONResponse({"message": error_message}, status_code=status_code) except ValueError as e_val: # From get_sanitized_json_body return JSONResponse({"message": str(e_val)}, status_code=400) except Exception as e: logger.error(f"Error in create_agent_dashboard_api_route: {e}", exc_info=True) return JSONResponse({"message": f"Error creating agent via dashboard API: {str(e)}"}, status_code=500) # Original: main.py lines 2061-2099 (terminate_agent_api function) async def terminate_agent_dashboard_api_route(request: Request) -> JSONResponse: """Dashboard API endpoint to terminate an agent. Calls the admin tool internally.""" if request.method != 'POST': return JSONResponse({"error": "Method not allowed"}, status_code=405) try: data = await get_sanitized_json_body(request) admin_auth_token = data.get("token") agent_id_to_terminate = data.get("agent_id") if not verify_token(admin_auth_token, "admin"): return JSONResponse({"message": "Unauthorized: Invalid admin token for API call"}, status_code=401) if not agent_id_to_terminate: return JSONResponse({"message": "Agent ID to terminate is required"}, status_code=400) tool_args = { "token": admin_auth_token, # Tool impl will verify again "agent_id": agent_id_to_terminate } result_list: List[mcp_types.TextContent] = await terminate_agent_tool_impl(tool_args) if result_list and result_list[0].text.startswith(f"Agent '{agent_id_to_terminate}' terminated"): return JSONResponse({"message": f"Agent '{agent_id_to_terminate}' terminated successfully via dashboard API."}) else: error_message = result_list[0].text if result_list else "Unknown error terminating agent." status_code = 400 if "Unauthorized" in error_message: status_code = 401 if "not found" in error_message: status_code = 404 return JSONResponse({"message": error_message}, status_code=status_code) except ValueError as e_val: # From get_sanitized_json_body return JSONResponse({"message": str(e_val)}, status_code=400) except Exception as e: logger.error(f"Error in terminate_agent_dashboard_api_route: {e}", exc_info=True) return JSONResponse({"message": f"Error terminating agent via dashboard API: {str(e)}"}, status_code=500) # --- Comprehensive Data Endpoint --- async def all_data_api_route(request: Request) -> JSONResponse: """Get all data in one call for caching on frontend""" if request.method == 'OPTIONS': return await handle_options(request) conn = None try: conn = get_db_connection() cursor = conn.cursor() # Get all agents with their tokens cursor.execute("SELECT * FROM agents ORDER BY created_at DESC") agents_data = [] for row in cursor.fetchall(): agent_dict = dict(row) agent_id = agent_dict['agent_id'] # Find token for this agent from active_agents agent_token = None for token, data in g.active_agents.items(): if data.get("agent_id") == agent_id and data.get("status") != "terminated": agent_token = token break agent_dict['auth_token'] = agent_token agents_data.append(agent_dict) # Add admin as special agent agents_data.insert(0, { 'agent_id': 'Admin', 'status': 'system', 'auth_token': g.admin_token, 'created_at': 'N/A', 'current_task': 'N/A' }) # Get all tasks cursor.execute("SELECT * FROM tasks ORDER BY created_at DESC") tasks_data = [dict(row) for row in cursor.fetchall()] # Get all context entries cursor.execute("SELECT * FROM project_context ORDER BY last_updated DESC") context_data = [dict(row) for row in cursor.fetchall()] # Get recent agent actions (last 100) cursor.execute(""" SELECT * FROM agent_actions ORDER BY timestamp DESC LIMIT 100 """) actions_data = [dict(row) for row in cursor.fetchall()] # Get file metadata cursor.execute("SELECT * FROM file_metadata") file_metadata = [dict(row) for row in cursor.fetchall()] response_data = { "agents": agents_data, "tasks": tasks_data, "context": context_data, "actions": actions_data, "file_metadata": file_metadata, "file_map": g.file_map, "admin_token": g.admin_token, "timestamp": datetime.datetime.now().isoformat() } return JSONResponse( response_data, headers={ 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Methods': 'GET, OPTIONS', 'Access-Control-Allow-Headers': 'Content-Type' } ) except Exception as e: logger.error(f"Error fetching all data: {e}", exc_info=True) return JSONResponse({"error": f"Failed to fetch all data: {str(e)}"}, status_code=500) finally: if conn: conn.close() async def context_data_api_route(request: Request) -> JSONResponse: """Get only context data""" if request.method == 'OPTIONS': return await handle_options(request) conn = None try: conn = get_db_connection() cursor = conn.cursor() # Get all context entries cursor.execute("SELECT * FROM project_context ORDER BY last_updated DESC") context_data = [dict(row) for row in cursor.fetchall()] return JSONResponse( context_data, headers={ 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Methods': 'GET, OPTIONS', 'Access-Control-Allow-Headers': 'Content-Type' } ) except Exception as e: logger.error(f"Error fetching context data: {e}", exc_info=True) return JSONResponse({"error": f"Failed to fetch context data: {str(e)}"}, status_code=500) finally: if conn: conn.close() # --- CORS Preflight Handler --- async def handle_options(request: Request) -> Response: """Handle OPTIONS requests for CORS preflight""" return PlainTextResponse( '', headers={ 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Methods': 'GET, POST, PUT, DELETE, OPTIONS', 'Access-Control-Allow-Headers': '*', 'Access-Control-Max-Age': '86400', } ) # --- Route Definitions List --- routes = [ Route('/api/all-data', endpoint=all_data_api_route, name="all_data_api", methods=['GET', 'OPTIONS']), Route('/api/status', endpoint=simple_status_api_route, name="simple_status_api", methods=['GET', 'OPTIONS']), Route('/api/graph-data', endpoint=graph_data_api_route, name="graph_data_api", methods=['GET', 'OPTIONS']), Route('/api/task-tree-data', endpoint=task_tree_data_api_route, name="task_tree_data_api", methods=['GET', 'OPTIONS']), Route('/api/node-details', endpoint=node_details_api_route, name="node_details_api", methods=['GET', 'OPTIONS']), Route('/api/agents', endpoint=agents_list_api_route, name="agents_list_api", methods=['GET', 'OPTIONS']), Route('/api/agents-list', endpoint=agents_list_api_route, name="agents_list_api_legacy", methods=['GET', 'OPTIONS']), Route('/api/tokens', endpoint=tokens_api_route, name="tokens_api", methods=['GET', 'OPTIONS']), Route('/api/tasks', endpoint=all_tasks_api_route, name="all_tasks_api", methods=['GET', 'OPTIONS']), Route('/api/tasks-all', endpoint=all_tasks_api_route, name="all_tasks_api_legacy", methods=['GET', 'OPTIONS']), Route('/api/update-task-dashboard', endpoint=update_task_details_api_route, name="update_task_dashboard_api", methods=['POST', 'OPTIONS']), # Added back for 1-to-1 dashboard compatibility Route('/api/create-agent', endpoint=create_agent_dashboard_api_route, name="create_agent_dashboard_api", methods=['POST', 'OPTIONS']), Route('/api/terminate-agent', endpoint=terminate_agent_dashboard_api_route, name="terminate_agent_dashboard_api", methods=['POST', 'OPTIONS']), # Catch-all OPTIONS handler for any API route Route('/api/{path:path}', endpoint=handle_options, methods=['OPTIONS']), ] # --- Test/Demo Data Endpoint --- async def create_sample_memories_route(request: Request) -> JSONResponse: """Create sample memory entries for testing""" if request.method == 'OPTIONS': return await handle_options(request) conn = None try: conn = get_db_connection() cursor = conn.cursor() # Sample memory entries sample_memories = [ { 'context_key': 'api.config.base_url', 'value': json.dumps('https://api.example.com'), 'description': 'Main API base URL for external services', 'updated_by': 'system' }, { 'context_key': 'app.settings.theme', 'value': json.dumps({'theme': 'dark', 'accent': 'blue'}), 'description': 'Application theme preferences', 'updated_by': 'admin' }, { 'context_key': 'database.connection.timeout', 'value': json.dumps(30), 'description': 'Database connection timeout in seconds', 'updated_by': 'system' }, { 'context_key': 'cache.redis.config', 'value': json.dumps({ 'host': 'localhost', 'port': 6379, 'ttl': 3600 }), 'description': 'Redis cache configuration', 'updated_by': 'admin' } ] current_time = datetime.datetime.now().isoformat() created_count = 0 for memory in sample_memories: cursor.execute(""" INSERT OR REPLACE INTO project_context (context_key, value, last_updated, updated_by, description) VALUES (?, ?, ?, ?, ?) """, ( memory['context_key'], memory['value'], current_time, memory['updated_by'], memory['description'] )) created_count += 1 conn.commit() return JSONResponse({ "success": True, "message": f"Created {created_count} sample memory entries", "created_count": created_count }) except Exception as e: if conn: conn.rollback() logger.error(f"Error creating sample memories: {e}", exc_info=True) return JSONResponse({ "success": False, "error": str(e) }, status_code=500) finally: if conn: conn.close() # Memory CRUD API endpoints async def create_memory_api_route(request: Request) -> JSONResponse: """Create a new memory entry""" if request.method == 'OPTIONS': return await handle_options(request) if request.method != 'POST': return JSONResponse({"error": "Method not allowed"}, status_code=405) conn = None try: data = await get_sanitized_json_body(request) admin_token = data.get('token') context_key = data.get('context_key') context_value = data.get('context_value') description = data.get('description') if not verify_token(admin_token, required_role='admin'): return JSONResponse({"error": "Invalid admin token"}, status_code=403) if not context_key: return JSONResponse({"error": "context_key is required"}, status_code=400) requesting_admin_id = auth_get_agent_id(admin_token) conn = get_db_connection() cursor = conn.cursor() # Check if key already exists cursor.execute("SELECT context_key FROM project_context WHERE context_key = ?", (context_key,)) if cursor.fetchone(): return JSONResponse({"error": "Memory with this key already exists"}, status_code=409) current_time = datetime.datetime.now().isoformat() cursor.execute(""" INSERT INTO project_context (context_key, value, last_updated, updated_by, description) VALUES (?, ?, ?, ?, ?) """, ( context_key, json.dumps(context_value), current_time, requesting_admin_id, description )) # Log the action log_agent_action_to_db(cursor, requesting_admin_id, "created_memory", details={"context_key": context_key}) conn.commit() return JSONResponse({ "success": True, "message": f"Memory '{context_key}' created successfully" }) except ValueError as e: return JSONResponse({"error": str(e)}, status_code=400) except Exception as e: if conn: conn.rollback() logger.error(f"Error creating memory: {e}", exc_info=True) return JSONResponse({"error": f"Failed to create memory: {str(e)}"}, status_code=500) finally: if conn: conn.close() async def update_memory_api_route(request: Request) -> JSONResponse: """Update an existing memory entry""" if request.method == 'OPTIONS': return await handle_options(request) if request.method != 'PUT': return JSONResponse({"error": "Method not allowed"}, status_code=405) # Extract context_key from URL path path_parts = request.url.path.split('/') if len(path_parts) < 4 or not path_parts[-1]: return JSONResponse({"error": "context_key is required in URL"}, status_code=400) context_key = path_parts[-1] conn = None try: data = await get_sanitized_json_body(request) admin_token = data.get('token') context_value = data.get('context_value') description = data.get('description') if not verify_token(admin_token, required_role='admin'): return JSONResponse({"error": "Invalid admin token"}, status_code=403) requesting_admin_id = auth_get_agent_id(admin_token) conn = get_db_connection() cursor = conn.cursor() # Check if memory exists cursor.execute("SELECT context_key FROM project_context WHERE context_key = ?", (context_key,)) if not cursor.fetchone(): return JSONResponse({"error": "Memory not found"}, status_code=404) current_time = datetime.datetime.now().isoformat() # Build update query dynamically update_fields = ["last_updated = ?", "updated_by = ?"] params = [current_time, requesting_admin_id] if context_value is not None: update_fields.append("value = ?") params.append(json.dumps(context_value)) if description is not None: update_fields.append("description = ?") params.append(description) params.append(context_key) query = f"UPDATE project_context SET {', '.join(update_fields)} WHERE context_key = ?" cursor.execute(query, params) # Log the action log_agent_action_to_db(cursor, requesting_admin_id, "updated_memory", details={"context_key": context_key}) conn.commit() return JSONResponse({ "success": True, "message": f"Memory '{context_key}' updated successfully" }) except ValueError as e: return JSONResponse({"error": str(e)}, status_code=400) except Exception as e: if conn: conn.rollback() logger.error(f"Error updating memory: {e}", exc_info=True) return JSONResponse({"error": f"Failed to update memory: {str(e)}"}, status_code=500) finally: if conn: conn.close() async def delete_memory_api_route(request: Request) -> JSONResponse: """Delete a memory entry""" if request.method == 'OPTIONS': return await handle_options(request) if request.method != 'DELETE': return JSONResponse({"error": "Method not allowed"}, status_code=405) # Extract context_key from URL path path_parts = request.url.path.split('/') if len(path_parts) < 4 or not path_parts[-1]: return JSONResponse({"error": "context_key is required in URL"}, status_code=400) context_key = path_parts[-1] conn = None try: data = await get_sanitized_json_body(request) admin_token = data.get('token') if not verify_token(admin_token, required_role='admin'): return JSONResponse({"error": "Invalid admin token"}, status_code=403) requesting_admin_id = auth_get_agent_id(admin_token) conn = get_db_connection() cursor = conn.cursor() # Check if memory exists cursor.execute("SELECT context_key FROM project_context WHERE context_key = ?", (context_key,)) if not cursor.fetchone(): return JSONResponse({"error": "Memory not found"}, status_code=404) # Delete the memory cursor.execute("DELETE FROM project_context WHERE context_key = ?", (context_key,)) # Log the action log_agent_action_to_db(cursor, requesting_admin_id, "deleted_memory", details={"context_key": context_key}) conn.commit() return JSONResponse({ "success": True, "message": f"Memory '{context_key}' deleted successfully" }) except ValueError as e: return JSONResponse({"error": str(e)}, status_code=400) except Exception as e: if conn: conn.rollback() logger.error(f"Error deleting memory: {e}", exc_info=True) return JSONResponse({"error": f"Failed to delete memory: {str(e)}"}, status_code=500) finally: if conn: conn.close() # Add the memory CRUD routes routes.extend([ Route('/api/memories', endpoint=create_memory_api_route, name="create_memory_api", methods=['POST', 'OPTIONS']), Route('/api/memories/{context_key}', endpoint=update_memory_api_route, name="update_memory_api", methods=['PUT', 'OPTIONS']), Route('/api/memories/{context_key}', endpoint=delete_memory_api_route, name="delete_memory_api", methods=['DELETE', 'OPTIONS']), Route('/api/context-data', endpoint=context_data_api_route, name="context_data_api", methods=['GET', 'OPTIONS']), ]) # Add the sample data route routes.append(Route('/api/create-sample-memories', endpoint=create_sample_memories_route, name="create_sample_memories", methods=['POST', 'OPTIONS']))

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rinadelph/Agent-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server