Skip to main content
Glama

Katamari MCP Server

by ciphernaut
community_hub.pyโ€ข25.3 kB
""" Community Framework for Katamari MCP - Phase 3 Enables collaborative capability development, sharing, and community-driven improvement of the Katamari ecosystem. """ import asyncio import json import logging import hashlib from datetime import datetime, timedelta from pathlib import Path from typing import Dict, List, Optional, Any, Union, Tuple from dataclasses import dataclass, field, asdict from enum import Enum import uuid import re from ..utils.config import Config from ..utils.error_handler import ErrorHandler from ..devtools.validator import PackageValidator from ..devtools.testing import CapabilityTester logger = logging.getLogger(__name__) class ContributionStatus(Enum): """Status of community contributions.""" PENDING = "pending" VALIDATING = "validating" APPROVED = "approved" REJECTED = "rejected" PUBLISHED = "published" DEPRECATED = "deprecated" class ContributionType(Enum): """Types of community contributions.""" CAPABILITY = "capability" WORKFLOW = "workflow" TEMPLATE = "template" DOCUMENTATION = "documentation" BUG_FIX = "bug_fix" ENHANCEMENT = "enhancement" class ReputationLevel(Enum): """Reputation levels for contributors.""" NEWCOMER = "newcomer" CONTRIBUTOR = "contributor" TRUSTED = "trusted" EXPERT = "expert" MAINTAINER = "maintainer" @dataclass class Contributor: """Community contributor profile.""" contributor_id: str username: str email: str reputation_score: int = 0 reputation_level: ReputationLevel = ReputationLevel.NEWCOMER contributions_count: int = 0 joined_date: datetime = field(default_factory=datetime.now) last_active: datetime = field(default_factory=datetime.now) bio: Optional[str] = None website: Optional[str] = None github_username: Optional[str] = None def update_reputation(self, score_change: int): """Update contributor reputation.""" self.reputation_score += score_change self.reputation_level = self._calculate_level() self.last_active = datetime.now() def _calculate_level(self) -> ReputationLevel: """Calculate reputation level based on score.""" if self.reputation_score >= 1000: return ReputationLevel.MAINTAINER elif self.reputation_score >= 500: return ReputationLevel.EXPERT elif self.reputation_score >= 200: return ReputationLevel.TRUSTED elif self.reputation_score >= 50: return ReputationLevel.CONTRIBUTOR else: return ReputationLevel.NEWCOMER @dataclass class Contribution: """A community contribution.""" contribution_id: str title: str description: str contribution_type: ContributionType contributor_id: str status: ContributionStatus = ContributionStatus.PENDING created_date: datetime = field(default_factory=datetime.now) updated_date: datetime = field(default_factory=datetime.now) published_date: Optional[datetime] = None # Content content: Optional[Dict[str, Any]] = None files: List[str] = field(default_factory=list) dependencies: List[str] = field(default_factory=list) # Validation validation_results: Optional[Dict[str, Any]] = None test_results: Optional[Dict[str, Any]] = None security_scan_results: Optional[Dict[str, Any]] = None # Community feedback rating: Optional[float] = None downloads: int = 0 forks: int = 0 issues: List[str] = field(default_factory=list) tags: List[str] = field(default_factory=list) # Review process reviewers: List[str] = field(default_factory=list) approval_count: int = 0 rejection_count: int = 0 @dataclass class Review: """A review of a contribution.""" review_id: str contribution_id: str reviewer_id: str rating: int # 1-5 comment: str approved: bool created_date: datetime = field(default_factory=datetime.now) helpful_count: int = 0 class CommunityHub: """Central hub for community contributions and collaboration.""" def __init__(self, config: Config): self.config = config self.error_handler = ErrorHandler() # Storage paths workspace_root = config.get('workspace_root', '.') self.hub_dir = Path(workspace_root) / '.katamari' / 'community' self.contributions_dir = self.hub_dir / 'contributions' self.contributors_dir = self.hub_dir / 'contributors' self.registry_file = self.hub_dir / 'registry.json' self.index_file = self.hub_dir / 'index.json' # Create directories self.hub_dir.mkdir(parents=True, exist_ok=True) self.contributions_dir.mkdir(parents=True, exist_ok=True) self.contributors_dir.mkdir(parents=True, exist_ok=True) # In-memory storage self.contributions: Dict[str, Contribution] = {} self.contributors: Dict[str, Contributor] = {} self.reviews: Dict[str, Review] = {} # Validation tools self.validator = PackageValidator(config) self.tester = CapabilityTester(isolated=True) # Load existing data self._load_registry() self._load_contributors() self._load_contributions() logger.info("Community hub initialized") def _load_registry(self): """Load the community registry.""" if self.registry_file.exists(): try: with open(self.registry_file, 'r') as f: registry = json.load(f) logger.info(f"Loaded community registry with {len(registry)} entries") except Exception as e: logger.error(f"Failed to load registry: {e}") def _load_contributors(self): """Load contributor profiles.""" for contributor_file in self.contributors_dir.glob('*.json'): try: with open(contributor_file, 'r') as f: data = json.load(f) # Convert date strings back to datetime objects data['joined_date'] = datetime.fromisoformat(data['joined_date']) data['last_active'] = datetime.fromisoformat(data['last_active']) data['reputation_level'] = ReputationLevel(data['reputation_level']) contributor = Contributor(**data) self.contributors[contributor.contributor_id] = contributor except Exception as e: logger.error(f"Failed to load contributor {contributor_file}: {e}") def _load_contributions(self): """Load community contributions.""" for contribution_file in self.contributions_dir.glob('*.json'): try: with open(contribution_file, 'r') as f: data = json.load(f) # Convert enums and dates data['contribution_type'] = ContributionType(data['contribution_type']) data['status'] = ContributionStatus(data['status']) data['created_date'] = datetime.fromisoformat(data['created_date']) data['updated_date'] = datetime.fromisoformat(data['updated_date']) if data.get('published_date'): data['published_date'] = datetime.fromisoformat(data['published_date']) contribution = Contribution(**data) self.contributions[contribution.contribution_id] = contribution except Exception as e: logger.error(f"Failed to load contribution {contribution_file}: {e}") def _save_contributor(self, contributor: Contributor): """Save contributor profile to disk.""" contributor_file = self.contributors_dir / f"{contributor.contributor_id}.json" try: data = asdict(contributor) data['joined_date'] = contributor.joined_date.isoformat() data['last_active'] = contributor.last_active.isoformat() data['reputation_level'] = contributor.reputation_level.value with open(contributor_file, 'w') as f: json.dump(data, f, indent=2) except Exception as e: logger.error(f"Failed to save contributor {contributor.contributor_id}: {e}") def _save_contribution(self, contribution: Contribution): """Save contribution to disk.""" contribution_file = self.contributions_dir / f"{contribution.contribution_id}.json" try: data = asdict(contribution) data['contribution_type'] = contribution.contribution_type.value data['status'] = contribution.status.value data['created_date'] = contribution.created_date.isoformat() data['updated_date'] = contribution.updated_date.isoformat() if contribution.published_date: data['published_date'] = contribution.published_date.isoformat() with open(contribution_file, 'w') as f: json.dump(data, f, indent=2) except Exception as e: logger.error(f"Failed to save contribution {contribution.contribution_id}: {e}") async def register_contributor( self, username: str, email: str, bio: Optional[str] = None, github_username: Optional[str] = None ) -> str: """Register a new contributor.""" contributor_id = str(uuid.uuid4()) contributor = Contributor( contributor_id=contributor_id, username=username, email=email, bio=bio, github_username=github_username ) self.contributors[contributor_id] = contributor self._save_contributor(contributor) logger.info(f"Registered new contributor: {username}") return contributor_id async def submit_contribution( self, contributor_id: str, title: str, description: str, contribution_type: ContributionType, content: Dict[str, Any], files: List[str], tags: Optional[List[str]] = None ) -> str: """Submit a new contribution.""" if contributor_id not in self.contributors: raise ValueError(f"Contributor {contributor_id} not found") contribution_id = str(uuid.uuid4()) contribution = Contribution( contribution_id=contribution_id, title=title, description=description, contribution_type=contribution_type, contributor_id=contributor_id, content=content, files=files, tags=tags or [] ) self.contributions[contribution_id] = contribution self._save_contribution(contribution) # Update contributor stats contributor = self.contributors[contributor_id] contributor.contributions_count += 1 self._save_contributor(contributor) # Start validation process await self._validate_contribution(contribution_id) logger.info(f"Submitted new contribution: {title}") return contribution_id async def _validate_contribution(self, contribution_id: str): """Validate a contribution.""" contribution = self.contributions[contribution_id] contribution.status = ContributionStatus.VALIDATING contribution.updated_date = datetime.now() try: # Security validation security_results = await self._security_scan(contribution) contribution.security_scan_results = security_results if not security_results.get('passed', False): contribution.status = ContributionStatus.REJECTED contribution.rejection_count += 1 self._save_contribution(contribution) return # Code validation validation_results = await self._validate_code(contribution) contribution.validation_results = validation_results if not validation_results.get('passed', False): contribution.status = ContributionStatus.REJECTED contribution.rejection_count += 1 self._save_contribution(contribution) return # Run tests test_results = await self._run_tests(contribution) contribution.test_results = test_results if not test_results.get('passed', False): contribution.status = ContributionStatus.REJECTED contribution.rejection_count += 1 self._save_contribution(contribution) return # All validations passed contribution.status = ContributionStatus.APPROVED contribution.approval_count += 1 self._save_contribution(contribution) logger.info(f"Contribution {contribution_id} passed validation") except Exception as e: logger.error(f"Validation failed for {contribution_id}: {e}") contribution.status = ContributionStatus.REJECTED contribution.rejection_count += 1 self._save_contribution(contribution) async def _security_scan(self, contribution: Contribution) -> Dict[str, Any]: """Perform security scan on contribution.""" try: # This would integrate with the secret scanning system # For now, return a basic scan result return { 'passed': True, 'issues': [], 'scan_time': datetime.now().isoformat() } except Exception as e: logger.error(f"Security scan failed: {e}") return { 'passed': False, 'issues': [f"Scan error: {e}"], 'scan_time': datetime.now().isoformat() } async def _validate_code(self, contribution: Contribution) -> Dict[str, Any]: """Validate contribution code.""" try: # Use the package validator if contribution.content: validation_result = await self.validator.validate_package(contribution.content) return validation_result return { 'passed': True, 'issues': [], 'validation_time': datetime.now().isoformat() } except Exception as e: logger.error(f"Code validation failed: {e}") return { 'passed': False, 'issues': [f"Validation error: {e}"], 'validation_time': datetime.now().isoformat() } async def _run_tests(self, contribution: Contribution) -> Dict[str, Any]: """Run tests on contribution.""" try: # Use the isolated tester if contribution.files: test_result = await self.tester.test_package(contribution.files) return test_result return { 'passed': True, 'test_results': [], 'test_time': datetime.now().isoformat() } except Exception as e: logger.error(f"Testing failed: {e}") return { 'passed': False, 'issues': [f"Test error: {e}"], 'test_time': datetime.now().isoformat() } async def review_contribution( self, contribution_id: str, reviewer_id: str, rating: int, comment: str, approved: bool ) -> str: """Review a contribution.""" if contribution_id not in self.contributions: raise ValueError(f"Contribution {contribution_id} not found") if reviewer_id not in self.contributors: raise ValueError(f"Reviewer {reviewer_id} not found") review_id = str(uuid.uuid4()) review = Review( review_id=review_id, contribution_id=contribution_id, reviewer_id=reviewer_id, rating=rating, comment=comment, approved=approved ) self.reviews[review_id] = review # Update contribution contribution = self.contributions[contribution_id] contribution.reviewers.append(reviewer_id) if approved: contribution.approval_count += 1 else: contribution.rejection_count += 1 # Update contributor reputation reviewer = self.contributors[reviewer_id] reputation_change = 5 if approved else 2 reviewer.update_reputation(reputation_change) self._save_contributor(reviewer) # Update contribution status based on reviews await self._update_contribution_status(contribution_id) logger.info(f"Reviewed contribution {contribution_id}: {'approved' if approved else 'rejected'}") return review_id async def _update_contribution_status(self, contribution_id: str): """Update contribution status based on reviews.""" contribution = self.contributions[contribution_id] # Need at least 3 approvals to publish if contribution.approval_count >= 3 and contribution.status == ContributionStatus.APPROVED: contribution.status = ContributionStatus.PUBLISHED contribution.published_date = datetime.now() # Award reputation to contributor contributor = self.contributors[contribution.contributor_id] contributor.update_reputation(50) # 50 points for published contribution self._save_contributor(contributor) contribution.updated_date = datetime.now() self._save_contribution(contribution) async def discover_contributions( self, contribution_type: Optional[ContributionType] = None, tags: Optional[List[str]] = None, rating_threshold: float = 0.0, limit: int = 50 ) -> List[Dict[str, Any]]: """Discover community contributions.""" contributions = [] for contribution in self.contributions.values(): # Filter by type if contribution_type and contribution.contribution_type != contribution_type: continue # Filter by status (only show published) if contribution.status != ContributionStatus.PUBLISHED: continue # Filter by rating if contribution.rating and contribution.rating < rating_threshold: continue # Filter by tags if tags and not any(tag in contribution.tags for tag in tags): continue # Get contributor info contributor = self.contributors.get(contribution.contributor_id) contribution_data = { 'contribution_id': contribution.contribution_id, 'title': contribution.title, 'description': contribution.description, 'type': contribution.contribution_type.value, 'contributor': contributor.username if contributor else 'Unknown', 'rating': contribution.rating, 'downloads': contribution.downloads, 'forks': contribution.forks, 'tags': contribution.tags, 'published_date': contribution.published_date.isoformat() if contribution.published_date else None, 'files': contribution.files } contributions.append(contribution_data) # Sort by rating and downloads contributions.sort(key=lambda x: (x.get('rating', 0), x.get('downloads', 0)), reverse=True) return contributions[:limit] async def download_contribution(self, contribution_id: str) -> Optional[Dict[str, Any]]: """Download a contribution.""" if contribution_id not in self.contributions: return None contribution = self.contributions[contribution_id] # Increment download count contribution.downloads += 1 self._save_contribution(contribution) return { 'contribution_id': contribution.contribution_id, 'title': contribution.title, 'content': contribution.content, 'files': contribution.files, 'dependencies': contribution.dependencies, 'download_date': datetime.now().isoformat() } async def fork_contribution(self, contribution_id: str, contributor_id: str) -> Optional[str]: """Fork a contribution.""" if contribution_id not in self.contributions: return None original = self.contributions[contribution_id] # Create forked contribution fork_id = str(uuid.uuid4()) forked = Contribution( contribution_id=fork_id, title=f"Fork of {original.title}", description=f"Forked from: {original.description}", contribution_type=original.contribution_type, contributor_id=contributor_id, content=original.content.copy() if original.content else None, files=original.files.copy(), dependencies=original.dependencies.copy(), tags=original.tags.copy() + ['fork'] ) self.contributions[fork_id] = forked self._save_contribution(forked) # Increment fork count on original original.forks += 1 self._save_contribution(original) logger.info(f"Forked contribution {contribution_id} as {fork_id}") return fork_id def get_contributor_stats(self, contributor_id: str) -> Optional[Dict[str, Any]]: """Get contributor statistics.""" if contributor_id not in self.contributors: return None contributor = self.contributors[contributor_id] # Count contributions by status contributions = [c for c in self.contributions.values() if c.contributor_id == contributor_id] published_count = sum(1 for c in contributions if c.status == ContributionStatus.PUBLISHED) pending_count = sum(1 for c in contributions if c.status == ContributionStatus.PENDING) # Calculate total downloads total_downloads = sum(c.downloads for c in contributions) total_forks = sum(c.forks for c in contributions) return { 'contributor_id': contributor.contributor_id, 'username': contributor.username, 'reputation_score': contributor.reputation_score, 'reputation_level': contributor.reputation_level.value, 'contributions_count': contributor.contributions_count, 'published_contributions': published_count, 'pending_contributions': pending_count, 'total_downloads': total_downloads, 'total_forks': total_forks, 'joined_date': contributor.joined_date.isoformat(), 'last_active': contributor.last_active.isoformat() } def get_community_stats(self) -> Dict[str, Any]: """Get overall community statistics.""" total_contributions = len(self.contributions) published_contributions = sum(1 for c in self.contributions.values() if c.status == ContributionStatus.PUBLISHED) total_contributors = len(self.contributors) # Contributions by type contributions_by_type = {} for contribution in self.contributions.values(): type_name = contribution.contribution_type.value contributions_by_type[type_name] = contributions_by_type.get(type_name, 0) + 1 # Reputation distribution reputation_distribution = {} for contributor in self.contributors.values(): level = contributor.reputation_level.value reputation_distribution[level] = reputation_distribution.get(level, 0) + 1 return { 'total_contributions': total_contributions, 'published_contributions': published_contributions, 'total_contributors': total_contributors, 'contributions_by_type': contributions_by_type, 'reputation_distribution': reputation_distribution, 'total_downloads': sum(c.downloads for c in self.contributions.values()), 'total_forks': sum(c.forks for c in self.contributions.values()) }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ciphernaut/katamari-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server