Skip to main content
Glama

MCP Orchestration Server

math_agent.py24.1 kB
#!/usr/bin/env python3 """ Math Agent - Mathematical Calculations and Operations Handles mathematical expressions, calculations, and problem solving """ import os import re import math import operator import asyncio from datetime import datetime from typing import Dict, List, Any, Optional, Union import logging # Add project root to path import sys sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) from agents.base_agent import BaseMCPAgent, AgentCapability, MCPMessage # MongoDB integration try: from mcp_mongodb_integration import MCPMongoDBIntegration MONGODB_AVAILABLE = True except ImportError: MONGODB_AVAILABLE = False class MathAgent(BaseMCPAgent): """Math agent for mathematical calculations and operations.""" def __init__(self): capabilities = [ AgentCapability( name="mathematical_operations", description="Perform mathematical calculations and solve problems", input_types=["text", "dict"], output_types=["dict"], methods=["calculate", "solve", "evaluate", "process", "info"] ) ] super().__init__("math_agent", "Math Agent", capabilities) # Initialize MongoDB integration self.mongodb_integration = None if MONGODB_AVAILABLE: try: self.mongodb_integration = MCPMongoDBIntegration() asyncio.create_task(self._init_mongodb()) except Exception as e: self.logger.error(f"Failed to initialize MongoDB: {e}") # Mathematical operations mapping self.operations = { '+': operator.add, '-': operator.sub, '*': operator.mul, '/': operator.truediv, '//': operator.floordiv, '%': operator.mod, '**': operator.pow, '^': operator.pow } # Mathematical functions self.functions = { 'sin': math.sin, 'cos': math.cos, 'tan': math.tan, 'sqrt': math.sqrt, 'log': math.log, 'log10': math.log10, 'exp': math.exp, 'abs': abs, 'round': round, 'ceil': math.ceil, 'floor': math.floor, 'factorial': math.factorial } # Mathematical constants self.constants = { 'pi': math.pi, 'e': math.e, 'tau': math.tau } self.logger.info("Math Agent initialized") async def _init_mongodb(self): """Initialize MongoDB connection.""" if self.mongodb_integration: try: connected = await self.mongodb_integration.connect() if connected: self.logger.info("Math Agent connected to MongoDB") else: self.logger.warning("Math Agent failed to connect to MongoDB") except Exception as e: self.logger.error(f"Math Agent MongoDB initialization error: {e}") async def _store_calculation(self, input_data: Dict[str, Any], result: Dict[str, Any]): """Store calculation result in MongoDB with force storage.""" if self.mongodb_integration: try: # Primary storage method mongodb_id = await self.mongodb_integration.save_agent_output( "math_agent", input_data, result, {"calculation_type": result.get("operation", "unknown"), "storage_type": "calculation"} ) self.logger.info(f"✅ Math calculation stored in MongoDB: {mongodb_id}") # Also force store as backup await self.mongodb_integration.force_store_result( "math_agent", input_data.get("expression", "unknown"), result ) self.logger.info("✅ Math calculation force stored as backup") except Exception as e: self.logger.error(f"❌ Failed to store math calculation: {e}") # Try force storage as fallback try: await self.mongodb_integration.force_store_result( "math_agent", input_data.get("expression", "unknown"), result ) self.logger.info("✅ Math calculation fallback storage successful") except Exception as e2: self.logger.error(f"❌ Math calculation fallback storage failed: {e2}") async def handle_process(self, message: MCPMessage) -> Dict[str, Any]: """Handle the main process method.""" try: params = message.params # Get the mathematical expression or query expression = params.get("expression", "") or params.get("query", "") or params.get("text", "") if not expression: return { "status": "error", "message": "No mathematical expression provided", "examples": [ "2 + 3 * 4", "sqrt(16) + 5", "sin(pi/2)", "What is 15% of 200?", "Calculate the area of a circle with radius 5" ], "agent": self.agent_id } # Process the mathematical expression result = await self.process_math_expression(expression) # Store in MongoDB if result.get("status") == "success": await self._store_calculation( {"expression": expression, "query": expression}, result ) return result except Exception as e: self.logger.error(f"Error in process: {e}") return { "status": "error", "message": str(e), "agent": self.agent_id } async def process_math_expression(self, expression: str) -> Dict[str, Any]: """Process mathematical expression or word problem.""" try: expression = expression.strip() # Check if it's a word problem if self.is_word_problem(expression): return await self.solve_word_problem(expression) # Check if it's a direct mathematical expression elif self.is_math_expression(expression): return await self.evaluate_expression(expression) # Try to extract mathematical expression from text else: extracted_expr = self.extract_math_from_text(expression) if extracted_expr: return await self.evaluate_expression(extracted_expr) else: return { "status": "error", "message": "Could not identify mathematical expression", "input": expression, "suggestions": [ "Try a direct expression like '2 + 3'", "Use word problems like 'What is 15% of 200?'", "Include mathematical functions like 'sqrt(16)'" ], "agent": self.agent_id } except Exception as e: self.logger.error(f"Error processing math expression: {e}") return { "status": "error", "message": f"Failed to process expression: {str(e)}", "agent": self.agent_id } def is_word_problem(self, text: str) -> bool: """Check if the text is a word problem.""" word_indicators = [ "what is", "calculate", "find", "solve", "how much", "how many", "percentage", "percent", "%", "area", "volume", "circumference", "distance", "speed", "time", "rate", "interest", "profit", "loss" ] text_lower = text.lower() return any(indicator in text_lower for indicator in word_indicators) def is_math_expression(self, text: str) -> bool: """Check if the text is a direct mathematical expression.""" # Remove spaces and check for mathematical characters clean_text = text.replace(" ", "") math_chars = set("0123456789+-*/()^.%") math_functions = ["sin", "cos", "tan", "sqrt", "log", "exp", "abs"] # Check if it contains mathematical characters or functions has_math_chars = any(char in math_chars for char in clean_text) has_math_functions = any(func in text.lower() for func in math_functions) return has_math_chars or has_math_functions def extract_math_from_text(self, text: str) -> Optional[str]: """Extract mathematical expression from text.""" # Look for patterns like "2 + 3", "sqrt(16)", etc. math_patterns = [ r'(\d+(?:\.\d+)?\s*[+\-*/^%]\s*\d+(?:\.\d+)?)', r'(sqrt\(\d+(?:\.\d+)?\))', r'(sin\([^)]+\))', r'(cos\([^)]+\))', r'(tan\([^)]+\))', r'(log\([^)]+\))', r'(\d+(?:\.\d+)?\s*\*\*\s*\d+(?:\.\d+)?)' ] for pattern in math_patterns: match = re.search(pattern, text) if match: return match.group(1) return None async def solve_word_problem(self, problem: str) -> Dict[str, Any]: """Solve word problems.""" try: problem_lower = problem.lower() # Percentage problems if "%" in problem or "percent" in problem_lower: return self.solve_percentage_problem(problem) # Area problems elif "area" in problem_lower: return self.solve_area_problem(problem) # Basic arithmetic word problems elif any(word in problem_lower for word in ["add", "sum", "plus", "total"]): return self.solve_addition_problem(problem) elif any(word in problem_lower for word in ["subtract", "minus", "difference"]): return self.solve_subtraction_problem(problem) elif any(word in problem_lower for word in ["multiply", "times", "product"]): return self.solve_multiplication_problem(problem) elif any(word in problem_lower for word in ["divide", "divided", "quotient"]): return self.solve_division_problem(problem) else: return { "status": "error", "message": "Could not identify the type of word problem", "problem": problem, "supported_types": [ "Percentage calculations", "Area calculations", "Basic arithmetic (add, subtract, multiply, divide)" ], "agent": self.agent_id } except Exception as e: self.logger.error(f"Error solving word problem: {e}") return { "status": "error", "message": f"Failed to solve word problem: {str(e)}", "agent": self.agent_id } def solve_percentage_problem(self, problem: str) -> Dict[str, Any]: """Solve percentage problems.""" try: # Extract numbers from the problem numbers = re.findall(r'\d+(?:\.\d+)?', problem) if len(numbers) >= 2: if "%" in problem or "percent" in problem.lower(): # Format: "What is X% of Y?" percentage = float(numbers[0]) value = float(numbers[1]) result = (percentage / 100) * value return { "status": "success", "problem": problem, "calculation": f"{percentage}% of {value}", "result": result, "formatted_result": f"{result:.2f}", "explanation": f"{percentage}% of {value} = ({percentage}/100) × {value} = {result:.2f}", "agent": self.agent_id } return { "status": "error", "message": "Could not extract numbers from percentage problem", "problem": problem, "agent": self.agent_id } except Exception as e: return { "status": "error", "message": f"Error in percentage calculation: {str(e)}", "agent": self.agent_id } def solve_area_problem(self, problem: str) -> Dict[str, Any]: """Solve area calculation problems.""" try: numbers = re.findall(r'\d+(?:\.\d+)?', problem) problem_lower = problem.lower() if "circle" in problem_lower and len(numbers) >= 1: radius = float(numbers[0]) area = math.pi * radius ** 2 return { "status": "success", "problem": problem, "shape": "circle", "radius": radius, "calculation": f"π × {radius}²", "result": area, "formatted_result": f"{area:.2f}", "explanation": f"Area of circle = π × r² = π × {radius}² = {area:.2f}", "agent": self.agent_id } elif "rectangle" in problem_lower and len(numbers) >= 2: length = float(numbers[0]) width = float(numbers[1]) area = length * width return { "status": "success", "problem": problem, "shape": "rectangle", "length": length, "width": width, "calculation": f"{length} × {width}", "result": area, "formatted_result": f"{area:.2f}", "explanation": f"Area of rectangle = length × width = {length} × {width} = {area:.2f}", "agent": self.agent_id } elif "square" in problem_lower and len(numbers) >= 1: side = float(numbers[0]) area = side ** 2 return { "status": "success", "problem": problem, "shape": "square", "side": side, "calculation": f"{side}²", "result": area, "formatted_result": f"{area:.2f}", "explanation": f"Area of square = side² = {side}² = {area:.2f}", "agent": self.agent_id } return { "status": "error", "message": "Could not identify shape or extract dimensions", "problem": problem, "supported_shapes": ["circle", "rectangle", "square"], "agent": self.agent_id } except Exception as e: return { "status": "error", "message": f"Error in area calculation: {str(e)}", "agent": self.agent_id } def solve_addition_problem(self, problem: str) -> Dict[str, Any]: """Solve addition problems.""" numbers = re.findall(r'\d+(?:\.\d+)?', problem) if len(numbers) >= 2: nums = [float(n) for n in numbers] result = sum(nums) return { "status": "success", "problem": problem, "numbers": nums, "operation": "addition", "calculation": " + ".join(numbers), "result": result, "explanation": f"{' + '.join(numbers)} = {result}", "agent": self.agent_id } return {"status": "error", "message": "Could not extract numbers for addition", "agent": self.agent_id} def solve_subtraction_problem(self, problem: str) -> Dict[str, Any]: """Solve subtraction problems.""" numbers = re.findall(r'\d+(?:\.\d+)?', problem) if len(numbers) >= 2: nums = [float(n) for n in numbers] result = nums[0] - nums[1] return { "status": "success", "problem": problem, "numbers": nums, "operation": "subtraction", "calculation": f"{numbers[0]} - {numbers[1]}", "result": result, "explanation": f"{numbers[0]} - {numbers[1]} = {result}", "agent": self.agent_id } return {"status": "error", "message": "Could not extract numbers for subtraction", "agent": self.agent_id} def solve_multiplication_problem(self, problem: str) -> Dict[str, Any]: """Solve multiplication problems.""" numbers = re.findall(r'\d+(?:\.\d+)?', problem) if len(numbers) >= 2: nums = [float(n) for n in numbers] result = nums[0] * nums[1] return { "status": "success", "problem": problem, "numbers": nums, "operation": "multiplication", "calculation": f"{numbers[0]} × {numbers[1]}", "result": result, "explanation": f"{numbers[0]} × {numbers[1]} = {result}", "agent": self.agent_id } return {"status": "error", "message": "Could not extract numbers for multiplication", "agent": self.agent_id} def solve_division_problem(self, problem: str) -> Dict[str, Any]: """Solve division problems.""" numbers = re.findall(r'\d+(?:\.\d+)?', problem) if len(numbers) >= 2: nums = [float(n) for n in numbers] if nums[1] != 0: result = nums[0] / nums[1] return { "status": "success", "problem": problem, "numbers": nums, "operation": "division", "calculation": f"{numbers[0]} ÷ {numbers[1]}", "result": result, "explanation": f"{numbers[0]} ÷ {numbers[1]} = {result}", "agent": self.agent_id } else: return {"status": "error", "message": "Division by zero is not allowed", "agent": self.agent_id} return {"status": "error", "message": "Could not extract numbers for division", "agent": self.agent_id} async def evaluate_expression(self, expression: str) -> Dict[str, Any]: """Evaluate mathematical expression.""" try: # Clean and prepare expression clean_expr = self.clean_expression(expression) # Replace constants for const, value in self.constants.items(): clean_expr = clean_expr.replace(const, str(value)) # Replace ^ with ** clean_expr = clean_expr.replace('^', '**') # Evaluate safely result = self.safe_eval(clean_expr) return { "status": "success", "expression": expression, "cleaned_expression": clean_expr, "result": result, "formatted_result": f"{result:.6f}".rstrip('0').rstrip('.') if isinstance(result, float) else str(result), "agent": self.agent_id } except Exception as e: self.logger.error(f"Error evaluating expression: {e}") return { "status": "error", "message": f"Failed to evaluate expression: {str(e)}", "expression": expression, "agent": self.agent_id } def clean_expression(self, expression: str) -> str: """Clean mathematical expression.""" # Remove extra spaces clean = re.sub(r'\s+', '', expression) # Handle mathematical functions for func_name in self.functions.keys(): clean = re.sub(f'{func_name}\\(', f'math.{func_name}(', clean) return clean def safe_eval(self, expression: str) -> Union[int, float]: """Safely evaluate mathematical expression.""" # Create safe namespace safe_dict = { "__builtins__": {}, "math": math, **self.constants } # Add mathematical functions to namespace for name, func in self.functions.items(): safe_dict[name] = func # Evaluate expression result = eval(expression, safe_dict) return result async def handle_info(self, message: MCPMessage) -> Dict[str, Any]: """Handle info request.""" return { "status": "success", "info": self.get_info(), "supported_operations": list(self.operations.keys()), "supported_functions": list(self.functions.keys()), "supported_constants": list(self.constants.keys()), "examples": [ "2 + 3 * 4", "sqrt(16) + 5", "sin(pi/2)", "What is 15% of 200?", "Calculate the area of a circle with radius 5" ], "agent": self.agent_id } # Agent registration def get_agent_info(): """Get agent information for auto-discovery.""" return { "name": "Math Agent", "description": "Performs mathematical calculations, solves word problems, and evaluates expressions", "version": "1.0.0", "author": "MCP System", "capabilities": ["mathematical_operations", "word_problems", "expression_evaluation"], "category": "specialized" } def create_agent(): """Create and return the agent instance.""" return MathAgent() if __name__ == "__main__": # Test the agent import asyncio async def test_agent(): print("🔢 Testing Math Agent") print("=" * 40) agent = MathAgent() test_expressions = [ "2 + 3 * 4", "sqrt(16) + 5", "What is 15% of 200?", "Calculate the area of a circle with radius 5" ] for expr in test_expressions: print(f"\nTesting: {expr}") message = MCPMessage( id=f"test_{datetime.now().timestamp()}", method="process", params={"expression": expr}, timestamp=datetime.now() ) result = await agent.process_message(message) if result["status"] == "success": print(f"✅ Result: {result.get('result', result.get('formatted_result', 'N/A'))}") else: print(f"❌ Error: {result['message']}") print("\n✅ Math Agent test completed!") asyncio.run(test_agent())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Nisarg-123-web/MCP2'

If you have feedback or need assistance with the MCP directory API, please join our Discord server