Skip to main content
Glama
skills_intelligence.py20.5 kB
#!/usr/bin/env python3 """ Skills Intelligence Script This script loads OPAL skill documentation from markdown files and indexes them using ParadeDB BM25 for fast semantic search. No external API dependencies required. Usage: python skills_intelligence.py --help python skills_intelligence.py --force # Clean rebuild python skills_intelligence.py # Incremental update """ import asyncio import logging import os import sys import argparse import re from pathlib import Path from datetime import datetime from typing import Dict, List, Optional, Any import asyncpg from dotenv import load_dotenv # ANSI color codes for terminal output class Colors: RED = '\033[91m' GREEN = '\033[92m' YELLOW = '\033[93m' BLUE = '\033[94m' MAGENTA = '\033[95m' CYAN = '\033[96m' WHITE = '\033[97m' BOLD = '\033[1m' UNDERLINE = '\033[4m' RESET = '\033[0m' BG_GREEN = '\033[102m' BG_YELLOW = '\033[103m' BG_BLUE = '\033[104m' class ColoredFormatter(logging.Formatter): """Custom formatter that adds colors to log messages.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.level_colors = { logging.DEBUG: Colors.CYAN, logging.INFO: Colors.WHITE, logging.WARNING: Colors.YELLOW, logging.ERROR: Colors.RED, logging.CRITICAL: Colors.RED + Colors.BOLD } self.event_patterns = { 'Successfully loaded': Colors.GREEN, 'Skipping': Colors.YELLOW, 'Updated': Colors.BLUE + Colors.BOLD, 'Created': Colors.GREEN + Colors.BOLD, 'Failed': Colors.RED + Colors.BOLD, 'Error': Colors.RED + Colors.BOLD, } def format(self, record): message = super().format(record) level_color = self.level_colors.get(record.levelno, Colors.WHITE) colored_message = message for pattern, color in self.event_patterns.items(): if pattern in message: colored_message = colored_message.replace( pattern, f"{color}{pattern}{Colors.RESET}" ) if colored_message == message: colored_message = f"{level_color}{message}{Colors.RESET}" return colored_message # Load environment variables load_dotenv() # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class SkillsIntelligenceLoader: """Loads skill documentation from markdown files into PostgreSQL with BM25 indexing.""" def __init__(self, skills_dir: str = None): self.db_pool = None # Find skills directory if skills_dir: self.skills_dir = Path(skills_dir) else: # Default to skills/ in project root script_dir = Path(__file__).parent self.skills_dir = script_dir.parent / 'skills' if not self.skills_dir.exists(): raise ValueError(f"Skills directory not found: {self.skills_dir}") # Statistics self.stats = { 'skills_found': 0, 'skills_created': 0, 'skills_updated': 0, 'skills_skipped': 0, 'skills_failed': 0, } # Force mode flag self.force_mode = False async def initialize_database(self) -> None: """Initialize database connection and ensure schema exists.""" db_password = os.getenv('SEMANTIC_GRAPH_PASSWORD') if not db_password: raise ValueError("SEMANTIC_GRAPH_PASSWORD environment variable must be set") db_config = { 'host': os.getenv('POSTGRES_HOST', 'localhost'), 'port': int(os.getenv('POSTGRES_PORT', '5432')), 'database': os.getenv('POSTGRES_DB', 'semantic_graph'), 'user': os.getenv('POSTGRES_USER', 'semantic_graph'), 'password': db_password } try: self.db_pool = await asyncpg.create_pool(**db_config, min_size=1, max_size=5) logger.info("Database connection established") # Create schema if it doesn't exist await self.ensure_schema_exists() except Exception as e: logger.error(f"Failed to connect to database: {e}") raise async def ensure_schema_exists(self) -> None: """Ensure the skills intelligence schema exists.""" script_dir = Path(__file__).parent schema_path = script_dir.parent / 'sql' / 'skills_intelligence_schema.sql' with open(schema_path, 'r') as f: schema_sql = f.read() async with self.db_pool.acquire() as conn: await conn.execute(schema_sql) logger.info("Skills intelligence schema created/verified") def parse_skill_frontmatter(self, content: str) -> Dict[str, Any]: """ Parse YAML frontmatter from skill markdown file. Expected format: --- name: skill-name description: Skill description text category: Aggregation # optional difficulty: intermediate # optional tags: ['opal', 'aggregation', 'statsby'] # optional --- """ frontmatter_pattern = r'^---\s*\n(.*?)\n---\s*\n' match = re.match(frontmatter_pattern, content, re.DOTALL) if not match: return {} frontmatter_text = match.group(1) metadata = {} # Parse simple YAML (we only need basic key: value and key: [list]) for line in frontmatter_text.split('\n'): line = line.strip() if not line or line.startswith('#'): continue if ':' in line: key, value = line.split(':', 1) key = key.strip() value = value.strip() # Parse lists if value.startswith('[') and value.endswith(']'): # Simple list parsing: ['item1', 'item2'] items = value[1:-1].split(',') metadata[key] = [item.strip().strip("'\"") for item in items if item.strip()] else: # Remove quotes metadata[key] = value.strip('"').strip("'") return metadata def extract_skill_content(self, content: str) -> str: """Extract main content after frontmatter.""" frontmatter_pattern = r'^---\s*\n.*?\n---\s*\n' content_without_frontmatter = re.sub(frontmatter_pattern, '', content, count=1, flags=re.DOTALL) return content_without_frontmatter.strip() def categorize_skill(self, skill_name: str, description: str, content: str) -> str: """Auto-categorize skill based on content analysis.""" content_lower = (skill_name + ' ' + description + ' ' + content[:500]).lower() # Category patterns if any(term in content_lower for term in ['aggregate', 'statsby', 'group_by', 'count', 'sum']): return 'Aggregation' elif any(term in content_lower for term in ['filter', 'search', 'contains', 'match']): return 'Filtering' elif any(term in content_lower for term in ['tdigest', 'percentile', 'latency', 'duration']): return 'Analysis' elif any(term in content_lower for term in ['window', 'lag', 'lead', 'row_number']): return 'Window Functions' elif any(term in content_lower for term in ['join', 'lookup', 'union', 'subquery']): return 'Data Combination' elif any(term in content_lower for term in ['parse', 'extract', 'regex']): return 'Parsing' elif any(term in content_lower for term in ['timechart', 'time-series', 'temporal']): return 'Time Series' elif any(term in content_lower for term in ['interval', 'span', 'duration']): return 'Intervals' elif any(term in content_lower for term in ['resource', 'reference', 'table']): return 'Resources' else: return 'General' def detect_difficulty(self, content: str) -> str: """Detect skill difficulty based on content complexity.""" content_lower = content.lower() # Advanced indicators advanced_terms = ['subquery', 'union', 'window()', 'tdigest_combine', 'nested', 'complex'] if any(term in content_lower for term in advanced_terms): return 'advanced' # Beginner indicators if any(term in content_lower for term in ['basic', 'simple', 'introduction', 'getting started']): return 'beginner' # Default to intermediate return 'intermediate' def extract_tags(self, skill_name: str, description: str, content: str) -> List[str]: """Extract relevant tags from skill content.""" tags = set() # Add skill name parts as tags name_parts = skill_name.replace('-', ' ').replace('_', ' ').split() tags.update(name_parts) # Common OPAL verbs and functions opal_keywords = [ 'statsby', 'filter', 'make_col', 'timechart', 'window', 'aggregate', 'align', 'lookup', 'union', 'join', 'parse', 'extract_regex', 'percentile', 'tdigest', 'group_by', 'sort', 'limit', 'topk', 'contains', 'count', 'sum', 'avg', 'min', 'max' ] content_lower = (description + ' ' + content[:1000]).lower() for keyword in opal_keywords: if keyword in content_lower: tags.add(keyword) return sorted(list(tags))[:15] # Limit to 15 most relevant tags async def load_skill_file(self, skill_path: Path) -> Optional[Dict[str, Any]]: """Load and parse a single skill markdown file.""" try: with open(skill_path, 'r', encoding='utf-8') as f: content = f.read() # Parse frontmatter metadata = self.parse_skill_frontmatter(content) # Extract content skill_content = self.extract_skill_content(content) # Get skill info skill_name = metadata.get('name', skill_path.parent.name) description = metadata.get('description', 'OPAL skill documentation') # Auto-categorize if not specified category = metadata.get('category') or self.categorize_skill(skill_name, description, skill_content) # Auto-detect difficulty if not specified difficulty = metadata.get('difficulty') or self.detect_difficulty(skill_content) # Extract or use provided tags tags = metadata.get('tags', []) if not tags: tags = self.extract_tags(skill_name, description, skill_content) return { 'skill_id': skill_name, 'skill_name': skill_name.replace('-', ' ').replace('_', ' ').title(), 'description': description, 'content': skill_content, 'category': category, 'tags': tags, 'difficulty': difficulty, } except Exception as e: logger.error(f"Failed to load skill file {skill_path}: {e}") return None async def check_skill_needs_update(self, skill_id: str) -> bool: """Check if a skill needs to be updated.""" if self.force_mode: return True try: async with self.db_pool.acquire() as conn: result = await conn.fetchrow( "SELECT updated_at FROM skills_intelligence WHERE skill_id = $1", skill_id ) if not result: return True # New skill # Check if updated recently (within 1 hour) updated_at = result['updated_at'] hours_since_update = (datetime.now() - updated_at).total_seconds() / 3600 if hours_since_update < 1: logger.debug(f"Skipping {skill_id} - updated {hours_since_update:.1f} hours ago") self.stats['skills_skipped'] += 1 return False return True except Exception as e: logger.warning(f"Error checking update status for {skill_id}: {e}") return True # Update on error async def store_skill(self, skill_data: Dict[str, Any]) -> None: """Store or update skill in database.""" try: async with self.db_pool.acquire() as conn: # Check if skill exists exists = await conn.fetchval( "SELECT EXISTS(SELECT 1 FROM skills_intelligence WHERE skill_id = $1)", skill_data['skill_id'] ) if exists: # Update existing await conn.execute(""" UPDATE skills_intelligence SET skill_name = $2, description = $3, content = $4, category = $5, tags = $6, difficulty = $7, updated_at = NOW() WHERE skill_id = $1 """, skill_data['skill_id'], skill_data['skill_name'], skill_data['description'], skill_data['content'], skill_data['category'], skill_data['tags'], skill_data['difficulty'] ) self.stats['skills_updated'] += 1 logger.info(f"Updated skill: {skill_data['skill_name']}") else: # Insert new await conn.execute(""" INSERT INTO skills_intelligence ( skill_id, skill_name, description, content, category, tags, difficulty ) VALUES ($1, $2, $3, $4, $5, $6, $7) """, skill_data['skill_id'], skill_data['skill_name'], skill_data['description'], skill_data['content'], skill_data['category'], skill_data['tags'], skill_data['difficulty'] ) self.stats['skills_created'] += 1 logger.info(f"Created skill: {skill_data['skill_name']}") except Exception as e: logger.error(f"Failed to store skill {skill_data['skill_id']}: {e}") self.stats['skills_failed'] += 1 async def load_all_skills(self) -> None: """Load all skills from the skills directory.""" logger.info("") logger.info("╔═══════════════════════════════════════════════════════════════╗") logger.info("║ 📚 Skills Intelligence Loader ║") logger.info("║ ║") logger.info("║ Loading OPAL skill documentation for BM25 search ║") if self.force_mode: logger.info("║ 🧹 FORCE MODE ENABLED 🧹 ║") logger.info("╚═══════════════════════════════════════════════════════════════╝") logger.info("") logger.info(f"🔍 Scanning skills directory: {self.skills_dir}") # Find all SKILL.md files skill_files = list(self.skills_dir.glob('*/SKILL.md')) if not skill_files: logger.warning(f"No skill files found in {self.skills_dir}") return self.stats['skills_found'] = len(skill_files) logger.info(f"📝 Found {len(skill_files)} skill files") # Process each skill for i, skill_file in enumerate(skill_files, 1): logger.info(f"Processing {i}/{len(skill_files)}: {skill_file.parent.name}") # Check if update needed if not await self.check_skill_needs_update(skill_file.parent.name): continue # Load skill data skill_data = await self.load_skill_file(skill_file) if skill_data: # Store in database await self.store_skill(skill_data) else: self.stats['skills_failed'] += 1 logger.info("Skills loading completed") self.print_statistics() def print_statistics(self) -> None: """Print loading statistics.""" logger.info("") logger.info("╔═══════════════════════════════════════════════════════════════╗") logger.info("║ 📊 Loading Statistics ║") logger.info("╠═══════════════════════════════════════════════════════════════╣") logger.info(f"║ Skills found: {self.stats['skills_found']:>43} ║") logger.info(f"║ Skills created: {self.stats['skills_created']:>41} ║") logger.info(f"║ Skills updated: {self.stats['skills_updated']:>41} ║") logger.info(f"║ Skills skipped: {self.stats['skills_skipped']:>41} ║") logger.info(f"║ Skills failed: {self.stats['skills_failed']:>42} ║") logger.info("╚═══════════════════════════════════════════════════════════════╝") logger.info("") async def clear_database(self) -> None: """Clear all skills for fresh start.""" async with self.db_pool.acquire() as conn: await conn.execute("TRUNCATE TABLE skills_intelligence RESTART IDENTITY CASCADE") logger.info("🧹 Cleared all skills from database") try: await conn.execute("VACUUM ANALYZE skills_intelligence") logger.info("🧹 Refreshed indexes and statistics") except Exception as e: logger.warning(f"Failed to refresh indexes: {e} (non-critical)") async def cleanup(self) -> None: """Cleanup resources.""" if self.db_pool: await self.db_pool.close() async def main(): parser = argparse.ArgumentParser(description="Load OPAL skills for BM25 search") parser.add_argument('--skills-dir', type=str, help='Path to skills directory') parser.add_argument('--force', action='store_true', help='Force clean database and reload all skills') parser.add_argument('--verbose', action='store_true', help='Enable verbose logging') args = parser.parse_args() # Configure colored logging handler = logging.StreamHandler() formatter = ColoredFormatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) handler.setFormatter(formatter) root_logger = logging.getLogger() root_logger.handlers.clear() root_logger.addHandler(handler) root_logger.setLevel(logging.DEBUG if args.verbose else logging.INFO) loader = SkillsIntelligenceLoader(skills_dir=args.skills_dir) loader.force_mode = args.force try: await loader.initialize_database() if args.force: logger.info("🧹 Force mode enabled - clearing skills database...") await loader.clear_database() await loader.load_all_skills() except KeyboardInterrupt: logger.info("Loading interrupted by user") except Exception as e: logger.error(f"Loading failed: {e}") raise finally: await loader.cleanup() if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rustomax/observe-experimental-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server