"""
Session management tools for Claudmaster AI DM system.
This module provides MCP tools for starting and resuming Claudmaster sessions,
managing session state, and coordinating multi-agent gameplay.
"""
import logging
from datetime import datetime
from pathlib import Path
from typing import Any, Optional
from pydantic import BaseModel, Field
from dm20_protocol.models import Campaign
from dm20_protocol.terminology import TermResolver, StyleTracker
from ..orchestrator import Orchestrator
from ..session import ClaudmasterSession
from ..config import ClaudmasterConfig
from ..base import AgentRole
from ..persistence import SessionSerializer, SessionMetadata
from ..agents.archivist import ArchivistAgent
from ..agents.narrator import NarratorAgent, NarrativeStyle
from ..agents.arbiter import ArbiterAgent
from ..agents.player_character import PlayerCharacterAgent
from ..companions import CompanionArchetype, CombatStyle
from ..consistency.fact_database import FactDatabase
from ..llm_client import AnthropicLLMClient, MockLLMClient, LLMDependencyError
from ..recovery.error_messages import ErrorMessageFormatter
from ..onboarding import detect_new_user, run_onboarding, OnboardingState
from ..vector_store import HAS_CHROMADB
logger = logging.getLogger("dm20-protocol")
# Error formatter for in-character error messages
_error_formatter = ErrorMessageFormatter()
# Module-level storage reference, set by main.py during initialization
_storage = None
def set_storage(storage_instance):
"""Set the storage instance for session tools.
Called by main.py during initialization to inject the DnDStorage instance
so that session tools can load campaigns and access quest data.
Args:
storage_instance: The DnDStorage instance from main.py
"""
global _storage
_storage = storage_instance
# ============================================================================
# Session State Models
# ============================================================================
class CampaignSummary(BaseModel):
"""Summary of campaign information for session state."""
campaign_id: str = Field(description="Campaign unique identifier")
campaign_name: str = Field(description="Campaign name")
character_count: int = Field(description="Number of player characters in the party")
npc_count: int = Field(description="Number of NPCs in the campaign")
class ModuleSummary(BaseModel):
"""Summary of loaded module information."""
module_id: Optional[str] = Field(default=None, description="Module unique identifier")
module_name: Optional[str] = Field(default=None, description="Module name")
is_loaded: bool = Field(description="Whether a module is currently loaded")
class GameStateSummary(BaseModel):
"""Summary of current game state."""
current_location: Optional[str] = Field(default=None, description="Current location of the party")
in_combat: bool = Field(description="Whether the party is currently in combat")
turn_count: int = Field(description="Number of turns in the current session")
class CharacterSummary(BaseModel):
"""Summary of a player character."""
character_id: str = Field(description="Character unique identifier")
character_name: str = Field(description="Character name")
character_class: Optional[str] = Field(default=None, description="Character class")
level: Optional[int] = Field(default=None, description="Character level")
class SessionState(BaseModel):
"""Complete state representation of a Claudmaster session."""
session_id: str = Field(description="Unique session identifier")
status: str = Field(description="Session status: active, paused, error")
campaign_info: CampaignSummary = Field(description="Campaign summary information")
module_info: ModuleSummary = Field(description="Module summary information")
game_state: GameStateSummary = Field(description="Game state summary")
party_info: list[CharacterSummary] = Field(description="Party members summary")
last_events: list[str] = Field(description="Recent game events")
context_budget: int = Field(description="Remaining context budget for LLM calls")
recap: Optional[str] = Field(default=None, description="'Previously on...' narrative recap for resumed sessions")
is_onboarding: bool = Field(default=False, description="True if this is a guided onboarding session for a new user")
onboarding: Optional[dict] = Field(default=None, description="Onboarding data: character_suggestions, step, onboarding_state")
error_message: Optional[str] = Field(default=None, description="Error message if status is error")
# ============================================================================
# Session Manager
# ============================================================================
class SessionManager:
"""
Manages the lifecycle of Claudmaster AI DM sessions.
This class handles session creation, persistence, resumption, and state tracking
for the multi-agent AI Game Master system.
"""
def __init__(self) -> None:
"""Initialize the SessionManager."""
self._active_sessions: dict[str, tuple[Orchestrator, ClaudmasterSession]] = {}
self._saved_sessions: dict[str, dict] = {}
self._fact_databases: dict[str, FactDatabase] = {} # Campaign path -> FactDatabase
self._term_resolvers: dict[str, TermResolver] = {} # session_id -> TermResolver
self._style_trackers: dict[str, StyleTracker] = {} # session_id -> StyleTracker
logger.info("SessionManager initialized")
async def start_session(
self,
campaign: Campaign,
config: Optional[ClaudmasterConfig] = None,
module_id: Optional[str] = None
) -> SessionState:
"""
Start a new Claudmaster AI DM session.
Args:
campaign: The campaign to run
config: Configuration for the session (uses defaults if not provided)
module_id: Optional module ID to load
Returns:
SessionState representing the newly created session
Raises:
ValueError: If campaign is invalid
"""
if not campaign:
raise ValueError("Campaign cannot be None")
# Use provided config or create default
session_config = config or ClaudmasterConfig()
# Create orchestrator
orchestrator = Orchestrator(campaign=campaign, config=session_config)
# Register Python agents (data retrieval only, no LLM calls)
archivist = ArchivistAgent(
campaign=campaign,
rules_lookup_fn=None, # TODO: Wire RAG rules lookup once vector store ready
cache_ttl=30.0
)
orchestrator.register_agent("archivist", archivist)
logger.info("[Hybrid Python] Registered ArchivistAgent for data retrieval")
# Create LLM clients for dual-agent architecture
narrator_llm, arbiter_llm = self._create_llm_clients(session_config)
# Narrator: fast narrative generation (Haiku by default)
narrator_style = NarrativeStyle(session_config.narrative_style) if session_config.narrative_style in [s.value for s in NarrativeStyle] else NarrativeStyle.DESCRIPTIVE
narrator = NarratorAgent(
llm=narrator_llm,
style=narrator_style,
max_tokens=session_config.narrator_max_tokens,
)
orchestrator.register_agent("narrator", narrator)
logger.info(f"[Dual-Agent] Registered NarratorAgent (model: {session_config.narrator_model})")
# Arbiter: thorough mechanical resolution (Sonnet by default)
arbiter = ArbiterAgent(
llm=arbiter_llm,
campaign=campaign,
max_tokens=session_config.arbiter_max_tokens,
)
orchestrator.register_agent("arbiter", arbiter)
logger.info(f"[Dual-Agent] Registered ArbiterAgent (model: {session_config.arbiter_model})")
# Register ModuleKeeperAgent if ChromaDB is available and a module is loaded
self._try_register_module_keeper(orchestrator, campaign, module_id)
# Register AI companion agents if SOLO mode with companions
self._register_ai_companions(orchestrator, campaign, session_config)
# Start the session
session = orchestrator.start_session()
# Initialize FactDatabase for consistency tracking
campaign_path = Path(_storage.data_dir) / "campaigns" / campaign.id if _storage else Path(f"/tmp/campaigns/{campaign.id}")
fact_db = FactDatabase(campaign_path)
self._fact_databases[session.session_id] = fact_db
logger.info(f"[Hybrid Python] Initialized FactDatabase at {campaign_path}")
# Initialize terminology system (TermResolver + StyleTracker)
term_resolver = TermResolver()
try:
# Load static core terms dictionary
core_terms_path = Path(__file__).parent.parent.parent / "terminology" / "data" / "core_terms.yaml"
term_resolver.load_yaml(core_terms_path)
logger.info(f"[Terminology] Loaded core terms dictionary from {core_terms_path}")
except Exception as e:
logger.warning(f"[Terminology] Failed to load core terms dictionary: {e}")
# TODO: Auto-index from loaded rulebooks when RulebookManager is wired
# if rulebook_manager:
# term_resolver.index_from_rulebook(rulebook_manager)
# Initialize fresh StyleTracker for new session
style_tracker = StyleTracker()
self._term_resolvers[session.session_id] = term_resolver
self._style_trackers[session.session_id] = style_tracker
logger.info(f"[Terminology] Initialized TermResolver and StyleTracker for session {session.session_id}")
# Store in active sessions
self._active_sessions[session.session_id] = (orchestrator, session)
logger.info(
f"Started new session {session.session_id} for campaign '{campaign.name}' "
f"(module_id: {module_id or 'none'})"
)
# Build and return session state
return self._build_session_state(
orchestrator=orchestrator,
session=session,
status="active",
module_id=module_id
)
async def resume_session(
self,
session_id: str,
campaign: Campaign
) -> SessionState:
"""
Resume a previously saved session.
Args:
session_id: The session ID to resume
campaign: The campaign associated with the session
Returns:
SessionState representing the resumed session
Raises:
ValueError: If session_id is not found in saved sessions
ValueError: If campaign is invalid
"""
if session_id not in self._saved_sessions:
raise ValueError(f"Session {session_id} not found in saved sessions")
if not campaign:
raise ValueError("Campaign cannot be None")
# Load saved session data
saved_data = self._saved_sessions[session_id]
# Recreate config from saved data
config = ClaudmasterConfig(**saved_data.get("config", {}))
# Create new orchestrator
orchestrator = Orchestrator(campaign=campaign, config=config)
# Register agents (same as in start_session)
archivist = ArchivistAgent(
campaign=campaign,
rules_lookup_fn=None,
cache_ttl=30.0
)
orchestrator.register_agent("archivist", archivist)
logger.info("[Hybrid Python] Registered ArchivistAgent for resumed session")
# Create LLM clients and register dual agents
narrator_llm, arbiter_llm = self._create_llm_clients(config)
narrator_style = NarrativeStyle(config.narrative_style) if config.narrative_style in [s.value for s in NarrativeStyle] else NarrativeStyle.DESCRIPTIVE
narrator = NarratorAgent(
llm=narrator_llm,
style=narrator_style,
max_tokens=config.narrator_max_tokens,
)
orchestrator.register_agent("narrator", narrator)
arbiter = ArbiterAgent(
llm=arbiter_llm,
campaign=campaign,
max_tokens=config.arbiter_max_tokens,
)
orchestrator.register_agent("arbiter", arbiter)
logger.info("[Dual-Agent] Registered NarratorAgent + ArbiterAgent for resumed session")
# Register ModuleKeeper for resumed session
module_id = saved_data.get("module_id")
self._try_register_module_keeper(orchestrator, campaign, module_id)
# Recreate session with saved state
session = ClaudmasterSession(
session_id=session_id,
campaign_id=campaign.id,
config=config,
started_at=datetime.fromisoformat(saved_data["started_at"]),
turn_count=saved_data.get("turn_count", 0),
conversation_history=saved_data.get("conversation_history", []),
active_agents=saved_data.get("active_agents", {}),
metadata=saved_data.get("metadata", {})
)
# Manually assign the session to orchestrator
orchestrator.session = session
# Load FactDatabase from disk
campaign_path = Path(_storage.data_dir) / "campaigns" / campaign.id if _storage else Path(f"/tmp/campaigns/{campaign.id}")
fact_db = FactDatabase(campaign_path)
# load() is called automatically in FactDatabase.__init__
self._fact_databases[session_id] = fact_db
logger.info(f"[Hybrid Python] Loaded FactDatabase for resumed session (facts: {len(fact_db.facts)})")
# Initialize terminology system (same as start_session)
term_resolver = TermResolver()
try:
core_terms_path = Path(__file__).parent.parent.parent / "terminology" / "data" / "core_terms.yaml"
term_resolver.load_yaml(core_terms_path)
logger.info(f"[Terminology] Loaded core terms dictionary")
except Exception as e:
logger.warning(f"[Terminology] Failed to load core terms dictionary: {e}")
# Restore StyleTracker from saved state
style_tracker = StyleTracker()
if "style_tracker_observations" in saved_data:
# Restore observations from saved state
style_tracker._observations = saved_data["style_tracker_observations"]
logger.info(f"[Terminology] Restored StyleTracker with {len(style_tracker._observations)} categories")
else:
logger.info(f"[Terminology] Initialized fresh StyleTracker (no saved state found)")
self._term_resolvers[session_id] = term_resolver
self._style_trackers[session_id] = style_tracker
# Store in active sessions
self._active_sessions[session_id] = (orchestrator, session)
logger.info(
f"Resumed session {session_id} for campaign '{campaign.name}' "
f"(turn count: {session.turn_count})"
)
# Generate "Previously on..." recap for the resumed session
recap_text = await self._generate_session_recap(
orchestrator=orchestrator,
session=session,
campaign=campaign,
)
# Store recap in session metadata to avoid regenerating
if recap_text:
session.metadata["last_recap"] = recap_text
# Build and return session state
return self._build_session_state(
orchestrator=orchestrator,
session=session,
status="active",
recap=recap_text,
)
def save_session(self, session_id: str) -> bool:
"""
Save a session for later resumption.
Args:
session_id: The session ID to save
Returns:
True if session was successfully saved, False otherwise
"""
if session_id not in self._active_sessions:
logger.warning(f"Cannot save session {session_id}: not found in active sessions")
return False
orchestrator, session = self._active_sessions[session_id]
# Save session data
saved_data = {
"session_id": session.session_id,
"campaign_id": session.campaign_id,
"config": session.config.model_dump(),
"started_at": session.started_at.isoformat(),
"turn_count": session.turn_count,
"conversation_history": session.conversation_history,
"active_agents": dict(session.active_agents),
"metadata": dict(session.metadata)
}
# Save StyleTracker observations if present
if session_id in self._style_trackers:
style_tracker = self._style_trackers[session_id]
# StyleTracker._observations is dict[str, dict[str, int]] - fully JSON-serializable
saved_data["style_tracker_observations"] = dict(style_tracker._observations)
logger.info(f"[Terminology] Saved StyleTracker observations for session {session_id}")
self._saved_sessions[session_id] = saved_data
# Save FactDatabase if present
if session_id in self._fact_databases:
fact_db = self._fact_databases[session_id]
fact_db.save()
logger.info(f"[Hybrid Python] Saved FactDatabase for session {session_id}")
logger.info(f"Saved session {session_id} (turn count: {session.turn_count})")
return True
def end_session(self, session_id: str) -> bool:
"""
End an active session.
Args:
session_id: The session ID to end
Returns:
True if session was successfully ended, False otherwise
"""
if session_id not in self._active_sessions:
logger.warning(f"Cannot end session {session_id}: not found in active sessions")
return False
orchestrator, session = self._active_sessions[session_id]
# Save FactDatabase before ending
if session_id in self._fact_databases:
fact_db = self._fact_databases[session_id]
fact_db.save()
logger.info(f"[Hybrid Python] Saved FactDatabase for session {session_id}")
del self._fact_databases[session_id]
# Clean up terminology system
if session_id in self._term_resolvers:
del self._term_resolvers[session_id]
if session_id in self._style_trackers:
del self._style_trackers[session_id]
logger.info(f"[Terminology] Cleaned up terminology system for session {session_id}")
# End session via orchestrator
try:
orchestrator.end_session()
except Exception as e:
logger.error(f"Error ending session {session_id}: {e}")
return False
# Remove from active sessions
del self._active_sessions[session_id]
logger.info(f"Ended session {session_id} (final turn count: {session.turn_count})")
return True
def get_session_state(self, session_id: str) -> Optional[SessionState]:
"""
Get the current state of an active session.
Args:
session_id: The session ID to query
Returns:
SessionState if session is active, None otherwise
"""
if session_id not in self._active_sessions:
return None
orchestrator, session = self._active_sessions[session_id]
return self._build_session_state(
orchestrator=orchestrator,
session=session,
status="active"
)
@staticmethod
def _create_llm_clients(config: ClaudmasterConfig) -> tuple:
"""
Create LLM clients for the dual-agent architecture.
Attempts to create real Anthropic clients. Falls back to MockLLMClient
if the anthropic package is not installed or API key is missing.
Args:
config: Session configuration with per-agent model settings.
Returns:
Tuple of (narrator_llm, arbiter_llm) client instances.
"""
try:
narrator_llm = AnthropicLLMClient(
model=config.narrator_model,
temperature=config.narrator_temperature,
default_max_tokens=config.narrator_max_tokens,
)
arbiter_llm = AnthropicLLMClient(
model=config.arbiter_model,
temperature=config.arbiter_temperature,
default_max_tokens=config.arbiter_max_tokens,
)
logger.info(
f"[Dual-Agent] Created Anthropic LLM clients: "
f"Narrator={config.narrator_model}, Arbiter={config.arbiter_model}"
)
return narrator_llm, arbiter_llm
except (LLMDependencyError, Exception) as e:
# Fallback to mock clients (useful for testing without API key)
logger.warning(
f"[Dual-Agent] Cannot create Anthropic clients ({e}). "
f"Using MockLLMClient as fallback."
)
narrator_llm = MockLLMClient(
default_response="The scene unfolds before you..."
)
arbiter_llm = MockLLMClient(
default_response='{"success": true, "dice_rolls": [], "state_changes": [], '
'"rules_applied": [], "narrative_hooks": ["The action resolves."], '
'"reasoning": "Mock resolution."}'
)
return narrator_llm, arbiter_llm
@staticmethod
def _try_register_module_keeper(
orchestrator: Orchestrator,
campaign: Campaign,
module_id: Optional[str],
) -> None:
"""Try to register a ModuleKeeperAgent with the orchestrator.
Only registers when ChromaDB is available and a module structure
can be obtained. Silently skips on any failure so gameplay is
never blocked by missing RAG infrastructure.
Args:
orchestrator: The orchestrator to register the agent with.
campaign: The campaign being played.
module_id: Optional module ID to load structure from.
"""
if not HAS_CHROMADB:
logger.info("[ModuleKeeper] ChromaDB not available — skipping registration")
return
try:
from ..vector_store import VectorStoreManager
from ..agents.module_keeper import ModuleKeeperAgent
from ..models.module import ModuleStructure
# Determine storage path for vector data
campaign_path = (
Path(_storage.data_dir) / "campaigns" / campaign.id
if _storage
else Path(f"/tmp/campaigns/{campaign.id}")
)
vector_dir = str(campaign_path / "claudmaster_sessions" / "vector_store")
# Initialize vector store
vector_store = VectorStoreManager(persist_directory=vector_dir)
# Build a minimal ModuleStructure from campaign data
# In the future this would come from a parsed PDF module
module_structure = ModuleStructure(
module_id=module_id or campaign.id,
title=campaign.name,
)
module_keeper = ModuleKeeperAgent(
vector_store=vector_store,
module_structure=module_structure,
current_location=campaign.game_state.current_location,
)
orchestrator.register_agent("module_keeper", module_keeper)
logger.info(
"[ModuleKeeper] Registered for campaign '%s' (vector_dir=%s)",
campaign.name, vector_dir,
)
except Exception as exc:
logger.warning("[ModuleKeeper] Registration failed: %s", exc)
def _register_ai_companions(
self,
orchestrator: Orchestrator,
campaign: Campaign,
config: ClaudmasterConfig,
) -> None:
"""Register PlayerCharacterAgent for each AI companion in the campaign.
Reads AI companion IDs from game_state notes and creates an agent
for each. Uses an economy LLM client for companion decisions.
Args:
orchestrator: The orchestrator to register agents with.
campaign: The campaign being played.
config: Session config for LLM client creation.
"""
# Parse AI companion IDs from game_state notes
notes = campaign.game_state.notes or ""
if "ai_companions:" not in notes:
return
# Extract companion IDs: "ai_companions:[id1,id2,...]"
import re
match = re.search(r'ai_companions:\[([^\]]*)\]', notes)
if not match:
return
companion_ids = [cid.strip() for cid in match.group(1).split(",") if cid.strip()]
if not companion_ids:
return
# Create a shared LLM client for all AI companions (economy tier)
try:
pc_llm, _ = self._create_llm_clients(config)
except Exception as e:
logger.warning(f"[AI Companions] Could not create LLM client: {e}")
return
# Map role archetypes from character class
class_archetype_map = {
"fighter": CompanionArchetype.TANK,
"barbarian": CompanionArchetype.TANK,
"paladin": CompanionArchetype.TANK,
"cleric": CompanionArchetype.HEALER,
"druid": CompanionArchetype.HEALER,
"wizard": CompanionArchetype.STRIKER,
"sorcerer": CompanionArchetype.STRIKER,
"warlock": CompanionArchetype.STRIKER,
"rogue": CompanionArchetype.STRIKER,
"ranger": CompanionArchetype.SUPPORT,
"bard": CompanionArchetype.SUPPORT,
"monk": CompanionArchetype.STRIKER,
}
registered = 0
for char_id in companion_ids:
# Find character by ID or name
character = None
for char_name, char in campaign.characters.items():
if char.id == char_id or char_name == char_id:
character = char
break
if not character:
logger.warning(f"[AI Companions] Character '{char_id}' not found in campaign")
continue
# Determine archetype from class
class_name = character.character_class.name.lower() if character.character_class else ""
archetype = class_archetype_map.get(class_name, CompanionArchetype.SUPPORT)
# Create and register the agent
pc_agent = PlayerCharacterAgent(
character=character,
llm=pc_llm,
archetype=archetype,
max_tokens=config.max_tokens // 4, # Use smaller token budget
)
orchestrator.register_agent(pc_agent.name, pc_agent)
registered += 1
logger.info(f"[AI Companions] Registered {pc_agent.name} (archetype: {archetype.value})")
if registered:
logger.info(f"[AI Companions] Total {registered} AI companion(s) registered")
@staticmethod
def _extract_recap_data(
campaign: Campaign,
session: ClaudmasterSession,
) -> dict[str, str]:
"""
Extract factual data from campaign and session for recap generation.
Gathers location, quests, recent events, and party status from the
persisted session data — no hallucination, only verified facts.
Args:
campaign: The campaign with current game state.
session: The restored session with conversation history.
Returns:
Dict with keys: location, active_quests, recent_events, party_status.
"""
# Location
location = campaign.game_state.current_location or "an unknown location"
# Active quests
quest_lines = []
for quest in campaign.quests.values():
if quest.status == "active":
quest_lines.append(f"- {quest.title} (given by {quest.giver})")
active_quests = "\n".join(quest_lines) if quest_lines else "None active"
# Recent events from conversation history (last 5 assistant messages)
event_lines = []
for msg in reversed(session.conversation_history):
if msg.get("role") == "assistant" and len(event_lines) < 5:
content = msg.get("content", "")
# Truncate long messages to key info
if len(content) > 150:
content = content[:147] + "..."
event_lines.append(f"- {content}")
recent_events = "\n".join(event_lines) if event_lines else "- No recent events recorded"
# Party status
party_parts = []
for char_id, character in campaign.characters.items():
hp_info = ""
if hasattr(character, "hit_points_current") and hasattr(character, "hit_points_max"):
hp_info = f", HP {character.hit_points_current}/{character.hit_points_max}"
party_parts.append(
f"{character.name} (L{character.character_class.level} "
f"{character.character_class.name}{hp_info})"
)
party_status = "; ".join(party_parts) if party_parts else "Unknown"
return {
"location": location,
"active_quests": active_quests,
"recent_events": recent_events,
"party_status": party_status,
}
async def _generate_session_recap(
self,
orchestrator: Orchestrator,
session: ClaudmasterSession,
campaign: Campaign,
) -> Optional[str]:
"""
Generate an atmospheric recap for a resumed session.
Extracts facts from session data and feeds them to the Narrator
agent for atmospheric formatting.
Args:
orchestrator: The orchestrator with registered agents.
session: The restored session.
campaign: The campaign with current state.
Returns:
Recap text string, or None if generation fails.
"""
try:
# Skip if no meaningful history to recap
if not session.conversation_history:
logger.info(f"[Recap] No conversation history — skipping recap")
return None
# Extract verified facts
recap_data = self._extract_recap_data(campaign, session)
# Get the Narrator agent for atmospheric formatting
narrator = orchestrator.agents.get("narrator")
if narrator is None:
logger.warning("[Recap] Narrator agent not available — skipping recap")
return None
recap_text = await narrator.generate_recap(**recap_data)
logger.info(
f"[Recap] Generated recap for session {session.session_id} "
f"({len(recap_text.split())} words)"
)
return recap_text
except Exception as e:
logger.warning(f"[Recap] Failed to generate recap: {e}")
return None
def _build_session_state(
self,
orchestrator: Orchestrator,
session: ClaudmasterSession,
status: str = "active",
module_id: Optional[str] = None,
recap: Optional[str] = None,
error_message: Optional[str] = None
) -> SessionState:
"""
Build a SessionState object from orchestrator and session.
Args:
orchestrator: The orchestrator managing the session
session: The session object
status: Session status (active, paused, error)
module_id: Optional module ID if a module is loaded
recap: Optional recap text for resumed sessions
error_message: Optional error message if status is error
Returns:
Complete SessionState object
"""
campaign = orchestrator.campaign
# Build campaign summary
campaign_info = CampaignSummary(
campaign_id=campaign.id,
campaign_name=campaign.name,
character_count=len(campaign.characters),
npc_count=len(campaign.npcs)
)
# Build module summary
module_info = ModuleSummary(
module_id=module_id,
module_name=None, # TODO: Lookup module name from module_id once module system is integrated
is_loaded=module_id is not None
)
# Build game state summary
game_state = GameStateSummary(
current_location=campaign.game_state.current_location,
in_combat=campaign.game_state.in_combat,
turn_count=session.turn_count
)
# Build party info
party_info: list[CharacterSummary] = []
for char_id, character in campaign.characters.items():
party_info.append(
CharacterSummary(
character_id=char_id,
character_name=character.name,
character_class=character.character_class.name,
level=character.character_class.level
)
)
# Extract last events from conversation history
last_events: list[str] = []
for msg in session.conversation_history[-5:]: # Last 5 messages
role = msg.get("role", "unknown")
content = msg.get("content", "")
# Truncate long messages
if len(content) > 100:
content = content[:97] + "..."
last_events.append(f"[{role}] {content}")
# Calculate context budget (simplified - assumes 4096 max tokens)
# This is a rough estimate based on conversation history length
used_tokens = sum(len(msg.get("content", "").split()) * 1.3 for msg in session.conversation_history)
context_budget = max(0, int(session.config.max_tokens - used_tokens))
return SessionState(
session_id=session.session_id,
status=status,
campaign_info=campaign_info,
module_info=module_info,
game_state=game_state,
party_info=party_info,
last_events=last_events,
context_budget=context_budget,
recap=recap,
error_message=error_message
)
# Module-level singleton
_session_manager = SessionManager()
# ============================================================================
# MCP Tool Function
# ============================================================================
async def start_claudmaster_session(
campaign_name: str,
module_id: Optional[str] = None,
session_id: Optional[str] = None,
resume: bool = False,
) -> dict:
"""
Start or resume a Claudmaster AI DM session.
This is the main MCP tool for initiating gameplay sessions with the
multi-agent AI Game Master system.
Args:
campaign_name: Name of the campaign to play
module_id: Optional D&D module to load (e.g., "lost-mine-of-phandelver")
session_id: Session ID to resume (required if resume=True)
resume: Whether to resume an existing session (default: False)
Returns:
Dictionary representation of SessionState with the following keys:
- session_id: Unique session identifier
- status: "active", "paused", or "error"
- campaign_info: Campaign summary (id, name, character_count, npc_count)
- module_info: Module summary (module_id, module_name, is_loaded)
- game_state: Game state summary (current_location, in_combat, turn_count)
- party_info: List of character summaries
- last_events: Recent game events
- context_budget: Remaining context budget
- error_message: Error description if status is "error"
Examples:
Start a new session:
>>> result = await start_claudmaster_session(campaign_name="Dragon Heist")
Resume an existing session:
>>> result = await start_claudmaster_session(
... campaign_name="Dragon Heist",
... session_id="abc123",
... resume=True
... )
Start with a specific module:
>>> result = await start_claudmaster_session(
... campaign_name="Starter Set",
... module_id="lost-mine-of-phandelver"
... )
"""
try:
# Validate inputs
if not campaign_name or not campaign_name.strip():
return {
"session_id": "",
"status": "error",
"error_message": _error_formatter.format_error(ValueError("Empty campaign name"), context={"player_facing": True}) if False else "*The DM looks at you expectantly*\n\nWhich campaign shall we embark upon, adventurer?"
}
if resume and not session_id:
return {
"session_id": "",
"status": "error",
"error_message": "*The DM searches through campaign notes*\n\nTo resume a session, I need to know which session to restore. Please provide the session identifier."
}
# Load campaign from storage
if _storage is None:
return {
"session_id": session_id or "",
"status": "error",
"error_message": (
"*The DM gestures at an empty shelf*\n\n"
"The realm's archives seem... inaccessible at the moment. "
"The storage vault has not been properly prepared for our journey."
)
}
try:
campaign = _storage.load_campaign(campaign_name)
except (FileNotFoundError, ValueError):
# Check if this is a new user — trigger onboarding
if not resume and detect_new_user(_storage):
return await _handle_onboarding(campaign_name)
return {
"session_id": session_id or "",
"status": "error",
"error_message": _error_formatter.format_missing_campaign(campaign_name)
}
# Get Claudmaster config for the campaign
try:
config = _storage.get_claudmaster_config()
except ValueError:
config = None # Will use default config
# Check for in-progress onboarding (session resumed after interruption)
onboarding_data = None
is_onboarding = False
# Start or resume the session
if resume:
state = await _session_manager.resume_session(session_id, campaign)
else:
state = await _session_manager.start_session(campaign, config, module_id)
# Check if this session has onboarding metadata (resumed onboarding)
if session_id and session_id in _session_manager._active_sessions:
_, sess = _session_manager._active_sessions[state.session_id]
if sess.metadata.get("onboarding_state"):
ob_state = OnboardingState.from_dict(sess.metadata["onboarding_state"])
if ob_state.step != "complete":
is_onboarding = True
onboarding_data = {
"step": ob_state.step,
"character_suggestions": sess.metadata.get("character_suggestions", ""),
"onboarding_state": ob_state.to_dict(),
}
result = state.model_dump()
if is_onboarding:
result["is_onboarding"] = True
result["onboarding"] = onboarding_data
return result
except ValueError as e:
logger.error(f"Validation error in start_claudmaster_session: {e}")
return {
"session_id": session_id or "",
"status": "error",
"error_message": _error_formatter.format_error(e)
}
except Exception as e:
logger.error(f"Unexpected error in start_claudmaster_session: {e}", exc_info=True)
return {
"session_id": session_id or "",
"status": "error",
"error_message": _error_formatter.format_error(e)
}
async def _handle_onboarding(campaign_name: str) -> dict:
"""Handle the onboarding flow for a brand-new user.
Called when no campaigns exist and the user tries to start a session.
Creates a campaign with defaults, starts a session, and returns
character suggestions from the Narrator.
Args:
campaign_name: The campaign name provided by the user.
Returns:
Dict with session state + onboarding data.
"""
try:
# Create a temporary config to get LLM clients for the Narrator
config = ClaudmasterConfig()
try:
config = _storage.get_claudmaster_config()
except (ValueError, AttributeError):
pass # Use defaults
narrator_llm, _ = SessionManager._create_llm_clients(config)
# Run the onboarding flow
onboarding_result = await run_onboarding(
storage=_storage,
campaign_name=campaign_name,
narrator=narrator_llm,
)
# Load the newly created campaign
campaign = _storage.load_campaign(onboarding_result.campaign_name)
# Start a session with the new campaign
state = await _session_manager.start_session(campaign, config)
# Store onboarding state in session metadata
_, session = _session_manager._active_sessions[state.session_id]
session.metadata["onboarding_state"] = onboarding_result.onboarding_state.to_dict()
session.metadata["character_suggestions"] = onboarding_result.character_suggestions
# Build response with onboarding data
result = state.model_dump()
result["is_onboarding"] = True
result["onboarding"] = {
"step": "character_creation",
"character_suggestions": onboarding_result.character_suggestions,
"onboarding_state": onboarding_result.onboarding_state.to_dict(),
}
logger.info(
f"[Onboarding] Started onboarding session {state.session_id} "
f"for campaign '{onboarding_result.campaign_name}'"
)
return result
except Exception as e:
logger.error(f"[Onboarding] Failed: {e}", exc_info=True)
return {
"session_id": "",
"status": "error",
"error_message": _error_formatter.format_error(e),
}
async def end_session(
session_id: str,
mode: str = "pause",
summary_notes: Optional[str] = None,
campaign_path: Optional[str] = None,
) -> dict:
"""
End or pause a Claudmaster session, saving all state.
This MCP tool cleanly terminates or pauses a Claudmaster session.
In "pause" mode, all state is persisted to disk so the session can
be resumed later. In "end" mode, state is saved as a final snapshot
and the session is terminated.
Args:
session_id: The session ID to end or pause
mode: "pause" (save for later resumption) or "end" (final termination)
summary_notes: Optional DM notes to save with the session snapshot
campaign_path: Optional path for disk persistence.
If provided, session state is written to disk under this path.
Returns:
Dictionary with the following keys:
- status: "paused" or "ended" on success, "error" on failure
- session_id: The session ID that was ended
- session_summary: Brief summary of the session
- save_path: Where state was persisted (if campaign_path provided)
- stats: Session statistics (duration, turn count, etc.)
- error_message: Error description if status is "error"
Examples:
Pause a session for later:
>>> result = await end_session(
... session_id="abc123",
... mode="pause",
... summary_notes="Party just entered the dungeon"
... )
End a session permanently:
>>> result = await end_session(session_id="abc123", mode="end")
Pause with disk persistence:
>>> result = await end_session(
... session_id="abc123",
... mode="pause",
... campaign_path="/data/campaigns/my-campaign"
... )
"""
try:
# Validate mode
if mode not in ("pause", "end"):
return {
"status": "error",
"session_id": session_id,
"error_message": f"*The DM tilts their head, puzzled*\n\nI'm not familiar with the '{mode}' command. When ending a session, please choose either 'pause' (to resume later) or 'end' (to conclude permanently).",
}
# Check session exists
if session_id not in _session_manager._active_sessions:
return {
"status": "error",
"session_id": session_id,
"error_message": _error_formatter.format_session_not_found(session_id),
}
# Get session info before ending
orchestrator, session = _session_manager._active_sessions[session_id]
turn_count = session.turn_count
started_at = session.started_at
campaign_id = session.campaign_id
# Calculate duration
duration_minutes = int((datetime.now() - started_at).total_seconds() / 60)
# Build stats
stats = {
"turn_count": turn_count,
"duration_minutes": duration_minutes,
"started_at": started_at.isoformat(),
"ended_at": datetime.now().isoformat(),
}
# Save to memory first (for resume capability)
_session_manager.save_session(session_id)
# Persist to disk if campaign_path provided
save_path = None
if campaign_path:
serializer = SessionSerializer(Path(campaign_path))
saved_data = _session_manager._saved_sessions.get(session_id, {})
save_dir = serializer.save_session(
session_data=saved_data,
mode=mode,
summary_notes=summary_notes,
)
save_path = str(save_dir)
# End the session (removes from active)
result_status = "paused" if mode == "pause" else "ended"
_session_manager.end_session(session_id)
# Build summary
session_summary = (
f"Session {session_id} for campaign '{campaign_id}': "
f"{turn_count} turns over {duration_minutes} minutes."
)
logger.info(f"MCP end_session: {result_status} session {session_id}")
return {
"status": result_status,
"session_id": session_id,
"session_summary": session_summary,
"save_path": save_path,
"stats": stats,
}
except Exception as e:
logger.error(f"Error in end_session for {session_id}: {e}", exc_info=True)
return {
"status": "error",
"session_id": session_id,
"error_message": _error_formatter.format_error(e),
}
async def get_session_state(
session_id: str,
detail_level: str = "standard",
include_history: bool = True,
history_limit: int = 10,
) -> dict:
"""
Get the current state of a Claudmaster session.
This MCP tool queries the state of an active session, returning
information about the game state, party, recent history, and
session metadata. Supports multiple detail levels.
Args:
session_id: The session ID to query
detail_level: How much detail to include:
- "minimal": Basic session info and status only
- "standard": Session info, game state, party status, recent history
- "full": Everything including complete context budget analysis
include_history: Whether to include action history in the response
history_limit: Maximum number of history entries to return (default: 10)
Returns:
Dictionary with the following keys:
- session_info: Basic session metadata (id, status, campaign, duration)
- game_state: Current game state (location, combat, turn count)
- party_status: List of character summaries with status
- recent_history: Last N actions (if include_history=True)
- active_quests: Current quest status (placeholder)
- context_usage: Context window utilization info
- error_message: Error description if session not found
Examples:
Get standard session state:
>>> result = await get_session_state(session_id="abc123")
Get minimal info (fast):
>>> result = await get_session_state(
... session_id="abc123",
... detail_level="minimal"
... )
Get full state with extended history:
>>> result = await get_session_state(
... session_id="abc123",
... detail_level="full",
... history_limit=50
... )
"""
try:
# Validate detail level
valid_levels = ("minimal", "standard", "full")
if detail_level not in valid_levels:
return {
"error_message": (
f"*The DM adjusts their spectacles*\n\n"
f"I don't recognize the '{detail_level}' level of detail. "
f"Please choose from: {', '.join(valid_levels)}."
)
}
# Get session state from manager
state = _session_manager.get_session_state(session_id)
if state is None:
return {
"error_message": _error_formatter.format_session_not_found(session_id)
}
# Get raw session for additional details
orchestrator, session = _session_manager._active_sessions[session_id]
# Calculate duration
duration_minutes = int((datetime.now() - session.started_at).total_seconds() / 60)
# Build session_info (always included)
session_info = {
"session_id": state.session_id,
"status": state.status,
"campaign_id": state.campaign_info.campaign_id,
"campaign_name": state.campaign_info.campaign_name,
"duration_minutes": duration_minutes,
"turn_count": session.turn_count,
}
# Minimal level: just session info
if detail_level == "minimal":
return {"session_info": session_info}
# Standard and full: add game state and party
game_state = state.game_state.model_dump()
party_status = [char.model_dump() for char in state.party_info]
# Build history if requested
recent_history: list[dict] = []
if include_history:
limit = history_limit if detail_level == "standard" else max(history_limit, 50)
for msg in session.conversation_history[-limit:]:
recent_history.append(msg)
# Context usage
context_usage = {
"context_budget_remaining": state.context_budget,
"max_tokens": session.config.max_tokens,
}
# Full level: add extra detail
if detail_level == "full":
context_usage["conversation_length"] = len(session.conversation_history)
context_usage["active_agents"] = dict(session.active_agents)
# Build active quests from campaign data
active_quests = []
if _storage:
campaign = _storage.get_current_campaign()
if campaign:
active_quests = [
{"title": quest.title, "status": quest.status, "giver": quest.giver}
for quest in campaign.quests.values()
if quest.status == "active"
]
return {
"session_info": session_info,
"game_state": game_state,
"party_status": party_status,
"recent_history": recent_history,
"active_quests": active_quests,
"context_usage": context_usage,
}
except Exception as e:
logger.error(f"Error in get_session_state for {session_id}: {e}", exc_info=True)
return {
"error_message": _error_formatter.format_error(e)
}
__all__ = [
"CampaignSummary",
"ModuleSummary",
"GameStateSummary",
"CharacterSummary",
"SessionState",
"SessionManager",
"SessionMetadata",
"set_storage",
"start_claudmaster_session",
"end_session",
"get_session_state",
]