Skip to main content
Glama

ONEDeFi MCP Server

by JMadhan1
ai_agent.py•22.1 kB
import json import requests import logging from typing import Dict, Any, Optional import time import random import os # New imports for AI features try: import openai OPENAI_AVAILABLE = True except ImportError: OPENAI_AVAILABLE = False openai = None from ai_portfolio_doctor import AIPortfolioDoctor from ai_strategy_sommelier import AIStrategySommelier from ai_chat_assistant import AIChatAssistant logger = logging.getLogger(__name__) class DeFiAIAgent: """AI Agent that can interact with DeFi MCP Server using Comput3 AI""" def __init__(self, mcp_server_url: str = "http://localhost:5000", api_key: str = None): self.mcp_server_url = mcp_server_url self.api_key = api_key self.session = requests.Session() if self.api_key: self.session.headers.update({'X-API-Key': self.api_key}) # Comput3 AI configuration self.openai_api_key = os.getenv('OPENAI_API_KEY', 'c3_api_key') self.openai_api_url = os.getenv('OPENAI_API_URL', 'https://api.comput3.ai/v1') self.small_model = os.getenv('SMALL_OPENAI_MODEL', 'llama3:70b') self.medium_model = os.getenv('MEDIUM_OPENAI_MODEL', 'llama3:70b') self.large_model = os.getenv('LARGE_OPENAI_MODEL', 'llama3:70b') # Initialize AI components self.portfolio_doctor = AIPortfolioDoctor(self.openai_api_key) self.strategy_sommelier = AIStrategySommelier(self.openai_api_key) self.chat_assistant = AIChatAssistant(self.openai_api_key) # AI decision-making parameters self.risk_tolerance = 0.3 # Low to medium risk self.max_position_size = 0.1 # Max 10% of portfolio per position self.target_apy = 0.05 # Target 5% APY minimum logger.info(f"DeFi AI Agent initialized with MCP server: {mcp_server_url}") logger.info(f"Using Comput3 AI endpoint: {self.openai_api_url}") def make_mcp_request(self, method: str, params: Dict[str, Any]) -> Dict[str, Any]: """Make a JSON-RPC request to the MCP server""" payload = { "jsonrpc": "2.0", "method": method, "params": params, "id": random.randint(1, 10000) } try: response = self.session.post(f"{self.mcp_server_url}/mcp", json=payload) response.raise_for_status() return response.json() except Exception as e: logger.error(f"MCP request failed: {e}") return {"error": str(e)} def ask_ai(self, prompt: str, model: str = None) -> str: """Ask Comput3 AI for intelligent decision-making""" if not model: model = self.medium_model headers = { 'Authorization': f'Bearer {self.openai_api_key}', 'Content-Type': 'application/json' } payload = { "model": model, "messages": [ { "role": "system", "content": "You are a DeFi expert AI assistant specializing in portfolio optimization, yield farming, and risk management. Provide concise, actionable advice based on market data and DeFi best practices." }, { "role": "user", "content": prompt } ], "max_tokens": 500, "temperature": 0.7 } try: response = requests.post( f"{self.openai_api_url}/chat/completions", headers=headers, json=payload, timeout=30 ) response.raise_for_status() data = response.json() return data['choices'][0]['message']['content'].strip() except Exception as e: logger.error(f"AI request failed: {e}") return f"AI analysis unavailable: {str(e)}" def analyze_portfolio(self, wallet_address: str, blockchain: str = "ethereum") -> Dict[str, Any]: """Analyze portfolio and make AI-powered investment decisions""" logger.info(f"Analyzing portfolio for wallet: {wallet_address}") # Get current portfolio portfolio_response = self.make_mcp_request("defi.portfolio", { "wallet_address": wallet_address, "blockchain": blockchain }) if "error" in portfolio_response: logger.error(f"Failed to get portfolio: {portfolio_response['error']}") return {"success": False, "error": portfolio_response["error"]} portfolio = portfolio_response.get("result", {}) # Get current positions positions_response = self.make_mcp_request("defi.positions", { "wallet_address": wallet_address, "blockchain": blockchain }) positions = positions_response.get("result", {}).get("positions", []) if "result" in positions_response else [] # Basic analysis analysis = { "total_value": portfolio.get("total_value_usd", 0), "token_count": len(portfolio.get("tokens", [])), "position_count": len(positions), "diversification_score": self._calculate_diversification(portfolio), "yield_potential": self._analyze_yield_opportunities(positions), "recommendations": [] } # Get AI insights using the Portfolio Doctor ai_prompt = f""" Analyze this DeFi portfolio: - Total Value: ${analysis['total_value']:,.2f} - Tokens: {analysis['token_count']} - Active Positions: {analysis['position_count']} - Diversification Score: {analysis['diversification_score']} - Average APY: {analysis['yield_potential']['average_apy']:.1%} Portfolio tokens: {[token.get('symbol', 'Unknown') + f" ({token.get('percentage', 0):.1f}%)" for token in portfolio.get('tokens', [])]} Provide 3 specific recommendations for optimization focusing on: 1. Risk management 2. Yield optimization 3. Diversification improvements Consider current DeFi market conditions and return brief, actionable advice. """ ai_insights = self.portfolio_doctor.analyze_portfolio_health(portfolio, analysis) # Use Portfolio Doctor analysis["ai_insights"] = ai_insights # Store AI insights # Generate traditional recommendations analysis["recommendations"] = self._generate_recommendations(portfolio, positions, analysis) return {"success": True, "analysis": analysis} def execute_strategy(self, wallet_address: str, strategy: str = "ai_optimized", blockchain: str = "ethereum") -> Dict[str, Any]: """Execute an AI-powered DeFi strategy""" logger.info(f"Executing {strategy} strategy for wallet: {wallet_address}") # First analyze the portfolio analysis_result = self.analyze_portfolio(wallet_address, blockchain) if not analysis_result["success"]: return analysis_result analysis = analysis_result["analysis"] executed_actions = [] # Get AI strategy recommendations using the Strategy Sommelier if strategy == "ai_optimized": actions = self.strategy_sommelier.get_strategy_actions(wallet_address, blockchain, analysis) elif strategy == "conservative_yield": actions = self._conservative_yield_strategy(wallet_address, blockchain, analysis) elif strategy == "aggressive_farming": actions = self._aggressive_farming_strategy(wallet_address, blockchain, analysis) elif strategy == "balanced_portfolio": actions = self._balanced_portfolio_strategy(wallet_address, blockchain, analysis) else: return {"success": False, "error": f"Unknown strategy: {strategy}"} # Execute each action for action in actions: try: result = self._execute_action(action) executed_actions.append({ "action": action, "result": result, "success": "error" not in result }) # Wait between actions to avoid rate limits time.sleep(2) except Exception as e: logger.error(f"Failed to execute action {action}: {e}") executed_actions.append({ "action": action, "result": {"error": str(e)}, "success": False }) return { "success": True, "strategy": strategy, "analysis": analysis, "executed_actions": executed_actions, "summary": self._generate_execution_summary(executed_actions) } def _calculate_diversification(self, portfolio: Dict[str, Any]) -> float: """Calculate portfolio diversification score (0-1)""" tokens = portfolio.get("tokens", []) if not tokens: return 0.0 # Simple diversification: more tokens = more diversified # In reality, this would consider correlations, sectors, etc. token_count = len(tokens) max_allocation = max([float(token.get("percentage", 0)) for token in tokens] + [0]) # Penalize high concentration diversification = min(1.0, token_count / 10) * (1 - max_allocation / 100) return round(diversification, 2) def _analyze_yield_opportunities(self, positions: list) -> Dict[str, Any]: """Analyze current yield-generating positions""" total_yield = 0 active_positions = 0 for position in positions: if position.get("position_type") in ["lending", "farming"]: active_positions += 1 apy = float(position.get("apy", 0)) total_yield += apy avg_yield = total_yield / active_positions if active_positions > 0 else 0 return { "active_positions": active_positions, "average_apy": round(avg_yield, 2), "total_estimated_yield": round(total_yield, 2), "yield_rating": "High" if avg_yield > 0.08 else "Medium" if avg_yield > 0.04 else "Low" } def _generate_recommendations(self, portfolio: Dict[str, Any], positions: list, analysis: Dict[str, Any]) -> list: """Generate AI-driven recommendations""" recommendations = [] # Diversification recommendations if analysis["diversification_score"] < 0.3: recommendations.append({ "type": "diversification", "priority": "high", "action": "Increase portfolio diversification by adding more tokens", "reason": f"Current diversification score: {analysis['diversification_score']}" }) # Yield optimization if analysis["yield_potential"]["average_apy"] < self.target_apy: recommendations.append({ "type": "yield_optimization", "priority": "medium", "action": "Consider lending assets to earn higher yields", "reason": f"Current average APY ({analysis['yield_potential']['average_apy']:.1%}) is below target ({self.target_apy:.1%})" }) # Position sizing tokens = portfolio.get("tokens", []) for token in tokens: percentage = float(token.get("percentage", 0)) if percentage > self.max_position_size * 100: recommendations.append({ "type": "risk_management", "priority": "medium", "action": f"Consider reducing {token.get('symbol', 'unknown')} position size", "reason": f"Position size ({percentage:.1f}%) exceeds recommended maximum ({self.max_position_size*100:.1f}%)" }) return recommendations def _ai_optimized_strategy(self, wallet_address: str, blockchain: str, analysis: Dict[str, Any]) -> list: """Generate AI-optimized actions based on portfolio analysis""" ai_prompt = f""" Based on this DeFi portfolio analysis, suggest 2-3 specific actions: Portfolio Summary: - Total Value: ${analysis['total_value']:,.2f} - Diversification Score: {analysis['diversification_score']} - Current Average APY: {analysis['yield_potential']['average_apy']:.1%} - Active Positions: {analysis['yield_potential']['active_positions']} Available actions: 1. LEND - Lend tokens to protocols like Aave or Compound 2. SWAP - Swap tokens for better diversification 3. FARM - Add liquidity to yield farming pools 4. WITHDRAW - Remove from underperforming positions Respond with a JSON array of actions, each with: - "action_type": "lend"|"swap"|"farm"|"withdraw" - "reasoning": "why this action" - "priority": "high"|"medium"|"low" - "risk_level": "low"|"medium"|"high" Focus on optimizing yield while managing risk. Consider current market conditions. """ ai_response = self.ask_ai(ai_prompt, self.large_model) # Parse AI response and convert to executable actions actions = [] try: # Try to extract JSON from AI response import re json_match = re.search(r'\[.*\]', ai_response, re.DOTALL) if json_match: ai_actions = json.loads(json_match.group()) for ai_action in ai_actions[:3]: # Limit to 3 actions if ai_action.get("action_type") == "lend": actions.append({ "type": "lend", "method": "defi.lend", "params": { "wallet_address": wallet_address, "blockchain": blockchain, "protocol": "aave", "token": "0xA0b86a33E6411D426C3d77A6CF0C0DFEcb7fD9A9", # USDC "amount": "100" }, "ai_reasoning": ai_action.get("reasoning", ""), "priority": ai_action.get("priority", "medium") }) elif ai_action.get("action_type") == "swap": actions.append({ "type": "swap", "method": "defi.swap", "params": { "wallet_address": wallet_address, "blockchain": blockchain, "token_in": "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2", # WETH "token_out": "0xA0b86a33E6411D426C3d77A6CF0C0DFEcb7fD9A9", # USDC "amount_in": "0.1", "protocol": "uniswap" }, "ai_reasoning": ai_action.get("reasoning", ""), "priority": ai_action.get("priority", "medium") }) except Exception as e: logger.warning(f"Could not parse AI strategy response: {e}") # Fallback to conservative strategy actions = self._conservative_yield_strategy(wallet_address, blockchain, analysis) if not actions: # Fallback if no AI actions generated actions = self._conservative_yield_strategy(wallet_address, blockchain, analysis) return actions def _conservative_yield_strategy(self, wallet_address: str, blockchain: str, analysis: Dict[str, Any]) -> list: """Generate actions for conservative yield strategy""" actions = [] # Focus on stable lending with established protocols if analysis["yield_potential"]["active_positions"] < 2: actions.append({ "type": "lend", "method": "defi.lend", "params": { "wallet_address": wallet_address, "blockchain": blockchain, "protocol": "aave", "token": "0xA0b86a33E6411D426C3d77A6CF0C0DFEcb7fD9A9", # USDC "amount": "100" # Demo amount } }) return actions def _aggressive_farming_strategy(self, wallet_address: str, blockchain: str, analysis: Dict[str, Any]) -> list: """Generate actions for aggressive farming strategy""" actions = [] # Add liquidity to high-yield pools actions.append({ "type": "farm", "method": "defi.farm", "params": { "wallet_address": wallet_address, "blockchain": blockchain, "protocol": "uniswap", "pool_id": "USDC-ETH-0.3%", "token_a": "0xA0b86a33E6411D426C3d77A6CF0C0DFEcb7fD9A9", # USDC "token_b": "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2", # WETH "amount_a": "50", "amount_b": "0.02" } }) return actions def _balanced_portfolio_strategy(self, wallet_address: str, blockchain: str, analysis: Dict[str, Any]) -> list: """Generate actions for balanced portfolio strategy""" actions = [] # Mix of swapping for diversification and yield farming if analysis["diversification_score"] < 0.5: actions.append({ "type": "swap", "method": "defi.swap", "params": { "wallet_address": wallet_address, "blockchain": blockchain, "token_in": "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2", # WETH "token_out": "0xA0b86a33E6411D426C3d77A6CF0C0DFEcb7fD9A9", # USDC "amount_in": "0.1", "protocol": "uniswap" } }) return actions def _execute_action(self, action: Dict[str, Any]) -> Dict[str, Any]: """Execute a single DeFi action via MCP""" method = action.get("method") params = action.get("params", {}) if not method: return {"error": "No method specified in action"} return self.make_mcp_request(method, params) def _generate_execution_summary(self, executed_actions: list) -> Dict[str, Any]: """Generate summary of executed actions""" successful = sum(1 for action in executed_actions if action["success"]) failed = len(executed_actions) - successful return { "total_actions": len(executed_actions), "successful": successful, "failed": failed, "success_rate": round(successful / len(executed_actions) * 100, 1) if executed_actions else 0 } def run_monitoring_loop(self, wallet_address: str, check_interval: int = 300): """Run continuous monitoring and optimization""" logger.info(f"Starting monitoring loop for wallet: {wallet_address}") logger.info(f"Check interval: {check_interval} seconds") while True: try: logger.info("Running portfolio analysis...") analysis = self.analyze_portfolio(wallet_address) if analysis["success"]: recommendations = analysis["analysis"]["recommendations"] if recommendations: logger.info(f"Found {len(recommendations)} recommendations") for rec in recommendations: logger.info(f"- {rec['type']}: {rec['action']}") else: logger.info("No recommendations at this time") else: logger.error(f"Analysis failed: {analysis.get('error', 'Unknown error')}") logger.info(f"Sleeping for {check_interval} seconds...") time.sleep(check_interval) except KeyboardInterrupt: logger.info("Monitoring loop stopped by user") break except Exception as e: logger.error(f"Error in monitoring loop: {e}") time.sleep(60) # Wait a minute before retrying def main(): """Demo function to show the AI agent in action""" print("šŸ¤– ONEDeFi AI Agent Demo") print("=" * 50) # Initialize agent (you would need to provide a real API key) agent = DeFiAIAgent( mcp_server_url="http://localhost:5000", api_key="demo_key_123" # This would be a real API key ) # Demo wallet address (this would be a real wallet) demo_wallet = "0x742d35Cc6641C88c4f95bbCdB96a2b0f0f3f6b7f" print("\nšŸ“Š Analyzing Portfolio...") analysis_result = agent.analyze_portfolio(demo_wallet) if analysis_result["success"]: analysis = analysis_result["analysis"] print(f"Portfolio Value: ${analysis['total_value']:,.2f}") print(f"Diversification Score: {analysis['diversification_score']}") print(f"Active Positions: {analysis['yield_potential']['active_positions']}") print(f"Average APY: {analysis['yield_potential']['average_apy']:.1%}") if analysis["recommendations"]: print("\nšŸ’” AI Recommendations:") for i, rec in enumerate(analysis["recommendations"], 1): print(f"{i}. [{rec['priority'].upper()}] {rec['action']}") print(f" Reason: {rec['reason']}") print("\nšŸš€ Executing Conservative Yield Strategy...") strategy_result = agent.execute_strategy(demo_wallet, "conservative_yield") if strategy_result["success"]: summary = strategy_result["summary"] print(f"Executed {summary['total_actions']} actions") print(f"Success rate: {summary['success_rate']}%") print("\nāœ… Demo completed!") if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/JMadhan1/OneDefi-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server