community_hub.pyโข25.3 kB
"""
Community Framework for Katamari MCP - Phase 3
Enables collaborative capability development, sharing, and community-driven
improvement of the Katamari ecosystem.
"""
import asyncio
import json
import logging
import hashlib
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional, Any, Union, Tuple
from dataclasses import dataclass, field, asdict
from enum import Enum
import uuid
import re
from ..utils.config import Config
from ..utils.error_handler import ErrorHandler
from ..devtools.validator import PackageValidator
from ..devtools.testing import CapabilityTester
logger = logging.getLogger(__name__)
class ContributionStatus(Enum):
"""Status of community contributions."""
PENDING = "pending"
VALIDATING = "validating"
APPROVED = "approved"
REJECTED = "rejected"
PUBLISHED = "published"
DEPRECATED = "deprecated"
class ContributionType(Enum):
"""Types of community contributions."""
CAPABILITY = "capability"
WORKFLOW = "workflow"
TEMPLATE = "template"
DOCUMENTATION = "documentation"
BUG_FIX = "bug_fix"
ENHANCEMENT = "enhancement"
class ReputationLevel(Enum):
"""Reputation levels for contributors."""
NEWCOMER = "newcomer"
CONTRIBUTOR = "contributor"
TRUSTED = "trusted"
EXPERT = "expert"
MAINTAINER = "maintainer"
@dataclass
class Contributor:
"""Community contributor profile."""
contributor_id: str
username: str
email: str
reputation_score: int = 0
reputation_level: ReputationLevel = ReputationLevel.NEWCOMER
contributions_count: int = 0
joined_date: datetime = field(default_factory=datetime.now)
last_active: datetime = field(default_factory=datetime.now)
bio: Optional[str] = None
website: Optional[str] = None
github_username: Optional[str] = None
def update_reputation(self, score_change: int):
"""Update contributor reputation."""
self.reputation_score += score_change
self.reputation_level = self._calculate_level()
self.last_active = datetime.now()
def _calculate_level(self) -> ReputationLevel:
"""Calculate reputation level based on score."""
if self.reputation_score >= 1000:
return ReputationLevel.MAINTAINER
elif self.reputation_score >= 500:
return ReputationLevel.EXPERT
elif self.reputation_score >= 200:
return ReputationLevel.TRUSTED
elif self.reputation_score >= 50:
return ReputationLevel.CONTRIBUTOR
else:
return ReputationLevel.NEWCOMER
@dataclass
class Contribution:
"""A community contribution."""
contribution_id: str
title: str
description: str
contribution_type: ContributionType
contributor_id: str
status: ContributionStatus = ContributionStatus.PENDING
created_date: datetime = field(default_factory=datetime.now)
updated_date: datetime = field(default_factory=datetime.now)
published_date: Optional[datetime] = None
# Content
content: Optional[Dict[str, Any]] = None
files: List[str] = field(default_factory=list)
dependencies: List[str] = field(default_factory=list)
# Validation
validation_results: Optional[Dict[str, Any]] = None
test_results: Optional[Dict[str, Any]] = None
security_scan_results: Optional[Dict[str, Any]] = None
# Community feedback
rating: Optional[float] = None
downloads: int = 0
forks: int = 0
issues: List[str] = field(default_factory=list)
tags: List[str] = field(default_factory=list)
# Review process
reviewers: List[str] = field(default_factory=list)
approval_count: int = 0
rejection_count: int = 0
@dataclass
class Review:
"""A review of a contribution."""
review_id: str
contribution_id: str
reviewer_id: str
rating: int # 1-5
comment: str
approved: bool
created_date: datetime = field(default_factory=datetime.now)
helpful_count: int = 0
class CommunityHub:
"""Central hub for community contributions and collaboration."""
def __init__(self, config: Config):
self.config = config
self.error_handler = ErrorHandler()
# Storage paths
workspace_root = config.get('workspace_root', '.')
self.hub_dir = Path(workspace_root) / '.katamari' / 'community'
self.contributions_dir = self.hub_dir / 'contributions'
self.contributors_dir = self.hub_dir / 'contributors'
self.registry_file = self.hub_dir / 'registry.json'
self.index_file = self.hub_dir / 'index.json'
# Create directories
self.hub_dir.mkdir(parents=True, exist_ok=True)
self.contributions_dir.mkdir(parents=True, exist_ok=True)
self.contributors_dir.mkdir(parents=True, exist_ok=True)
# In-memory storage
self.contributions: Dict[str, Contribution] = {}
self.contributors: Dict[str, Contributor] = {}
self.reviews: Dict[str, Review] = {}
# Validation tools
self.validator = PackageValidator(config)
self.tester = CapabilityTester(isolated=True)
# Load existing data
self._load_registry()
self._load_contributors()
self._load_contributions()
logger.info("Community hub initialized")
def _load_registry(self):
"""Load the community registry."""
if self.registry_file.exists():
try:
with open(self.registry_file, 'r') as f:
registry = json.load(f)
logger.info(f"Loaded community registry with {len(registry)} entries")
except Exception as e:
logger.error(f"Failed to load registry: {e}")
def _load_contributors(self):
"""Load contributor profiles."""
for contributor_file in self.contributors_dir.glob('*.json'):
try:
with open(contributor_file, 'r') as f:
data = json.load(f)
# Convert date strings back to datetime objects
data['joined_date'] = datetime.fromisoformat(data['joined_date'])
data['last_active'] = datetime.fromisoformat(data['last_active'])
data['reputation_level'] = ReputationLevel(data['reputation_level'])
contributor = Contributor(**data)
self.contributors[contributor.contributor_id] = contributor
except Exception as e:
logger.error(f"Failed to load contributor {contributor_file}: {e}")
def _load_contributions(self):
"""Load community contributions."""
for contribution_file in self.contributions_dir.glob('*.json'):
try:
with open(contribution_file, 'r') as f:
data = json.load(f)
# Convert enums and dates
data['contribution_type'] = ContributionType(data['contribution_type'])
data['status'] = ContributionStatus(data['status'])
data['created_date'] = datetime.fromisoformat(data['created_date'])
data['updated_date'] = datetime.fromisoformat(data['updated_date'])
if data.get('published_date'):
data['published_date'] = datetime.fromisoformat(data['published_date'])
contribution = Contribution(**data)
self.contributions[contribution.contribution_id] = contribution
except Exception as e:
logger.error(f"Failed to load contribution {contribution_file}: {e}")
def _save_contributor(self, contributor: Contributor):
"""Save contributor profile to disk."""
contributor_file = self.contributors_dir / f"{contributor.contributor_id}.json"
try:
data = asdict(contributor)
data['joined_date'] = contributor.joined_date.isoformat()
data['last_active'] = contributor.last_active.isoformat()
data['reputation_level'] = contributor.reputation_level.value
with open(contributor_file, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
logger.error(f"Failed to save contributor {contributor.contributor_id}: {e}")
def _save_contribution(self, contribution: Contribution):
"""Save contribution to disk."""
contribution_file = self.contributions_dir / f"{contribution.contribution_id}.json"
try:
data = asdict(contribution)
data['contribution_type'] = contribution.contribution_type.value
data['status'] = contribution.status.value
data['created_date'] = contribution.created_date.isoformat()
data['updated_date'] = contribution.updated_date.isoformat()
if contribution.published_date:
data['published_date'] = contribution.published_date.isoformat()
with open(contribution_file, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
logger.error(f"Failed to save contribution {contribution.contribution_id}: {e}")
async def register_contributor(
self,
username: str,
email: str,
bio: Optional[str] = None,
github_username: Optional[str] = None
) -> str:
"""Register a new contributor."""
contributor_id = str(uuid.uuid4())
contributor = Contributor(
contributor_id=contributor_id,
username=username,
email=email,
bio=bio,
github_username=github_username
)
self.contributors[contributor_id] = contributor
self._save_contributor(contributor)
logger.info(f"Registered new contributor: {username}")
return contributor_id
async def submit_contribution(
self,
contributor_id: str,
title: str,
description: str,
contribution_type: ContributionType,
content: Dict[str, Any],
files: List[str],
tags: Optional[List[str]] = None
) -> str:
"""Submit a new contribution."""
if contributor_id not in self.contributors:
raise ValueError(f"Contributor {contributor_id} not found")
contribution_id = str(uuid.uuid4())
contribution = Contribution(
contribution_id=contribution_id,
title=title,
description=description,
contribution_type=contribution_type,
contributor_id=contributor_id,
content=content,
files=files,
tags=tags or []
)
self.contributions[contribution_id] = contribution
self._save_contribution(contribution)
# Update contributor stats
contributor = self.contributors[contributor_id]
contributor.contributions_count += 1
self._save_contributor(contributor)
# Start validation process
await self._validate_contribution(contribution_id)
logger.info(f"Submitted new contribution: {title}")
return contribution_id
async def _validate_contribution(self, contribution_id: str):
"""Validate a contribution."""
contribution = self.contributions[contribution_id]
contribution.status = ContributionStatus.VALIDATING
contribution.updated_date = datetime.now()
try:
# Security validation
security_results = await self._security_scan(contribution)
contribution.security_scan_results = security_results
if not security_results.get('passed', False):
contribution.status = ContributionStatus.REJECTED
contribution.rejection_count += 1
self._save_contribution(contribution)
return
# Code validation
validation_results = await self._validate_code(contribution)
contribution.validation_results = validation_results
if not validation_results.get('passed', False):
contribution.status = ContributionStatus.REJECTED
contribution.rejection_count += 1
self._save_contribution(contribution)
return
# Run tests
test_results = await self._run_tests(contribution)
contribution.test_results = test_results
if not test_results.get('passed', False):
contribution.status = ContributionStatus.REJECTED
contribution.rejection_count += 1
self._save_contribution(contribution)
return
# All validations passed
contribution.status = ContributionStatus.APPROVED
contribution.approval_count += 1
self._save_contribution(contribution)
logger.info(f"Contribution {contribution_id} passed validation")
except Exception as e:
logger.error(f"Validation failed for {contribution_id}: {e}")
contribution.status = ContributionStatus.REJECTED
contribution.rejection_count += 1
self._save_contribution(contribution)
async def _security_scan(self, contribution: Contribution) -> Dict[str, Any]:
"""Perform security scan on contribution."""
try:
# This would integrate with the secret scanning system
# For now, return a basic scan result
return {
'passed': True,
'issues': [],
'scan_time': datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Security scan failed: {e}")
return {
'passed': False,
'issues': [f"Scan error: {e}"],
'scan_time': datetime.now().isoformat()
}
async def _validate_code(self, contribution: Contribution) -> Dict[str, Any]:
"""Validate contribution code."""
try:
# Use the package validator
if contribution.content:
validation_result = await self.validator.validate_package(contribution.content)
return validation_result
return {
'passed': True,
'issues': [],
'validation_time': datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Code validation failed: {e}")
return {
'passed': False,
'issues': [f"Validation error: {e}"],
'validation_time': datetime.now().isoformat()
}
async def _run_tests(self, contribution: Contribution) -> Dict[str, Any]:
"""Run tests on contribution."""
try:
# Use the isolated tester
if contribution.files:
test_result = await self.tester.test_package(contribution.files)
return test_result
return {
'passed': True,
'test_results': [],
'test_time': datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Testing failed: {e}")
return {
'passed': False,
'issues': [f"Test error: {e}"],
'test_time': datetime.now().isoformat()
}
async def review_contribution(
self,
contribution_id: str,
reviewer_id: str,
rating: int,
comment: str,
approved: bool
) -> str:
"""Review a contribution."""
if contribution_id not in self.contributions:
raise ValueError(f"Contribution {contribution_id} not found")
if reviewer_id not in self.contributors:
raise ValueError(f"Reviewer {reviewer_id} not found")
review_id = str(uuid.uuid4())
review = Review(
review_id=review_id,
contribution_id=contribution_id,
reviewer_id=reviewer_id,
rating=rating,
comment=comment,
approved=approved
)
self.reviews[review_id] = review
# Update contribution
contribution = self.contributions[contribution_id]
contribution.reviewers.append(reviewer_id)
if approved:
contribution.approval_count += 1
else:
contribution.rejection_count += 1
# Update contributor reputation
reviewer = self.contributors[reviewer_id]
reputation_change = 5 if approved else 2
reviewer.update_reputation(reputation_change)
self._save_contributor(reviewer)
# Update contribution status based on reviews
await self._update_contribution_status(contribution_id)
logger.info(f"Reviewed contribution {contribution_id}: {'approved' if approved else 'rejected'}")
return review_id
async def _update_contribution_status(self, contribution_id: str):
"""Update contribution status based on reviews."""
contribution = self.contributions[contribution_id]
# Need at least 3 approvals to publish
if contribution.approval_count >= 3 and contribution.status == ContributionStatus.APPROVED:
contribution.status = ContributionStatus.PUBLISHED
contribution.published_date = datetime.now()
# Award reputation to contributor
contributor = self.contributors[contribution.contributor_id]
contributor.update_reputation(50) # 50 points for published contribution
self._save_contributor(contributor)
contribution.updated_date = datetime.now()
self._save_contribution(contribution)
async def discover_contributions(
self,
contribution_type: Optional[ContributionType] = None,
tags: Optional[List[str]] = None,
rating_threshold: float = 0.0,
limit: int = 50
) -> List[Dict[str, Any]]:
"""Discover community contributions."""
contributions = []
for contribution in self.contributions.values():
# Filter by type
if contribution_type and contribution.contribution_type != contribution_type:
continue
# Filter by status (only show published)
if contribution.status != ContributionStatus.PUBLISHED:
continue
# Filter by rating
if contribution.rating and contribution.rating < rating_threshold:
continue
# Filter by tags
if tags and not any(tag in contribution.tags for tag in tags):
continue
# Get contributor info
contributor = self.contributors.get(contribution.contributor_id)
contribution_data = {
'contribution_id': contribution.contribution_id,
'title': contribution.title,
'description': contribution.description,
'type': contribution.contribution_type.value,
'contributor': contributor.username if contributor else 'Unknown',
'rating': contribution.rating,
'downloads': contribution.downloads,
'forks': contribution.forks,
'tags': contribution.tags,
'published_date': contribution.published_date.isoformat() if contribution.published_date else None,
'files': contribution.files
}
contributions.append(contribution_data)
# Sort by rating and downloads
contributions.sort(key=lambda x: (x.get('rating', 0), x.get('downloads', 0)), reverse=True)
return contributions[:limit]
async def download_contribution(self, contribution_id: str) -> Optional[Dict[str, Any]]:
"""Download a contribution."""
if contribution_id not in self.contributions:
return None
contribution = self.contributions[contribution_id]
# Increment download count
contribution.downloads += 1
self._save_contribution(contribution)
return {
'contribution_id': contribution.contribution_id,
'title': contribution.title,
'content': contribution.content,
'files': contribution.files,
'dependencies': contribution.dependencies,
'download_date': datetime.now().isoformat()
}
async def fork_contribution(self, contribution_id: str, contributor_id: str) -> Optional[str]:
"""Fork a contribution."""
if contribution_id not in self.contributions:
return None
original = self.contributions[contribution_id]
# Create forked contribution
fork_id = str(uuid.uuid4())
forked = Contribution(
contribution_id=fork_id,
title=f"Fork of {original.title}",
description=f"Forked from: {original.description}",
contribution_type=original.contribution_type,
contributor_id=contributor_id,
content=original.content.copy() if original.content else None,
files=original.files.copy(),
dependencies=original.dependencies.copy(),
tags=original.tags.copy() + ['fork']
)
self.contributions[fork_id] = forked
self._save_contribution(forked)
# Increment fork count on original
original.forks += 1
self._save_contribution(original)
logger.info(f"Forked contribution {contribution_id} as {fork_id}")
return fork_id
def get_contributor_stats(self, contributor_id: str) -> Optional[Dict[str, Any]]:
"""Get contributor statistics."""
if contributor_id not in self.contributors:
return None
contributor = self.contributors[contributor_id]
# Count contributions by status
contributions = [c for c in self.contributions.values() if c.contributor_id == contributor_id]
published_count = sum(1 for c in contributions if c.status == ContributionStatus.PUBLISHED)
pending_count = sum(1 for c in contributions if c.status == ContributionStatus.PENDING)
# Calculate total downloads
total_downloads = sum(c.downloads for c in contributions)
total_forks = sum(c.forks for c in contributions)
return {
'contributor_id': contributor.contributor_id,
'username': contributor.username,
'reputation_score': contributor.reputation_score,
'reputation_level': contributor.reputation_level.value,
'contributions_count': contributor.contributions_count,
'published_contributions': published_count,
'pending_contributions': pending_count,
'total_downloads': total_downloads,
'total_forks': total_forks,
'joined_date': contributor.joined_date.isoformat(),
'last_active': contributor.last_active.isoformat()
}
def get_community_stats(self) -> Dict[str, Any]:
"""Get overall community statistics."""
total_contributions = len(self.contributions)
published_contributions = sum(1 for c in self.contributions.values() if c.status == ContributionStatus.PUBLISHED)
total_contributors = len(self.contributors)
# Contributions by type
contributions_by_type = {}
for contribution in self.contributions.values():
type_name = contribution.contribution_type.value
contributions_by_type[type_name] = contributions_by_type.get(type_name, 0) + 1
# Reputation distribution
reputation_distribution = {}
for contributor in self.contributors.values():
level = contributor.reputation_level.value
reputation_distribution[level] = reputation_distribution.get(level, 0) + 1
return {
'total_contributions': total_contributions,
'published_contributions': published_contributions,
'total_contributors': total_contributors,
'contributions_by_type': contributions_by_type,
'reputation_distribution': reputation_distribution,
'total_downloads': sum(c.downloads for c in self.contributions.values()),
'total_forks': sum(c.forks for c in self.contributions.values())
}