knowledge_transfer.pyโข41.2 kB
"""
Cross-Component Learning System - Phase 3
Advanced knowledge transfer system that enables learning and adaptation
to propagate across different capabilities and components.
"""
import asyncio
import json
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Tuple, Set
from dataclasses import dataclass, field
from enum import Enum
import uuid
import logging
import hashlib
from .data_models import (
FeedbackEvent, PerformanceSnapshot, CapabilityProfile,
LearningRecord, AdaptationProposal, AdaptationType
)
logger = logging.getLogger(__name__)
class KnowledgeType(Enum):
"""Types of knowledge that can be transferred"""
HEURISTIC_WEIGHTS = "heuristic_weights"
PERFORMANCE_PATTERNS = "performance_patterns"
ERROR_SOLUTIONS = "error_solutions"
OPTIMIZATION_STRATEGIES = "optimization_strategies"
BEST_PRACTICES = "best_practices"
CONFIGURATION_TEMPLATES = "configuration_templates"
SECURITY_POLICIES = "security_policies"
RESOURCE_ALLOCATION = "resource_allocation"
class TransferMethod(Enum):
"""Methods for knowledge transfer"""
DIRECT_COPY = "direct_copy"
ADAPTIVE_MAPPING = "adaptive_mapping"
PATTERN_MATCHING = "pattern_matching"
STATISTICAL_INFERENCE = "statistical_inference"
HYBRID = "hybrid"
class TransferConfidence(Enum):
"""Confidence levels for knowledge transfer"""
VERY_LOW = 0.2
LOW = 0.4
MEDIUM = 0.6
HIGH = 0.8
VERY_HIGH = 0.95
@dataclass
class KnowledgeArtifact:
"""A piece of transferable knowledge"""
artifact_id: str = field(default_factory=lambda: str(uuid.uuid4()))
timestamp: datetime = field(default_factory=datetime.now)
knowledge_type: KnowledgeType = KnowledgeType.HEURISTIC_WEIGHTS
# Source information
source_capability_id: str = ""
source_component: str = ""
source_context: Dict[str, Any] = field(default_factory=dict)
# Knowledge content
content: Dict[str, Any] = field(default_factory=dict)
content_hash: str = ""
# Metadata
tags: List[str] = field(default_factory=list)
version: str = "1.0.0"
validity_period: Optional[timedelta] = None
expires_at: Optional[datetime] = None
# Performance metrics
success_rate: float = 0.0
usage_count: int = 0
effectiveness_score: float = 0.0
# Transfer history
transfer_history: List[Dict[str, Any]] = field(default_factory=list)
# Status
status: str = "active" # active, deprecated, expired, invalid
# Metadata
metadata: Dict[str, Any] = field(default_factory=dict)
def __post_init__(self):
"""Calculate content hash after initialization"""
if self.content:
content_str = json.dumps(self.content, sort_keys=True)
self.content_hash = hashlib.sha256(content_str.encode()).hexdigest()
@dataclass
class TransferProposal:
"""Proposal for knowledge transfer"""
proposal_id: str = field(default_factory=lambda: str(uuid.uuid4()))
timestamp: datetime = field(default_factory=datetime.now)
# Transfer details
source_artifact_id: str = ""
target_capability_id: str = ""
target_component: str = ""
transfer_method: TransferMethod = TransferMethod.DIRECT_COPY
# Compatibility analysis
compatibility_score: float = 0.0 # 0-1
required_adaptations: List[str] = field(default_factory=list)
adaptation_complexity: str = "low" # low, medium, high
# Expected impact
expected_benefit: str = ""
expected_improvement: float = 0.0 # percentage
confidence_level: TransferConfidence = TransferConfidence.MEDIUM
# Risk assessment
risk_factors: List[str] = field(default_factory=list)
risk_score: float = 0.0 # 0-1
rollback_plan: str = ""
# Validation
validation_method: str = ""
validation_criteria: List[str] = field(default_factory=list)
testing_required: bool = True
# Status
status: str = "proposed" # proposed, approved, rejected, applied, rolled_back
reviewed_by: Optional[str] = None
reviewed_at: Optional[datetime] = None
applied_at: Optional[datetime] = None
# Results
actual_impact: Optional[Dict[str, float]] = None
side_effects: List[str] = field(default_factory=list)
success_metrics: Optional[Dict[str, Any]] = None
# Metadata
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class LearningPathway:
"""A pathway for learning between capabilities"""
pathway_id: str = field(default_factory=lambda: str(uuid.uuid4()))
created_at: datetime = field(default_factory=datetime.now)
# Pathway definition
source_capability_id: str = ""
target_capability_id: str = ""
knowledge_types: List[KnowledgeType] = field(default_factory=list)
# Pathway strength
transfer_frequency: int = 0
success_rate: float = 0.0
average_improvement: float = 0.0
# Similarity metrics
structural_similarity: float = 0.0 # Code/structure similarity
functional_similarity: float = 0.0 # Functional similarity
contextual_similarity: float = 0.0 # Usage context similarity
# Pathway optimization
optimal_transfer_method: TransferMethod = TransferMethod.DIRECT_COPY
adaptation_patterns: Dict[str, str] = field(default_factory=dict)
# Status
status: str = "active" # active, inactive, deprecated
# Metadata
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class CrossComponentInsight:
"""Insight derived from cross-component analysis"""
insight_id: str = field(default_factory=lambda: str(uuid.uuid4()))
timestamp: datetime = field(default_factory=datetime.now)
# Insight details
insight_type: str = ""
title: str = ""
description: str = ""
# Scope
involved_capabilities: List[str] = field(default_factory=list)
affected_components: List[str] = field(default_factory=list)
# Evidence
supporting_data: List[Dict[str, Any]] = field(default_factory=list)
statistical_significance: float = 0.0
confidence_score: float = 0.0
# Recommendations
recommendations: List[str] = field(default_factory=list)
action_items: List[Dict[str, Any]] = field(default_factory=list)
# Impact
potential_improvement: float = 0.0 # percentage
implementation_effort: str = "medium" # low, medium, high
# Status
status: str = "new" # new, reviewed, implemented, dismissed
# Metadata
metadata: Dict[str, Any] = field(default_factory=dict)
class KnowledgeTransferEngine:
"""Advanced cross-component learning and knowledge transfer system"""
def __init__(self):
# Storage
self.artifacts: Dict[str, KnowledgeArtifact] = {}
self.proposals: Dict[str, TransferProposal] = {}
self.pathways: Dict[str, LearningPathway] = {}
self.insights: List[CrossComponentInsight] = []
# Capability profiles for similarity analysis
self.capability_profiles: Dict[str, Dict[str, Any]] = {}
self.component_signatures: Dict[str, Dict[str, Any]] = {}
# Transfer cache and optimization
self.transfer_cache: Dict[str, Any] = {}
self.similarity_cache: Dict[str, float] = {}
self.learning_patterns: Dict[str, List[str]] = {}
# Analysis state
self.analysis_lock = asyncio.Lock()
self.active_transfers: Dict[str, asyncio.Task] = {}
# Configuration
self.min_similarity_threshold = 0.3
self.auto_transfer_threshold = 0.8
self.max_concurrent_transfers = 5
logger.info("KnowledgeTransferEngine initialized")
async def register_capability_profile(
self,
capability_id: str,
profile: Dict[str, Any]
):
"""Register a capability profile for similarity analysis"""
self.capability_profiles[capability_id] = profile
# Update learning pathways
await self._update_learning_pathways(capability_id)
logger.info(f"Registered profile for capability: {capability_id}")
async def extract_knowledge(
self,
capability_id: str,
component: str,
learning_record: LearningRecord
) -> Optional[KnowledgeArtifact]:
"""Extract knowledge from a learning record"""
# Determine knowledge type based on adaptation
knowledge_type = self._classify_knowledge_type(learning_record)
# Extract content based on type
content = await self._extract_knowledge_content(learning_record, knowledge_type)
if not content:
return None
# Create artifact
artifact = KnowledgeArtifact(
knowledge_type=knowledge_type,
source_capability_id=capability_id,
source_component=component,
content=content,
tags=self._generate_tags(learning_record, content),
source_context={
"adaptation_type": learning_record.adaptation_type.value,
"confidence": learning_record.confidence,
"based_on_samples": learning_record.based_on_samples
}
)
# Store artifact
self.artifacts[artifact.artifact_id] = artifact
# Trigger transfer analysis
asyncio.create_task(self._analyze_transfer_opportunities(artifact))
logger.info(f"Extracted knowledge artifact: {artifact.artifact_id}")
return artifact
def _classify_knowledge_type(self, learning_record: LearningRecord) -> KnowledgeType:
"""Classify the type of knowledge based on learning record"""
adaptation_type = learning_record.adaptation_type
if adaptation_type == AdaptationType.WEIGHT_ADJUSTMENT:
return KnowledgeType.HEURISTIC_WEIGHTS
elif adaptation_type == AdaptationType.THRESHOLD_CHANGE:
return KnowledgeType.PERFORMANCE_PATTERNS
elif adaptation_type == AdaptationType.RULE_MODIFICATION:
return KnowledgeType.ERROR_SOLUTIONS
elif adaptation_type == AdaptationType.TAG_RECLASSIFICATION:
return KnowledgeType.SECURITY_POLICIES
else:
return KnowledgeType.BEST_PRACTICES
async def _extract_knowledge_content(
self,
learning_record: LearningRecord,
knowledge_type: KnowledgeType
) -> Optional[Dict[str, Any]]:
"""Extract content based on knowledge type"""
content = {
"adaptation_type": learning_record.adaptation_type.value,
"target_component": learning_record.target_component,
"target_parameter": learning_record.target_parameter,
"old_value": learning_record.old_value,
"new_value": learning_record.new_value,
"adaptation_reason": learning_record.adaptation_reason,
"confidence": learning_record.confidence,
"based_on_samples": learning_record.based_on_samples
}
# Add type-specific content
if knowledge_type == KnowledgeType.HEURISTIC_WEIGHTS:
content["weight_adjustments"] = {
learning_record.target_parameter: {
"old": learning_record.old_value,
"new": learning_record.new_value,
"change_magnitude": abs(
float(learning_record.new_value or 0) - float(learning_record.old_value or 0)
)
}
}
elif knowledge_type == KnowledgeType.PERFORMANCE_PATTERNS:
content["performance_pattern"] = {
"metric": learning_record.target_parameter,
"threshold_change": {
"old": learning_record.old_value,
"new": learning_record.new_value
},
"improvement_expected": learning_record.expected_impact
}
elif knowledge_type == KnowledgeType.ERROR_SOLUTIONS:
content["error_solution"] = {
"error_type": learning_record.target_parameter,
"solution": learning_record.new_value,
"previous_approach": learning_record.old_value,
"success_rate": learning_record.confidence
}
return content
def _generate_tags(
self,
learning_record: LearningRecord,
content: Dict[str, Any]
) -> List[str]:
"""Generate tags for knowledge artifact"""
tags = [
learning_record.adaptation_type.value,
learning_record.target_component,
learning_record.capability_id
]
# Add content-based tags
if "weight_adjustments" in content:
tags.append("weights")
if "performance_pattern" in content:
tags.append("performance")
if "error_solution" in content:
tags.append("error_handling")
# Add confidence-based tags
if learning_record.confidence > 0.8:
tags.append("high_confidence")
elif learning_record.confidence > 0.6:
tags.append("medium_confidence")
return list(set(tags))
async def _analyze_transfer_opportunities(self, artifact: KnowledgeArtifact):
"""Analyze opportunities for knowledge transfer"""
async with self.analysis_lock:
source_capability = artifact.source_capability_id
# Find similar capabilities
similar_capabilities = await self._find_similar_capabilities(source_capability)
# Generate transfer proposals
for target_capability in similar_capabilities:
if target_capability == source_capability:
continue
proposal = await self._create_transfer_proposal(artifact, target_capability)
if proposal and proposal.compatibility_score > self.min_similarity_threshold:
self.proposals[proposal.proposal_id] = proposal
# Auto-apply if high confidence
if (proposal.confidence_level == TransferConfidence.VERY_HIGH and
proposal.risk_score < 0.2):
asyncio.create_task(self._apply_transfer(proposal))
async def _find_similar_capabilities(
self,
capability_id: str,
min_similarity: float = 0.3
) -> List[str]:
"""Find capabilities similar to the given capability"""
similar = []
if capability_id not in self.capability_profiles:
return similar
source_profile = self.capability_profiles[capability_id]
for other_id, other_profile in self.capability_profiles.items():
if other_id == capability_id:
continue
# Calculate similarity
similarity = await self._calculate_capability_similarity(
capability_id, other_id
)
if similarity >= min_similarity:
similar.append(other_id)
# Sort by similarity (highest first)
similar.sort(key=lambda x: self.similarity_cache.get(f"{capability_id}-{x}", 0), reverse=True)
return similar
async def _calculate_capability_similarity(
self,
capability1: str,
capability2: str
) -> float:
"""Calculate similarity between two capabilities"""
cache_key = f"{capability1}-{capability2}"
if cache_key in self.similarity_cache:
return self.similarity_cache[cache_key]
if capability1 not in self.capability_profiles or capability2 not in self.capability_profiles:
return 0.0
profile1 = self.capability_profiles[capability1]
profile2 = self.capability_profiles[capability2]
# Calculate different aspects of similarity
structural_sim = self._calculate_structural_similarity(profile1, profile2)
functional_sim = self._calculate_functional_similarity(profile1, profile2)
contextual_sim = self._calculate_contextual_similarity(profile1, profile2)
# Weighted average
overall_similarity = (
structural_sim * 0.4 +
functional_sim * 0.4 +
contextual_sim * 0.2
)
# Cache result
self.similarity_cache[cache_key] = overall_similarity
return overall_similarity
def _calculate_structural_similarity(self, profile1: Dict, profile2: Dict) -> float:
"""Calculate structural similarity between profiles"""
# Compare component structures, dependencies, etc.
components1 = set(profile1.get("components", []))
components2 = set(profile2.get("components", []))
if not components1 and not components2:
return 1.0
intersection = len(components1.intersection(components2))
union = len(components1.union(components2))
return intersection / union if union > 0 else 0.0
def _calculate_functional_similarity(self, profile1: Dict, profile2: Dict) -> float:
"""Calculate functional similarity between profiles"""
# Compare functionality, capabilities, purposes
func1 = set(profile1.get("functions", []))
func2 = set(profile2.get("functions", []))
if not func1 and not func2:
return 1.0
intersection = len(func1.intersection(func2))
union = len(func1.union(func2))
return intersection / union if union > 0 else 0.0
def _calculate_contextual_similarity(self, profile1: Dict, profile2: Dict) -> float:
"""Calculate contextual similarity between profiles"""
# Compare usage patterns, contexts, environments
context1 = set(profile1.get("contexts", []))
context2 = set(profile2.get("contexts", []))
if not context1 and not context2:
return 1.0
intersection = len(context1.intersection(context2))
union = len(context1.union(context2))
return intersection / union if union > 0 else 0.0
async def _create_transfer_proposal(
self,
artifact: KnowledgeArtifact,
target_capability_id: str
) -> Optional[TransferProposal]:
"""Create a transfer proposal for an artifact"""
# Calculate compatibility
compatibility = await self._calculate_transfer_compatibility(
artifact, target_capability_id
)
if compatibility < self.min_similarity_threshold:
return None
# Determine transfer method
transfer_method = await self._determine_transfer_method(
artifact, target_capability_id, compatibility
)
# Calculate confidence
confidence = await self._calculate_transfer_confidence(
artifact, target_capability_id, compatibility
)
# Assess risks
risk_factors, risk_score = await self._assess_transfer_risks(
artifact, target_capability_id
)
proposal = TransferProposal(
source_artifact_id=artifact.artifact_id,
target_capability_id=target_capability_id,
target_component=artifact.source_component, # Assume same component type
transfer_method=transfer_method,
compatibility_score=compatibility,
confidence_level=confidence,
risk_factors=risk_factors,
risk_score=risk_score,
expected_benefit=f"Transfer {artifact.knowledge_type.value} from {artifact.source_capability_id}",
expected_improvement=compatibility * artifact.effectiveness_score * 100
)
return proposal
async def _calculate_transfer_compatibility(
self,
artifact: KnowledgeArtifact,
target_capability_id: str
) -> float:
"""Calculate compatibility between artifact and target capability"""
# Base similarity
similarity = await self._calculate_capability_similarity(
artifact.source_capability_id, target_capability_id
)
# Adjust for knowledge type compatibility
type_compatibility = await self._check_type_compatibility(
artifact.knowledge_type, target_capability_id
)
# Adjust for context compatibility
context_compatibility = await self._check_context_compatibility(
artifact.source_context, target_capability_id
)
# Weighted combination
compatibility = similarity * 0.5 + type_compatibility * 0.3 + context_compatibility * 0.2
return min(1.0, compatibility)
async def _check_type_compatibility(
self,
knowledge_type: KnowledgeType,
target_capability_id: str
) -> float:
"""Check if knowledge type is compatible with target capability"""
# Simple heuristic - most types are generally compatible
# In a real implementation, this would be more sophisticated
type_scores = {
KnowledgeType.HEURISTIC_WEIGHTS: 0.9,
KnowledgeType.PERFORMANCE_PATTERNS: 0.8,
KnowledgeType.ERROR_SOLUTIONS: 0.7,
KnowledgeType.OPTIMIZATION_STRATEGIES: 0.8,
KnowledgeType.BEST_PRACTICES: 0.9,
KnowledgeType.CONFIGURATION_TEMPLATES: 0.6,
KnowledgeType.SECURITY_POLICIES: 0.5,
KnowledgeType.RESOURCE_ALLOCATION: 0.7
}
return type_scores.get(knowledge_type, 0.5)
async def _check_context_compatibility(
self,
source_context: Dict[str, Any],
target_capability_id: str
) -> float:
"""Check context compatibility for transfer"""
# Simple heuristic based on confidence and sample size
confidence = source_context.get("confidence", 0.5)
samples = source_context.get("based_on_samples", 0)
# Higher confidence and more samples = better context compatibility
context_score = confidence * 0.7 + min(1.0, samples / 100) * 0.3
return context_score
async def _determine_transfer_method(
self,
artifact: KnowledgeArtifact,
target_capability_id: str,
compatibility: float
) -> TransferMethod:
"""Determine the best transfer method"""
if compatibility > 0.9:
return TransferMethod.DIRECT_COPY
elif compatibility > 0.7:
return TransferMethod.ADAPTIVE_MAPPING
elif compatibility > 0.5:
return TransferMethod.PATTERN_MATCHING
else:
return TransferMethod.STATISTICAL_INFERENCE
async def _calculate_transfer_confidence(
self,
artifact: KnowledgeArtifact,
target_capability_id: str,
compatibility: float
) -> TransferConfidence:
"""Calculate confidence level for transfer"""
# Base confidence from compatibility
base_confidence = compatibility
# Adjust for artifact effectiveness
artifact_confidence = artifact.effectiveness_score
# Adjust for source confidence
source_confidence = artifact.source_context.get("confidence", 0.5)
# Combined confidence
combined_confidence = (base_confidence + artifact_confidence + source_confidence) / 3
# Map to confidence levels
if combined_confidence >= 0.9:
return TransferConfidence.VERY_HIGH
elif combined_confidence >= 0.7:
return TransferConfidence.HIGH
elif combined_confidence >= 0.5:
return TransferConfidence.MEDIUM
elif combined_confidence >= 0.3:
return TransferConfidence.LOW
else:
return TransferConfidence.VERY_LOW
async def _assess_transfer_risks(
self,
artifact: KnowledgeArtifact,
target_capability_id: str
) -> Tuple[List[str], float]:
"""Assess risks associated with knowledge transfer"""
risk_factors = []
risk_score = 0.0
# Risk 1: Low compatibility
compatibility = await self._calculate_transfer_compatibility(
artifact, target_capability_id
)
if compatibility < 0.5:
risk_factors.append("Low compatibility between source and target")
risk_score += 0.3
# Risk 2: Low artifact effectiveness
if artifact.effectiveness_score < 0.6:
risk_factors.append("Low artifact effectiveness")
risk_score += 0.2
# Risk 3: Limited usage data
if artifact.usage_count < 5:
risk_factors.append("Limited usage data for artifact")
risk_score += 0.1
# Risk 4: High complexity adaptation
if artifact.knowledge_type in [KnowledgeType.CONFIGURATION_TEMPLATES, KnowledgeType.SECURITY_POLICIES]:
risk_factors.append("Complex knowledge type requires careful adaptation")
risk_score += 0.2
# Risk 5: Target capability criticality
if target_capability_id in self.capability_profiles:
profile = self.capability_profiles[target_capability_id]
if profile.get("criticality", "medium") == "high":
risk_factors.append("Target capability has high criticality")
risk_score += 0.2
return risk_factors, min(1.0, risk_score)
async def _apply_transfer(self, proposal: TransferProposal):
"""Apply a knowledge transfer proposal"""
if proposal.proposal_id not in self.proposals:
return
if len(self.active_transfers) >= self.max_concurrent_transfers:
logger.warning("Maximum concurrent transfers reached, queuing transfer")
await asyncio.sleep(1.0)
return await self._apply_transfer(proposal)
# Mark as in progress
proposal.status = "applying"
# Create transfer task
transfer_task = asyncio.create_task(self._execute_transfer(proposal))
self.active_transfers[proposal.proposal_id] = transfer_task
try:
result = await transfer_task
if result:
proposal.status = "applied"
proposal.applied_at = datetime.now()
logger.info(f"Successfully applied transfer {proposal.proposal_id}")
else:
proposal.status = "failed"
logger.error(f"Failed to apply transfer {proposal.proposal_id}")
except Exception as e:
proposal.status = "failed"
logger.error(f"Transfer {proposal.proposal_id} failed with error: {e}")
finally:
# Clean up
if proposal.proposal_id in self.active_transfers:
del self.active_transfers[proposal.proposal_id]
async def _execute_transfer(self, proposal: TransferProposal) -> bool:
"""Execute the actual knowledge transfer"""
if proposal.source_artifact_id not in self.artifacts:
return False
artifact = self.artifacts[proposal.source_artifact_id]
try:
# Simulate transfer execution based on method
if proposal.transfer_method == TransferMethod.DIRECT_COPY:
success = await self._direct_copy_transfer(artifact, proposal)
elif proposal.transfer_method == TransferMethod.ADAPTIVE_MAPPING:
success = await self._adaptive_mapping_transfer(artifact, proposal)
elif proposal.transfer_method == TransferMethod.PATTERN_MATCHING:
success = await self._pattern_matching_transfer(artifact, proposal)
else:
success = await self._statistical_inference_transfer(artifact, proposal)
if success:
# Update artifact usage
artifact.usage_count += 1
# Record transfer in artifact history
artifact.transfer_history.append({
"target_capability": proposal.target_capability_id,
"transfer_method": proposal.transfer_method.value,
"timestamp": datetime.now().isoformat(),
"proposal_id": proposal.proposal_id
})
# Update learning pathway
await self._update_learning_pathway_stats(
artifact.source_capability_id,
proposal.target_capability_id,
success
)
return success
except Exception as e:
logger.error(f"Transfer execution failed: {e}")
return False
async def _direct_copy_transfer(self, artifact: KnowledgeArtifact, proposal: TransferProposal) -> bool:
"""Execute direct copy transfer"""
# Simulate direct application of knowledge
await asyncio.sleep(0.1)
# In a real implementation, this would directly apply the artifact content
# to the target capability's configuration/heuristics
return True
async def _adaptive_mapping_transfer(self, artifact: KnowledgeArtifact, proposal: TransferProposal) -> bool:
"""Execute adaptive mapping transfer"""
# Simulate adaptive mapping with some transformation
await asyncio.sleep(0.2)
# In a real implementation, this would adapt the artifact content
# to fit the target capability's context
return True
async def _pattern_matching_transfer(self, artifact: KnowledgeArtifact, proposal: TransferProposal) -> bool:
"""Execute pattern matching transfer"""
# Simulate pattern matching and extraction
await asyncio.sleep(0.3)
# In a real implementation, this would identify patterns in the artifact
# and apply similar patterns to the target capability
return True
async def _statistical_inference_transfer(self, artifact: KnowledgeArtifact, proposal: TransferProposal) -> bool:
"""Execute statistical inference transfer"""
# Simulate statistical analysis and inference
await asyncio.sleep(0.4)
# In a real implementation, this would use statistical methods
# to infer applicable knowledge for the target capability
return True
async def _update_learning_pathway_stats(self, source_capability: str, target_capability: str, success: bool):
"""Update learning pathways based on transfer results"""
pathway_key = f"{source_capability}-{target_capability}"
if pathway_key not in self.pathways:
# Create new pathway
pathway = LearningPathway(
source_capability_id=source_capability,
target_capability_id=target_capability,
structural_similarity=await self._calculate_capability_similarity(source_capability, target_capability)
)
self.pathways[pathway_key] = pathway
else:
pathway = self.pathways[pathway_key]
# Update pathway statistics
pathway.transfer_frequency += 1
if success:
pathway.success_rate = (
(pathway.success_rate * (pathway.transfer_frequency - 1) + 1.0) /
pathway.transfer_frequency
)
else:
pathway.success_rate = (
(pathway.success_rate * (pathway.transfer_frequency - 1)) /
pathway.transfer_frequency
)
async def _update_learning_pathways(self, capability_id: str):
"""Update learning pathways when a new capability is registered"""
for other_capability in self.capability_profiles:
if other_capability == capability_id:
continue
# Calculate similarity
similarity = await self._calculate_capability_similarity(capability_id, other_capability)
# Create pathway if similarity is significant
if similarity > self.min_similarity_threshold:
pathway_key = f"{capability_id}-{other_capability}"
if pathway_key not in self.pathways:
pathway = LearningPathway(
source_capability_id=capability_id,
target_capability_id=other_capability,
structural_similarity=similarity
)
self.pathways[pathway_key] = pathway
async def generate_cross_component_insights(self) -> List[CrossComponentInsight]:
"""Generate insights from cross-component analysis"""
insights = []
# Analyze successful transfers
successful_transfers = [
proposal for proposal in self.proposals.values()
if proposal.status == "applied" and proposal.actual_impact
]
if successful_transfers:
# Insight about effective transfer patterns
effective_patterns = self._analyze_effective_transfer_patterns(successful_transfers)
if effective_patterns:
insight = CrossComponentInsight(
insight_type="transfer_patterns",
title="Effective Knowledge Transfer Patterns Identified",
description="Analysis of successful transfers reveals common patterns",
involved_capabilities=list(set(
[p.target_capability_id for p in successful_transfers] +
[self.artifacts[p.source_artifact_id].source_capability_id for p in successful_transfers if p.source_artifact_id in self.artifacts]
)),
supporting_data=effective_patterns,
statistical_significance=0.85,
confidence_score=0.8,
recommendations=[
"Prioritize transfers with similar patterns",
"Document successful transfer strategies",
"Create transfer templates for common patterns"
]
)
insights.append(insight)
# Analyze learning pathways
strong_pathways = [
pathway for pathway in self.pathways.values()
if pathway.success_rate > 0.8 and pathway.transfer_frequency > 3
]
if strong_pathways:
insight = CrossComponentInsight(
insight_type="learning_pathways",
title="Strong Learning Pathways Detected",
description=f"Found {len(strong_pathways)} pathways with high success rates",
involved_capabilities=list(set(
[p.source_capability_id for p in strong_pathways] +
[p.target_capability_id for p in strong_pathways]
)),
supporting_data=[
{
"pathway": f"{p.source_capability_id} -> {p.target_capability_id}",
"success_rate": p.success_rate,
"frequency": p.transfer_frequency
}
for p in strong_pathways
],
statistical_significance=0.9,
confidence_score=0.85,
recommendations=[
"Automate transfers along strong pathways",
"Investigate why these pathways are successful",
"Apply similar patterns to other capability pairs"
]
)
insights.append(insight)
# Store insights
self.insights.extend(insights)
return insights
def _analyze_effective_transfer_patterns(self, transfers: List[TransferProposal]) -> List[Dict[str, Any]]:
"""Analyze patterns in effective transfers"""
patterns = []
# Analyze transfer methods
method_counts = {}
method_success = {}
for transfer in transfers:
method = transfer.transfer_method.value
method_counts[method] = method_counts.get(method, 0) + 1
if transfer.actual_impact and transfer.actual_impact.get("improvement", 0) > 0:
method_success[method] = method_success.get(method, 0) + 1
if method_counts:
patterns.append({
"pattern_type": "transfer_methods",
"data": {
"method_distribution": method_counts,
"success_rates": {
method: method_success.get(method, 0) / count
for method, count in method_counts.items()
}
}
})
# Analyze knowledge types
type_counts = {}
for transfer in transfers:
if transfer.source_artifact_id in self.artifacts:
artifact = self.artifacts[transfer.source_artifact_id]
ktype = artifact.knowledge_type.value
type_counts[ktype] = type_counts.get(ktype, 0) + 1
if type_counts:
patterns.append({
"pattern_type": "knowledge_types",
"data": {"type_distribution": type_counts}
})
return patterns
async def get_transfer_analytics(self) -> Dict[str, Any]:
"""Get comprehensive knowledge transfer analytics"""
total_artifacts = len(self.artifacts)
total_proposals = len(self.proposals)
total_pathways = len(self.pathways)
# Transfer statistics
applied_transfers = len([p for p in self.proposals.values() if p.status == "applied"])
transfer_success_rate = applied_transfers / total_proposals if total_proposals > 0 else 0
# Knowledge type distribution
type_counts = {}
for artifact in self.artifacts.values():
ktype = artifact.knowledge_type.value
type_counts[ktype] = type_counts.get(ktype, 0) + 1
# Pathway statistics
pathway_stats = {
"total": total_pathways,
"active": len([p for p in self.pathways.values() if p.status == "active"]),
"high_success": len([p for p in self.pathways.values() if p.success_rate > 0.8])
}
# Recent activity
recent_artifacts = len([
a for a in self.artifacts.values()
if (datetime.now() - a.timestamp).days < 7
])
recent_transfers = len([
p for p in self.proposals.values()
if p.applied_at and (datetime.now() - p.applied_at).days < 7
])
analytics = {
"summary": {
"total_artifacts": total_artifacts,
"total_proposals": total_proposals,
"total_pathways": total_pathways,
"transfer_success_rate": transfer_success_rate,
"recent_artifacts": recent_artifacts,
"recent_transfers": recent_transfers
},
"knowledge_types": type_counts,
"pathways": pathway_stats,
"insights": {
"total": len(self.insights),
"new": len([i for i in self.insights if i.status == "new"]),
"implemented": len([i for i in self.insights if i.status == "implemented"])
}
}
return analytics
async def approve_transfer(self, proposal_id: str, reviewer: str) -> bool:
"""Approve a transfer proposal"""
if proposal_id not in self.proposals:
return False
proposal = self.proposals[proposal_id]
proposal.status = "approved"
proposal.reviewed_by = reviewer
proposal.reviewed_at = datetime.now()
# Apply the transfer
await self._apply_transfer(proposal)
return True
async def reject_transfer(self, proposal_id: str, reviewer: str, reason: str) -> bool:
"""Reject a transfer proposal"""
if proposal_id not in self.proposals:
return False
proposal = self.proposals[proposal_id]
proposal.status = "rejected"
proposal.reviewed_by = reviewer
proposal.reviewed_at = datetime.now()
proposal.metadata["rejection_reason"] = reason
return True