Skip to main content
Glama
FradSer

Sequential Thinking Multi-Agent System

by FradSer

sequentialthinking

Process complex problems through sequential analysis using a coordinated team of AI agents for planning, research, critique, and synthesis in structured thought steps.

Instructions

Advanced sequential thinking tool with multi-agent coordination.

Processes thoughts through a specialized team of AI agents that coordinate to provide comprehensive analysis, planning, research, critique, and synthesis.

Args: thought: Content of the thinking step (required) thought_number: Sequence number starting from 1 (≥1) total_thoughts: Estimated total thoughts required (≥5) next_needed: Whether another thought step follows this one is_revision: Whether this thought revises a previous thought revises_thought: Thought number being revised (requires is_revision=True) branch_from: Thought number to branch from for alternative exploration branch_id: Unique identifier for the branch (required if branch_from set) needs_more: Whether more thoughts are needed beyond the initial estimate

Returns: Synthesized response from the multi-agent team with guidance for next steps

Raises: ProcessingError: When thought processing fails ValidationError: When input validation fails RuntimeError: When server state is invalid

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
branch_fromNo
branch_idNo
is_revisionNo
needs_moreNo
next_neededYes
revises_thoughtNo
thoughtYes
thought_numberYes
total_thoughtsYes

Implementation Reference

  • The main MCP tool handler function 'sequentialthinking' that sanitizes inputs, creates validated ThoughtData, and delegates processing to the ThoughtProcessor with comprehensive error handling.
    @mcp.tool()
    async def sequentialthinking(
        thought: str,
        thoughtNumber: int,
        totalThoughts: int,
        nextThoughtNeeded: bool,
        isRevision: bool,
        branchFromThought: int | None,
        branchId: str | None,
        needsMoreThoughts: bool,
    ) -> str:
        """Advanced sequential thinking tool with multi-agent coordination.
    
        Processes thoughts through a specialized team of AI agents that coordinate
        to provide comprehensive analysis, planning, research, critique, and synthesis.
    
        Args:
            thought: Content of the thinking step (required)
            thoughtNumber: Sequence number starting from {ThoughtProcessingLimits.MIN_THOUGHT_SEQUENCE} (≥{ThoughtProcessingLimits.MIN_THOUGHT_SEQUENCE})
            totalThoughts: Estimated total thoughts required (≥1)
            nextThoughtNeeded: Whether another thought step follows this one
            isRevision: Whether this thought revises a previous thought
            branchFromThought: Thought number to branch from for alternative exploration
            branchId: Unique identifier for the branch (required if branchFromThought set)
            needsMoreThoughts: Whether more thoughts are needed beyond the initial estimate
    
        Returns:
            Synthesized response from the multi-agent team with guidance for next steps
    
        Raises:
            ProcessingError: When thought processing fails
            ValidationError: When input validation fails
            RuntimeError: When server state is invalid
        """
        # Capture server state locally to avoid async race conditions
        current_server_state = _server_state
        if current_server_state is None:
            return "Server Error: Server not initialized"
    
        try:
            # Create and validate thought data using refactored function
            thought_data = create_validated_thought_data(
                thought=thought,
                thoughtNumber=thoughtNumber,
                totalThoughts=totalThoughts,
                nextThoughtNeeded=nextThoughtNeeded,
                isRevision=isRevision,
                branchFromThought=branchFromThought,
                branchId=branchId,
                needsMoreThoughts=needsMoreThoughts,
            )
    
            # Use captured state directly to avoid race conditions
            global _thought_processor
            async with _processor_lock:
                if _thought_processor is None:
                    logger.info(
                        "Initializing ThoughtProcessor with Multi-Thinking workflow"
                    )
                    _thought_processor = ThoughtProcessor(current_server_state.session)
    
            result = await _thought_processor.process_thought(thought_data)
    
            logger.info(f"Successfully processed thought #{thoughtNumber}")
            return result
    
        except ValidationError as e:
            error_msg = f"Input validation failed for thought #{thoughtNumber}: {e}"
            logger.exception(error_msg)
            return f"Validation Error: {e}"
    
        except ThoughtProcessingError as e:
            error_msg = f"Processing failed for thought #{thoughtNumber}: {e}"
            logger.exception(error_msg)
            if hasattr(e, "metadata") and e.metadata:
                logger.exception(f"Error metadata: {e.metadata}")
            return f"Processing Error: {e}"
    
        except Exception as e:
            error_msg = f"Unexpected error processing thought #{thoughtNumber}: {e}"
            logger.exception(error_msg)
            return f"Unexpected Error: {e}"
  • Pydantic model 'ThoughtData' providing input schema validation and type determination for sequential thinking thoughts.
    class ThoughtData(BaseModel):
        """Streamlined thought data model with consolidated validation."""
    
        model_config = {"validate_assignment": True, "frozen": True}
    
        # Core fields
        thought: str = Field(
            ...,
            min_length=FieldLengthLimits.MIN_STRING_LENGTH,
            description="Content of the thought",
        )
        # MCP API compatibility - camelCase field names required
        thoughtNumber: ThoughtNumber = Field(  # noqa: N815
            ...,
            ge=ValidationLimits.MIN_THOUGHT_NUMBER,
            description="Sequence number starting from 1",
        )
        totalThoughts: int = Field(  # noqa: N815
            ...,
            ge=1,
            description="Estimated total thoughts",
        )
        nextThoughtNeeded: bool = Field(  # noqa: N815
            ..., description="Whether another thought is needed"
        )
    
        # Required workflow fields
        isRevision: bool = Field(  # noqa: N815
            ..., description="Whether this revises a previous thought"
        )
        branchFromThought: ThoughtNumber | None = Field(  # noqa: N815
            ...,
            ge=ValidationLimits.MIN_THOUGHT_NUMBER,
            description="Thought number to branch from",
        )
        branchId: BranchId | None = Field(  # noqa: N815
            ..., description="Unique branch identifier"
        )
        needsMoreThoughts: bool = Field(  # noqa: N815
            ..., description="Whether more thoughts are needed beyond estimate"
        )
    
        @property
        def thought_type(self) -> ThoughtType:
            """Determine the type of thought based on field values."""
            if self.isRevision:
                return ThoughtType.REVISION
            if self.branchFromThought is not None:
                return ThoughtType.BRANCH
            return ThoughtType.STANDARD
    
        @model_validator(mode="before")
        @classmethod
        def validate_thought_data(cls, data: dict[str, Any]) -> dict[str, Any]:
            """Consolidated validation with simplified logic."""
            if isinstance(data, dict):
                _validate_thought_relationships(data)
            return data
    
        def format_for_log(self) -> str:
            """Format thought for logging with optimized type-specific formatting."""
            # Use match statement for modern Python pattern matching
            match self.thought_type:
                case ThoughtType.REVISION:
                    prefix = (
                        f"Revision {self.thoughtNumber}/{self.totalThoughts} "
                        f"(revising #{self.branchFromThought})"
                    )
                case ThoughtType.BRANCH:
                    prefix = (
                        f"Branch {self.thoughtNumber}/{self.totalThoughts} "
                        f"(from #{self.branchFromThought}, ID: {self.branchId})"
                    )
                case _:  # ThoughtType.STANDARD
                    prefix = f"Thought {self.thoughtNumber}/{self.totalThoughts}"
    
            # Use multiline string formatting for better readability
            return (
                f"{prefix}\n"
                f"  Content: {self.thought}\n"
                f"  Next: {self.nextThoughtNeeded}, More: {self.needsMoreThoughts}"
            )
  • FastMCP @mcp.tool() decorator registering the 'sequentialthinking' tool.
    @mcp.tool()
  • ThoughtProcessor.process_thought method orchestrating the core sequential thinking workflow: logging, session management, context building, multi-thinking execution, response formatting.
    async def process_thought(self, thought_data: ThoughtData) -> str:
        """Process a thought through the appropriate workflow with comprehensive error handling.
    
        This is the main public API method that maintains backward compatibility
        while using the new service-based architecture internally.
    
        Args:
            thought_data: The thought data to process
    
        Returns:
            Processed thought response
    
        Raises:
            ThoughtProcessingError: If processing fails
        """
        try:
            return await self._process_thought_internal(thought_data)
        except Exception as e:
            error_msg = f"Failed to process {thought_data.thought_type.value} thought #{thought_data.thoughtNumber}: {e}"
            logger.error(error_msg, exc_info=True)
            metadata: ProcessingMetadata = {
                "error_count": ProcessingDefaults.ERROR_COUNT_INITIAL,
                "retry_count": ProcessingDefaults.RETRY_COUNT_INITIAL,
                "processing_time": ProcessingDefaults.PROCESSING_TIME_INITIAL,
            }
            raise ThoughtProcessingError(error_msg, metadata) from e
    
    async def _process_thought_internal(self, thought_data: ThoughtData) -> str:
        """Internal thought processing logic using specialized services.
    
        Args:
            thought_data: The thought data to process
    
        Returns:
            Processed thought response
        """
        start_time = time.time()
    
        # Log thought data and add to session (now async for thread safety)
        self._log_thought_data(thought_data)
        await self._session.add_thought(thought_data)
    
        # Build context using specialized service (now async for thread safety)
        input_prompt = await self._context_builder.build_context_prompt(thought_data)
        await self._context_builder.log_context_building(thought_data, input_prompt)
    
        # Execute Multi-Thinking workflow using specialized service
        (
            content,
            workflow_result,
            total_time,
        ) = await self._workflow_executor.execute_workflow(
            thought_data, input_prompt, start_time
        )
    
        # Format response using specialized service
        final_response = self._response_formatter.format_response(content, thought_data)
    
        # Log workflow completion
        self._workflow_executor.log_workflow_completion(
            thought_data, workflow_result, total_time, final_response
        )
    
        return final_response
  • MultiThinkingSequentialProcessor.process_with_multi_thinking implementing intelligent routing and parallel multi-agent thinking execution (factual, emotional, critical, optimistic, creative, synthesis).
    async def process_with_multi_thinking(
        self, thought_data: "ThoughtData", context_prompt: str = ""
    ) -> MultiThinkingProcessingResult:
        """Process thoughts using multi-thinking methodology with parallel execution."""
        start_time = time.time()
    
        logger.info("Multi-thinking processing started")
        if logger.isEnabledFor(logging.INFO):
            logger.info("Input preview: %s", thought_data.thought[:100])
            logger.info("Context length: %d chars", len(context_prompt))
    
        try:
            # Step 1: Intelligent routing decision
            routing_decision = await self.router.route_thought(thought_data)
    
            logger.info("Selected strategy: %s", routing_decision.strategy.name)
            if logger.isEnabledFor(logging.INFO):
                sequence = [
                    direction.value
                    for direction in routing_decision.strategy.thinking_sequence
                ]
                logger.info("Thinking sequence: %s", sequence)
    
            # Step 2: Execute processing based on complexity
            if routing_decision.strategy.complexity == ProcessingDepth.SINGLE:
                result = await self._process_single_direction(
                    thought_data, context_prompt, routing_decision
                )
            elif routing_decision.strategy.complexity == ProcessingDepth.DOUBLE:
                result = await self._process_double_direction_sequence(
                    thought_data, context_prompt, routing_decision
                )
            elif routing_decision.strategy.complexity == ProcessingDepth.TRIPLE:
                result = await self._process_triple_direction_sequence(
                    thought_data, context_prompt, routing_decision
                )
            else:  # FULL
                result = await self._process_full_direction_sequence(
                    thought_data, context_prompt, routing_decision
                )
    
            processing_time = time.time() - start_time
    
            # Create final result
            final_result = MultiThinkingProcessingResult(
                content=result["final_content"],
                strategy_used=routing_decision.strategy.name,
                thinking_sequence=[
                    direction.value
                    for direction in routing_decision.strategy.thinking_sequence
                ],
                processing_time=processing_time,
                complexity_score=routing_decision.complexity_metrics.complexity_score,
                cost_reduction=routing_decision.estimated_cost_reduction,
                individual_results=result.get("individual_results", {}),
                step_name="multi_thinking_processing",
            )
    
            logger.info(
                "Multi-thinking processing completed - Time: %.3fs, Cost reduction: %.1f%%, Output: %d chars",
                processing_time,
                routing_decision.estimated_cost_reduction,
                len(final_result.content),
            )
    
            return final_result
Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/FradSer/mcp-server-mas-sequential-thinking'

If you have feedback or need assistance with the MCP directory API, please join our Discord server