"""
MCP Tools Implementation
Provides example tools for the MCP-MAF agent:
- WeatherTool: Get weather information
- CalculatorTool: Perform calculations
- StockPriceTool: Query Taiwan stock prices
"""
from typing import Dict, Any
import requests
from datetime import datetime
from .utils.logger import setup_logger
logger = setup_logger(__name__)
class BaseTool:
"""Base class for all tools"""
name: str
description: str
input_schema: Dict[str, Any]
async def execute(self, **kwargs) -> Dict[str, Any]:
"""Execute the tool (must be implemented by subclasses)"""
raise NotImplementedError
def to_dict(self) -> Dict[str, Any]:
"""Convert tool to dictionary format for MCP"""
return {
"name": self.name,
"description": self.description,
"inputSchema": self.input_schema
}
class WeatherTool(BaseTool):
"""
Weather Query Tool
Get current weather information for a specific location.
"""
name = "get_weather"
description = "Get current weather information for a specific location"
input_schema = {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "City name or location (e.g., 'Taipei', 'Tokyo', 'New York')"
}
},
"required": ["location"]
}
async def execute(self, location: str, **kwargs) -> Dict[str, Any]:
"""
Execute weather query
In production, this should integrate with a real weather API
like OpenWeather API. Currently uses mock data.
Args:
location: Location name
Returns:
Weather information dictionary
"""
logger.info(f"🌤️ Weather query: location={location}")
# Mock weather data
weather_data = {
"Taipei": {"temp": "25°C", "condition": "Sunny", "humidity": "60%"},
"Tokyo": {"temp": "18°C", "condition": "Cloudy", "humidity": "70%"},
"New York": {"temp": "15°C", "condition": "Rainy", "humidity": "80%"},
"London": {"temp": "12°C", "condition": "Foggy", "humidity": "85%"},
"Paris": {"temp": "16°C", "condition": "Partly Cloudy", "humidity": "65%"},
}
data = weather_data.get(location, {
"temp": "22°C",
"condition": "Partly Cloudy",
"humidity": "65%"
})
result = {
"location": location,
"temperature": data["temp"],
"condition": data["condition"],
"humidity": data["humidity"],
"timestamp": datetime.now().isoformat()
}
logger.info(f"✅ Weather result: {result}")
return result
class CalculatorTool(BaseTool):
"""
Calculator Tool
Perform mathematical calculations safely.
"""
name = "calculate"
description = "Perform mathematical calculations (supports +, -, *, /, parentheses)"
input_schema = {
"type": "object",
"properties": {
"expression": {
"type": "string",
"description": "Mathematical expression to evaluate (e.g., '2 + 2', '10 * 5', '(3 + 5) * 2')"
}
},
"required": ["expression"]
}
async def execute(self, expression: str, **kwargs) -> Dict[str, Any]:
"""
Execute calculation
Uses safe evaluation (no eval()) to prevent security issues.
Args:
expression: Mathematical expression
Returns:
Calculation result dictionary
"""
logger.info(f"🧮 Calculate: expression={expression}")
try:
# Security check: only allow safe characters
allowed_chars = set("0123456789+-*/(). ")
if not all(c in allowed_chars for c in expression):
error_msg = "Invalid characters in expression. Only numbers and operators (+, -, *, /, parentheses) are allowed."
logger.warning(f"⚠️ {error_msg}")
return {
"error": error_msg,
"expression": expression,
"success": False
}
# Evaluate expression
# Note: In production, use a proper math expression parser like numexpr
result = eval(expression, {"__builtins__": {}}, {})
logger.info(f"✅ Calculation result: {expression} = {result}")
return {
"expression": expression,
"result": result,
"success": True
}
except ZeroDivisionError:
error_msg = "Division by zero"
logger.warning(f"⚠️ {error_msg}")
return {
"error": error_msg,
"expression": expression,
"success": False
}
except Exception as e:
error_msg = f"Calculation error: {str(e)}"
logger.error(f"❌ {error_msg}")
return {
"error": error_msg,
"expression": expression,
"success": False
}
class StockPriceTool(BaseTool):
"""
Taiwan Stock Price Query Tool
Query real-time stock prices from Taiwan Stock Exchange (TWSE).
"""
name = "get_stock_price"
description = "Get Taiwan stock price information from TWSE"
input_schema = {
"type": "object",
"properties": {
"stock_code": {
"type": "string",
"description": "Taiwan stock code (e.g., '2330' for TSMC, '2454' for MediaTek)"
}
},
"required": ["stock_code"]
}
async def execute(self, stock_code: str, **kwargs) -> Dict[str, Any]:
"""
Execute stock price query
Queries Taiwan Stock Exchange (TWSE) API for real-time stock data.
Args:
stock_code: Taiwan stock code
Returns:
Stock information dictionary
"""
logger.info(f"📊 Stock query: stock_code={stock_code}")
try:
# Query TWSE API
url = f"http://mis.twse.com.tw/stock/api/getStockInfo.jsp?ex_ch=tse_{stock_code}.tw"
response = requests.get(url, timeout=5, verify=False)
response.raise_for_status()
data = response.json()
# Parse response
if data.get("msgArray") and len(data["msgArray"]) > 0:
stock = data["msgArray"][0]
# Extract price information
current_price = float(stock.get("z", 0))
yesterday_price = float(stock.get("y", current_price))
change = current_price - yesterday_price
change_percent = (change / yesterday_price * 100) if yesterday_price > 0 else 0
result = {
"stock_code": stock_code,
"name": stock.get("n", f"股票 {stock_code}"),
"price": f"NT$ {current_price:.2f}",
"change": f"{'+' if change >= 0 else ''}{change:.2f}",
"change_percent": f"{'+' if change_percent >= 0 else ''}{change_percent:.2f}%",
"volume": stock.get("v", "N/A"),
"timestamp": datetime.now().isoformat(),
"success": True
}
logger.info(f"✅ Stock result: {stock_code} - {result['name']} - {result['price']}")
return result
else:
error_msg = f"Stock code {stock_code} not found"
logger.warning(f"⚠️ {error_msg}")
return {
"error": error_msg,
"stock_code": stock_code,
"success": False
}
except requests.Timeout:
error_msg = "Request timeout - TWSE API not responding"
logger.error(f"❌ {error_msg}")
return {
"error": error_msg,
"stock_code": stock_code,
"success": False
}
except Exception as e:
error_msg = f"Failed to query stock price: {str(e)}"
logger.error(f"❌ {error_msg}")
return {
"error": error_msg,
"stock_code": stock_code,
"success": False
}
# Register all available tools
AVAILABLE_TOOLS = [
WeatherTool(),
CalculatorTool(),
StockPriceTool(),
]
def get_tools_list() -> list:
"""Get list of all available tools"""
return [tool.to_dict() for tool in AVAILABLE_TOOLS]
def get_tool(tool_name: str) -> BaseTool:
"""
Get tool by name
Args:
tool_name: Tool name
Returns:
Tool instance
Raises:
ValueError: If tool not found
"""
for tool in AVAILABLE_TOOLS:
if tool.name == tool_name:
return tool
raise ValueError(f"Tool not found: {tool_name}")
async def call_tool(tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""
Call a tool by name
Args:
tool_name: Tool name
arguments: Tool arguments
Returns:
Tool execution result
"""
tool = get_tool(tool_name)
return await tool.execute(**arguments)