sequentialthinking
Process complex problems through sequential analysis using a coordinated team of AI agents for planning, research, critique, and synthesis in structured thought steps.
Instructions
Advanced sequential thinking tool with multi-agent coordination.
Processes thoughts through a specialized team of AI agents that coordinate to provide comprehensive analysis, planning, research, critique, and synthesis.
Args: thought: Content of the thinking step (required) thought_number: Sequence number starting from 1 (≥1) total_thoughts: Estimated total thoughts required (≥5) next_needed: Whether another thought step follows this one is_revision: Whether this thought revises a previous thought revises_thought: Thought number being revised (requires is_revision=True) branch_from: Thought number to branch from for alternative exploration branch_id: Unique identifier for the branch (required if branch_from set) needs_more: Whether more thoughts are needed beyond the initial estimate
Returns: Synthesized response from the multi-agent team with guidance for next steps
Raises: ProcessingError: When thought processing fails ValidationError: When input validation fails RuntimeError: When server state is invalid
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| branch_from | No | ||
| branch_id | No | ||
| is_revision | No | ||
| needs_more | No | ||
| next_needed | Yes | ||
| revises_thought | No | ||
| thought | Yes | ||
| thought_number | Yes | ||
| total_thoughts | Yes |
Implementation Reference
- The main MCP tool handler function 'sequentialthinking' that sanitizes inputs, creates validated ThoughtData, and delegates processing to the ThoughtProcessor with comprehensive error handling.@mcp.tool() async def sequentialthinking( thought: str, thoughtNumber: int, totalThoughts: int, nextThoughtNeeded: bool, isRevision: bool, branchFromThought: int | None, branchId: str | None, needsMoreThoughts: bool, ) -> str: """Advanced sequential thinking tool with multi-agent coordination. Processes thoughts through a specialized team of AI agents that coordinate to provide comprehensive analysis, planning, research, critique, and synthesis. Args: thought: Content of the thinking step (required) thoughtNumber: Sequence number starting from {ThoughtProcessingLimits.MIN_THOUGHT_SEQUENCE} (≥{ThoughtProcessingLimits.MIN_THOUGHT_SEQUENCE}) totalThoughts: Estimated total thoughts required (≥1) nextThoughtNeeded: Whether another thought step follows this one isRevision: Whether this thought revises a previous thought branchFromThought: Thought number to branch from for alternative exploration branchId: Unique identifier for the branch (required if branchFromThought set) needsMoreThoughts: Whether more thoughts are needed beyond the initial estimate Returns: Synthesized response from the multi-agent team with guidance for next steps Raises: ProcessingError: When thought processing fails ValidationError: When input validation fails RuntimeError: When server state is invalid """ # Capture server state locally to avoid async race conditions current_server_state = _server_state if current_server_state is None: return "Server Error: Server not initialized" try: # Create and validate thought data using refactored function thought_data = create_validated_thought_data( thought=thought, thoughtNumber=thoughtNumber, totalThoughts=totalThoughts, nextThoughtNeeded=nextThoughtNeeded, isRevision=isRevision, branchFromThought=branchFromThought, branchId=branchId, needsMoreThoughts=needsMoreThoughts, ) # Use captured state directly to avoid race conditions global _thought_processor async with _processor_lock: if _thought_processor is None: logger.info( "Initializing ThoughtProcessor with Multi-Thinking workflow" ) _thought_processor = ThoughtProcessor(current_server_state.session) result = await _thought_processor.process_thought(thought_data) logger.info(f"Successfully processed thought #{thoughtNumber}") return result except ValidationError as e: error_msg = f"Input validation failed for thought #{thoughtNumber}: {e}" logger.exception(error_msg) return f"Validation Error: {e}" except ThoughtProcessingError as e: error_msg = f"Processing failed for thought #{thoughtNumber}: {e}" logger.exception(error_msg) if hasattr(e, "metadata") and e.metadata: logger.exception(f"Error metadata: {e.metadata}") return f"Processing Error: {e}" except Exception as e: error_msg = f"Unexpected error processing thought #{thoughtNumber}: {e}" logger.exception(error_msg) return f"Unexpected Error: {e}"
- Pydantic model 'ThoughtData' providing input schema validation and type determination for sequential thinking thoughts.class ThoughtData(BaseModel): """Streamlined thought data model with consolidated validation.""" model_config = {"validate_assignment": True, "frozen": True} # Core fields thought: str = Field( ..., min_length=FieldLengthLimits.MIN_STRING_LENGTH, description="Content of the thought", ) # MCP API compatibility - camelCase field names required thoughtNumber: ThoughtNumber = Field( # noqa: N815 ..., ge=ValidationLimits.MIN_THOUGHT_NUMBER, description="Sequence number starting from 1", ) totalThoughts: int = Field( # noqa: N815 ..., ge=1, description="Estimated total thoughts", ) nextThoughtNeeded: bool = Field( # noqa: N815 ..., description="Whether another thought is needed" ) # Required workflow fields isRevision: bool = Field( # noqa: N815 ..., description="Whether this revises a previous thought" ) branchFromThought: ThoughtNumber | None = Field( # noqa: N815 ..., ge=ValidationLimits.MIN_THOUGHT_NUMBER, description="Thought number to branch from", ) branchId: BranchId | None = Field( # noqa: N815 ..., description="Unique branch identifier" ) needsMoreThoughts: bool = Field( # noqa: N815 ..., description="Whether more thoughts are needed beyond estimate" ) @property def thought_type(self) -> ThoughtType: """Determine the type of thought based on field values.""" if self.isRevision: return ThoughtType.REVISION if self.branchFromThought is not None: return ThoughtType.BRANCH return ThoughtType.STANDARD @model_validator(mode="before") @classmethod def validate_thought_data(cls, data: dict[str, Any]) -> dict[str, Any]: """Consolidated validation with simplified logic.""" if isinstance(data, dict): _validate_thought_relationships(data) return data def format_for_log(self) -> str: """Format thought for logging with optimized type-specific formatting.""" # Use match statement for modern Python pattern matching match self.thought_type: case ThoughtType.REVISION: prefix = ( f"Revision {self.thoughtNumber}/{self.totalThoughts} " f"(revising #{self.branchFromThought})" ) case ThoughtType.BRANCH: prefix = ( f"Branch {self.thoughtNumber}/{self.totalThoughts} " f"(from #{self.branchFromThought}, ID: {self.branchId})" ) case _: # ThoughtType.STANDARD prefix = f"Thought {self.thoughtNumber}/{self.totalThoughts}" # Use multiline string formatting for better readability return ( f"{prefix}\n" f" Content: {self.thought}\n" f" Next: {self.nextThoughtNeeded}, More: {self.needsMoreThoughts}" )
- src/mcp_server_mas_sequential_thinking/main.py:147-147 (registration)FastMCP @mcp.tool() decorator registering the 'sequentialthinking' tool.@mcp.tool()
- ThoughtProcessor.process_thought method orchestrating the core sequential thinking workflow: logging, session management, context building, multi-thinking execution, response formatting.async def process_thought(self, thought_data: ThoughtData) -> str: """Process a thought through the appropriate workflow with comprehensive error handling. This is the main public API method that maintains backward compatibility while using the new service-based architecture internally. Args: thought_data: The thought data to process Returns: Processed thought response Raises: ThoughtProcessingError: If processing fails """ try: return await self._process_thought_internal(thought_data) except Exception as e: error_msg = f"Failed to process {thought_data.thought_type.value} thought #{thought_data.thoughtNumber}: {e}" logger.error(error_msg, exc_info=True) metadata: ProcessingMetadata = { "error_count": ProcessingDefaults.ERROR_COUNT_INITIAL, "retry_count": ProcessingDefaults.RETRY_COUNT_INITIAL, "processing_time": ProcessingDefaults.PROCESSING_TIME_INITIAL, } raise ThoughtProcessingError(error_msg, metadata) from e async def _process_thought_internal(self, thought_data: ThoughtData) -> str: """Internal thought processing logic using specialized services. Args: thought_data: The thought data to process Returns: Processed thought response """ start_time = time.time() # Log thought data and add to session (now async for thread safety) self._log_thought_data(thought_data) await self._session.add_thought(thought_data) # Build context using specialized service (now async for thread safety) input_prompt = await self._context_builder.build_context_prompt(thought_data) await self._context_builder.log_context_building(thought_data, input_prompt) # Execute Multi-Thinking workflow using specialized service ( content, workflow_result, total_time, ) = await self._workflow_executor.execute_workflow( thought_data, input_prompt, start_time ) # Format response using specialized service final_response = self._response_formatter.format_response(content, thought_data) # Log workflow completion self._workflow_executor.log_workflow_completion( thought_data, workflow_result, total_time, final_response ) return final_response
- MultiThinkingSequentialProcessor.process_with_multi_thinking implementing intelligent routing and parallel multi-agent thinking execution (factual, emotional, critical, optimistic, creative, synthesis).async def process_with_multi_thinking( self, thought_data: "ThoughtData", context_prompt: str = "" ) -> MultiThinkingProcessingResult: """Process thoughts using multi-thinking methodology with parallel execution.""" start_time = time.time() logger.info("Multi-thinking processing started") if logger.isEnabledFor(logging.INFO): logger.info("Input preview: %s", thought_data.thought[:100]) logger.info("Context length: %d chars", len(context_prompt)) try: # Step 1: Intelligent routing decision routing_decision = await self.router.route_thought(thought_data) logger.info("Selected strategy: %s", routing_decision.strategy.name) if logger.isEnabledFor(logging.INFO): sequence = [ direction.value for direction in routing_decision.strategy.thinking_sequence ] logger.info("Thinking sequence: %s", sequence) # Step 2: Execute processing based on complexity if routing_decision.strategy.complexity == ProcessingDepth.SINGLE: result = await self._process_single_direction( thought_data, context_prompt, routing_decision ) elif routing_decision.strategy.complexity == ProcessingDepth.DOUBLE: result = await self._process_double_direction_sequence( thought_data, context_prompt, routing_decision ) elif routing_decision.strategy.complexity == ProcessingDepth.TRIPLE: result = await self._process_triple_direction_sequence( thought_data, context_prompt, routing_decision ) else: # FULL result = await self._process_full_direction_sequence( thought_data, context_prompt, routing_decision ) processing_time = time.time() - start_time # Create final result final_result = MultiThinkingProcessingResult( content=result["final_content"], strategy_used=routing_decision.strategy.name, thinking_sequence=[ direction.value for direction in routing_decision.strategy.thinking_sequence ], processing_time=processing_time, complexity_score=routing_decision.complexity_metrics.complexity_score, cost_reduction=routing_decision.estimated_cost_reduction, individual_results=result.get("individual_results", {}), step_name="multi_thinking_processing", ) logger.info( "Multi-thinking processing completed - Time: %.3fs, Cost reduction: %.1f%%, Output: %d chars", processing_time, routing_decision.estimated_cost_reduction, len(final_result.content), ) return final_result