Skip to main content
Glama

Katamari MCP Server

by ciphernaut
knowledge_transfer.pyโ€ข41.2 kB
""" Cross-Component Learning System - Phase 3 Advanced knowledge transfer system that enables learning and adaptation to propagate across different capabilities and components. """ import asyncio import json from datetime import datetime, timedelta from typing import Dict, List, Optional, Any, Tuple, Set from dataclasses import dataclass, field from enum import Enum import uuid import logging import hashlib from .data_models import ( FeedbackEvent, PerformanceSnapshot, CapabilityProfile, LearningRecord, AdaptationProposal, AdaptationType ) logger = logging.getLogger(__name__) class KnowledgeType(Enum): """Types of knowledge that can be transferred""" HEURISTIC_WEIGHTS = "heuristic_weights" PERFORMANCE_PATTERNS = "performance_patterns" ERROR_SOLUTIONS = "error_solutions" OPTIMIZATION_STRATEGIES = "optimization_strategies" BEST_PRACTICES = "best_practices" CONFIGURATION_TEMPLATES = "configuration_templates" SECURITY_POLICIES = "security_policies" RESOURCE_ALLOCATION = "resource_allocation" class TransferMethod(Enum): """Methods for knowledge transfer""" DIRECT_COPY = "direct_copy" ADAPTIVE_MAPPING = "adaptive_mapping" PATTERN_MATCHING = "pattern_matching" STATISTICAL_INFERENCE = "statistical_inference" HYBRID = "hybrid" class TransferConfidence(Enum): """Confidence levels for knowledge transfer""" VERY_LOW = 0.2 LOW = 0.4 MEDIUM = 0.6 HIGH = 0.8 VERY_HIGH = 0.95 @dataclass class KnowledgeArtifact: """A piece of transferable knowledge""" artifact_id: str = field(default_factory=lambda: str(uuid.uuid4())) timestamp: datetime = field(default_factory=datetime.now) knowledge_type: KnowledgeType = KnowledgeType.HEURISTIC_WEIGHTS # Source information source_capability_id: str = "" source_component: str = "" source_context: Dict[str, Any] = field(default_factory=dict) # Knowledge content content: Dict[str, Any] = field(default_factory=dict) content_hash: str = "" # Metadata tags: List[str] = field(default_factory=list) version: str = "1.0.0" validity_period: Optional[timedelta] = None expires_at: Optional[datetime] = None # Performance metrics success_rate: float = 0.0 usage_count: int = 0 effectiveness_score: float = 0.0 # Transfer history transfer_history: List[Dict[str, Any]] = field(default_factory=list) # Status status: str = "active" # active, deprecated, expired, invalid # Metadata metadata: Dict[str, Any] = field(default_factory=dict) def __post_init__(self): """Calculate content hash after initialization""" if self.content: content_str = json.dumps(self.content, sort_keys=True) self.content_hash = hashlib.sha256(content_str.encode()).hexdigest() @dataclass class TransferProposal: """Proposal for knowledge transfer""" proposal_id: str = field(default_factory=lambda: str(uuid.uuid4())) timestamp: datetime = field(default_factory=datetime.now) # Transfer details source_artifact_id: str = "" target_capability_id: str = "" target_component: str = "" transfer_method: TransferMethod = TransferMethod.DIRECT_COPY # Compatibility analysis compatibility_score: float = 0.0 # 0-1 required_adaptations: List[str] = field(default_factory=list) adaptation_complexity: str = "low" # low, medium, high # Expected impact expected_benefit: str = "" expected_improvement: float = 0.0 # percentage confidence_level: TransferConfidence = TransferConfidence.MEDIUM # Risk assessment risk_factors: List[str] = field(default_factory=list) risk_score: float = 0.0 # 0-1 rollback_plan: str = "" # Validation validation_method: str = "" validation_criteria: List[str] = field(default_factory=list) testing_required: bool = True # Status status: str = "proposed" # proposed, approved, rejected, applied, rolled_back reviewed_by: Optional[str] = None reviewed_at: Optional[datetime] = None applied_at: Optional[datetime] = None # Results actual_impact: Optional[Dict[str, float]] = None side_effects: List[str] = field(default_factory=list) success_metrics: Optional[Dict[str, Any]] = None # Metadata metadata: Dict[str, Any] = field(default_factory=dict) @dataclass class LearningPathway: """A pathway for learning between capabilities""" pathway_id: str = field(default_factory=lambda: str(uuid.uuid4())) created_at: datetime = field(default_factory=datetime.now) # Pathway definition source_capability_id: str = "" target_capability_id: str = "" knowledge_types: List[KnowledgeType] = field(default_factory=list) # Pathway strength transfer_frequency: int = 0 success_rate: float = 0.0 average_improvement: float = 0.0 # Similarity metrics structural_similarity: float = 0.0 # Code/structure similarity functional_similarity: float = 0.0 # Functional similarity contextual_similarity: float = 0.0 # Usage context similarity # Pathway optimization optimal_transfer_method: TransferMethod = TransferMethod.DIRECT_COPY adaptation_patterns: Dict[str, str] = field(default_factory=dict) # Status status: str = "active" # active, inactive, deprecated # Metadata metadata: Dict[str, Any] = field(default_factory=dict) @dataclass class CrossComponentInsight: """Insight derived from cross-component analysis""" insight_id: str = field(default_factory=lambda: str(uuid.uuid4())) timestamp: datetime = field(default_factory=datetime.now) # Insight details insight_type: str = "" title: str = "" description: str = "" # Scope involved_capabilities: List[str] = field(default_factory=list) affected_components: List[str] = field(default_factory=list) # Evidence supporting_data: List[Dict[str, Any]] = field(default_factory=list) statistical_significance: float = 0.0 confidence_score: float = 0.0 # Recommendations recommendations: List[str] = field(default_factory=list) action_items: List[Dict[str, Any]] = field(default_factory=list) # Impact potential_improvement: float = 0.0 # percentage implementation_effort: str = "medium" # low, medium, high # Status status: str = "new" # new, reviewed, implemented, dismissed # Metadata metadata: Dict[str, Any] = field(default_factory=dict) class KnowledgeTransferEngine: """Advanced cross-component learning and knowledge transfer system""" def __init__(self): # Storage self.artifacts: Dict[str, KnowledgeArtifact] = {} self.proposals: Dict[str, TransferProposal] = {} self.pathways: Dict[str, LearningPathway] = {} self.insights: List[CrossComponentInsight] = [] # Capability profiles for similarity analysis self.capability_profiles: Dict[str, Dict[str, Any]] = {} self.component_signatures: Dict[str, Dict[str, Any]] = {} # Transfer cache and optimization self.transfer_cache: Dict[str, Any] = {} self.similarity_cache: Dict[str, float] = {} self.learning_patterns: Dict[str, List[str]] = {} # Analysis state self.analysis_lock = asyncio.Lock() self.active_transfers: Dict[str, asyncio.Task] = {} # Configuration self.min_similarity_threshold = 0.3 self.auto_transfer_threshold = 0.8 self.max_concurrent_transfers = 5 logger.info("KnowledgeTransferEngine initialized") async def register_capability_profile( self, capability_id: str, profile: Dict[str, Any] ): """Register a capability profile for similarity analysis""" self.capability_profiles[capability_id] = profile # Update learning pathways await self._update_learning_pathways(capability_id) logger.info(f"Registered profile for capability: {capability_id}") async def extract_knowledge( self, capability_id: str, component: str, learning_record: LearningRecord ) -> Optional[KnowledgeArtifact]: """Extract knowledge from a learning record""" # Determine knowledge type based on adaptation knowledge_type = self._classify_knowledge_type(learning_record) # Extract content based on type content = await self._extract_knowledge_content(learning_record, knowledge_type) if not content: return None # Create artifact artifact = KnowledgeArtifact( knowledge_type=knowledge_type, source_capability_id=capability_id, source_component=component, content=content, tags=self._generate_tags(learning_record, content), source_context={ "adaptation_type": learning_record.adaptation_type.value, "confidence": learning_record.confidence, "based_on_samples": learning_record.based_on_samples } ) # Store artifact self.artifacts[artifact.artifact_id] = artifact # Trigger transfer analysis asyncio.create_task(self._analyze_transfer_opportunities(artifact)) logger.info(f"Extracted knowledge artifact: {artifact.artifact_id}") return artifact def _classify_knowledge_type(self, learning_record: LearningRecord) -> KnowledgeType: """Classify the type of knowledge based on learning record""" adaptation_type = learning_record.adaptation_type if adaptation_type == AdaptationType.WEIGHT_ADJUSTMENT: return KnowledgeType.HEURISTIC_WEIGHTS elif adaptation_type == AdaptationType.THRESHOLD_CHANGE: return KnowledgeType.PERFORMANCE_PATTERNS elif adaptation_type == AdaptationType.RULE_MODIFICATION: return KnowledgeType.ERROR_SOLUTIONS elif adaptation_type == AdaptationType.TAG_RECLASSIFICATION: return KnowledgeType.SECURITY_POLICIES else: return KnowledgeType.BEST_PRACTICES async def _extract_knowledge_content( self, learning_record: LearningRecord, knowledge_type: KnowledgeType ) -> Optional[Dict[str, Any]]: """Extract content based on knowledge type""" content = { "adaptation_type": learning_record.adaptation_type.value, "target_component": learning_record.target_component, "target_parameter": learning_record.target_parameter, "old_value": learning_record.old_value, "new_value": learning_record.new_value, "adaptation_reason": learning_record.adaptation_reason, "confidence": learning_record.confidence, "based_on_samples": learning_record.based_on_samples } # Add type-specific content if knowledge_type == KnowledgeType.HEURISTIC_WEIGHTS: content["weight_adjustments"] = { learning_record.target_parameter: { "old": learning_record.old_value, "new": learning_record.new_value, "change_magnitude": abs( float(learning_record.new_value or 0) - float(learning_record.old_value or 0) ) } } elif knowledge_type == KnowledgeType.PERFORMANCE_PATTERNS: content["performance_pattern"] = { "metric": learning_record.target_parameter, "threshold_change": { "old": learning_record.old_value, "new": learning_record.new_value }, "improvement_expected": learning_record.expected_impact } elif knowledge_type == KnowledgeType.ERROR_SOLUTIONS: content["error_solution"] = { "error_type": learning_record.target_parameter, "solution": learning_record.new_value, "previous_approach": learning_record.old_value, "success_rate": learning_record.confidence } return content def _generate_tags( self, learning_record: LearningRecord, content: Dict[str, Any] ) -> List[str]: """Generate tags for knowledge artifact""" tags = [ learning_record.adaptation_type.value, learning_record.target_component, learning_record.capability_id ] # Add content-based tags if "weight_adjustments" in content: tags.append("weights") if "performance_pattern" in content: tags.append("performance") if "error_solution" in content: tags.append("error_handling") # Add confidence-based tags if learning_record.confidence > 0.8: tags.append("high_confidence") elif learning_record.confidence > 0.6: tags.append("medium_confidence") return list(set(tags)) async def _analyze_transfer_opportunities(self, artifact: KnowledgeArtifact): """Analyze opportunities for knowledge transfer""" async with self.analysis_lock: source_capability = artifact.source_capability_id # Find similar capabilities similar_capabilities = await self._find_similar_capabilities(source_capability) # Generate transfer proposals for target_capability in similar_capabilities: if target_capability == source_capability: continue proposal = await self._create_transfer_proposal(artifact, target_capability) if proposal and proposal.compatibility_score > self.min_similarity_threshold: self.proposals[proposal.proposal_id] = proposal # Auto-apply if high confidence if (proposal.confidence_level == TransferConfidence.VERY_HIGH and proposal.risk_score < 0.2): asyncio.create_task(self._apply_transfer(proposal)) async def _find_similar_capabilities( self, capability_id: str, min_similarity: float = 0.3 ) -> List[str]: """Find capabilities similar to the given capability""" similar = [] if capability_id not in self.capability_profiles: return similar source_profile = self.capability_profiles[capability_id] for other_id, other_profile in self.capability_profiles.items(): if other_id == capability_id: continue # Calculate similarity similarity = await self._calculate_capability_similarity( capability_id, other_id ) if similarity >= min_similarity: similar.append(other_id) # Sort by similarity (highest first) similar.sort(key=lambda x: self.similarity_cache.get(f"{capability_id}-{x}", 0), reverse=True) return similar async def _calculate_capability_similarity( self, capability1: str, capability2: str ) -> float: """Calculate similarity between two capabilities""" cache_key = f"{capability1}-{capability2}" if cache_key in self.similarity_cache: return self.similarity_cache[cache_key] if capability1 not in self.capability_profiles or capability2 not in self.capability_profiles: return 0.0 profile1 = self.capability_profiles[capability1] profile2 = self.capability_profiles[capability2] # Calculate different aspects of similarity structural_sim = self._calculate_structural_similarity(profile1, profile2) functional_sim = self._calculate_functional_similarity(profile1, profile2) contextual_sim = self._calculate_contextual_similarity(profile1, profile2) # Weighted average overall_similarity = ( structural_sim * 0.4 + functional_sim * 0.4 + contextual_sim * 0.2 ) # Cache result self.similarity_cache[cache_key] = overall_similarity return overall_similarity def _calculate_structural_similarity(self, profile1: Dict, profile2: Dict) -> float: """Calculate structural similarity between profiles""" # Compare component structures, dependencies, etc. components1 = set(profile1.get("components", [])) components2 = set(profile2.get("components", [])) if not components1 and not components2: return 1.0 intersection = len(components1.intersection(components2)) union = len(components1.union(components2)) return intersection / union if union > 0 else 0.0 def _calculate_functional_similarity(self, profile1: Dict, profile2: Dict) -> float: """Calculate functional similarity between profiles""" # Compare functionality, capabilities, purposes func1 = set(profile1.get("functions", [])) func2 = set(profile2.get("functions", [])) if not func1 and not func2: return 1.0 intersection = len(func1.intersection(func2)) union = len(func1.union(func2)) return intersection / union if union > 0 else 0.0 def _calculate_contextual_similarity(self, profile1: Dict, profile2: Dict) -> float: """Calculate contextual similarity between profiles""" # Compare usage patterns, contexts, environments context1 = set(profile1.get("contexts", [])) context2 = set(profile2.get("contexts", [])) if not context1 and not context2: return 1.0 intersection = len(context1.intersection(context2)) union = len(context1.union(context2)) return intersection / union if union > 0 else 0.0 async def _create_transfer_proposal( self, artifact: KnowledgeArtifact, target_capability_id: str ) -> Optional[TransferProposal]: """Create a transfer proposal for an artifact""" # Calculate compatibility compatibility = await self._calculate_transfer_compatibility( artifact, target_capability_id ) if compatibility < self.min_similarity_threshold: return None # Determine transfer method transfer_method = await self._determine_transfer_method( artifact, target_capability_id, compatibility ) # Calculate confidence confidence = await self._calculate_transfer_confidence( artifact, target_capability_id, compatibility ) # Assess risks risk_factors, risk_score = await self._assess_transfer_risks( artifact, target_capability_id ) proposal = TransferProposal( source_artifact_id=artifact.artifact_id, target_capability_id=target_capability_id, target_component=artifact.source_component, # Assume same component type transfer_method=transfer_method, compatibility_score=compatibility, confidence_level=confidence, risk_factors=risk_factors, risk_score=risk_score, expected_benefit=f"Transfer {artifact.knowledge_type.value} from {artifact.source_capability_id}", expected_improvement=compatibility * artifact.effectiveness_score * 100 ) return proposal async def _calculate_transfer_compatibility( self, artifact: KnowledgeArtifact, target_capability_id: str ) -> float: """Calculate compatibility between artifact and target capability""" # Base similarity similarity = await self._calculate_capability_similarity( artifact.source_capability_id, target_capability_id ) # Adjust for knowledge type compatibility type_compatibility = await self._check_type_compatibility( artifact.knowledge_type, target_capability_id ) # Adjust for context compatibility context_compatibility = await self._check_context_compatibility( artifact.source_context, target_capability_id ) # Weighted combination compatibility = similarity * 0.5 + type_compatibility * 0.3 + context_compatibility * 0.2 return min(1.0, compatibility) async def _check_type_compatibility( self, knowledge_type: KnowledgeType, target_capability_id: str ) -> float: """Check if knowledge type is compatible with target capability""" # Simple heuristic - most types are generally compatible # In a real implementation, this would be more sophisticated type_scores = { KnowledgeType.HEURISTIC_WEIGHTS: 0.9, KnowledgeType.PERFORMANCE_PATTERNS: 0.8, KnowledgeType.ERROR_SOLUTIONS: 0.7, KnowledgeType.OPTIMIZATION_STRATEGIES: 0.8, KnowledgeType.BEST_PRACTICES: 0.9, KnowledgeType.CONFIGURATION_TEMPLATES: 0.6, KnowledgeType.SECURITY_POLICIES: 0.5, KnowledgeType.RESOURCE_ALLOCATION: 0.7 } return type_scores.get(knowledge_type, 0.5) async def _check_context_compatibility( self, source_context: Dict[str, Any], target_capability_id: str ) -> float: """Check context compatibility for transfer""" # Simple heuristic based on confidence and sample size confidence = source_context.get("confidence", 0.5) samples = source_context.get("based_on_samples", 0) # Higher confidence and more samples = better context compatibility context_score = confidence * 0.7 + min(1.0, samples / 100) * 0.3 return context_score async def _determine_transfer_method( self, artifact: KnowledgeArtifact, target_capability_id: str, compatibility: float ) -> TransferMethod: """Determine the best transfer method""" if compatibility > 0.9: return TransferMethod.DIRECT_COPY elif compatibility > 0.7: return TransferMethod.ADAPTIVE_MAPPING elif compatibility > 0.5: return TransferMethod.PATTERN_MATCHING else: return TransferMethod.STATISTICAL_INFERENCE async def _calculate_transfer_confidence( self, artifact: KnowledgeArtifact, target_capability_id: str, compatibility: float ) -> TransferConfidence: """Calculate confidence level for transfer""" # Base confidence from compatibility base_confidence = compatibility # Adjust for artifact effectiveness artifact_confidence = artifact.effectiveness_score # Adjust for source confidence source_confidence = artifact.source_context.get("confidence", 0.5) # Combined confidence combined_confidence = (base_confidence + artifact_confidence + source_confidence) / 3 # Map to confidence levels if combined_confidence >= 0.9: return TransferConfidence.VERY_HIGH elif combined_confidence >= 0.7: return TransferConfidence.HIGH elif combined_confidence >= 0.5: return TransferConfidence.MEDIUM elif combined_confidence >= 0.3: return TransferConfidence.LOW else: return TransferConfidence.VERY_LOW async def _assess_transfer_risks( self, artifact: KnowledgeArtifact, target_capability_id: str ) -> Tuple[List[str], float]: """Assess risks associated with knowledge transfer""" risk_factors = [] risk_score = 0.0 # Risk 1: Low compatibility compatibility = await self._calculate_transfer_compatibility( artifact, target_capability_id ) if compatibility < 0.5: risk_factors.append("Low compatibility between source and target") risk_score += 0.3 # Risk 2: Low artifact effectiveness if artifact.effectiveness_score < 0.6: risk_factors.append("Low artifact effectiveness") risk_score += 0.2 # Risk 3: Limited usage data if artifact.usage_count < 5: risk_factors.append("Limited usage data for artifact") risk_score += 0.1 # Risk 4: High complexity adaptation if artifact.knowledge_type in [KnowledgeType.CONFIGURATION_TEMPLATES, KnowledgeType.SECURITY_POLICIES]: risk_factors.append("Complex knowledge type requires careful adaptation") risk_score += 0.2 # Risk 5: Target capability criticality if target_capability_id in self.capability_profiles: profile = self.capability_profiles[target_capability_id] if profile.get("criticality", "medium") == "high": risk_factors.append("Target capability has high criticality") risk_score += 0.2 return risk_factors, min(1.0, risk_score) async def _apply_transfer(self, proposal: TransferProposal): """Apply a knowledge transfer proposal""" if proposal.proposal_id not in self.proposals: return if len(self.active_transfers) >= self.max_concurrent_transfers: logger.warning("Maximum concurrent transfers reached, queuing transfer") await asyncio.sleep(1.0) return await self._apply_transfer(proposal) # Mark as in progress proposal.status = "applying" # Create transfer task transfer_task = asyncio.create_task(self._execute_transfer(proposal)) self.active_transfers[proposal.proposal_id] = transfer_task try: result = await transfer_task if result: proposal.status = "applied" proposal.applied_at = datetime.now() logger.info(f"Successfully applied transfer {proposal.proposal_id}") else: proposal.status = "failed" logger.error(f"Failed to apply transfer {proposal.proposal_id}") except Exception as e: proposal.status = "failed" logger.error(f"Transfer {proposal.proposal_id} failed with error: {e}") finally: # Clean up if proposal.proposal_id in self.active_transfers: del self.active_transfers[proposal.proposal_id] async def _execute_transfer(self, proposal: TransferProposal) -> bool: """Execute the actual knowledge transfer""" if proposal.source_artifact_id not in self.artifacts: return False artifact = self.artifacts[proposal.source_artifact_id] try: # Simulate transfer execution based on method if proposal.transfer_method == TransferMethod.DIRECT_COPY: success = await self._direct_copy_transfer(artifact, proposal) elif proposal.transfer_method == TransferMethod.ADAPTIVE_MAPPING: success = await self._adaptive_mapping_transfer(artifact, proposal) elif proposal.transfer_method == TransferMethod.PATTERN_MATCHING: success = await self._pattern_matching_transfer(artifact, proposal) else: success = await self._statistical_inference_transfer(artifact, proposal) if success: # Update artifact usage artifact.usage_count += 1 # Record transfer in artifact history artifact.transfer_history.append({ "target_capability": proposal.target_capability_id, "transfer_method": proposal.transfer_method.value, "timestamp": datetime.now().isoformat(), "proposal_id": proposal.proposal_id }) # Update learning pathway await self._update_learning_pathway_stats( artifact.source_capability_id, proposal.target_capability_id, success ) return success except Exception as e: logger.error(f"Transfer execution failed: {e}") return False async def _direct_copy_transfer(self, artifact: KnowledgeArtifact, proposal: TransferProposal) -> bool: """Execute direct copy transfer""" # Simulate direct application of knowledge await asyncio.sleep(0.1) # In a real implementation, this would directly apply the artifact content # to the target capability's configuration/heuristics return True async def _adaptive_mapping_transfer(self, artifact: KnowledgeArtifact, proposal: TransferProposal) -> bool: """Execute adaptive mapping transfer""" # Simulate adaptive mapping with some transformation await asyncio.sleep(0.2) # In a real implementation, this would adapt the artifact content # to fit the target capability's context return True async def _pattern_matching_transfer(self, artifact: KnowledgeArtifact, proposal: TransferProposal) -> bool: """Execute pattern matching transfer""" # Simulate pattern matching and extraction await asyncio.sleep(0.3) # In a real implementation, this would identify patterns in the artifact # and apply similar patterns to the target capability return True async def _statistical_inference_transfer(self, artifact: KnowledgeArtifact, proposal: TransferProposal) -> bool: """Execute statistical inference transfer""" # Simulate statistical analysis and inference await asyncio.sleep(0.4) # In a real implementation, this would use statistical methods # to infer applicable knowledge for the target capability return True async def _update_learning_pathway_stats(self, source_capability: str, target_capability: str, success: bool): """Update learning pathways based on transfer results""" pathway_key = f"{source_capability}-{target_capability}" if pathway_key not in self.pathways: # Create new pathway pathway = LearningPathway( source_capability_id=source_capability, target_capability_id=target_capability, structural_similarity=await self._calculate_capability_similarity(source_capability, target_capability) ) self.pathways[pathway_key] = pathway else: pathway = self.pathways[pathway_key] # Update pathway statistics pathway.transfer_frequency += 1 if success: pathway.success_rate = ( (pathway.success_rate * (pathway.transfer_frequency - 1) + 1.0) / pathway.transfer_frequency ) else: pathway.success_rate = ( (pathway.success_rate * (pathway.transfer_frequency - 1)) / pathway.transfer_frequency ) async def _update_learning_pathways(self, capability_id: str): """Update learning pathways when a new capability is registered""" for other_capability in self.capability_profiles: if other_capability == capability_id: continue # Calculate similarity similarity = await self._calculate_capability_similarity(capability_id, other_capability) # Create pathway if similarity is significant if similarity > self.min_similarity_threshold: pathway_key = f"{capability_id}-{other_capability}" if pathway_key not in self.pathways: pathway = LearningPathway( source_capability_id=capability_id, target_capability_id=other_capability, structural_similarity=similarity ) self.pathways[pathway_key] = pathway async def generate_cross_component_insights(self) -> List[CrossComponentInsight]: """Generate insights from cross-component analysis""" insights = [] # Analyze successful transfers successful_transfers = [ proposal for proposal in self.proposals.values() if proposal.status == "applied" and proposal.actual_impact ] if successful_transfers: # Insight about effective transfer patterns effective_patterns = self._analyze_effective_transfer_patterns(successful_transfers) if effective_patterns: insight = CrossComponentInsight( insight_type="transfer_patterns", title="Effective Knowledge Transfer Patterns Identified", description="Analysis of successful transfers reveals common patterns", involved_capabilities=list(set( [p.target_capability_id for p in successful_transfers] + [self.artifacts[p.source_artifact_id].source_capability_id for p in successful_transfers if p.source_artifact_id in self.artifacts] )), supporting_data=effective_patterns, statistical_significance=0.85, confidence_score=0.8, recommendations=[ "Prioritize transfers with similar patterns", "Document successful transfer strategies", "Create transfer templates for common patterns" ] ) insights.append(insight) # Analyze learning pathways strong_pathways = [ pathway for pathway in self.pathways.values() if pathway.success_rate > 0.8 and pathway.transfer_frequency > 3 ] if strong_pathways: insight = CrossComponentInsight( insight_type="learning_pathways", title="Strong Learning Pathways Detected", description=f"Found {len(strong_pathways)} pathways with high success rates", involved_capabilities=list(set( [p.source_capability_id for p in strong_pathways] + [p.target_capability_id for p in strong_pathways] )), supporting_data=[ { "pathway": f"{p.source_capability_id} -> {p.target_capability_id}", "success_rate": p.success_rate, "frequency": p.transfer_frequency } for p in strong_pathways ], statistical_significance=0.9, confidence_score=0.85, recommendations=[ "Automate transfers along strong pathways", "Investigate why these pathways are successful", "Apply similar patterns to other capability pairs" ] ) insights.append(insight) # Store insights self.insights.extend(insights) return insights def _analyze_effective_transfer_patterns(self, transfers: List[TransferProposal]) -> List[Dict[str, Any]]: """Analyze patterns in effective transfers""" patterns = [] # Analyze transfer methods method_counts = {} method_success = {} for transfer in transfers: method = transfer.transfer_method.value method_counts[method] = method_counts.get(method, 0) + 1 if transfer.actual_impact and transfer.actual_impact.get("improvement", 0) > 0: method_success[method] = method_success.get(method, 0) + 1 if method_counts: patterns.append({ "pattern_type": "transfer_methods", "data": { "method_distribution": method_counts, "success_rates": { method: method_success.get(method, 0) / count for method, count in method_counts.items() } } }) # Analyze knowledge types type_counts = {} for transfer in transfers: if transfer.source_artifact_id in self.artifacts: artifact = self.artifacts[transfer.source_artifact_id] ktype = artifact.knowledge_type.value type_counts[ktype] = type_counts.get(ktype, 0) + 1 if type_counts: patterns.append({ "pattern_type": "knowledge_types", "data": {"type_distribution": type_counts} }) return patterns async def get_transfer_analytics(self) -> Dict[str, Any]: """Get comprehensive knowledge transfer analytics""" total_artifacts = len(self.artifacts) total_proposals = len(self.proposals) total_pathways = len(self.pathways) # Transfer statistics applied_transfers = len([p for p in self.proposals.values() if p.status == "applied"]) transfer_success_rate = applied_transfers / total_proposals if total_proposals > 0 else 0 # Knowledge type distribution type_counts = {} for artifact in self.artifacts.values(): ktype = artifact.knowledge_type.value type_counts[ktype] = type_counts.get(ktype, 0) + 1 # Pathway statistics pathway_stats = { "total": total_pathways, "active": len([p for p in self.pathways.values() if p.status == "active"]), "high_success": len([p for p in self.pathways.values() if p.success_rate > 0.8]) } # Recent activity recent_artifacts = len([ a for a in self.artifacts.values() if (datetime.now() - a.timestamp).days < 7 ]) recent_transfers = len([ p for p in self.proposals.values() if p.applied_at and (datetime.now() - p.applied_at).days < 7 ]) analytics = { "summary": { "total_artifacts": total_artifacts, "total_proposals": total_proposals, "total_pathways": total_pathways, "transfer_success_rate": transfer_success_rate, "recent_artifacts": recent_artifacts, "recent_transfers": recent_transfers }, "knowledge_types": type_counts, "pathways": pathway_stats, "insights": { "total": len(self.insights), "new": len([i for i in self.insights if i.status == "new"]), "implemented": len([i for i in self.insights if i.status == "implemented"]) } } return analytics async def approve_transfer(self, proposal_id: str, reviewer: str) -> bool: """Approve a transfer proposal""" if proposal_id not in self.proposals: return False proposal = self.proposals[proposal_id] proposal.status = "approved" proposal.reviewed_by = reviewer proposal.reviewed_at = datetime.now() # Apply the transfer await self._apply_transfer(proposal) return True async def reject_transfer(self, proposal_id: str, reviewer: str, reason: str) -> bool: """Reject a transfer proposal""" if proposal_id not in self.proposals: return False proposal = self.proposals[proposal_id] proposal.status = "rejected" proposal.reviewed_by = reviewer proposal.reviewed_at = datetime.now() proposal.metadata["rejection_reason"] = reason return True

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ciphernaut/katamari-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server