"""
Example custom tools for Berry MCP Server
"""
import json
from berry_mcp.tools.decorators import tool
@tool(description="Calculate the sum of a list of numbers")
def sum_numbers(numbers: list[float]) -> float:
"""Calculate the sum of a list of numbers"""
return sum(numbers)
@tool(description="Convert text to uppercase or lowercase")
def transform_text(text: str, to_upper: bool = True) -> str:
"""
Transform text case
Args:
text: The text to transform
to_upper: If True, convert to uppercase; if False, convert to lowercase
Returns:
The transformed text
"""
return text.upper() if to_upper else text.lower()
@tool(description="Count words in a text")
def count_words(text: str, include_stats: bool = False) -> int | dict[str, int]:
"""
Count words in a text
Args:
text: The text to analyze
include_stats: If True, return detailed statistics
Returns:
Word count as integer, or statistics dictionary if include_stats is True
"""
words = text.split()
word_count = len(words)
if include_stats:
chars = len(text)
chars_no_spaces = len(text.replace(" ", ""))
lines = len(text.split("\n"))
return {
"words": word_count,
"characters": chars,
"characters_no_spaces": chars_no_spaces,
"lines": lines,
}
return word_count
@tool(description="Generate a simple report from data")
def generate_report(
title: str, data: dict[str, str | int | float], format_type: str = "text"
) -> str:
"""
Generate a simple report from data
Args:
title: Report title
data: Dictionary of data to include
format_type: Output format ('text' or 'json')
Returns:
Formatted report as string
"""
if format_type.lower() == "json":
report = {"title": title, "data": data, "generated_by": "Berry MCP Server"}
return json.dumps(report, indent=2)
# Text format
lines = [
f"# {title}",
"=" * len(title),
"",
"## Data:",
]
for key, value in data.items():
lines.append(f"- {key}: {value}")
lines.extend(["", "---", "Generated by Berry MCP Server"])
return "\n".join(lines)
@tool(description="Validate and format JSON data")
def validate_json(json_string: str, pretty_print: bool = True) -> str | dict[str, str]:
"""
Validate and optionally format JSON data
Args:
json_string: JSON string to validate
pretty_print: If True, return formatted JSON
Returns:
Formatted JSON string or error dictionary
"""
try:
data = json.loads(json_string)
if pretty_print:
return json.dumps(data, indent=2, ensure_ascii=False)
else:
return json.dumps(data)
except json.JSONDecodeError as e:
return {"error": f"Invalid JSON: {str(e)}"}
@tool(description="Find and replace text with optional case sensitivity")
def find_replace_text(
text: str,
find: str,
replace: str,
case_sensitive: bool = True,
max_replacements: int | None = None,
) -> dict[str, str | int]:
"""
Find and replace text with options
Args:
text: Source text
find: Text to find
replace: Replacement text
case_sensitive: Whether search is case sensitive
max_replacements: Maximum number of replacements (None for all)
Returns:
Dictionary with result text and replacement count
"""
original_text = text
if not case_sensitive:
# For case-insensitive search, we need to handle this manually
# since str.replace() doesn't have a case-insensitive option
import re
pattern = re.escape(find)
if max_replacements:
result_text = re.sub(
pattern, replace, text, count=max_replacements, flags=re.IGNORECASE
)
else:
result_text = re.sub(pattern, replace, text, flags=re.IGNORECASE)
# Count replacements by comparing
count = len(re.findall(pattern, original_text, re.IGNORECASE))
if max_replacements:
count = min(count, max_replacements)
else:
if max_replacements:
result_text = text.replace(find, replace, max_replacements)
else:
result_text = text.replace(find, replace)
count = original_text.count(find)
if max_replacements:
count = min(count, max_replacements)
return {
"result": result_text,
"replacements_made": count,
"original_length": len(original_text),
"new_length": len(result_text),
}
# Async tool example
@tool(description="Simulate async processing with delay")
async def async_process_text(text: str, delay_seconds: float = 1.0) -> str:
"""
Simulate async text processing with delay
Args:
text: Text to process
delay_seconds: Delay in seconds
Returns:
Processed text with metadata
"""
import asyncio
# Simulate async work
await asyncio.sleep(delay_seconds)
# Simple processing
processed = text.strip().upper()
return f"ASYNC PROCESSED: {processed} (delayed {delay_seconds}s)"
if __name__ == "__main__":
# Example usage
print("Example custom tools for Berry MCP Server")
print("\nAvailable tools:")
print("- sum_numbers: Calculate sum of numbers")
print("- transform_text: Convert text case")
print("- count_words: Count words and characters")
print("- generate_report: Create formatted reports")
print("- validate_json: Validate and format JSON")
print("- find_replace_text: Find and replace with options")
print("- async_process_text: Async text processing example")