#!/usr/bin/env python3
"""
Python Client Library for MCP Self-Learning Server
Provides integration with Claudia Voice Assistant and other Python applications
"""
import json
import time
import asyncio
import aiohttp
import websockets
import logging
from typing import Dict, List, Optional, Any, Callable
from datetime import datetime, timedelta
from dataclasses import dataclass
from enum import Enum
class LearningStatus(Enum):
INITIALIZING = "initializing"
ACTIVE = "active"
INACTIVE = "inactive"
ERROR = "error"
@dataclass
class LearningInsights:
"""Learning insights data structure"""
top_patterns: List[Dict[str, Any]]
performance_metrics: Dict[str, Any]
recommendations: List[str]
total_patterns: int
learning_cycles: int
@dataclass
class ServerStatus:
"""Server status data structure"""
running: bool
uptime: int
learning_status: LearningStatus
patterns_count: int
knowledge_count: int
memory_usage: Dict[str, Any]
class SelfLearningClient:
"""
Python client for MCP Self-Learning Server
Provides async/await interface for all learning tools and real-time monitoring
"""
def __init__(self, base_url: str = "http://localhost:8765", api_key: Optional[str] = None,
timeout: int = 30, retry_attempts: int = 3, logger: Optional[logging.Logger] = None):
"""
Initialize the client
Args:
base_url: Base URL of the MCP Self-Learning Server API
api_key: Optional API key for authentication
timeout: Request timeout in seconds
retry_attempts: Number of retry attempts for failed requests
logger: Optional logger instance
"""
self.base_url = base_url.rstrip('/')
self.api_key = api_key
self.timeout = timeout
self.retry_attempts = retry_attempts
self.logger = logger or logging.getLogger(__name__)
# Connection tracking
self.is_connected = False
self.last_health_check = None
self.websocket = None
self.websocket_callbacks = {}
# Request tracking
self.request_count = 0
self.last_error = None
# Session management
self.session = None
async def __aenter__(self):
"""Async context manager entry"""
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit"""
await self.disconnect()
async def connect(self) -> bool:
"""
Initialize connection and verify server is accessible
Returns:
bool: True if connection successful, False otherwise
"""
try:
self.session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=self.timeout),
headers={'Content-Type': 'application/json'}
)
if self.api_key:
self.session.headers.update({'Authorization': f'Bearer {self.api_key}'})
health = await self.get_health()
self.is_connected = health.get('status') == 'healthy'
self.last_health_check = time.time()
if self.is_connected:
self.logger.info(f"Connected to MCP Self-Learning Server at {self.base_url}")
return self.is_connected
except Exception as e:
self.logger.error(f"Failed to connect to server: {e}")
self.last_error = str(e)
self.is_connected = False
return False
async def disconnect(self):
"""Clean up connections"""
if self.websocket:
await self.websocket.close()
self.websocket = None
if self.session:
await self.session.close()
self.session = None
self.is_connected = False
self.logger.info("Disconnected from MCP Self-Learning Server")
async def _make_request(self, method: str, endpoint: str, data: Optional[Dict] = None) -> Dict[str, Any]:
"""
Make HTTP request with retry logic
Args:
method: HTTP method (GET, POST, etc.)
endpoint: API endpoint
data: Optional request data
Returns:
Dict containing response data
Raises:
Exception: If request fails after all retry attempts
"""
if not self.session:
raise Exception("Not connected. Call connect() first.")
url = f"{self.base_url}{endpoint}"
last_error = None
for attempt in range(1, self.retry_attempts + 1):
try:
kwargs = {}
if data:
kwargs['json'] = data
async with self.session.request(method, url, **kwargs) as response:
self.request_count += 1
if response.status >= 400:
error_data = await response.json() if response.content_type == 'application/json' else {}
raise Exception(f"HTTP {response.status}: {error_data.get('error', response.reason)}")
result = await response.json()
return result
except Exception as e:
last_error = e
if attempt < self.retry_attempts:
wait_time = min(2 ** attempt, 10) # Exponential backoff, max 10s
self.logger.warning(f"Request failed (attempt {attempt}/{self.retry_attempts}), retrying in {wait_time}s: {e}")
await asyncio.sleep(wait_time)
else:
self.logger.error(f"Request failed after {self.retry_attempts} attempts: {e}")
self.last_error = str(last_error)
raise last_error
async def get_health(self) -> Dict[str, Any]:
"""Get server health status"""
return await self._make_request('GET', '/health')
async def get_status(self) -> ServerStatus:
"""Get detailed server status"""
data = await self._make_request('GET', '/status')
learning_status = LearningStatus.INACTIVE
if data.get('learning', {}).get('active'):
learning_status = LearningStatus.ACTIVE
elif 'error' in data:
learning_status = LearningStatus.ERROR
return ServerStatus(
running=data.get('running', False),
uptime=data.get('uptime', 0),
learning_status=learning_status,
patterns_count=data.get('learning', {}).get('patterns', 0),
knowledge_count=data.get('learning', {}).get('knowledge', 0),
memory_usage=data.get('memory', {})
)
async def analyze_pattern(self, interaction: Dict[str, Any]) -> Dict[str, Any]:
"""
Analyze an interaction pattern for learning
Args:
interaction: Dictionary containing interaction data with keys:
- type: str (interaction type)
- input: str (input data)
- output: str (output data)
- success: bool (whether interaction was successful)
- context: dict (optional context data)
- performance: dict (optional performance metrics)
Returns:
Dict containing analysis results
"""
required_fields = ['type', 'input', 'output', 'success']
for field in required_fields:
if field not in interaction:
raise ValueError(f"Missing required field: {field}")
return await self._make_request('POST', '/analyze', {'interaction': interaction})
async def get_insights(self) -> LearningInsights:
"""Get learning insights and analytics"""
data = await self._make_request('GET', '/insights')
insights = data.get('insights', {})
return LearningInsights(
top_patterns=insights.get('topPatterns', []),
performance_metrics=insights.get('performanceMetrics', {}),
recommendations=insights.get('recommendations', []),
total_patterns=len(insights.get('topPatterns', [])),
learning_cycles=insights.get('performanceMetrics', {}).get('learningCycles', 0)
)
async def trigger_learning(self) -> Dict[str, Any]:
"""Trigger a learning cycle manually"""
return await self._make_request('POST', '/learn')
async def export_knowledge(self, format: str = 'json') -> Dict[str, Any]:
"""
Export learned knowledge
Args:
format: Export format ('json' or 'markdown')
Returns:
Dict containing export information including download URL
"""
return await self._make_request('GET', f'/export?format={format}')
async def import_knowledge(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
Import knowledge from external source
Args:
data: Knowledge data to import
Returns:
Dict containing import results
"""
return await self._make_request('POST', '/import', data)
async def get_optimizations(self, tool_name: Optional[str] = None) -> Dict[str, Any]:
"""
Get optimization suggestions
Args:
tool_name: Optional specific tool to get optimizations for
Returns:
Dict containing optimization suggestions
"""
endpoint = '/optimize'
if tool_name:
endpoint += f'?tool_name={tool_name}'
return await self._make_request('GET', endpoint)
async def predict_next_action(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""
Predict next actions based on context
Args:
context: Context information for prediction
Returns:
Dict containing predictions
"""
return await self._make_request('POST', '/predict', {'context': context})
async def get_performance_metrics(self, tool_name: Optional[str] = None) -> Dict[str, Any]:
"""
Get performance metrics
Args:
tool_name: Optional specific tool to get metrics for
Returns:
Dict containing performance metrics
"""
endpoint = '/metrics'
if tool_name:
endpoint += f'?tool_name={tool_name}'
return await self._make_request('GET', endpoint)
async def start_websocket_monitoring(self, callback: Callable[[Dict], None]):
"""
Start WebSocket connection for real-time monitoring
Args:
callback: Function to call when updates are received
"""
if self.websocket:
return # Already connected
ws_url = self.base_url.replace('http', 'ws') + '/ws'
try:
self.websocket = await websockets.connect(ws_url)
self.logger.info("WebSocket connection established")
# Listen for messages
async def listen():
try:
async for message in self.websocket:
try:
data = json.loads(message)
callback(data)
except json.JSONDecodeError as e:
self.logger.error(f"Invalid WebSocket message: {e}")
except Exception as e:
self.logger.error(f"WebSocket callback error: {e}")
except websockets.exceptions.ConnectionClosed:
self.logger.info("WebSocket connection closed")
self.websocket = None
# Start listening in background
asyncio.create_task(listen())
except Exception as e:
self.logger.error(f"Failed to connect WebSocket: {e}")
raise
async def stop_websocket_monitoring(self):
"""Stop WebSocket monitoring"""
if self.websocket:
await self.websocket.close()
self.websocket = None
self.logger.info("WebSocket monitoring stopped")
async def monitor_learning(self, callback: Callable[[Optional[Exception], Optional[Dict]], None],
interval: float = 5.0) -> Callable[[], None]:
"""
Monitor learning activity with periodic updates
Args:
callback: Function to call with updates (error, data)
interval: Update interval in seconds
Returns:
Function to stop monitoring
"""
monitoring = True
async def monitor():
while monitoring:
try:
status = await self.get_status()
insights = await self.get_insights()
callback(None, {
'status': status,
'insights': insights,
'timestamp': datetime.now().isoformat()
})
except Exception as e:
callback(e, None)
await asyncio.sleep(interval)
# Start monitoring task
task = asyncio.create_task(monitor())
def stop_monitoring():
nonlocal monitoring
monitoring = False
task.cancel()
return stop_monitoring
async def wait_for_learning_milestone(self, condition: Callable[[LearningInsights], bool],
timeout: float = 60.0) -> LearningInsights:
"""
Wait for specific learning milestone
Args:
condition: Function that returns True when milestone is reached
timeout: Maximum time to wait in seconds
Returns:
LearningInsights when milestone is reached
Raises:
asyncio.TimeoutError: If timeout is reached
"""
start_time = time.time()
while time.time() - start_time < timeout:
insights = await self.get_insights()
if condition(insights):
return insights
await asyncio.sleep(1.0)
raise asyncio.TimeoutError("Timeout waiting for learning milestone")
async def analyze_voice_interaction(self, user_input: str, assistant_output: str,
context: Dict[str, Any], success: bool = True) -> Dict[str, Any]:
"""
Analyze voice interaction for learning (Claudia-specific)
Args:
user_input: What the user said
assistant_output: What the assistant responded
context: Context information (intent, confidence, etc.)
success: Whether the interaction was successful
Returns:
Dict containing analysis results
"""
interaction = {
'type': 'voice_interaction',
'input': user_input,
'output': assistant_output,
'success': success,
'context': {
**context,
'interaction_type': 'voice',
'timestamp': datetime.now().isoformat()
},
'performance': {
'duration': context.get('response_time', 0)
}
}
return await self.analyze_pattern(interaction)
def create_learning_session(self, session_id: str) -> 'LearningSession':
"""Create a learning session for related interactions"""
return LearningSession(self, session_id)
class LearningSession:
"""Learning session for grouping related interactions"""
def __init__(self, client: SelfLearningClient, session_id: str):
self.client = client
self.session_id = session_id
self.interactions = []
self.start_time = time.time()
def add_interaction(self, interaction: Dict[str, Any]) -> Dict[str, Any]:
"""Add interaction to session"""
session_interaction = {
**interaction,
'session_id': self.session_id,
'session_time': time.time() - self.start_time
}
self.interactions.append(session_interaction)
return session_interaction
async def analyze_session(self) -> Dict[str, Any]:
"""Analyze all interactions in session"""
results = []
for interaction in self.interactions:
try:
result = await self.client.analyze_pattern(interaction)
results.append({
'success': True,
'data': result,
'interaction': interaction
})
except Exception as e:
results.append({
'success': False,
'error': str(e),
'interaction': interaction
})
return {
'session_id': self.session_id,
'total_interactions': len(self.interactions),
'duration': time.time() - self.start_time,
'results': results
}
def get_summary(self) -> Dict[str, Any]:
"""Get session summary"""
return {
'session_id': self.session_id,
'interactions': len(self.interactions),
'duration': time.time() - self.start_time,
'types': list(set(i.get('type', 'unknown') for i in self.interactions)),
'success_rate': sum(1 for i in self.interactions if i.get('success', False)) / len(self.interactions) if self.interactions else 0
}
# Convenience functions for common use cases
async def quick_analyze(interaction: Dict[str, Any], server_url: str = "http://localhost:8765") -> Dict[str, Any]:
"""Quick analysis of a single interaction"""
async with SelfLearningClient(server_url) as client:
return await client.analyze_pattern(interaction)
async def get_learning_status(server_url: str = "http://localhost:8765") -> ServerStatus:
"""Quick status check"""
async with SelfLearningClient(server_url) as client:
return await client.get_status()
# Example usage for Claudia integration
class ClaudiaLearningIntegration:
"""Integration helper for Claudia voice assistant"""
def __init__(self, mcp_server_url: str = "http://localhost:8765"):
self.client = SelfLearningClient(mcp_server_url)
self.current_session = None
async def start(self):
"""Start learning integration"""
await self.client.connect()
async def stop(self):
"""Stop learning integration"""
await self.client.disconnect()
async def log_voice_command(self, user_input: str, assistant_response: str,
intent: str, confidence: float, success: bool = True):
"""Log voice command for learning"""
context = {
'intent': intent,
'confidence': confidence,
'voice_assistant': 'claudia'
}
return await self.client.analyze_voice_interaction(
user_input, assistant_response, context, success
)
async def get_voice_optimizations(self) -> Dict[str, Any]:
"""Get optimizations specific to voice interactions"""
return await self.client.get_optimizations('voice_interaction')
async def predict_user_intent(self, partial_input: str) -> Dict[str, Any]:
"""Predict user intent from partial input"""
context = {
'partial_input': partial_input,
'prediction_type': 'intent'
}
return await self.client.predict_next_action(context)