Crawlab MCP Server
Official
by crawlab-team
- crawlab_mcp
- agents
import json
import logging
import time
from typing import Any, Dict, List, Tuple
class TaskPlanner:
"""
Responsible for planning and executing a sequence of tasks to fulfill complex user queries.
Uses Chain of Thought and ReAct-style planning to break down complex queries into steps.
"""
def __init__(self, llm_provider, tools, session):
self.llm_provider = llm_provider
self.tools = tools
self.session = session
self.logger = logging.getLogger("mcp.planner")
self.logger.setLevel(logging.DEBUG)
async def create_plan(self, query: str) -> Dict[str, Any]:
"""
Create a plan to fulfill the user query by breaking it down into steps.
Returns a structured plan with steps, required tools, and reasoning.
"""
self.logger.info("Creating execution plan for query")
system_message = {
"role": "system",
"content": """You are a sophisticated planning agent for the Crawlab API.
Given a user query, your task is to:
1. Break down the query into logical steps that need to be executed sequentially
2. For each step, identify the specific tools needed from the available API
3. Provide a clear reasoning for each step
4. Return a structured JSON plan
Available API tools: {tool_names}
The plan should have this structure:
{{
"thought": "Your detailed analysis of the query and overall approach",
"steps": [
{{
"description": "Clear description of this step",
"reasoning": "Why this step is necessary",
"tools": ["tool1", "tool2"] or [] if no tools needed,
"requires_previous_step_result": true/false
}},
...more steps...
]
}}
Always use a step-by-step approach, especially for complex queries that require multiple API calls or data processing steps.""",
}
# Format the system message with available tools
tool_names = [tool.name for tool in self.tools]
system_message["content"] = system_message["content"].format(
tool_names=", ".join(tool_names)
)
user_message = {"role": "user", "content": query}
# Call the LLM to create a plan
try:
response = await self.llm_provider.chat_completion(
messages=[system_message, user_message],
temperature=0.2, # Low but not zero to allow some creativity
)
plan_text = response["choices"][0]["message"]["content"].strip()
# Extract the JSON plan from the response
# First try to parse the entire response as JSON
try:
plan = json.loads(plan_text)
except json.JSONDecodeError:
# If that fails, try to extract JSON from markdown code blocks
import re
json_match = re.search(r"```(?:json)?\s*([\s\S]*?)\s*```", plan_text)
if json_match:
try:
plan = json.loads(json_match.group(1))
except json.JSONDecodeError:
raise ValueError("Could not parse plan JSON from LLM response")
else:
raise ValueError("Could not extract JSON plan from LLM response")
self.logger.info(f"Created plan with {len(plan.get('steps', []))} steps")
self.logger.debug(f"Plan: {json.dumps(plan, indent=2)}")
return plan
except Exception as e:
self.logger.error(f"Error creating plan: {str(e)}", exc_info=True)
# Return a minimal default plan on error
return {
"thought": f"Error creating plan: {str(e)}",
"steps": [
{
"description": "Process query directly without planning",
"reasoning": "Error occurred during planning",
"tools": [],
"requires_previous_step_result": False,
}
],
}
async def execute_plan(self, query: str, plan: Dict[str, Any]) -> str:
"""
Execute the plan by processing each step sequentially.
Returns the final response that addresses the user's query.
"""
self.logger.info(f"Executing plan with {len(plan.get('steps', []))} steps")
# Initialize variables to track execution
steps = plan.get("steps", [])
messages = [{"role": "user", "content": query}]
final_text = []
step_results = [] # Store results from each step for potential use in subsequent steps
# Add the plan thought as context
if "thought" in plan:
messages.append({"role": "assistant", "content": f"Planning: {plan['thought']}"})
# Execute each step in the plan
for i, step in enumerate(steps):
step_num = i + 1
self.logger.info(f"Executing step {step_num}/{len(steps)}: {step['description']}")
# Prepare context for this step
step_context = f"Step {step_num}: {step['description']}\nReasoning: {step['reasoning']}"
# Add the step context to messages
messages.append({"role": "assistant", "content": step_context})
# Add previous step results to context if required
if step.get("requires_previous_step_result", False) and step_results:
prev_results = "\n\n".join(
[f"Result from step {j + 1}: {result}" for j, result in enumerate(step_results)]
)
messages.append(
{"role": "assistant", "content": f"Previous results:\n{prev_results}"}
)
# Prepare tools for this step
step_tools = step.get("tools", [])
if step_tools:
available_tools = [
{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.inputSchema,
},
}
for tool in self.tools
if tool.name in step_tools
]
tool_choice = "auto"
else:
available_tools = None
tool_choice = "none"
# Call the LLM for this step
step_start_time = time.time()
try:
response = await self.llm_provider.chat_completion(
messages=messages,
tools=available_tools,
tool_choice=tool_choice,
)
response_message = response["choices"][0]["message"]
content = response_message.get("content") or ""
# Process tool calls if present
tool_results = []
if response_message.get("tool_calls"):
# Process tool calls using a helper method
step_text, tool_outputs = await self._process_tool_calls(
response_message, messages.copy()
)
final_text.append(f"[Step {step_num}] {step['description']}")
final_text.extend(step_text)
# Combine all tool outputs for this step
step_result = "\n".join(tool_outputs)
else:
# No tool calls, just use the content
final_text.append(f"[Step {step_num}] {step['description']}")
final_text.append(content)
step_result = content
# Store the result of this step
step_results.append(step_result)
# Update messages with the step's outcome
messages.append({"role": "assistant", "content": step_result})
step_time = time.time() - step_start_time
self.logger.info(f"Step {step_num} completed in {step_time:.2f} seconds")
except Exception as e:
error_msg = f"Error executing step {step_num}: {str(e)}"
self.logger.error(error_msg, exc_info=True)
final_text.append(f"[Step {step_num} Error] {error_msg}")
messages.append(
{"role": "assistant", "content": f"Error in step {step_num}: {str(e)}"}
)
# Generate final summary response
self.logger.info("Generating final response")
summary_message = {
"role": "system",
"content": """You are the final summarizer. Based on all the steps executed and their results,
provide a comprehensive and cohesive answer to the user's original query.
Focus on clarity and completeness.""",
}
try:
final_response = await self.llm_provider.chat_completion(
messages=messages + [summary_message],
temperature=0.3, # Slightly higher temp for more natural summary
)
summary = final_response["choices"][0]["message"]["content"]
final_text.append("\n--- Summary ---")
final_text.append(summary)
except Exception as e:
error_msg = f"Error generating summary: {str(e)}"
self.logger.error(error_msg, exc_info=True)
final_text.append(f"[Summary Error] {error_msg}")
return "\n\n".join(final_text)
async def _process_tool_calls(
self, response_message: Dict[str, Any], current_messages: List[Dict[str, Any]]
) -> Tuple[List[str], List[str]]:
"""
Process tool calls from the LLM response and return the results.
Returns a tuple of (text_for_user, tool_outputs)
"""
tool_calls = response_message["tool_calls"]
text_parts = []
tool_outputs = []
for tool_call in tool_calls:
try:
function_name = tool_call["function"]["name"]
function_args = json.loads(tool_call["function"]["arguments"])
self.logger.info(f"Calling tool: {function_name}")
self.logger.debug(f"Tool arguments: {json.dumps(function_args)}")
# Execute the tool call
result = await self.session.call_tool(function_name, function_args)
text_parts.append(f"[Tool Call] {function_name}({json.dumps(function_args)})")
text_parts.append(f"[Tool Result] {result.content}")
tool_outputs.append(f"{function_name} result: {result.content}")
# Add tool call and result to messages
current_messages.append(
{
"role": "assistant",
"content": response_message.get("content"),
"tool_calls": [tool_call], # Just this tool call
}
)
current_messages.append(
{
"role": "tool",
"tool_call_id": tool_call["id"],
"content": result.content,
}
)
# Get interpretative response from LLM
interpret_response = await self.llm_provider.chat_completion(
messages=current_messages
)
interpretation = interpret_response["choices"][0]["message"].get("content", "")
if interpretation:
text_parts.append(f"[Interpretation] {interpretation}")
tool_outputs.append(f"Interpretation: {interpretation}")
except Exception as e:
error_msg = f"Error with tool {tool_call.get('function', {}).get('name', 'unknown')}: {str(e)}"
self.logger.error(error_msg, exc_info=True)
text_parts.append(f"[Tool Error] {error_msg}")
tool_outputs.append(f"Error: {error_msg}")
return text_parts, tool_outputs