Skip to main content
Glama

Agent MCP

file_metadata_tools.py11.8 kB
# Agent-MCP/mcp_template/mcp_server_src/tools/file_metadata_tools.py import json import datetime import sqlite3 import os from pathlib import Path from typing import List, Dict, Any, Optional import mcp.types as mcp_types from .registry import register_tool from ..core.config import logger from ..core import globals as g # For agent_working_dirs from ..core.auth import get_agent_id, verify_token from ..utils.audit_utils import log_audit from ..db.connection import get_db_connection from ..db.actions.agent_actions_db import log_agent_action_to_db def _normalize_filepath(filepath_arg: str, agent_id_for_wd: Optional[str]) -> str: """ Resolves a filepath to an absolute, normalized POSIX path. Uses the agent's working directory if the path is relative. """ if not os.path.isabs(filepath_arg): working_dir = os.getcwd() # Default to CWD if no agent context if agent_id_for_wd and agent_id_for_wd in g.agent_working_dirs: working_dir = g.agent_working_dirs[agent_id_for_wd] elif agent_id_for_wd: # Agent ID provided but not in map logger.warning( f"Agent '{agent_id_for_wd}' not found in agent_working_dirs for path resolution. Using CWD." ) resolved_path = Path(working_dir) / filepath_arg else: resolved_path = Path(filepath_arg) return ( resolved_path.resolve().as_posix() ) # Resolve to absolute and normalize to POSIX # --- view_file_metadata tool --- # Original logic from main.py: lines 1503-1533 (view_file_metadata_tool function) async def view_file_metadata_tool_impl( arguments: Dict[str, Any], ) -> List[mcp_types.TextContent]: agent_auth_token = arguments.get("token") filepath_arg = arguments.get("filepath") requesting_agent_id = get_agent_id(agent_auth_token) # main.py:1506 if not requesting_agent_id: return [ mcp_types.TextContent( type="text", text="Unauthorized: Valid token required" ) ] if not filepath_arg or not isinstance(filepath_arg, str): return [ mcp_types.TextContent( type="text", text="Error: filepath is required and must be a string." ) ] # Resolve and normalize path (main.py:1509-1515) normalized_filepath_str = _normalize_filepath(filepath_arg, requesting_agent_id) # Log audit (main.py:1517) log_audit( requesting_agent_id, "view_file_metadata", {"filepath_normalized": normalized_filepath_str, "original_path": filepath_arg}, ) conn = None response_message: str = "" try: conn = get_db_connection() cursor = conn.cursor() # main.py:1521 cursor.execute( "SELECT metadata, updated_by, last_updated, content_hash FROM file_metadata WHERE filepath = ?", (normalized_filepath_str,), ) row = cursor.fetchone() if row: # main.py:1523-1525 try: metadata_parsed = json.loads( row["metadata"] ) # Metadata is stored as JSON string except json.JSONDecodeError: logger.warning( f"Failed to parse JSON metadata for file '{normalized_filepath_str}'. Raw: {row['metadata']}" ) metadata_parsed = { "error": "Could not parse stored metadata string.", "raw_value": row["metadata"], } response_data = { "filepath": normalized_filepath_str, "metadata": metadata_parsed, "last_updated_by": row["updated_by"], "last_updated_at": row["last_updated"], "content_hash": ( row["content_hash"] if "content_hash" in row.keys() else "N/A" ), # content_hash was added later } response_message = f"Metadata for file '{filepath_arg}' (normalized: {normalized_filepath_str}):\n\n{json.dumps(response_data, indent=2, ensure_ascii=False)}" else: # main.py:1527 response_message = f"No metadata found for file '{filepath_arg}' (normalized: {normalized_filepath_str})." except sqlite3.Error as e_sql: # main.py:1529 logger.error( f"Database error viewing file metadata for '{normalized_filepath_str}': {e_sql}", exc_info=True, ) response_message = f"Database error viewing file metadata: {e_sql}" except ( json.JSONDecodeError ) as e_json: # Should be caught per-item, but as fallback (main.py:1532) logger.error( f"Error decoding JSON from file_metadata table for '{normalized_filepath_str}': {e_json}", exc_info=True, ) response_message = f"Error decoding stored file metadata." except Exception as e: logger.error( f"Unexpected error viewing file metadata for '{normalized_filepath_str}': {e}", exc_info=True, ) response_message = f"An unexpected error occurred: {e}" finally: if conn: conn.close() return [mcp_types.TextContent(type="text", text=response_message)] # --- update_file_metadata tool --- # Original logic from main.py: lines 1536-1569 (update_file_metadata_tool function) async def update_file_metadata_tool_impl( arguments: Dict[str, Any], ) -> List[mcp_types.TextContent]: admin_auth_token = arguments.get("token") filepath_arg = arguments.get("filepath") metadata_to_set = arguments.get("metadata") # This is a Dict[str, Any] if not verify_token( admin_auth_token, "admin" ): # main.py:1539 (Restricted to admin) return [ mcp_types.TextContent( type="text", text="Unauthorized: Admin token required for updating file metadata.", ) ] requesting_admin_id = get_agent_id(admin_auth_token) # main.py:1542 if not requesting_admin_id: # Should be "admin" logger.error( "Admin token verified but could not get admin_id. This is unexpected." ) return [ mcp_types.TextContent(type="text", text="Internal authorization error.") ] if ( not filepath_arg or not isinstance(filepath_arg, str) or metadata_to_set is None or not isinstance(metadata_to_set, dict) ): return [ mcp_types.TextContent( type="text", text="Error: filepath (string) and metadata (dictionary) are required.", ) ] # Resolve and normalize path (main.py:1545-1549) normalized_filepath_str = _normalize_filepath( filepath_arg, requesting_admin_id ) # Use admin's context for WD if needed # Log audit (main.py:1551) log_audit( requesting_admin_id, "update_file_metadata", { "filepath_normalized": normalized_filepath_str, "original_path": filepath_arg, "metadata_keys": list(metadata_to_set.keys()), }, ) conn = None try: # Ensure metadata is JSON serializable (main.py:1555-1558) metadata_json_str = json.dumps(metadata_to_set) except TypeError as e_type: logger.error( f"Metadata provided for file '{normalized_filepath_str}' is not JSON serializable: {e_type}" ) return [ mcp_types.TextContent( type="text", text=f"Error: Provided metadata is not JSON serializable: {e_type}", ) ] try: conn = get_db_connection() cursor = conn.cursor() updated_at_iso = datetime.datetime.now().isoformat() # The original did not explicitly handle content_hash here. # If metadata updates should also update/clear content_hash, that logic would be added. # For now, it only updates metadata, last_updated, updated_by. # (main.py:1561-1565) cursor.execute( """ INSERT OR REPLACE INTO file_metadata (filepath, metadata, last_updated, updated_by) VALUES (?, ?, ?, ?) """, ( normalized_filepath_str, metadata_json_str, updated_at_iso, requesting_admin_id, ), ) log_agent_action_to_db( cursor, requesting_admin_id, "updated_file_metadata", details={"filepath": normalized_filepath_str, "action": "set/update"}, ) conn.commit() logger.info( f"File metadata for '{normalized_filepath_str}' updated by '{requesting_admin_id}'." ) return [ mcp_types.TextContent( type="text", text=f"File metadata updated successfully for '{filepath_arg}' (normalized: {normalized_filepath_str}).", ) ] except sqlite3.Error as e_sql: # main.py:1566 if conn: conn.rollback() logger.error( f"Database error updating file metadata for '{normalized_filepath_str}': {e_sql}", exc_info=True, ) return [ mcp_types.TextContent( type="text", text=f"Database error updating file metadata: {e_sql}" ) ] except Exception as e: if conn: conn.rollback() logger.error( f"Unexpected error updating file metadata for '{normalized_filepath_str}': {e}", exc_info=True, ) return [ mcp_types.TextContent( type="text", text=f"Unexpected error updating file metadata: {e}" ) ] finally: if conn: conn.close() # --- Register file metadata tools --- def register_file_metadata_tools(): register_tool( name="view_file_metadata", # main.py:1841 (schema name) description="View stored metadata (e.g., purpose, components) for a specific file path.", input_schema={ # From main.py:1842-1852 "type": "object", "properties": { "token": {"type": "string", "description": "Authentication token"}, "filepath": { "type": "string", "description": "Path to the file (can be relative to agent's CWD or absolute)", }, }, "required": ["token", "filepath"], "additionalProperties": False, }, implementation=view_file_metadata_tool_impl, ) register_tool( name="update_file_metadata", # main.py:1854 (schema name) description="Add or replace the entire metadata object for a specific file path. Admin only.", input_schema={ # From main.py:1855-1867 "type": "object", "properties": { "token": { "type": "string", "description": "Admin authentication token", }, "filepath": { "type": "string", "description": "Path to the file (can be relative or absolute)", }, "metadata": { "type": "object", "description": "A JSON object containing the metadata to set for the file.", }, }, "required": ["token", "filepath", "metadata"], "additionalProperties": False, }, implementation=update_file_metadata_tool_impl, ) # Call registration when this module is imported register_file_metadata_tools()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rinadelph/Agent-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server