"""
Base class for simple MCP tools.
Simple tools follow a straightforward pattern:
1. Receive request
2. Prepare prompt (with files, context, etc.)
3. Call AI model
4. Format and return response
They use the shared SchemaBuilder for consistent schema generation
and inherit all the conversation, file processing, and model handling
capabilities from BaseTool.
"""
from abc import abstractmethod
from typing import Any, Optional
from tools.shared.base_models import ToolRequest
from tools.shared.base_tool import BaseTool
from tools.shared.schema_builders import SchemaBuilder
from mcp.types import TextContent
from utils.client_info import get_current_session_fingerprint, get_cached_client_info, format_client_info
class SimpleTool(BaseTool):
"""
Base class for simple (non-workflow) tools.
Simple tools are request/response tools that don't require multi-step workflows.
They benefit from:
- Automatic schema generation using SchemaBuilder
- Inherited conversation handling and file processing
- Standardized model integration
- Consistent error handling and response formatting
To create a simple tool:
1. Inherit from SimpleTool
2. Implement get_tool_fields() to define tool-specific fields
3. Implement prepare_prompt() for prompt preparation
4. Optionally override format_response() for custom formatting
5. Optionally override get_required_fields() for custom requirements
Example:
class ChatTool(SimpleTool):
def get_name(self) -> str:
return "chat"
def get_tool_fields(self) -> Dict[str, Dict[str, Any]]:
return {
"prompt": {
"type": "string",
"description": "Your question or idea...",
},
"files": SimpleTool.FILES_FIELD,
}
def get_required_fields(self) -> List[str]:
return ["prompt"]
"""
# Common field definitions that simple tools can reuse
FILES_FIELD = SchemaBuilder.SIMPLE_FIELD_SCHEMAS["files"]
IMAGES_FIELD = SchemaBuilder.COMMON_FIELD_SCHEMAS["images"]
@abstractmethod
def get_tool_fields(self) -> dict[str, dict[str, Any]]:
"""
Return tool-specific field definitions.
This method should return a dictionary mapping field names to their
JSON schema definitions. Common fields (model, temperature, etc.)
are added automatically by the base class.
Returns:
Dict mapping field names to JSON schema objects
Example:
return {
"prompt": {
"type": "string",
"description": "The user's question or request",
},
"files": SimpleTool.FILES_FIELD, # Reuse common field
"max_tokens": {
"type": "integer",
"minimum": 1,
"description": "Maximum tokens for response",
}
}
"""
pass
def get_required_fields(self) -> list[str]:
"""
Return list of required field names.
Override this to specify which fields are required for your tool.
The model field is automatically added if in auto mode.
Returns:
List of required field names
"""
return []
def get_annotations(self) -> Optional[dict[str, Any]]:
"""
Return tool annotations. Simple tools are read-only by default.
All simple tools perform operations without modifying the environment.
They may call external AI models for analysis or conversation, but they
don't write files or make system changes.
Override this method if your simple tool needs different annotations.
Returns:
Dictionary with readOnlyHint set to True
"""
return {"readOnlyHint": True}
def format_response(self, response: str, request, model_info: Optional[dict] = None) -> str:
"""
Format the AI response before returning to the client.
This is a hook method that subclasses can override to customize
response formatting. The default implementation returns the response as-is.
Args:
response: The raw response from the AI model
request: The validated request object
model_info: Optional model information dictionary
Returns:
Formatted response string
"""
return response
def get_input_schema(self) -> dict[str, Any]:
"""
Generate the complete input schema using SchemaBuilder.
This method automatically combines:
- Tool-specific fields from get_tool_fields()
- Common fields (temperature, thinking_mode, etc.)
- Model field with proper auto-mode handling
- Required fields from get_required_fields()
Tools can override this method for custom schema generation while
still benefiting from SimpleTool's convenience methods.
Returns:
Complete JSON schema for the tool
"""
return SchemaBuilder.build_schema(
tool_specific_fields=self.get_tool_fields(),
required_fields=self.get_required_fields(),
model_field_schema=self.get_model_field_schema(),
auto_mode=self.is_effective_auto_mode(),
)
def get_request_model(self):
"""
Return the request model class.
Simple tools use the base ToolRequest by default.
Override this if your tool needs a custom request model.
"""
return ToolRequest
# Hook methods for safe attribute access without hasattr/getattr
def get_request_model_name(self, request) -> Optional[str]:
"""Get model name from request. Override for custom model name handling."""
try:
return request.model
except AttributeError:
return None
def get_request_images(self, request) -> list:
"""Get images from request. Override for custom image handling."""
try:
return request.images if request.images is not None else []
except AttributeError:
return []
def get_request_continuation_id(self, request) -> Optional[str]:
"""Get continuation_id from request. Override for custom continuation handling."""
try:
return request.continuation_id
except AttributeError:
return None
def get_request_prompt(self, request) -> str:
"""Get prompt from request. Override for custom prompt handling."""
try:
return request.prompt
except AttributeError:
return ""
def get_request_temperature(self, request) -> Optional[float]:
"""Get temperature from request. Override for custom temperature handling."""
try:
return request.temperature
except AttributeError:
return None
def get_validated_temperature(self, request, model_context: Any) -> tuple[float, list[str]]:
"""
Get temperature from request and validate it against model constraints.
This is a convenience method that combines temperature extraction and validation
for simple tools. It ensures temperature is within valid range for the model.
Args:
request: The request object containing temperature
model_context: Model context object containing model info
Returns:
Tuple of (validated_temperature, warning_messages)
"""
temperature = self.get_request_temperature(request)
if temperature is None:
temperature = self.get_default_temperature()
return self.validate_and_correct_temperature(temperature, model_context)
def get_request_thinking_mode(self, request) -> Optional[str]:
"""Get thinking_mode from request. Override for custom thinking mode handling."""
try:
return request.thinking_mode
except AttributeError:
return None
def get_request_files(self, request) -> list:
"""Get files from request. Override for custom file handling."""
try:
return request.files if request.files is not None else []
except AttributeError:
return []
def get_request_use_websearch(self, request) -> bool:
"""Get use_websearch from request, falling back to env default.
EX_WEBSEARCH_DEFAULT_ON controls default when request doesn't specify.
"""
try:
val = getattr(request, "use_websearch", None)
if val is not None:
return bool(val)
import os as __os
return __os.getenv("EX_WEBSEARCH_DEFAULT_ON", "true").strip().lower() == "true"
except AttributeError:
return True
def get_request_as_dict(self, request) -> dict:
"""Convert request to dictionary. Override for custom serialization."""
try:
# Try Pydantic v2 method first
return request.model_dump()
except AttributeError:
try:
# Fall back to Pydantic v1 method
return request.dict()
except AttributeError:
# Last resort - convert to dict manually
return {"prompt": self.get_request_prompt(request)}
def set_request_files(self, request, files: list) -> None:
"""Set files on request. Override for custom file setting."""
try:
request.files = files
except AttributeError:
# If request doesn't support file setting, ignore silently
pass
def get_actually_processed_files(self) -> list:
"""Get actually processed files. Override for custom file tracking."""
try:
return self._actually_processed_files
except AttributeError:
return []
async def execute(self, arguments: dict[str, Any]) -> list:
"""
Execute the simple tool using the comprehensive flow from old base.py.
This method replicates the proven execution pattern while using SimpleTool hooks.
"""
import json
import logging
from tools.models import ToolOutput
logger = logging.getLogger(f"tools.{self.get_name()}")
try:
# Store arguments for access by helper methods
self._current_arguments = arguments
logger.info(f"đź”§ {self.get_name()} tool called with arguments: {list(arguments.keys())}")
try:
from utils.progress import send_progress
send_progress(f"{self.get_name()}: Starting execution")
except Exception:
pass
# Validate request using the tool's Pydantic model
request_model = self.get_request_model()
request = request_model(**arguments)
logger.debug(f"Request validation successful for {self.get_name()}")
try:
from utils.progress import send_progress
send_progress(f"{self.get_name()}: Request validated")
except Exception:
pass
# Validate file paths for security
# This prevents path traversal attacks and ensures proper access control
path_error = self._validate_file_paths(request)
if path_error:
error_output = ToolOutput(
status="error",
content=path_error,
content_type="text",
)
return [TextContent(type="text", text=error_output.model_dump_json())]
# Handle model resolution like old base.py
model_name = self.get_request_model_name(request)
if not model_name:
from config import DEFAULT_MODEL
model_name = DEFAULT_MODEL
# Store the current model name for later use
self._current_model_name = model_name
# Handle model context from arguments (for in-process testing)
if "_model_context" in arguments:
self._model_context = arguments["_model_context"]
logger.debug(f"{self.get_name()}: Using model context from arguments")
else:
# Create model context if not provided
from utils.model_context import ModelContext
self._model_context = ModelContext(model_name)
logger.debug(f"{self.get_name()}: Created model context for {model_name}")
try:
from utils.progress import send_progress
send_progress(f"{self.get_name()}: Model/context ready: {model_name}")
except Exception:
pass
# Get images if present
images = self.get_request_images(request)
continuation_id = self.get_request_continuation_id(request)
# Handle conversation history and prompt preparation
if continuation_id:
# Check if conversation history is already embedded
field_value = self.get_request_prompt(request)
if "=== CONVERSATION HISTORY ===" in field_value:
# Use pre-embedded history
prompt = field_value
logger.debug(f"{self.get_name()}: Using pre-embedded conversation history")
else:
# No embedded history - reconstruct it (for in-process calls)
logger.debug(f"{self.get_name()}: No embedded history found, reconstructing conversation")
# Get thread context
from utils.conversation_memory import add_turn, build_conversation_history, get_thread
thread_context = get_thread(continuation_id)
if thread_context:
# Add user's new input to conversation
user_prompt = self.get_request_prompt(request)
user_files = self.get_request_files(request)
if user_prompt:
add_turn(continuation_id, "user", user_prompt, files=user_files)
# Get updated thread context after adding the turn
thread_context = get_thread(continuation_id)
logger.debug(
f"{self.get_name()}: Retrieved updated thread with {len(thread_context.turns)} turns"
)
# Build conversation history with updated thread context
conversation_history, conversation_tokens = build_conversation_history(
thread_context, self._model_context
)
# Get the base prompt from the tool
base_prompt = await self.prepare_prompt(request)
# Combine with conversation history
if conversation_history:
prompt = f"{conversation_history}\n\n=== NEW USER INPUT ===\n{base_prompt}"
else:
prompt = base_prompt
else:
# Thread not found, prepare normally
logger.warning(f"Thread {continuation_id} not found, preparing prompt normally")
prompt = await self.prepare_prompt(request)
else:
# New conversation, prepare prompt normally
prompt = await self.prepare_prompt(request)
# Add follow-up instructions for new conversations
from server import get_follow_up_instructions
follow_up_instructions = get_follow_up_instructions(0)
prompt = f"{prompt}\n\n{follow_up_instructions}"
logger.debug(
f"Added follow-up instructions for new {self.get_name()} conversation"
) # Validate images if any were provided
if images:
image_validation_error = self._validate_image_limits(
images, model_context=self._model_context, continuation_id=continuation_id
)
if image_validation_error:
return [TextContent(type="text", text=json.dumps(image_validation_error, ensure_ascii=False))]
# Get and validate temperature against model constraints
temperature, temp_warnings = self.get_validated_temperature(request, self._model_context)
# Log any temperature corrections
for warning in temp_warnings:
# Get thinking mode with defaults
logger.warning(warning)
thinking_mode = self.get_request_thinking_mode(request)
if thinking_mode is None:
thinking_mode = self.get_default_thinking_mode()
# Get the provider from model context (clean OOP - no re-fetching)
provider = self._model_context.provider
# Get system prompt for this tool
base_system_prompt = self.get_system_prompt()
language_instruction = self.get_language_instruction()
system_prompt = language_instruction + base_system_prompt
# Estimate tokens for logging
from utils.token_utils import estimate_tokens
estimated_tokens = estimate_tokens(prompt)
logger.debug(f"Prompt length: {len(prompt)} characters (~{estimated_tokens:,} tokens)")
try:
from utils.progress import send_progress
send_progress(f"{self.get_name()}: Generating response (~{estimated_tokens:,} tokens)")
except Exception:
pass
# Generate AI response with fallback (free-first → paid) when applicable
import os as _os
from src.providers.registry import ModelProviderRegistry as _Registry
selected_model = self._current_model_name
tool_call_metadata = [] # collected sanitized tool-call events for UI dropdown
def _call_with_model(_model_name: str):
import os as __os
nonlocal selected_model, provider
selected_model = _model_name
prov = _Registry.get_provider_for_model(_model_name)
if not prov:
raise RuntimeError(f"No provider available for model '{_model_name}'")
provider = prov # update for downstream logging/usage
# Provider-native web browsing via capability layer
provider_kwargs = {}
try:
from src.providers.capabilities import get_capabilities_for_provider
from utils.tool_events import ToolCallEvent, ToolEventSink
use_web = self.get_request_use_websearch(request)
caps = get_capabilities_for_provider(prov.get_provider_type())
ws = caps.get_websearch_tool_schema({"use_websearch": use_web})
web_event = None
if ws.tools:
provider_kwargs["tools"] = ws.tools
if ws.tool_choice is not None:
provider_kwargs["tool_choice"] = ws.tool_choice
# Begin web tool event timing (best-effort)
web_event = ToolCallEvent(
provider=prov.get_provider_type().value,
tool_name="web_search",
args={}
)
try:
tool_call_metadata.append({
"provider": web_event.provider,
"tool_name": web_event.tool_name,
"args": web_event.args,
"start_ts": web_event.start_ts,
})
except Exception:
pass
except Exception:
web_event = None
result = prov.generate_content(
prompt=prompt,
model_name=_model_name,
system_prompt=system_prompt,
temperature=temperature,
thinking_mode=thinking_mode if prov.supports_thinking_mode(_model_name) else None,
images=images if images else None,
**provider_kwargs,
)
return result
use_fallback = (self._current_model_name.lower() == "auto") or (_os.getenv("FALLBACK_ON_FAILURE", "true").lower() == "true")
if use_fallback:
tool_category = self.get_model_category()
logger.info(f"Using fallback chain for category {tool_category.name}")
# Derive simple hints for context-aware routing
hints = []
# Content-derived hints
try:
u = (self.get_request_prompt(request) or "").lower()
if any(k in u for k in ("image", "diagram", "vision")):
hints.append("vision")
if any(k in u for k in ("think", "reason", "chain of thought", "deep")):
hints.append("deep_reasoning")
except Exception:
pass
# Context-size hint (Option 2: dispatch metadata)
try:
if estimated_tokens and int(estimated_tokens) > 48000:
hints.append("long_context")
hints.append(f"estimated_tokens:{int(estimated_tokens)}")
except Exception:
pass
# Raw user-prompt size hint (pre-shaping): if the original user prompt itself is huge
try:
user_prompt_raw = self.get_request_prompt(request) or ""
raw_tokens = estimate_tokens(user_prompt_raw)
if int(raw_tokens) > 48000:
hints.append("long_context")
hints.append(f"estimated_tokens:{int(raw_tokens)}")
except Exception:
pass
model_response = _Registry.call_with_fallback(tool_category, _call_with_model, hints=hints)
# Sync the model context and current name to the selected model
self._current_model_name = selected_model
self._model_context.model_name = selected_model
# End web tool timing if any
try:
if 'web_event' in locals() and web_event is not None:
web_event.end(ok=True)
ToolEventSink().record(web_event)
except Exception:
pass
else:
# Direct call without fallback
logger.info(f"Sending request to {provider.get_provider_type().value} API for {self.get_name()}")
logger.info(
f"Using model: {self._model_context.model_name} via {provider.get_provider_type().value} provider"
)
# Provider-native web browsing via capability layer
provider_kwargs = {}
try:
from src.providers.capabilities import get_capabilities_for_provider
use_web = self.get_request_use_websearch(request)
caps = get_capabilities_for_provider(provider.get_provider_type())
ws = caps.get_websearch_tool_schema({"use_websearch": use_web})
if ws.tools:
provider_kwargs["tools"] = ws.tools
if ws.tool_choice is not None:
provider_kwargs["tool_choice"] = ws.tool_choice
except Exception:
pass
model_response = provider.generate_content(
prompt=prompt,
model_name=self._current_model_name,
system_prompt=system_prompt,
temperature=temperature,
thinking_mode=thinking_mode if provider.supports_thinking_mode(self._current_model_name) else None,
images=images if images else None,
**provider_kwargs,
)
# End web tool timing if any
try:
if 'web_event' in locals() and web_event is not None:
web_event.end(ok=True)
ToolEventSink().record(web_event)
except Exception:
pass
logger.info(f"Received response from {provider.get_provider_type().value} API for {self.get_name()}")
# Defensive: handle missing provider response (None)
if model_response is None:
logger.error(f"Model call returned None in {self.get_name()} - treating as error")
tool_output = ToolOutput(
status="error",
content="Model returned no response (None). Please retry or switch model; check provider logs for details.",
content_type="text",
)
return [TextContent(type="text", text=tool_output.model_dump_json())]
# Process the model's response
if getattr(model_response, "content", None):
raw_text = model_response.content
# Create model info for conversation tracking
model_info = {
"provider": provider,
"model_name": self._current_model_name,
"model_response": model_response,
"tool_calls": tool_call_metadata if 'tool_call_metadata' in locals() else [],
"effective_prompt": prompt,
}
# Parse response using the same logic as old base.py
tool_output = self._parse_response(raw_text, request, model_info)
logger.info(f"âś… {self.get_name()} tool completed successfully")
else:
# Handle cases where the model couldn't generate a response
finish_reason = model_response.metadata.get("finish_reason", "Unknown")
logger.warning(f"Response blocked or incomplete for {self.get_name()}. Finish reason: {finish_reason}")
tool_output = ToolOutput(
status="error",
content=f"Response blocked or incomplete. Finish reason: {finish_reason}",
content_type="text",
)
# Return the tool output as TextContent
# Attach tool_call_events metadata for UI dropdown if available
if isinstance(tool_output.metadata, dict):
try:
if 'tool_call_metadata' in locals() and tool_call_metadata:
tool_output.metadata.setdefault('tool_call_events', tool_call_metadata)
# Ensure dropdown visibility even when no provider-native tools were used
else:
import os as ___os, time as ___time
if ___os.getenv("EX_ALWAYS_TOOLCALL_METADATA", "false").strip().lower() == "true":
# Synthesize a minimal event so UIs show a dropdown
use_web_flag = False
try:
use_web_flag = self.get_request_use_websearch(request)
except Exception:
pass
synthetic = [{
"provider": getattr(provider.get_provider_type(), "value", str(provider)) if provider else "unknown",
"tool_name": self.get_name(),
"args": {
"model": self._current_model_name,
"use_websearch": bool(use_web_flag),
"thinking_mode": thinking_mode,
},
"start_ts": ___time.time(),
}]
tool_output.metadata.setdefault('tool_call_events', synthetic)
except Exception:
pass
return [TextContent(type="text", text=tool_output.model_dump_json())]
except Exception as e:
# Special handling for MCP size check errors
if str(e).startswith("MCP_SIZE_CHECK:"):
# Extract the JSON content after the prefix
json_content = str(e)[len("MCP_SIZE_CHECK:") :]
return [TextContent(type="text", text=json_content)]
logger.error(f"Error in {self.get_name()}: {str(e)}")
error_output = ToolOutput(
status="error",
content=f"Error in {self.get_name()}: {str(e)}",
content_type="text",
)
return [TextContent(type="text", text=error_output.model_dump_json())]
def _parse_response(self, raw_text: str, request, model_info: Optional[dict] = None):
"""
Parse the raw response and format it using the hook method.
This simplified version focuses on the SimpleTool pattern: format the response
using the format_response hook, then handle conversation continuation.
"""
from tools.models import ToolOutput
# Format the response using the hook method
formatted_response = self.format_response(raw_text, request, model_info)
# Handle conversation continuation like old base.py
continuation_id = self.get_request_continuation_id(request)
if continuation_id:
# Add turn to conversation memory
from utils.conversation_memory import add_turn
# Extract model metadata for conversation tracking
model_provider = None
model_name = None
model_metadata = None
if model_info:
provider = model_info.get("provider")
if provider:
# Handle both provider objects and string values
if isinstance(provider, str):
model_provider = provider
else:
try:
model_provider = provider.get_provider_type().value
except AttributeError:
# Fallback if provider doesn't have get_provider_type method
model_provider = str(provider)
model_name = model_info.get("model_name")
model_response = model_info.get("model_response")
if model_response:
model_metadata = {"usage": model_response.usage, "metadata": model_response.metadata}
# Only add the assistant's response to the conversation
# The user's turn is handled elsewhere (when thread is created/continued)
add_turn(
continuation_id, # thread_id as positional argument
"assistant", # role as positional argument
raw_text, # content as positional argument
files=self.get_request_files(request),
images=self.get_request_images(request),
tool_name=self.get_name(),
model_provider=model_provider,
model_name=model_name,
model_metadata=model_metadata,
)
# Create continuation offer like old base.py
continuation_data = self._create_continuation_offer(request, model_info)
if continuation_data:
return self._create_continuation_offer_response(formatted_response, continuation_data, request, model_info)
else:
# Build metadata with model and provider info for success response
metadata = {}
if model_info:
model_name = model_info.get("model_name")
if model_name:
metadata["model_used"] = model_name
provider = model_info.get("provider")
if provider:
# Handle both provider objects and string values
if isinstance(provider, str):
metadata["provider_used"] = provider
else:
try:
metadata["provider_used"] = provider.get_provider_type().value
except AttributeError:
# Fallback if provider doesn't have get_provider_type method
metadata["provider_used"] = str(provider)
return ToolOutput(
status="success",
content=formatted_response,
content_type="text",
metadata=metadata if metadata else None,
)
def _create_continuation_offer(self, request, model_info: Optional[dict] = None):
"""Create continuation offer following old base.py pattern"""
continuation_id = self.get_request_continuation_id(request)
try:
from utils.conversation_memory import create_thread, get_thread
if continuation_id:
# Existing conversation
thread_context = get_thread(continuation_id)
if thread_context and thread_context.turns:
turn_count = len(thread_context.turns)
from utils.conversation_memory import MAX_CONVERSATION_TURNS
if turn_count >= MAX_CONVERSATION_TURNS - 1:
return None # No more turns allowed
remaining_turns = MAX_CONVERSATION_TURNS - turn_count - 1
return {
"continuation_id": continuation_id,
"remaining_turns": remaining_turns,
"note": f"Claude can continue this conversation for {remaining_turns} more exchanges.",
}
else:
# New conversation - create thread and offer continuation
# Convert request to dict for initial_context
initial_request_dict = self.get_request_as_dict(request)
# Compute session fingerprint and friendly client name (for scoping and UX)
try:
current_args = getattr(self, "_current_arguments", {})
sess_fp = get_current_session_fingerprint(current_args)
ci = get_cached_client_info()
friendly = format_client_info(ci) if ci else None
except Exception:
sess_fp, friendly = None, None
new_thread_id = create_thread(
tool_name=self.get_name(),
initial_request=initial_request_dict,
session_fingerprint=sess_fp,
client_friendly_name=friendly,
)
# Add the initial user turn to the new thread
from utils.conversation_memory import MAX_CONVERSATION_TURNS, add_turn
user_prompt = self.get_request_prompt(request)
user_files = self.get_request_files(request)
user_images = self.get_request_images(request)
# Add user's initial turn
add_turn(
new_thread_id, "user", user_prompt, files=user_files, images=user_images, tool_name=self.get_name()
)
note_client = friendly or "Claude"
return {
"continuation_id": new_thread_id,
"remaining_turns": MAX_CONVERSATION_TURNS - 1,
"note": f"{note_client} can continue this conversation for {MAX_CONVERSATION_TURNS - 1} more exchanges.",
}
except Exception:
return None
def _create_continuation_offer_response(
self, content: str, continuation_data: dict, request, model_info: Optional[dict] = None
):
"""Create response with continuation offer following old base.py pattern"""
from tools.models import ContinuationOffer, ToolOutput
try:
continuation_offer = ContinuationOffer(
continuation_id=continuation_data["continuation_id"],
note=continuation_data["note"],
remaining_turns=continuation_data["remaining_turns"],
)
# Build metadata with model and provider info
metadata = {"tool_name": self.get_name(), "conversation_ready": True}
if model_info:
model_name = model_info.get("model_name")
if model_name:
metadata["model_used"] = model_name
provider = model_info.get("provider")
if provider:
# Handle both provider objects and string values
if isinstance(provider, str):
metadata["provider_used"] = provider
else:
try:
metadata["provider_used"] = provider.get_provider_type().value
except AttributeError:
# Fallback if provider doesn't have get_provider_type method
metadata["provider_used"] = str(provider)
return ToolOutput(
status="continuation_available",
content=content,
content_type="text",
continuation_offer=continuation_offer,
metadata=metadata,
)
except Exception:
# Fallback to simple success if continuation offer fails
return ToolOutput(status="success", content=content, content_type="text")
# Convenience methods for common tool patterns
def build_standard_prompt(
self, system_prompt: str, user_content: str, request, file_context_title: str = "CONTEXT FILES"
) -> str:
"""
Build a standard prompt with system prompt, user content, and optional files.
This is a convenience method that handles the common pattern of:
1. Adding file content if present
2. Checking token limits
3. Adding web search instructions
4. Combining everything into a well-formatted prompt
Args:
system_prompt: The system prompt for the tool
user_content: The main user request/content
request: The validated request object
file_context_title: Title for the file context section
Returns:
Complete formatted prompt ready for the AI model
"""
# Add context files if provided
files = self.get_request_files(request)
if files:
file_content, processed_files = self._prepare_file_content_for_prompt(
files,
self.get_request_continuation_id(request),
"Context files",
model_context=getattr(self, "_model_context", None),
)
self._actually_processed_files = processed_files
if file_content:
user_content = f"{user_content}\n\n=== {file_context_title} ===\n{file_content}\n=== END CONTEXT ===="
# Check token limits
self._validate_token_limit(user_content, "Content")
# Add web search instruction if enabled
websearch_instruction = ""
use_websearch = self.get_request_use_websearch(request)
if use_websearch:
websearch_instruction = self.get_websearch_instruction(use_websearch, self.get_websearch_guidance())
# Combine system prompt with user content
full_prompt = f"""{system_prompt}{websearch_instruction}
=== USER REQUEST ===
{user_content}
=== END REQUEST ===
Please provide a thoughtful, comprehensive response:"""
return full_prompt
def get_prompt_content_for_size_validation(self, user_content: str) -> str:
"""
Override to use original user prompt for size validation when conversation history is embedded.
When server.py embeds conversation history into the prompt field, it also stores
the original user prompt in _original_user_prompt. We use that for size validation
to avoid incorrectly triggering size limits due to conversation history.
Args:
user_content: The user content (may include conversation history)
Returns:
The original user prompt if available, otherwise the full user content
"""
# Check if we have the current arguments from execute() method
current_args = getattr(self, "_current_arguments", None)
if current_args:
# If server.py embedded conversation history, it stores original prompt separately
original_user_prompt = current_args.get("_original_user_prompt")
if original_user_prompt is not None:
# Use original user prompt for size validation (excludes conversation history)
return original_user_prompt
# Fallback to default behavior (validate full user content)
return user_content
def get_websearch_guidance(self) -> Optional[str]:
"""
Return tool-specific web search guidance.
Override this to provide tool-specific guidance for when web searches
would be helpful. Return None to use the default guidance.
Returns:
Tool-specific web search guidance or None for default
"""
return None
def handle_prompt_file_with_fallback(self, request) -> str:
"""
Handle prompt.txt files with fallback to request field.
This is a convenience method for tools that accept prompts either
as a field or as a prompt.txt file. It handles the extraction
and validation automatically.
Args:
request: The validated request object
Returns:
The effective prompt content
Raises:
ValueError: If prompt is too large for MCP transport
"""
# Check for prompt.txt in files
files = self.get_request_files(request)
if files:
prompt_content, updated_files = self.handle_prompt_file(files)
# Update request files list if needed
if updated_files is not None:
self.set_request_files(request, updated_files)
else:
prompt_content = None
# Use prompt.txt content if available, otherwise use the prompt field
user_content = prompt_content if prompt_content else self.get_request_prompt(request)
# Check user input size at MCP transport boundary (excluding conversation history)
validation_content = self.get_prompt_content_for_size_validation(user_content)
size_check = self.check_prompt_size(validation_content)
if size_check:
from tools.models import ToolOutput
raise ValueError(f"MCP_SIZE_CHECK:{ToolOutput(**size_check).model_dump_json()}")
return user_content
def get_chat_style_websearch_guidance(self) -> str:
"""
Get Chat tool-style web search guidance.
Returns web search guidance that matches the original Chat tool pattern.
This is useful for tools that want to maintain the same search behavior.
Returns:
Web search guidance text
"""
return """When discussing topics, consider if searches for these would help:
- Documentation for any technologies or concepts mentioned
- Current best practices and patterns
- Recent developments or updates
- Community discussions and solutions"""
def supports_custom_request_model(self) -> bool:
"""
Indicate whether this tool supports custom request models.
Simple tools support custom request models by default. Tools that override
get_request_model() to return something other than ToolRequest should
return True here.
Returns:
True if the tool uses a custom request model
"""
return self.get_request_model() != ToolRequest
def _validate_file_paths(self, request) -> Optional[str]:
"""
Validate that all file paths in the request are absolute paths.
This is a security measure to prevent path traversal attacks and ensure
proper access control. All file paths must be absolute (starting with '/').
Args:
request: The validated request object
Returns:
Optional[str]: Error message if validation fails, None if all paths are valid
"""
import os
from pathlib import Path
from utils.file_utils import resolve_and_validate_path
# Check if request has 'files' attribute (used by most tools)
files = self.get_request_files(request)
if files:
# Compute repository root (two levels up from tools/simple/base.py -> repo root)
repo_root = Path(__file__).resolve().parents[2]
for file_path in files:
# 1) Absolute path requirement
if not os.path.isabs(file_path):
return (
f"Error: All file paths must be FULL absolute paths to real files / folders - DO NOT SHORTEN. "
f"Received relative path: {file_path}\n"
f"Please provide the full absolute path starting with '/' (must be FULL absolute paths to real files / folders - DO NOT SHORTEN)"
)
# 2) Secure resolution + containment check within repo root
try:
resolved = resolve_and_validate_path(file_path)
except (ValueError, PermissionError) as e:
return (
f"Error: Invalid or disallowed path: {file_path}\n"
f"Reason: {type(e).__name__}: {e}"
)
try:
# Ensure the path is within the repository root
resolved.relative_to(repo_root)
except Exception:
return (
f"Error: File path is outside the permitted project workspace.\n"
f"Path: {file_path}\n"
f"Allowed root: {repo_root}"
)
return None
def prepare_chat_style_prompt(self, request, system_prompt: str = None) -> str:
"""
Prepare a prompt using Chat tool-style patterns.
This convenience method replicates the Chat tool's prompt preparation logic:
1. Handle prompt.txt file if present
2. Add file context with specific formatting
3. Add web search guidance
4. Format with system prompt
Args:
request: The validated request object
system_prompt: System prompt to use (uses get_system_prompt() if None)
Returns:
Complete formatted prompt
"""
# Use provided system prompt or get from tool
if system_prompt is None:
system_prompt = self.get_system_prompt()
# Get user content (handles prompt.txt files)
user_content = self.handle_prompt_file_with_fallback(request)
# Build standard prompt with Chat-style web search guidance
websearch_guidance = self.get_chat_style_websearch_guidance()
# Override the websearch guidance temporarily
original_guidance = self.get_websearch_guidance
self.get_websearch_guidance = lambda: websearch_guidance
try:
full_prompt = self.build_standard_prompt(system_prompt, user_content, request, "CONTEXT FILES")
finally:
# Restore original guidance method
self.get_websearch_guidance = original_guidance
return full_prompt