"""
ThinkingService - Sequential thinking with revision and branching
=================================================================
Based on Anthropic's sequential-thinking MCP (MIT Licensed)
Extended with session management, confidence tracking, and bridge integration.
This is the "think" in Think. Do. Done.
"""
import uuid
import time
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, field
from datetime import datetime
from ..config import get_config
from ..logger import get_logger
from ..utils.errors import ThinkingError, ValidationError, BranchError, RevisionError
from ..utils.responses import thinking_response, success_response, error_response
@dataclass
class Thought:
"""Represents a single thought in the thinking process"""
thought_number: int
content: str
timestamp: datetime = field(default_factory=datetime.utcnow)
is_revision: bool = False
revises_thought: Optional[int] = None
branch_id: Optional[str] = None
branch_from: Optional[int] = None
confidence: Optional[float] = None
tags: List[str] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary"""
result = {
'thought_number': self.thought_number,
'content': self.content,
'timestamp': self.timestamp.isoformat()
}
if self.is_revision:
result['is_revision'] = True
result['revises_thought'] = self.revises_thought
if self.branch_id:
result['branch_id'] = self.branch_id
result['branch_from'] = self.branch_from
if self.confidence is not None:
result['confidence'] = self.confidence
if self.tags:
result['tags'] = self.tags
return result
@dataclass
class ThinkingSession:
"""Manages a thinking session with all its thoughts and branches"""
session_id: str
problem: str
thoughts: List[Thought] = field(default_factory=list)
branches: Dict[str, List[Thought]] = field(default_factory=dict)
total_thoughts_estimate: int = 5
created_at: datetime = field(default_factory=datetime.utcnow)
completed: bool = False
main_branch: str = "main"
def get_current_thought_number(self, branch_id: Optional[str] = None) -> int:
"""Get the next thought number for a branch"""
if branch_id and branch_id in self.branches:
return len(self.branches[branch_id]) + 1
return len(self.thoughts) + 1
def add_thought(self, thought: Thought) -> None:
"""Add a thought to the session"""
if thought.branch_id and thought.branch_id != self.main_branch:
if thought.branch_id not in self.branches:
self.branches[thought.branch_id] = []
self.branches[thought.branch_id].append(thought)
else:
self.thoughts.append(thought)
def get_thought(self, thought_number: int, branch_id: Optional[str] = None) -> Optional[Thought]:
"""Get a specific thought by number"""
thoughts = self.branches.get(branch_id, self.thoughts) if branch_id else self.thoughts
for thought in thoughts:
if thought.thought_number == thought_number:
return thought
return None
def to_dict(self) -> Dict[str, Any]:
"""Convert session to dictionary"""
return {
'session_id': self.session_id,
'problem': self.problem,
'thoughts': [t.to_dict() for t in self.thoughts],
'branches': {
branch_id: [t.to_dict() for t in thoughts]
for branch_id, thoughts in self.branches.items()
},
'total_thoughts_estimate': self.total_thoughts_estimate,
'created_at': self.created_at.isoformat(),
'completed': self.completed,
'thought_count': len(self.thoughts),
'branch_count': len(self.branches)
}
class ThinkingService:
"""
Sequential thinking service with revision and branching capabilities
Features:
- Step-by-step reasoning with explicit thought tracking
- Revision capability: "wait, thought #3 was wrong"
- Branching: explore alternative approaches
- Dynamic expansion: can extend thinking when scope unclear
- Confidence tracking per thought
- Session management for complex problems
"""
def __init__(self):
"""Initialize the thinking service"""
self.config = get_config()
self.logger = get_logger('ThinkingService')
# Configuration
self.max_thoughts = self.config.thinking.max_thoughts
self.max_branches = self.config.thinking.max_branches
self.max_revisions = self.config.thinking.max_revisions_per_thought
# Active sessions
self.sessions: Dict[str, ThinkingSession] = {}
self.logger.info(
f"ThinkingService initialized: max_thoughts={self.max_thoughts}, "
f"max_branches={self.max_branches}"
)
def start_thinking(
self,
problem: str,
initial_estimate: int = 5
) -> Dict[str, Any]:
"""
Start a new thinking session
Args:
problem: The problem or question to think through
initial_estimate: Initial estimate of thoughts needed
Returns:
Response with session information
"""
self.logger.info(f"Starting thinking session for: {problem[:50]}...")
try:
# Validate
if not problem or not problem.strip():
raise ValidationError("Problem statement cannot be empty")
if initial_estimate < 1:
initial_estimate = 1
elif initial_estimate > self.max_thoughts:
initial_estimate = self.max_thoughts
# Create session
session_id = f"think_{uuid.uuid4().hex[:12]}"
session = ThinkingSession(
session_id=session_id,
problem=problem.strip(),
total_thoughts_estimate=initial_estimate
)
self.sessions[session_id] = session
self.logger.info(f"Created thinking session: {session_id}")
return success_response(
f"Thinking session started",
data={
'session_id': session_id,
'problem': problem,
'estimated_thoughts': initial_estimate,
'instructions': (
"Use 'think' to add thoughts. "
"Use 'revise' to revise previous thoughts. "
"Use 'branch' to explore alternatives. "
"Use 'conclude' when done thinking."
)
}
)
except ValidationError:
raise
except Exception as e:
self.logger.exception(f"Failed to start thinking session: {e}")
raise ThinkingError(f"Failed to start thinking: {str(e)}")
def think(
self,
session_id: str,
thought: str,
thought_number: Optional[int] = None,
total_thoughts: Optional[int] = None,
confidence: Optional[float] = None,
tags: Optional[List[str]] = None,
next_thought_needed: bool = True
) -> Dict[str, Any]:
"""
Add a thought to the thinking session
Args:
session_id: The session ID
thought: The thought content
thought_number: Override thought number (auto-increments by default)
total_thoughts: Update total thoughts estimate
confidence: Confidence level for this thought (0-1)
tags: Optional tags for categorization
next_thought_needed: Whether more thinking is needed
Returns:
Thinking response with thought details
"""
self.logger.debug(f"Adding thought to session {session_id}")
try:
session = self._get_session(session_id)
# Auto-increment thought number if not provided
if thought_number is None:
thought_number = session.get_current_thought_number()
# Update total estimate if provided
if total_thoughts is not None:
session.total_thoughts_estimate = min(total_thoughts, self.max_thoughts)
# Check limits
if thought_number > self.max_thoughts:
raise ThinkingError(
f"Maximum thoughts ({self.max_thoughts}) reached",
suggestion="Consider concluding or branching to a new approach"
)
# Create and add thought
new_thought = Thought(
thought_number=thought_number,
content=thought.strip(),
confidence=confidence,
tags=tags or []
)
session.add_thought(new_thought)
self.logger.info(
f"Session {session_id}: Thought {thought_number}/{session.total_thoughts_estimate}"
)
return thinking_response(
thought_number=thought_number,
total_thoughts=session.total_thoughts_estimate,
thought=thought,
next_thought_needed=next_thought_needed,
confidence=confidence,
metadata={
'session_id': session_id,
'tags': tags
}
)
except (ValidationError, ThinkingError):
raise
except Exception as e:
self.logger.exception(f"Failed to add thought: {e}")
raise ThinkingError(f"Failed to add thought: {str(e)}")
def revise(
self,
session_id: str,
revises_thought: int,
revised_content: str,
reason: Optional[str] = None,
confidence: Optional[float] = None
) -> Dict[str, Any]:
"""
Revise a previous thought
Args:
session_id: The session ID
revises_thought: The thought number being revised
revised_content: The revised thought content
reason: Reason for revision
confidence: Confidence in the revision
Returns:
Thinking response with revision details
"""
self.logger.info(f"Revising thought {revises_thought} in session {session_id}")
try:
session = self._get_session(session_id)
# Check if thought exists
original = session.get_thought(revises_thought)
if not original:
raise RevisionError(
f"Thought {revises_thought} not found",
details={'available_thoughts': len(session.thoughts)}
)
# Count revisions for this thought
revision_count = sum(
1 for t in session.thoughts
if t.is_revision and t.revises_thought == revises_thought
)
if revision_count >= self.max_revisions:
raise RevisionError(
f"Maximum revisions ({self.max_revisions}) for thought {revises_thought} reached",
suggestion="Consider branching to explore a different approach"
)
# Create revision thought
thought_number = session.get_current_thought_number()
revision = Thought(
thought_number=thought_number,
content=revised_content.strip(),
is_revision=True,
revises_thought=revises_thought,
confidence=confidence,
tags=['revision', f'revises_{revises_thought}']
)
session.add_thought(revision)
self.logger.info(
f"Session {session_id}: Revised thought {revises_thought} "
f"(revision {revision_count + 1}/{self.max_revisions})"
)
return thinking_response(
thought_number=thought_number,
total_thoughts=session.total_thoughts_estimate,
thought=revised_content,
next_thought_needed=True,
is_revision=True,
revises_thought=revises_thought,
confidence=confidence,
metadata={
'session_id': session_id,
'reason': reason,
'revision_number': revision_count + 1
}
)
except (RevisionError, ValidationError):
raise
except Exception as e:
self.logger.exception(f"Failed to revise thought: {e}")
raise ThinkingError(f"Failed to revise thought: {str(e)}")
def branch(
self,
session_id: str,
branch_from: int,
branch_name: Optional[str] = None,
first_thought: Optional[str] = None
) -> Dict[str, Any]:
"""
Create a new branch to explore an alternative approach
Args:
session_id: The session ID
branch_from: The thought number to branch from
branch_name: Optional name for the branch
first_thought: Optional first thought in the new branch
Returns:
Response with branch information
"""
self.logger.info(f"Creating branch from thought {branch_from} in session {session_id}")
try:
session = self._get_session(session_id)
# Check limits
if len(session.branches) >= self.max_branches:
raise BranchError(
f"Maximum branches ({self.max_branches}) reached",
suggestion="Consider concluding an existing branch first"
)
# Verify branch point exists
branch_point = session.get_thought(branch_from)
if not branch_point:
raise BranchError(
f"Thought {branch_from} not found - cannot branch from non-existent thought"
)
# Create branch ID
branch_id = branch_name or f"branch_{len(session.branches) + 1}"
# Initialize branch
session.branches[branch_id] = []
response_data = {
'session_id': session_id,
'branch_id': branch_id,
'branch_from': branch_from,
'total_branches': len(session.branches)
}
# Add first thought if provided
if first_thought:
new_thought = Thought(
thought_number=1,
content=first_thought.strip(),
branch_id=branch_id,
branch_from=branch_from
)
session.branches[branch_id].append(new_thought)
response_data['first_thought'] = new_thought.to_dict()
self.logger.info(f"Created branch '{branch_id}' from thought {branch_from}")
return success_response(
f"Branch '{branch_id}' created from thought {branch_from}",
data=response_data
)
except (BranchError, ValidationError):
raise
except Exception as e:
self.logger.exception(f"Failed to create branch: {e}")
raise ThinkingError(f"Failed to create branch: {str(e)}")
def conclude(
self,
session_id: str,
conclusion: str,
selected_branch: Optional[str] = None,
confidence: Optional[float] = None
) -> Dict[str, Any]:
"""
Conclude the thinking session with a final synthesis
Args:
session_id: The session ID
conclusion: The final conclusion/synthesis
selected_branch: If branched, which branch to follow
confidence: Overall confidence in the conclusion
Returns:
Response with complete thinking summary
"""
self.logger.info(f"Concluding thinking session {session_id}")
try:
session = self._get_session(session_id)
# Mark as completed
session.completed = True
# Create conclusion thought
conclusion_thought = Thought(
thought_number=session.get_current_thought_number(),
content=conclusion.strip(),
confidence=confidence,
tags=['conclusion', 'final']
)
session.add_thought(conclusion_thought)
# Build summary
summary = {
'session_id': session_id,
'problem': session.problem,
'total_thoughts': len(session.thoughts),
'branches_explored': len(session.branches),
'selected_branch': selected_branch,
'conclusion': conclusion,
'confidence': confidence,
'thoughts': [t.to_dict() for t in session.thoughts],
'duration_seconds': (
datetime.utcnow() - session.created_at
).total_seconds()
}
if session.branches:
summary['branches'] = {
bid: [t.to_dict() for t in thoughts]
for bid, thoughts in session.branches.items()
}
self.logger.info(
f"Session {session_id} concluded: {len(session.thoughts)} thoughts, "
f"{len(session.branches)} branches"
)
return success_response(
"Thinking session concluded",
data=summary,
metadata={'ready_for_action': True}
)
except Exception as e:
self.logger.exception(f"Failed to conclude session: {e}")
raise ThinkingError(f"Failed to conclude session: {str(e)}")
def get_session(self, session_id: str) -> Dict[str, Any]:
"""
Get the current state of a thinking session
Args:
session_id: The session ID
Returns:
Session state
"""
session = self._get_session(session_id)
return success_response(
f"Session {session_id} state",
data=session.to_dict()
)
def list_sessions(self, include_completed: bool = False) -> Dict[str, Any]:
"""
List all thinking sessions
Args:
include_completed: Include completed sessions
Returns:
List of sessions
"""
sessions = []
for session in self.sessions.values():
if include_completed or not session.completed:
sessions.append({
'session_id': session.session_id,
'problem': session.problem[:50] + '...' if len(session.problem) > 50 else session.problem,
'thought_count': len(session.thoughts),
'branch_count': len(session.branches),
'completed': session.completed,
'created_at': session.created_at.isoformat()
})
return success_response(
f"Found {len(sessions)} sessions",
data={'sessions': sessions}
)
def _get_session(self, session_id: str) -> ThinkingSession:
"""Get a session by ID or raise error"""
if session_id not in self.sessions:
raise ValidationError(
f"Session not found: {session_id}",
suggestion="Use 'start_thinking' to create a new session"
)
return self.sessions[session_id]