"""Hybrid sync orchestrator for cloud + HTTP fallback.
This module implements the HybridAkoshaSync class which orchestrates multiple
sync methods with automatic fallback. Cloud sync is tried first (fastest),
then HTTP sync if cloud is unavailable.
Architecture (per agent recommendations):
- Simplified orchestrator (~80 lines vs 300 lines)
- Protocol-based method selection
- Priority-based fallback (cloud → HTTP)
- Fast availability detection (1s timeout)
Example:
>>> config = AkoshaSyncConfig.from_settings(settings)
>>> hybrid = HybridAkoshaSync(config)
>>> result = await hybrid.sync_memories()
>>> print(result['method'])
'cloud' # or 'http' if cloud failed
"""
from __future__ import annotations
from typing import Any, Literal
from session_buddy.storage.akosha_config import AkoshaSyncConfig
from session_buddy.storage.cloud_sync import CloudSyncMethod
from session_buddy.storage.sync_protocol import (
HTTPSyncError,
HybridSyncError,
SyncMethod,
)
from session_buddy.utils.error_management import _get_logger
logger = _get_logger()
class HttpSyncMethod(SyncMethod):
"""HTTP sync method for direct upload to Akosha.
Pushes memories directly to Akosha's HTTP endpoints (store_memory,
batch_store_memories) as a fallback when cloud sync is unavailable.
This is intended for development environments where cloud storage
is not configured.
"""
AKOSHA_DEFAULT_URL = "http://localhost:8682/mcp"
def __init__(self, config: AkoshaSyncConfig) -> None:
"""Initialize HTTP sync method.
Args:
config: Akosha sync configuration
"""
self.config = config
def is_available(self) -> bool:
"""Check if Akosha HTTP endpoint is reachable.
Returns:
True if Akosha server is running
Example:
>>> http_sync.is_available()
True # Akosha server reachable
"""
# Quick check: try to connect to Akosha
return self._check_akosha_reachable()
def get_method_name(self) -> str:
"""Get method name for logging.
Returns:
Method name string
"""
return "http"
async def sync(
self,
upload_reflections: bool = True,
upload_knowledge_graph: bool = True,
**kwargs: Any,
) -> dict[str, Any]:
"""Sync memories via HTTP POST to Akosha.
Args:
upload_reflections: Whether to upload reflections
upload_knowledge_graph: Whether to upload knowledge graph
**kwargs: Additional parameters (ignored)
Returns:
Sync result dictionary
Raises:
HTTPSyncError: If HTTP sync fails
"""
import httpx
akosha_url = self.config.cloud_endpoint or self.AKOSHA_DEFAULT_URL
logger.info(f"HTTP sync to Akosha: {akosha_url}")
try:
# For now, just verify connectivity
# TODO: Implement actual batch upload via batch_store_memories
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.post(
f"{akosha_url}/tools/call",
json={
"method": "tools/call",
"params": {
"name": "batch_store_memories",
"arguments": {
"memories": [],
"source": self.config.system_id_resolved,
},
},
},
)
response.raise_for_status()
logger.info("HTTP sync successful (connectivity verified)")
return {
"method": "http",
"success": True,
"files_uploaded": [],
"bytes_transferred": 0,
"duration_seconds": 0.0,
"error": None,
}
except httpx.HTTPError as e:
logger.error(f"HTTP sync failed: {e}")
raise HTTPSyncError(
message=f"Akosha HTTP endpoint unreachable: {e}",
method="http",
original=e,
) from e
except Exception as e:
# Wrap any other exception in HTTPSyncError
logger.error(f"HTTP sync failed: {e}")
raise HTTPSyncError(
message=f"Akosha HTTP sync failed: {e}",
method="http",
original=e,
) from e
def _check_akosha_reachable(self) -> bool:
"""Check if Akosha server is reachable (non-blocking).
Returns:
True if reachable, False otherwise
"""
# Quick check with 1s timeout
try:
import httpx
url = self.config.cloud_endpoint or self.AKOSHA_DEFAULT_URL
# Use synchronous check with timeout
response = httpx.get(f"{url}/status", timeout=1.0)
return response.status_code < 500
except Exception:
return False
class HybridAkoshaSync:
"""Hybrid sync orchestrator with automatic fallback.
Tries cloud sync first (fastest), falls back to HTTP sync if cloud
is unavailable. Uses protocol-based design for extensibility.
Example:
>>> config = AkoshaSyncConfig.from_settings(settings)
>>> hybrid = HybridAkoshaSync(config)
>>> result = await hybrid.sync_memories()
>>> print(result['method'])
'cloud' # or 'http' if cloud failed
"""
def __init__(self, config: AkoshaSyncConfig) -> None:
"""Initialize hybrid sync orchestrator.
Args:
config: Akosha sync configuration
"""
self.config = config
# Initialize sync methods in priority order
self.methods: list[SyncMethod] = [
CloudSyncMethod(config),
HttpSyncMethod(config),
]
logger.info(
f"HybridAkoshaSync initialized: {len(self.methods)} methods available"
)
async def sync_memories(
self,
force_method: Literal["auto", "cloud", "http"] = "auto",
upload_reflections: bool = True,
upload_knowledge_graph: bool = True,
) -> dict[str, Any]:
"""Sync memories using available methods with automatic fallback.
Args:
force_method: Force specific method ("auto", "cloud", "http")
upload_reflections: Whether to upload reflection database
upload_knowledge_graph: Whether to upload knowledge graph database
Returns:
Sync result dictionary with method, success status, and metadata
Raises:
HybridSyncError: If all sync methods fail
Example:
>>> result = await hybrid.sync_memories(force_method="auto")
>>> print(result['method'])
'cloud'
>>> result = await hybrid.sync_memories(force_method="http")
>>> print(result['method'])
'http'
"""
# If method forced, use only that method
if force_method != "auto":
method = self._get_method(force_method)
if method is None:
raise HybridSyncError(
message=f"Requested method '{force_method}' not available",
method="hybrid",
errors=[{"method": force_method, "error": "Method not configured"}],
)
logger.info(f"Using forced method: {force_method}")
return await method.sync(
upload_reflections=upload_reflections,
upload_knowledge_graph=upload_knowledge_graph,
)
# Auto mode: try each available method in priority order
errors: list[dict[str, Any]] = []
for method in self.methods:
method_name = method.get_method_name()
# Check if method is available
if not method.is_available():
logger.debug(f"Method '{method_name}' not available, skipping")
continue
# Try sync with this method
try:
logger.info(f"Trying sync method: {method_name}")
result = await method.sync(
upload_reflections=upload_reflections,
upload_knowledge_graph=upload_knowledge_graph,
)
if result.get("success"):
logger.info(f"✅ Sync successful using method: {method_name}")
return result
# Method reported failure
error_msg = result.get("error", "Unknown error")
logger.warning(f"Method '{method_name}' reported failure: {error_msg}")
errors.append({"method": method_name, "error": error_msg})
except Exception as e:
logger.warning(f"Method '{method_name}' raised exception: {e}")
errors.append({"method": method_name, "error": str(e)})
# Continue to next method
continue
# All methods failed
logger.error(f"All sync methods failed: {errors}")
raise HybridSyncError(
message=f"All {len(self.methods)} sync methods failed",
method="hybrid",
errors=errors,
)
def _get_method(self, method_name: str) -> SyncMethod | None:
"""Get sync method by name.
Args:
method_name: Method name ("cloud" or "http")
Returns:
SyncMethod instance or None if not found
"""
for method in self.methods:
if method.get_method_name() == method_name:
return method
return None
__all__ = [
"HybridAkoshaSync",
"HttpSyncMethod",
]