Skip to main content
Glama

MCP Brain Service

by jomapps
cleanup_invalid_nodes.py10.4 kB
#!/usr/bin/env python3 """ Cleanup Invalid Nodes Script This script removes invalid/irrelevant nodes from the Neo4j knowledge graph. It targets nodes with error messages, empty content, or other invalid patterns. Usage: # Clean all projects python scripts/cleanup_invalid_nodes.py # Clean specific project python scripts/cleanup_invalid_nodes.py --project-id my-project-123 # Dry run (preview what would be deleted) python scripts/cleanup_invalid_nodes.py --dry-run # Custom patterns python scripts/cleanup_invalid_nodes.py --patterns "Error:" "undefined" "null" """ import asyncio import argparse import logging import sys import os from typing import List, Dict, Any, Optional from datetime import datetime # Add parent directory to path to import from src sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from src.lib.neo4j_client import get_neo4j_client, Neo4jClient # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Default invalid patterns to search for DEFAULT_INVALID_PATTERNS = [ "Error:", "error:", "no user message", "No user message", "undefined", "null", "NULL", "[object Object]", "NaN", ] async def find_invalid_nodes( neo4j_client: Neo4jClient, patterns: List[str], project_id: Optional[str] = None ) -> List[Dict[str, Any]]: """ Find nodes matching invalid patterns Args: neo4j_client: Neo4j client instance patterns: List of patterns to search for project_id: Optional project ID to filter by Returns: List of matching nodes with their details """ all_matches = [] for pattern in patterns: logger.info(f"Searching for nodes containing pattern: '{pattern}'") # Build query query = """ MATCH (n:Document) WHERE n.content CONTAINS $pattern """ if project_id: query += " AND n.project_id = $project_id" query += """ RETURN n.id as id, n.content as content, n.project_id as project_id, n.document_type as type, n.created_at as created_at LIMIT 1000 """ params = {"pattern": pattern} if project_id: params["project_id"] = project_id try: results = await neo4j_client.run_query(query, params) if results: logger.info(f" Found {len(results)} nodes matching '{pattern}'") all_matches.extend(results) else: logger.info(f" No nodes found matching '{pattern}'") except Exception as e: logger.error(f" Error searching for pattern '{pattern}': {e}") # Remove duplicates based on node ID unique_matches = {node['id']: node for node in all_matches}.values() return list(unique_matches) async def delete_invalid_nodes( neo4j_client: Neo4jClient, patterns: List[str], project_id: Optional[str] = None, dry_run: bool = False ) -> Dict[str, Any]: """ Delete nodes matching invalid patterns Args: neo4j_client: Neo4j client instance patterns: List of patterns to search for project_id: Optional project ID to filter by dry_run: If True, only preview deletions without executing Returns: Dictionary with deletion statistics """ stats = { "total_found": 0, "total_deleted": 0, "by_pattern": {}, "by_project": {}, "errors": [] } for pattern in patterns: logger.info(f"Processing pattern: '{pattern}'") # Build query if dry_run: query = """ MATCH (n:Document) WHERE n.content CONTAINS $pattern """ if project_id: query += " AND n.project_id = $project_id" query += """ RETURN count(n) as count, collect(n.id)[0..5] as sample_ids """ else: query = """ MATCH (n:Document) WHERE n.content CONTAINS $pattern """ if project_id: query += " AND n.project_id = $project_id" query += """ WITH n DETACH DELETE n RETURN count(n) as count """ params = {"pattern": pattern} if project_id: params["project_id"] = project_id try: results = await neo4j_client.run_query(query, params) if results: count = results[0].get("count", 0) stats["by_pattern"][pattern] = count if dry_run and count > 0: sample_ids = results[0].get("sample_ids", []) logger.info(f" Would delete {count} nodes") logger.info(f" Sample IDs: {sample_ids}") elif count > 0: logger.info(f" Deleted {count} nodes") stats["total_deleted"] += count else: logger.info(f" No nodes found") stats["total_found"] += count except Exception as e: error_msg = f"Error processing pattern '{pattern}': {e}" logger.error(f" {error_msg}") stats["errors"].append(error_msg) return stats async def get_project_stats(neo4j_client: Neo4jClient) -> Dict[str, int]: """Get node count by project""" query = """ MATCH (n:Document) RETURN n.project_id as project_id, count(n) as count ORDER BY count DESC """ try: results = await neo4j_client.run_query(query, {}) return {r["project_id"]: r["count"] for r in results if r.get("project_id")} except Exception as e: logger.error(f"Failed to get project stats: {e}") return {} async def main(): """Main cleanup function""" parser = argparse.ArgumentParser( description="Clean up invalid nodes from the knowledge graph", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__ ) parser.add_argument( "--project-id", type=str, help="Filter by specific project ID" ) parser.add_argument( "--patterns", nargs="+", default=DEFAULT_INVALID_PATTERNS, help="Custom patterns to search for (default: common error patterns)" ) parser.add_argument( "--dry-run", action="store_true", help="Preview what would be deleted without actually deleting" ) parser.add_argument( "--list-projects", action="store_true", help="List all projects and their node counts" ) parser.add_argument( "--verbose", action="store_true", help="Enable verbose logging" ) args = parser.parse_args() if args.verbose: logging.getLogger().setLevel(logging.DEBUG) # Connect to Neo4j logger.info("Connecting to Neo4j...") try: neo4j_client = await get_neo4j_client() logger.info("✅ Connected to Neo4j successfully") except Exception as e: logger.error(f"❌ Failed to connect to Neo4j: {e}") logger.error("Please check your NEO4J_URI, NEO4J_USER, and NEO4J_PASSWORD environment variables") return 1 # List projects if requested if args.list_projects: logger.info("\n" + "="*60) logger.info("PROJECT STATISTICS") logger.info("="*60) project_stats = await get_project_stats(neo4j_client) if project_stats: for project_id, count in project_stats.items(): logger.info(f" {project_id}: {count} nodes") else: logger.info(" No projects found") logger.info("="*60 + "\n") return 0 # Run cleanup logger.info("\n" + "="*60) logger.info("CLEANUP INVALID NODES") logger.info("="*60) logger.info(f"Mode: {'DRY RUN (preview only)' if args.dry_run else 'LIVE DELETION'}") logger.info(f"Project filter: {args.project_id or 'All projects'}") logger.info(f"Patterns: {', '.join(args.patterns)}") logger.info("="*60 + "\n") if not args.dry_run: logger.warning("⚠️ This will permanently delete nodes from the database!") logger.warning("⚠️ Press Ctrl+C within 5 seconds to cancel...") await asyncio.sleep(5) # Execute cleanup start_time = datetime.now() stats = await delete_invalid_nodes( neo4j_client=neo4j_client, patterns=args.patterns, project_id=args.project_id, dry_run=args.dry_run ) end_time = datetime.now() duration = (end_time - start_time).total_seconds() # Print summary logger.info("\n" + "="*60) logger.info("CLEANUP SUMMARY") logger.info("="*60) logger.info(f"Total nodes found: {stats['total_found']}") if args.dry_run: logger.info(f"Would delete: {stats['total_found']} nodes") else: logger.info(f"Total deleted: {stats['total_deleted']}") logger.info(f"\nBy pattern:") for pattern, count in stats['by_pattern'].items(): logger.info(f" '{pattern}': {count}") if stats['errors']: logger.info(f"\nErrors encountered: {len(stats['errors'])}") for error in stats['errors']: logger.error(f" {error}") logger.info(f"\nDuration: {duration:.2f} seconds") logger.info("="*60 + "\n") if args.dry_run: logger.info("✅ Dry run completed. Run without --dry-run to actually delete nodes.") else: logger.info("✅ Cleanup completed successfully!") return 0 if __name__ == "__main__": try: exit_code = asyncio.run(main()) sys.exit(exit_code) except KeyboardInterrupt: logger.info("\n\n❌ Cleanup cancelled by user") sys.exit(1) except Exception as e: logger.error(f"\n\n❌ Unexpected error: {e}", exc_info=True) sys.exit(1)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jomapps/mcp-brain-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server